九色国产,午夜在线视频,新黄色网址,九九色综合,天天做夜夜做久久做狠狠,天天躁夜夜躁狠狠躁2021a,久久不卡一区二区三区

打開APP
userphoto
未登錄

開通VIP,暢享免費電子書等14項超值服

開通VIP
理解Spark SQL(三)—— Spark SQL程序舉例

上一篇說到,在Spark 2.x當中,實際上SQLContext和HiveContext是過時的,相反是采用SparkSession對象的sql函數來操作SQL語句的。使用這個函數執(zhí)行SQL語句前需要先調用DataFrame的createOrReplaceTempView注冊一個臨時表,所以關鍵是先要將RDD轉換成DataFrame。實際上,在Spark中實際聲明了

type DataFrame = Dataset[Row]

所以,DataFrame是Dataset[Row]的別名。RDD是提供面向低層次的API,而DataFrame/Dataset提供面向高層次的API(適合于SQL等面向結構化數據的場合)。

下面提供一些Spark SQL程序的例子。

例子一:SparkSQLExam.scala

 1 package bruce.bigdata.spark.example 2  3 import org.apache.spark.sql.Row 4 import org.apache.spark.sql.SparkSession 5 import org.apache.spark.sql.types._ 6  7 object SparkSQLExam { 8  9     case class offices(office:Int,city:String,region:String,mgr:Int,target:Double,sales:Double)10     11     def main(args: Array[String]) {12 13         val spark = SparkSession14           .builder15           .appName("SparkSQLExam")16           .getOrCreate()17         18         runSparkSQLExam1(spark)19         runSparkSQLExam2(spark)20         21         spark.stop()22     23     }24     25     26     private def runSparkSQLExam1(spark: SparkSession): Unit = {27     28         import spark.implicits._29         30         val rddOffices=spark.sparkContext.textFile("/user/hive/warehouse/orderdb.db/offices/offices.txt").map(_.split("\t")).map(p=>offices(p(0).trim.toInt,p(1),p(2),p(3).trim.toInt,p(4).trim.toDouble,p(5).trim.toDouble))31         val officesDataFrame = spark.createDataFrame(rddOffices)32         33         officesDataFrame.createOrReplaceTempView("offices")34         spark.sql("select city from offices where region='Eastern'").map(t=>"City: " + t(0)).collect.foreach(println)35         36     37     }38     39     private def runSparkSQLExam2(spark: SparkSession): Unit = {40     41          import spark.implicits._42          import org.apache.spark.sql._43          import org.apache.spark.sql.types._44         45          val schema = new StructType(Array(StructField("office", IntegerType, false), StructField("city", StringType, false), StructField("region", StringType, false), StructField("mgr", IntegerType, true), StructField("target", DoubleType, true), StructField("sales", DoubleType, false)))46          val rowRDD = spark.sparkContext.textFile("/user/hive/warehouse/orderdb.db/offices/offices.txt").map(_.split("\t")).map(p => Row(p(0).trim.toInt,p(1),p(2),p(3).trim.toInt,p(4).trim.toDouble,p(5).trim.toDouble))47          val dataFrame = spark.createDataFrame(rowRDD, schema)48          49          dataFrame.createOrReplaceTempView("offices2")        50          spark.sql("select city from offices2 where region='Western'").map(t=>"City: " + t(0)).collect.foreach(println)51         52     }53     54 }

使用下面的命令進行編譯:

[root@BruceCentOS4 scala]# scalac SparkSQLExam.scala

在編譯之前,需要在CLASSPATH中增加路徑:

