九色国产,午夜在线视频,新黄色网址,九九色综合,天天做夜夜做久久做狠狠,天天躁夜夜躁狠狠躁2021a,久久不卡一区二区三区

打開APP
userphoto
未登錄

開通VIP,暢享免費電子書等14項超值服

開通VIP
Spark 初體驗

Spark 初體驗

概覽

Spark 是什么? 官方描述如下:

Apache Spark is a fast and general-purpose cluster computing system

我覺得下面這張圖描述的很好,展示了Spark 的組成,我暫時用到的局限于SparkSQL

DataFrame,DataSet,RDD 這些是Spark 的核心概念,網(wǎng)上資料很多可以去找找。

由于Spark2.0 有很大的變化,在開始之前有必要了解下Spark2.0 的變化,尋找資料的時候看到不同的寫法不至于不知道為什么.具體參考 Spark Release 2.0.0 對只用到SparkSQL 的來說,主要了解下面這幾個變化:

  • ifying DataFrame and Dataset: In Scala and Java, DataFrame and Dataset have been unified, i.e. DataFrame is just a type alias for Dataset of Row. In Python and R, given the lack of type safety, DataFrame is the main programming interface.
  • arkSession: new entry point that replaces the old SQLContext and HiveContext for DataFrame and Dataset APIs. SQLContext and HiveContext are kept for backward compatibility.
  • new, streamlined configuration API for SparkSession
  • mpler, more performant accumulator API
  • new, improved Aggregator API for typed aggregation in Datasets

開發(fā)環(huán)境

操作系統(tǒng)Ubuntu14.04 選擇Scala-2.11 ,Spark-2.01 ,IDE : IntelliJ IDEA 本地環(huán)境具體搭建步驟

  1. 安裝Scala

  2. 下載Spark Spark下載

  3. IDE 有了上面的環(huán)境 還需要一個IDE, IntelliJ IDEA 需要安裝 Scala 插件 使用 sbt 來管理依賴和打包代碼 在國內(nèi)sbt 實在太慢 暫時發(fā)現(xiàn)有人 Repox公服 可以試一下。

最終是要打包代碼到EMR 或者其他集群上跑的(暫時開發(fā)在本地)。為了打包的時候打包所有依賴,使用 sbt-assembly

好了有了上面的環(huán)境下面就可以寫代碼了。 在 build.sbt 文件添加依賴

libraryDependencies ++= Seq(  "org.apache.spark" %% "spark-core" % "2.0.1",  "org.apache.spark" %% "spark-sql" % "2.0.1",)

可以寫個簡單的測試

