九色国产,午夜在线视频,新黄色网址,九九色综合,天天做夜夜做久久做狠狠,天天躁夜夜躁狠狠躁2021a,久久不卡一区二区三区

打開(kāi)APP
userphoto
未登錄

開(kāi)通VIP,暢享免費(fèi)電子書(shū)等14項(xiàng)超值服

開(kāi)通VIP
Spark學(xué)習(xí)實(shí)例(Python):加載數(shù)據(jù)源Load Data Source

我們?cè)谑褂肧park的時(shí)候主要是用來(lái)快速處理大批量的數(shù)據(jù),那么實(shí)際開(kāi)發(fā)和生產(chǎn)中會(huì)有哪些數(shù)據(jù)來(lái)源呢,我歸類總結(jié)有:

  • text
  • csv
  • json
  • parquet
  • jdbc
  • hive
  • kafka
  • elasticsearch

接下來(lái)所有的測(cè)試是基于spark local模式,因?yàn)閘ocal模式便于測(cè)試不依賴spark集群環(huán)境。有一點(diǎn)要注意將代碼運(yùn)行在spark集群上時(shí)要將.master("local[*]")這行去掉,同時(shí)需要修改相應(yīng)的路徑名才能訪問(wèn)本地機(jī)器文件,以/tmp/people.txt文件為例:

local模式:/tmp/people.txt

集群模式:file:///tmp/people.txt 相當(dāng)于local模式/tmp/people.txt

??????????????????? hdfs://master:8020/tmp/people.txt 分布式系統(tǒng)文件

在學(xué)習(xí)各種數(shù)據(jù)來(lái)源前先了解一種最基本的數(shù)據(jù)源,那就是數(shù)據(jù)集,也就是我們根據(jù)自身開(kāi)發(fā)需求制造出來(lái)的數(shù)據(jù),常常用在開(kāi)發(fā)和測(cè)試一些簡(jiǎn)單功能上面。

開(kāi)始編寫代碼制造數(shù)據(jù)集并形成dataframe顯示出來(lái)

from pyspark.sql import SparkSessionif __name__ == '__main__':    spark = SparkSession        .builder        .appName("loadDatas")        .master("local[*]")        .enableHiveSupport()        .getOrCreate()    datas = [('Jack', 27), ('Rose', 24), ('Andy', 32)]    df = spark.createDataFrame(datas, ['name', 'age'])    df.show()    #  ---- ---     # |name|age|    #  ---- ---     # |Jack| 27|    # |Rose| 24|    # |Andy| 32|    #  ---- ---     spark.stop()

text

數(shù)據(jù)源people.txt內(nèi)容是

Jack 27
Rose 24
Andy 32

編寫代碼加載people.txt并通過(guò)sql顯示出來(lái)

from pyspark.sql import SparkSessionfrom pyspark.sql import Rowif __name__ == '__main__':    spark = SparkSession        .builder        .appName("loadTextData")        .master("local[*]")        .getOrCreate()    lines = spark.sparkContext.textFile("/home/llh/data/people.txt")    parts = lines.map(lambda line: line.split(" "))    people = parts.map(lambda p: Row(name=p[0], age=p[1]))    peopledf = spark.createDataFrame(people)    peopledf.show()    #  --- ----     # |age|name|    #  --- ----     # | 27|Jack|    # | 24|Rose|    # | 32|Andy|    #  --- ----     peopledf.createOrReplaceTempView("people")    namedf = spark.sql("select name from people where age < 30")    namedf.show()    #  ----     # |name|    #  ----     # |Jack|    # |Rose|    #  ----     spark.stop()

csv

數(shù)據(jù)源people.csv內(nèi)容是

Jack,27
Rose,24
Andy,32

編寫代碼加載csv數(shù)據(jù)并顯示出來(lái)

from pyspark.sql import SparkSessionfrom pyspark.sql.types import *import pandas as pdif __name__ == '__main__':    spark = SparkSession        .builder        .appName("loadCsvData")        .master("local[*]")        .getOrCreate()    # 方式一: 與Text生成的表頭的另外一種形式    schema = StructType([        StructField("name", StringType(), True),        StructField("age", IntegerType(), True)    ])    peopledf = spark.read.csv("/home/llh/data/people.csv", schema=schema)    peopledf.show()    #  ---- ---     # |name|age|    #  ---- ---     # |Jack| 27|    # |Rose| 24|    # |Andy| 32|    #  ---- ---     # 方式二: 該方式并未使用Spark    data = pd.read_csv("/home/llh/data/people.csv", names=['name','age'])    print(data.head())    #    name  age    # 0  Jack   27    # 1  Rose   24    # 2  Andy   32    spark.stop()

json

數(shù)據(jù)源people.json內(nèi)容是:

{"name":"Jack", "age":27}
{"name":"Rose", "age":24}
{"name":"Andy"}

編寫代碼加載json數(shù)據(jù)并通過(guò)接口顯示

from pyspark.sql import SparkSessionif __name__ == '__main__':    spark = SparkSession        .builder        .appName("loadJsonData")        .master("local[*]")        .getOrCreate()    peopledf = spark.read.json("/home/llh/data/people.json")    peopledf.show()    #  ---- ----     # | age|name|    #  ---- ----     # | 27 |Jack|    # | 24 |Rose|    # |null|Andy|    #  ---- ----     peopledf.printSchema()    # root    # | -- age: long(nullable=true)    # | -- name: string(nullable=true)    peopledf.select('name').show()    #  ----     # |name|    #  ----     # |Jack|    # |Rose|    # |Andy|    #  ----     peopledf.select(peopledf['name'],peopledf['age'] 1).show()    #  ---- ---------     # |name|(age   1)|    #  ---- ---------     # |Jack|       28|    # |Rose|       25|    # |Andy|     null|    #  ---- ---------     peopledf.filter(peopledf['age'] > 25).show()    #  --- ----     # |age|name|    #  --- ----     # | 27|Jack|    #  --- ----     peopledf.groupBy("age").count().show()    #  ---- -----     # | age|count|    #  ---- -----     # |null|    1|    # |  27|    1|    # |  24|    1|    #  ---- -----     spark.stop()

