Spark 是什么? 官方描述如下:
Apache Spark is a fast and general-purpose cluster computing system
我覺得下面這張圖描述的很好,展示了Spark 的組成,我暫時用到的局限于SparkSQL
DataFrame,DataSet,RDD 這些是Spark 的核心概念,網(wǎng)上資料很多可以去找找。
由于Spark2.0 有很大的變化,在開始之前有必要了解下Spark2.0 的變化,尋找資料的時候看到不同的寫法不至于不知道為什么.具體參考 Spark Release 2.0.0 對只用到SparkSQL 的來說,主要了解下面這幾個變化:
操作系統(tǒng)Ubuntu14.04 選擇Scala-2.11 ,Spark-2.01 ,IDE : IntelliJ IDEA 本地環(huán)境具體搭建步驟
安裝Scala
下載Spark Spark下載
IDE 有了上面的環(huán)境 還需要一個IDE, IntelliJ IDEA 需要安裝 Scala 插件 使用 sbt 來管理依賴和打包代碼 在國內(nèi)sbt 實在太慢 暫時發(fā)現(xiàn)有人 Repox公服 可以試一下。
最終是要打包代碼到EMR 或者其他集群上跑的(暫時開發(fā)在本地)。為了打包的時候打包所有依賴,使用 sbt-assembly
好了有了上面的環(huán)境下面就可以寫代碼了。 在 build.sbt
文件添加依賴
libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "2.0.1", "org.apache.spark" %% "spark-sql" % "2.0.1",)
可以寫個簡單的測試
object Parser { def main(args: Array[String]) { val sourcePath = AppConfig.sourcePath val spark = SparkSession.builder() .master(AppConfig.sparkMaster) .appName(AppConfig.sparkAppName) .getOrCreate() // todo what you want todo val testDf = spark.read.json("test.json") testDf.show() spark.stop() }}
在命令行 sbt clean assembly
可以打包一個 *.jar
文件,然后通過 spark-submmit
來運行。 這個過程中可能會遇到一些問題。比如依賴之間的 沖突,有了sbt-assembly 可以在built.sbt 文件添加如下:
assemblyMergeStrategy in assembly := { case PathList("javax", "servlet", xs @ _*) => MergeStrategy.last case PathList("javax", "activation", xs @ _*) => MergeStrategy.last case PathList("javax", "inject", xs @ _*) => MergeStrategy.last case PathList("org", "apache", xs @ _*) => MergeStrategy.last case PathList("org", "glassfish", xs @ _*) => MergeStrategy.last case PathList("org", "aopalliance", xs @ _*) => MergeStrategy.last case PathList("com", "google", xs @ _*) => MergeStrategy.last case PathList("com", "esotericsoftware", xs @ _*) => MergeStrategy.last case PathList("com", "codahale", xs @ _*) => MergeStrategy.last case PathList("com", "yammer", xs @ _*) => MergeStrategy.last case "about.html" => MergeStrategy.rename case "META-INF/ECLIPSEF.RSA" => MergeStrategy.last case "META-INF/mailcap" => MergeStrategy.last case "META-INF/mimetypes.default" => MergeStrategy.last case "plugin.properties" => MergeStrategy.last case "log4j.properties" => MergeStrategy.last case x => val oldStrategy = (assemblyMergeStrategy in assembly).value oldStrategy(x)}
具體的選項看自己需要,可以增加可以刪減。這個的目的是發(fā)生沖突的時候就選擇最新版本。
工作完成的內(nèi)容本來不是很復(fù)雜,但是很多東西也不是簡單Copy 就能解決的,多讀文檔有的時候就能有很多想法。下面記錄幾個操作。 主要是Dataset 的API, 因為工作主要的目的還只是數(shù)據(jù)的ETL 。RDD 的操作被分為兩種 transformation 和action 。transformation是lazy的意思是Spark 不會理解去執(zhí)行,只會遇到action 的時候才會去執(zhí)行,這樣是Spark 比較高性能的一個原因。
主要的工作是對Json 做解析,所以有時候需要對每個Json 產(chǎn)生很多字段,或者對已有的字段做修改。我使用比較多的是使用
def withColumn(colName: String, col: Column): DataFrame
def withColumnRenamed(existingName: String, newName: String): DataFrame
withColumn
可以修改一個Column 如果colName不存在會新增一個, 第二個參數(shù)我習(xí)慣使用 UDF 舉個例子 定義了一個function
def parseUrl(docId: String): String = { val url = "http://fdfda=" + docId url } val urlUdf = udf((docId: String) => parseUrl(docId)) testDF = testDf.withColumn("url", testDF("doc_id"))
上面這個例子是使用testDF 原有的Column ”doc_id“ 產(chǎn)生了一個新的叫url 其中testDF 是DataFrame。這中方式我不想大概只有Scala支持,因為UDF 應(yīng)該是使用了Scala 的隱士轉(zhuǎn)換特性。
其實都很相似,這里舉個flatMap 解析嵌套Json 的例子。網(wǎng)上有中方法解決是使用 explode
但是發(fā)現(xiàn)這個方法有很多局限,有篇 blog 就是使用 explode
他使用的例子:
{ "user": "gT35Hhhre9m", "dates": ["2016-01-29", "2016-01-28"], "status": "OK", "reason": "some reason", "content": [{ "foo": 123, "bar": "val1" }, { "foo": 456, "bar": "val2" }, { "foo": 789, "bar": "val3" }, { "foo": 124, "bar": "val4" }, { "foo": 126, "bar": "val5" }]}
explode 可以把content 拉平,但是當我想把content 某個元素取出來關(guān)聯(lián)上 user
產(chǎn)生一個新的DataFrame 這個DataFrame 有content 里面的每個dict 和user 這個字段組成一個Row .我的真實場景會比這個Json 稍微復(fù)雜一點,我需要把dict 的某個字段(字符串類型) 分詞然后和user 拼接組合起來成為一個Row 這時候就該用到flatMap 就感覺比較好實現(xiàn)。 大概的實現(xiàn)框架如下:
def parseLitigant(row: Row): Array[Row] = { var relateInfo = row.getAs[mutable.WrappedArray[GenericRowWithSchema]]("relateInfo") relateInfo = relateInfo.filter((row: GenericRowWithSchema) => row.getAs[String]("key") == "appellor") if (relateInfo.length!=0){ val docId = row.getAs[String]("doc_id") var appellor = relateInfo(0).getAs[String]("value") var appellorList = appellor.split(",") for (name <- appellorList) yield Row(docId, name, "") } else{ Array[Row]() } }var testRDD = testDF.rdd.flatMap(row => parseLitigant(row))var newDF = spark.createDataFrame(testRDD, schema)
flatMap的結(jié)果是RDD ,最后一行需要轉(zhuǎn)換成DataFrame ,其中第二個參數(shù) schema
是自字定義的 StructType
StructType
就是告訴Spark 最后產(chǎn)生數(shù)據(jù)的結(jié)構(gòu)。通過如果讀入的Json 固定,就最好指定schema 這樣會減少Spark 掃描的時間。
在 build.sbt
中添驅(qū)動
libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "2.0.1", "org.apache.spark" %% "spark-sql" % "2.0.1", "org.postgresql" % "postgresql" % "9.4.1211.jre7")
def getJdbcProperties(): java.util.Properties = { val prop = new java.util.Properties prop.setProperty("user", jdbcUser) prop.setProperty("password", jdbcPassword) prop.setProperty("driver", jdbcDriver) prop } testDF.write.mode(SaveMode.Overwrite).jdbc(jdbcUrl, table, prop)
可以先在數(shù)據(jù)庫建表,也可以不建。如果你需要一個自增的Id,就可以先建表,當然字段需要一樣,不建Spark 會直接把所有寫入數(shù)據(jù)庫。
Scala 應(yīng)該也不是使用Spark 的門檻,了解基礎(chǔ)語法,基本就可以看文檔,完成簡單統(tǒng)計絕對不是問題。 但是要深入的使用,就一定要能看懂Scala ,并深入研究Spark 的API 。Spark 的API 和豐富,但是有時候可以通過實現(xiàn)繼承Spark的接口來修改,比如Spark 導(dǎo)出成Json 的時候,沒個Row 導(dǎo)出成一個文件,且文件名是 Row 中的內(nèi)容。
聯(lián)系客服