export CLASSPATH=$CLASSPATH:$SPARK_HOME/jars/*:$(/opt/hadoop/bin/hadoop classpath)

然后打包成jar文件:

[root@BruceCentOS4 scala]# jar -cvf spark_exam_scala.jar bruce

然后通過spark-submit提交程序到y(tǒng)arn集群執(zhí)行,為了方便從客戶端查看結果,這里采用yarn cient模式運行。

[root@BruceCentOS4 scala]# $SPARK_HOME/bin/spark-submit --class bruce.bigdata.spark.example.SparkSQLExam --master yarn --deploy-mode client spark_exam_scala.jar

運行結果截圖:

 

例子二:SparkSQLExam.scala(需要啟動hive metastore)

 1 package  bruce.bigdata.spark.example 2  3 import org.apache.spark.sql.{SaveMode, SparkSession} 4  5 object SparkHiveExam { 6  7     def main(args: Array[String]) { 8          9         val spark = SparkSession10           .builder()11           .appName("Spark Hive Exam")12           .config("spark.sql.warehouse.dir", "/user/hive/warehouse")13           .enableHiveSupport()14           .getOrCreate()15        16         import spark.implicits._17         18         //使用hql查看hive數據19         spark.sql("show databases").collect.foreach(println)20         spark.sql("use orderdb")21         spark.sql("show tables").collect.foreach(println)22         spark.sql("select city from offices where region='Eastern'").map(t=>"City: " + t(0)).collect.foreach(println)23         24         //將hql查詢出的數據保存到另外一張新建的hive表25         //找出訂單金額超過1萬美元的產品26         spark.sql("""create table products_high_sales(mfr_id string,product_id string,description string) 27                    ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS TEXTFILE""")28         spark.sql("""select mfr_id,product_id,description29                    from products a inner join orders b30                    on a.mfr_id=b.mfr and a.product_id=b.product31                    where b.amount>10000""").write.mode(SaveMode.Overwrite).saveAsTable("products_high_sales")32         33         //將HDFS文件數據導入到hive表中            34         spark.sql("""CREATE TABLE IF NOT EXISTS offices2 (office int,city string,region string,mgr int,target double,sales double ) 35                    ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS TEXTFILE""")36         spark.sql("LOAD DATA INPATH '/user/hive/warehouse/orderdb.db/offices/offices.txt' INTO TABLE offices2")37         38         spark.stop()39     }40 }

使用下面的命令進行編譯:

[root@BruceCentOS4 scala]# scalac SparkHiveExam.scala

使用下面的命令打包:

[root@BruceCentOS4 scala]# jar -cvf spark_exam_scala.jar bruce

使用下面的命令運行:

[root@BruceCentOS4 scala]# $SPARK_HOME/bin/spark-submit --class bruce.bigdata.spark.example.SparkHiveExam --master yarn --deploy-mode client spark_exam_scala.jar

程序運行結果:

 

另外上述程序運行后,hive中多了2張表:

 

 

例子三:spark_sql_exam.py

 1 from __future__ import print_function 2  3 from pyspark.sql import SparkSession 4 from pyspark.sql.types import * 5  6  7 if __name__ == "__main__": 8     spark = SparkSession  9         .builder 10         .appName("Python Spark SQL exam") 11         .config("spark.some.config.option", "some-value") 12         .getOrCreate()13 14     schema = StructType([StructField("office", IntegerType(), False), StructField("city", StringType(), False), 15         StructField("region", StringType(), False), StructField("mgr", IntegerType(), True), 16         StructField("Target", DoubleType(), True), StructField("sales", DoubleType(), False)])17         18     rowRDD = spark.sparkContext.textFile("/user/hive/warehouse/orderdb.db/offices/offices.txt").map(lambda p: p.split("\t")) 19         .map(lambda p: (int(p[0].strip()), p[1], p[2], int(p[3].strip()), float(p[4].strip()), float(p[5].strip())))20             21     dataFrame = spark.createDataFrame(rowRDD, schema)22     dataFrame.createOrReplaceTempView("offices")23     spark.sql("select city from offices where region='Eastern'").show()24     25     spark.stop()

 執(zhí)行命令運行程序:

[root@BruceCentOS4 spark]# $SPARK_HOME/bin/spark-submit --master yarn --deploy-mode client spark_sql_exam.py

程序運行結果:

 

例子四:JavaSparkSQLExam.java

 1 package bruce.bigdata.spark.example; 2  3 import java.util.ArrayList; 4 import java.util.List; 5  6 import org.apache.spark.api.java.JavaRDD; 7 import org.apache.spark.api.java.function.Function; 8 import org.apache.spark.api.java.function.MapFunction; 9 import org.apache.spark.sql.Dataset;10 import org.apache.spark.sql.Row;11 import org.apache.spark.sql.RowFactory;12 import org.apache.spark.sql.SparkSession;13 import org.apache.spark.sql.types.DataTypes;14 import org.apache.spark.sql.types.StructField;15 import org.apache.spark.sql.types.StructType;16 import org.apache.spark.sql.AnalysisException;17 18 19 public class JavaSparkSQLExam {20     public static void main(String[] args) throws AnalysisException {21         SparkSession spark = SparkSession22           .builder()23           .appName("Java Spark SQL exam")24           .config("spark.some.config.option", "some-value")25           .getOrCreate();    26         27         List<StructField> fields = new ArrayList<>();28         fields.add(DataTypes.createStructField("office", DataTypes.IntegerType, false));29         fields.add(DataTypes.createStructField("city", DataTypes.StringType, false));30         fields.add(DataTypes.createStructField("region", DataTypes.StringType, false));31         fields.add(DataTypes.createStructField("mgr", DataTypes.IntegerType, true));32         fields.add(DataTypes.createStructField("target", DataTypes.DoubleType, true));33         fields.add(DataTypes.createStructField("sales", DataTypes.DoubleType, false));34         35         StructType schema = DataTypes.createStructType(fields);36         37         38         JavaRDD<String> officesRDD = spark.sparkContext()39           .textFile("/user/hive/warehouse/orderdb.db/offices/offices.txt", 1)40           .toJavaRDD();41         42         JavaRDD<Row> rowRDD = officesRDD.map((Function<String, Row>) record -> {43           String[] attributes = record.split("\t");44           return RowFactory.create(Integer.valueOf(attributes[0].trim()), attributes[1], attributes[2], Integer.valueOf(attributes[3].trim()), Double.valueOf(attributes[4].trim()), Double.valueOf(attributes[5].trim()));45         });46         47         Dataset<Row> dataFrame = spark.createDataFrame(rowRDD, schema);48         49         dataFrame.createOrReplaceTempView("offices");50         Dataset<Row> results = spark.sql("select city from offices where region='Eastern'");51         results.collectAsList().forEach(r -> System.out.println(r));52         53         spark.stop();54     }55 }

編譯打包后通過如下命令執(zhí)行:

[root@BruceCentOS4 spark]# $SPARK_HOME/bin/spark-submit --class bruce.bigdata.spark.example.JavaSparkSQLExam --master yarn --deploy-mode client spark_exam_java.jar

運行結果:

 

上面是一些關于Spark SQL程序的一些例子,分別采用了Scala/Python/Java來編寫的。另外除了這三種語言,Spark還支持R語言編寫程序,因為我自己也不熟悉,就不舉例了。不管用什么語言,其實API都是基本一致的,主要是采用DataFrame和Dataset的高層次API來調用和執(zhí)行SQL。使用這些API,可以輕松的將結構化數據轉化成SQL來操作,同時也能夠方便的操作Hive中的數據。

 

 

 

 

 

 

 

 

 

 

 

本站僅提供存儲服務,所有內容均由用戶發(fā)布,如發(fā)現有害或侵權內容,請點擊舉報。
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
理解Spark SQL(二)—— SQLContext和HiveContext
SparkSession簡單介紹
Spark RDD、DataFrame和DataSet的區(qū)別
如何使用Redis流和Apache Spark處理實時數據?
SparkSQL內置函數
Spark計算引擎之SparkSQL詳解
更多類似文章 >>
生活服務
熱點新聞
分享 收藏 導長圖 關注 下載文章
綁定賬號成功
后續(xù)可登錄賬號暢享VIP特權!
如果VIP功能使用有故障,
可點擊這里聯系客服!

聯系客服