parquet

這種格式數(shù)據(jù)一般存放在hdfs上,用一般編輯器打開(kāi)會(huì)顯示一堆亂碼

編寫代碼加載parquet數(shù)據(jù)并顯示出來(lái)

from pyspark.sql import SparkSessionif __name__ == '__main__':    spark = SparkSession        .builder        .appName("loadParquetData")        .master("local[*]")        .getOrCreate()    peopledf = spark.read.parquet("/home/llh/data/people.parquet")    peopledf.createOrReplaceTempView("people")    namedf = spark.sql("select name from people where age < 30")    namedf.show()    #  ----     # |name|    #  ----     # |Jack|    # |Rose|    #  ----     spark.stop()    

jdbc

jdbc可以包含mysql、oracle、tidb等,我們這里以mysql為例,數(shù)據(jù)庫(kù)是test,表為people

編寫代碼加載mysql數(shù)據(jù)庫(kù)并顯示

from pyspark.sql import SparkSessionif __name__ == '__main__':    spark = SparkSession        .builder        .appName("loadJdbcData")        .master("local[*]")        .getOrCreate()    peopledf = spark.read        .format("jdbc")        .option("url", "jdbc:mysql://localhost:3306/test")        .option("driver", "com.mysql.jdbc.Driver")        .option("dbtable", "(select * from people) tmp")        .option("user", "root")        .option("password", "1")        .load()    peopledf.show()    #  ---- ---     # |name|age|    #  ---- ---     # |Jack| 27|    # |Rose| 24|    # |Andy| 32|    #  ---- ---     spark.stop()

運(yùn)行時(shí)可以會(huì)報(bào)找不到mysql驅(qū)動(dòng):java.lang.ClassNotFoundException: com.mysql.jdbc.Driver,解決辦法是mysql驅(qū)動(dòng)下載一個(gè)驅(qū)動(dòng)放到pyspark安裝目錄jars下,默認(rèn)在/usr/local/lib/python3.7/site_package/pyspark/jars/

hive

hive數(shù)據(jù)存放文件分隔符是一種特殊符號(hào)"^A",而且一般的spark配置了hive數(shù)據(jù)庫(kù)信息,所以可以直接讀取hive數(shù)據(jù)庫(kù)

編寫代碼加載people.hive到people表中并顯示出來(lái)

from pyspark.sql import SparkSessionif __name__ == '__main__':    spark = SparkSession        .builder        .appName("loadHiveData")        .master("local[*]")        .enableHiveSupport()        .getOrCreate()    spark.sql("create table if not exists people (name string, age int) using hive")    spark.sql("load data local inpath '/home/llh/data/people.hive' into table people")    spark.sql("select * from people").show()    #  ---- ---     # |name|age|    #  ---- ---     # |Jack| 27|    # |Rose| 24|    # |Andy| 32|    #  ---- ---     spark.stop()

kafka

kafka與spark結(jié)合常用于實(shí)時(shí)項(xiàng)目,也就是spark streaming后續(xù)會(huì)單獨(dú)寫

elasticsearch

es與mysql等數(shù)據(jù)庫(kù)類似

編寫代碼加載并顯示出來(lái)

from pyspark.sql import SparkSessionif __name__ == '__main__':    spark = SparkSession        .builder        .appName("loadEsData")        .master("local[*]")        .enableHiveSupport()        .getOrCreate()    peopledf = spark.read        .format("org.elasticsearch.spark.sql")        .option("es.nodes", "localhost")        .option("es.port", 9200)        .option("es.resource", "people/data")        .load()    peopledf.registerTempTable("people")    spark.sql("select * from people").show()    #  ---- ---     # |name|age|    #  ---- ---     # |Jack| 27|    # |Rose| 24|    # |Andy| 32|    #  ---- ---     spark.stop()

以上是比較常用的數(shù)據(jù)來(lái)源,當(dāng)然還有一些比如hbase、phoenix等等...掌握上面的幾種再舉一反三問(wèn)題不大。

?

?

?

?

?

來(lái)源:https://www.icode9.com/content-1-384101.html
本站僅提供存儲(chǔ)服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊舉報(bào)。
打開(kāi)APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
hive加載json數(shù)據(jù)和解析json
spark sql根本使用方法介紹
大數(shù)據(jù)IMF傳奇行動(dòng)絕密課程第58課:使用Java和Scala在IDE中開(kāi)發(fā)DataFrame實(shí)戰(zhàn)
“模板”學(xué)習(xí)筆記(3)
【Hive】各種join連接用法
Python之pyspark:pyspark的簡(jiǎn)介、安裝、使用方法之詳細(xì)攻略
更多類似文章 >>
生活服務(wù)
熱點(diǎn)新聞
分享 收藏 導(dǎo)長(zhǎng)圖 關(guān)注 下載文章
綁定賬號(hào)成功
后續(xù)可登錄賬號(hào)暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點(diǎn)擊這里聯(lián)系客服!

聯(lián)系客服