Spark SQL支持多種數(shù)據(jù)源,如JDBC、HDFS、HBase。它的內(nèi)部組件,如SQL的語法解析器、分析器等支持重定義進行擴展,能更好的滿足不同的業(yè)務(wù)場景。與Spark Core無縫集成,提供了DataSet/DataFrame的可編程抽象數(shù)據(jù)模型,并且可被視為一個分布式的SQL查詢引擎。
DataSet/DataFrame
DataSet/DataFrame都是Spark SQL提供的分布式數(shù)據(jù)集,相對于RDD而言,除了記錄數(shù)據(jù)以外,還記錄表的schema信息。
DataFrame是DataSet以命名列方式組織的分布式數(shù)據(jù)集,類似于RDBMS中的表,或者R和Python中的 data frame。DataFrame API支持Scala、Java、Python、R。在Scala API中,DataFrame變成類型為Row的Dataset:
type DataFrame = Dataset[Row]。
DataFrame在編譯期不進行數(shù)據(jù)中字段的類型檢查,在運行期進行檢查。但DataSet則與之相反,因為它是強類型的。此外,二者都是使用catalyst進行sql的解析和優(yōu)化。為了方便,以下統(tǒng)一使用DataSet統(tǒng)稱。
DataSet創(chuàng)建
DataSet通常通過加載外部數(shù)據(jù)或通過RDD轉(zhuǎn)化創(chuàng)建。
1.加載外部數(shù)據(jù)
以加載json和mysql為例:
..() ..() .(( , , , , )).()
2.RDD轉(zhuǎn)換為DataSet
通過RDD轉(zhuǎn)化創(chuàng)建DataSet,關(guān)鍵在于為RDD指定schema,通常有兩種方式(偽代碼):
) .().(.()) ) (:, :, :) ) .( (()., (), ().)) ) . (.().( (, , ))) .(((),())) .(,)
操作DataSet的兩種風(fēng)格語法
DSL語法
1.查詢DataSet部分列中的內(nèi)容
personDS.select(col("name"))
personDS.select(col("name"), col("age"))
2.查詢所有的name和age和salary,并將salary加1000
personDS.select(col("name"), col("age"), col("salary") + 1000)
personDS.select(personDS("name"), personDS("age"), personDS("salary") + 1000)
3.過濾age大于18的
personDS.filter(col("age") > 18)
4.按年齡進行分組并統(tǒng)計相同年齡的人數(shù)
personDS.groupBy("age").count()
注意:直接使用col方法需要import org.apache.spark.sql.functions._
SQL語法
如果想使用SQL風(fēng)格的語法,需要將DataSet注冊成表
personDS.registerTempTable("person")
//查詢年齡最大的前兩名
val result = sparkSession.sql("select * from person order by age desc limit 2")
//保存結(jié)果為json文件。注意:如果不指定存儲格式,則默認(rèn)存儲為parquet
result.write.format("json").save("hdfs://ip:port/res2")
Spark SQL的幾種使用方式
1.sparksql-shell交互式查詢
就是利用Spark提供的shell命令行執(zhí)行SQL
2.編程
首先要獲取Spark SQL編程"入口":SparkSession(當(dāng)然在早期版本中大家可能更熟悉的是SQLContext,如果是操作hive則為HiveContext)。這里以讀取parquet為例:
val spark = SparkSession.builder()
.appName("example").master("local[*]").getOrCreate();
val df = sparkSession.read.format("parquet").load("/路徑/parquet文件")
然后就可以針對df進行業(yè)務(wù)處理了。
3.Thriftserver
beeline客戶端連接操作
啟動spark-sql的thrift服務(wù),sbin/start-thriftserver.sh,啟動腳本中配置好Spark集群服務(wù)資源、地址等信息。然后通過beeline連接thrift服務(wù)進行數(shù)據(jù)處理。
hive-jdbc驅(qū)動包來訪問spark-sql的thrift服務(wù)
在項目pom文件中引入相關(guān)驅(qū)動包,跟訪問mysql等jdbc數(shù)據(jù)源類似。示例:
.() .(, , ); { .() .() (.()) { (.()) } } { : .() } { () .() }
Spark SQL 獲取Hive數(shù)據(jù)
Spark SQL讀取hive數(shù)據(jù)的關(guān)鍵在于將hive的元數(shù)據(jù)作為服務(wù)暴露給Spark。除了通過上面thriftserver jdbc連接hive的方式,也可以通過下面這種方式:
首先,配置 $HIVE_HOME/conf/hive-site.xml,增加如下內(nèi)容:
<property>
<name>hive.metastore.uris</name>
<value>thrift://ip:port</value>
</property>
然后,啟動hive metastore
最后,將hive-site.xml復(fù)制或者軟鏈到$SPARK_HOME/conf/。如果hive的元數(shù)據(jù)存儲在mysql中,那么需要將mysql的連接驅(qū)動jar包如mysql-connector-java-5.1.12.jar放到$SPARK_HOME/lib/下,啟動spark-sql即可操作hive中的庫和表。而此時使用hive元數(shù)據(jù)獲取SparkSession的方式為:
val spark = SparkSession.builder()
.config(sparkConf).enableHiveSupport().getOrCreate()
UDF、UDAF、Aggregator
UDF
UDF是最基礎(chǔ)的用戶自定義函數(shù),以自定義一個求字符串長度的udf為例:
{(:) .} ..(,) ..() .() .()
UDAF
定義UDAF,需要繼承抽象類UserDefinedAggregateFunction,它是弱類型的,下面的aggregator是強類型的。以求平均數(shù)為例:
....{, } ..... ..... ..... { : ((, ) :: ) : { ((, ) :: (, ) :: ) } : : (: ): { () () } (: , : ): { (.()) { () .() .() () .() } } (: , : ): { () .() .() () .() .() } (: ): .(). .() } ..(, ) ..() .() .() .() .()
Aggregator
....{, , } ..... (: , : ) ( : , : ) [, , ] { : (, ) (: , : ): { . . . } (: , : ): { . . . . } (: ): .. . : [] . : [] . } ..().[] .() ..() .() .()
聯(lián)系客服