object Parser {  def main(args: Array[String]) {    val sourcePath = AppConfig.sourcePath    val spark = SparkSession.builder()      .master(AppConfig.sparkMaster)      .appName(AppConfig.sparkAppName)      .getOrCreate()    // todo what you want todo    val testDf = spark.read.json("test.json")    testDf.show()    spark.stop()  }}

在命令行 sbt clean assembly 可以打包一個 *.jar 文件,然后通過 spark-submmit 來運行。 這個過程中可能會遇到一些問題。比如依賴之間的 沖突,有了sbt-assembly 可以在built.sbt 文件添加如下:

assemblyMergeStrategy in assembly := {    case PathList("javax", "servlet", xs @ _*) => MergeStrategy.last    case PathList("javax", "activation", xs @ _*) => MergeStrategy.last    case PathList("javax", "inject", xs @ _*) => MergeStrategy.last    case PathList("org", "apache", xs @ _*) => MergeStrategy.last    case PathList("org", "glassfish", xs @ _*) => MergeStrategy.last    case PathList("org", "aopalliance", xs @ _*) => MergeStrategy.last    case PathList("com", "google", xs @ _*) => MergeStrategy.last    case PathList("com", "esotericsoftware", xs @ _*) => MergeStrategy.last    case PathList("com", "codahale", xs @ _*) => MergeStrategy.last    case PathList("com", "yammer", xs @ _*) => MergeStrategy.last    case "about.html" => MergeStrategy.rename    case "META-INF/ECLIPSEF.RSA" => MergeStrategy.last    case "META-INF/mailcap" => MergeStrategy.last    case "META-INF/mimetypes.default" => MergeStrategy.last    case "plugin.properties" => MergeStrategy.last    case "log4j.properties" => MergeStrategy.last    case x =>        val oldStrategy = (assemblyMergeStrategy in assembly).value        oldStrategy(x)}

具體的選項看自己需要,可以增加可以刪減。這個的目的是發(fā)生沖突的時候就選擇最新版本。

API

工作完成的內(nèi)容本來不是很復(fù)雜,但是很多東西也不是簡單Copy 就能解決的,多讀文檔有的時候就能有很多想法。下面記錄幾個操作。 主要是Dataset 的API, 因為工作主要的目的還只是數(shù)據(jù)的ETL 。RDD 的操作被分為兩種 transformation 和action 。transformation是lazy的意思是Spark 不會理解去執(zhí)行,只會遇到action 的時候才會去執(zhí)行,這樣是Spark 比較高性能的一個原因。

對Dataset 中的Column 的操作

主要的工作是對Json 做解析,所以有時候需要對每個Json 產(chǎn)生很多字段,或者對已有的字段做修改。我使用比較多的是使用

def withColumn(colName: String, col: Column): DataFrame
def withColumnRenamed(existingName: String, newName: String): DataFrame

withColumn 可以修改一個Column 如果colName不存在會新增一個, 第二個參數(shù)我習(xí)慣使用 UDF 舉個例子 定義了一個function

def parseUrl(docId: String): String = {    val url = "http://fdfda=" + docId    url  }  val urlUdf = udf((docId: String) => parseUrl(docId))  testDF = testDf.withColumn("url", testDF("doc_id"))

上面這個例子是使用testDF 原有的Column ”doc_id“ 產(chǎn)生了一個新的叫url 其中testDF 是DataFrame。這中方式我不想大概只有Scala支持,因為UDF 應(yīng)該是使用了Scala 的隱士轉(zhuǎn)換特性。

使用map, flatMap, filter

其實都很相似,這里舉個flatMap 解析嵌套Json 的例子。網(wǎng)上有中方法解決是使用 explode 但是發(fā)現(xiàn)這個方法有很多局限,有篇 blog 就是使用 explode 他使用的例子:

{	"user": "gT35Hhhre9m",	"dates": ["2016-01-29", "2016-01-28"],	"status": "OK",	"reason": "some reason",	"content": [{		"foo": 123,		"bar": "val1"	}, {		"foo": 456,		"bar": "val2"	}, {		"foo": 789,		"bar": "val3"	}, {		"foo": 124,		"bar": "val4"	}, {		"foo": 126,		"bar": "val5"	}]}

explode 可以把content 拉平,但是當我想把content 某個元素取出來關(guān)聯(lián)上 user 產(chǎn)生一個新的DataFrame 這個DataFrame 有content 里面的每個dict 和user 這個字段組成一個Row .我的真實場景會比這個Json 稍微復(fù)雜一點,我需要把dict 的某個字段(字符串類型) 分詞然后和user 拼接組合起來成為一個Row 這時候就該用到flatMap 就感覺比較好實現(xiàn)。 大概的實現(xiàn)框架如下:

def parseLitigant(row: Row): Array[Row] = {    var relateInfo = row.getAs[mutable.WrappedArray[GenericRowWithSchema]]("relateInfo")    relateInfo = relateInfo.filter((row: GenericRowWithSchema) => row.getAs[String]("key") == "appellor")    if (relateInfo.length!=0){      val docId = row.getAs[String]("doc_id")      var appellor = relateInfo(0).getAs[String]("value")      var appellorList = appellor.split(",")      for (name <- appellorList) yield Row(docId, name, "")    }    else{      Array[Row]()    }  }var testRDD = testDF.rdd.flatMap(row => parseLitigant(row))var newDF = spark.createDataFrame(testRDD, schema)

flatMap的結(jié)果是RDD ,最后一行需要轉(zhuǎn)換成DataFrame ,其中第二個參數(shù) schema 是自字定義的 StructType StructType 就是告訴Spark 最后產(chǎn)生數(shù)據(jù)的結(jié)構(gòu)。通過如果讀入的Json 固定,就最好指定schema 這樣會減少Spark 掃描的時間。

數(shù)據(jù)存儲到PostgreSQL

build.sbt 中添驅(qū)動

libraryDependencies ++= Seq(  "org.apache.spark" %% "spark-core" % "2.0.1",  "org.apache.spark" %% "spark-sql" % "2.0.1",  "org.postgresql" % "postgresql" % "9.4.1211.jre7")
def getJdbcProperties(): java.util.Properties = {    val prop = new java.util.Properties    prop.setProperty("user", jdbcUser)    prop.setProperty("password", jdbcPassword)    prop.setProperty("driver", jdbcDriver)    prop  } testDF.write.mode(SaveMode.Overwrite).jdbc(jdbcUrl, table, prop)

可以先在數(shù)據(jù)庫建表,也可以不建。如果你需要一個自增的Id,就可以先建表,當然字段需要一樣,不建Spark 會直接把所有寫入數(shù)據(jù)庫。

問題

Scala 應(yīng)該也不是使用Spark 的門檻,了解基礎(chǔ)語法,基本就可以看文檔,完成簡單統(tǒng)計絕對不是問題。 但是要深入的使用,就一定要能看懂Scala ,并深入研究Spark 的API 。Spark 的API 和豐富,但是有時候可以通過實現(xiàn)繼承Spark的接口來修改,比如Spark 導(dǎo)出成Json 的時候,沒個Row 導(dǎo)出成一個文件,且文件名是 Row 中的內(nèi)容。

本站僅提供存儲服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊舉報。
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
談?wù)凴DD、DataFrame、Dataset的區(qū)別和各自的優(yōu)勢
【Spark 2.0系列】: Spark Session API和Dataset API
SparkSession簡單介紹
Spark計算引擎之SparkSQL詳解
深入理解XGBoost:分布式實現(xiàn)
大數(shù)據(jù)開發(fā)技術(shù)之Spark SQL的多種使用方法
更多類似文章 >>
生活服務(wù)
熱點新聞
分享 收藏 導(dǎo)長圖 關(guān)注 下載文章
綁定賬號成功
后續(xù)可登錄賬號暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點擊這里聯(lián)系客服!

聯(lián)系客服