【版權(quán)聲明】博客內(nèi)容由廈門大學(xué)數(shù)據(jù)庫實(shí)驗(yàn)室擁有版權(quán)!
Spark SQL可以支持Parquet、JSON、Hive等數(shù)據(jù)源,并且可以通過JDBC連接外部數(shù)據(jù)源。前面的介紹中,我們已經(jīng)涉及到了JSON、文本格式的加載,這里不再贅述。這里介紹Parquet,下一節(jié)會(huì)介紹JDBC數(shù)據(jù)庫連接。
Parquet是一種流行的列式存儲(chǔ)格式,可以高效地存儲(chǔ)具有嵌套字段的記錄。Parquet是語言無關(guān)的,而且不與任何一種數(shù)據(jù)處理框架綁定在一起,適配多種語言和組件,能夠與Parquet配合的組件有:
* 查詢引擎: Hive, Impala, Pig, Presto, Drill, Tajo, HAWQ, IBM Big SQL
* 計(jì)算框架: MapReduce, Spark, Cascading, Crunch, Scalding, Kite
* 數(shù)據(jù)模型: Avro, Thrift, Protocol Buffers, POJOs
Spark已經(jīng)為我們提供了parquet樣例數(shù)據(jù),就保存在“/usr/local/spark/examples/src/main/resources/”這個(gè)目錄下,有個(gè)users.parquet文件,這個(gè)文件格式比較特殊,如果你用vim編輯器打開,或者用cat命令查看文件內(nèi)容,肉眼是一堆亂七八糟的東西,是無法理解的。只有被加載到程序中以后,Spark會(huì)對這種格式進(jìn)行解析,然后我們才能理解其中的數(shù)據(jù)。
下面代碼演示了如何從parquet文件中加載數(shù)據(jù)生成DataFrame。
scala> import org.apache.spark.sql.SQLContextimport org.apache.spark.sql.SQLContext scala> val sqlContext = new SQLContext(sc)sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@35f18100 scala> val users = sqlContext.read.load("file:///usr/local/spark/examples/src/main/resources/users.parquet")users: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string, favorite_numbers: array<int>] scala> users.registerTempTable("usersTempTab") scala> val usersRDD =sqlContext.sql("select * from usersTempTab").rddusersRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[13] at rdd at <console>:34 scala> usersRDD.foreach(t=>println("name:"+t(0)+" favorite color:"+t(1)))name:Alyssa favorite color:nullname:Ben favorite color:red
下面介紹如何將DataFrame保存成parquet文件。
進(jìn)入spark-shell執(zhí)行下面命令:
scala> import org.apache.spark.sql.SQLContextimport org.apache.spark.sql.SQLContext scala> val sqlContext = new SQLContext(sc)sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@4a65c40 scala> val df = sqlContext.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")df: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> df.select("name","age").write.format("parquet").save("file:////usr/local/spark/examples/src/main/resources/newpeople.parquet")
上述過程執(zhí)行結(jié)束后,可以打開第二個(gè)終端窗口,在Shell命令提示符下查看新生成的newpeople.parquet:
cd /usr/local/spark/examples/src/main/resources/ls
上面命令執(zhí)行后,可以看到”/usr/local/spark/examples/src/main/resources/”這個(gè)目錄下多了一個(gè)newpeople.parquet,不過,注意,這不是一個(gè)文件,而是一個(gè)目錄(不要被newpeople.parquet中的圓點(diǎn)所迷惑,文件夾名稱也可以包含圓點(diǎn)),也就是說,df.select(“name”,”age”).write.format(“parquet”).save()括號(hào)里面的參數(shù)是文件夾,不是文件名。下面我們可以進(jìn)入newpeople.parquet目錄,會(huì)發(fā)現(xiàn)下面4個(gè)文件:
_common_metadata _metadata part-r-00000-ad565c11-d91b-4de7-865b-ea17f8e91247.gz.parquet _SUCCESS
這4個(gè)文件都是剛才保存生成的?,F(xiàn)在問題來了,如果我們要再次把這個(gè)剛生成的數(shù)據(jù)又加載到DataFrame中,應(yīng)該加載哪個(gè)文件呢?很簡單,只要加載newpeople.parquet目錄即可,而不是加載這4個(gè)文件,語句如下:
val users = sqlContext.read.load("file:///usr/local/spark/examples/src/main/resources/newpeople.parquet")
聯(lián)系客服