我們?cè)谑褂肧park的時(shí)候主要是用來(lái)快速處理大批量的數(shù)據(jù),那么實(shí)際開(kāi)發(fā)和生產(chǎn)中會(huì)有哪些數(shù)據(jù)來(lái)源呢,我歸類總結(jié)有:
接下來(lái)所有的測(cè)試是基于spark local模式,因?yàn)閘ocal模式便于測(cè)試不依賴spark集群環(huán)境。有一點(diǎn)要注意將代碼運(yùn)行在spark集群上時(shí)要將.master("local[*]")這行去掉,同時(shí)需要修改相應(yīng)的路徑名才能訪問(wèn)本地機(jī)器文件,以/tmp/people.txt文件為例:
local模式:/tmp/people.txt
集群模式:file:///tmp/people.txt 相當(dāng)于local模式/tmp/people.txt
??????????????????? hdfs://master:8020/tmp/people.txt 分布式系統(tǒng)文件
在學(xué)習(xí)各種數(shù)據(jù)來(lái)源前先了解一種最基本的數(shù)據(jù)源,那就是數(shù)據(jù)集,也就是我們根據(jù)自身開(kāi)發(fā)需求制造出來(lái)的數(shù)據(jù),常常用在開(kāi)發(fā)和測(cè)試一些簡(jiǎn)單功能上面。
開(kāi)始編寫代碼制造數(shù)據(jù)集并形成dataframe顯示出來(lái)
from pyspark.sql import SparkSessionif __name__ == '__main__': spark = SparkSession .builder .appName("loadDatas") .master("local[*]") .enableHiveSupport() .getOrCreate() datas = [('Jack', 27), ('Rose', 24), ('Andy', 32)] df = spark.createDataFrame(datas, ['name', 'age']) df.show() # ---- --- # |name|age| # ---- --- # |Jack| 27| # |Rose| 24| # |Andy| 32| # ---- --- spark.stop()
數(shù)據(jù)源people.txt內(nèi)容是
Jack 27
Rose 24
Andy 32
編寫代碼加載people.txt并通過(guò)sql顯示出來(lái)
from pyspark.sql import SparkSessionfrom pyspark.sql import Rowif __name__ == '__main__': spark = SparkSession .builder .appName("loadTextData") .master("local[*]") .getOrCreate() lines = spark.sparkContext.textFile("/home/llh/data/people.txt") parts = lines.map(lambda line: line.split(" ")) people = parts.map(lambda p: Row(name=p[0], age=p[1])) peopledf = spark.createDataFrame(people) peopledf.show() # --- ---- # |age|name| # --- ---- # | 27|Jack| # | 24|Rose| # | 32|Andy| # --- ---- peopledf.createOrReplaceTempView("people") namedf = spark.sql("select name from people where age < 30") namedf.show() # ---- # |name| # ---- # |Jack| # |Rose| # ---- spark.stop()
數(shù)據(jù)源people.csv內(nèi)容是
Jack,27
Rose,24
Andy,32
編寫代碼加載csv數(shù)據(jù)并顯示出來(lái)
from pyspark.sql import SparkSessionfrom pyspark.sql.types import *import pandas as pdif __name__ == '__main__': spark = SparkSession .builder .appName("loadCsvData") .master("local[*]") .getOrCreate() # 方式一: 與Text生成的表頭的另外一種形式 schema = StructType([ StructField("name", StringType(), True), StructField("age", IntegerType(), True) ]) peopledf = spark.read.csv("/home/llh/data/people.csv", schema=schema) peopledf.show() # ---- --- # |name|age| # ---- --- # |Jack| 27| # |Rose| 24| # |Andy| 32| # ---- --- # 方式二: 該方式并未使用Spark data = pd.read_csv("/home/llh/data/people.csv", names=['name','age']) print(data.head()) # name age # 0 Jack 27 # 1 Rose 24 # 2 Andy 32 spark.stop()
數(shù)據(jù)源people.json內(nèi)容是:
{"name":"Jack", "age":27}
{"name":"Rose", "age":24}
{"name":"Andy"}
編寫代碼加載json數(shù)據(jù)并通過(guò)接口顯示
from pyspark.sql import SparkSessionif __name__ == '__main__': spark = SparkSession .builder .appName("loadJsonData") .master("local[*]") .getOrCreate() peopledf = spark.read.json("/home/llh/data/people.json") peopledf.show() # ---- ---- # | age|name| # ---- ---- # | 27 |Jack| # | 24 |Rose| # |null|Andy| # ---- ---- peopledf.printSchema() # root # | -- age: long(nullable=true) # | -- name: string(nullable=true) peopledf.select('name').show() # ---- # |name| # ---- # |Jack| # |Rose| # |Andy| # ---- peopledf.select(peopledf['name'],peopledf['age'] 1).show() # ---- --------- # |name|(age 1)| # ---- --------- # |Jack| 28| # |Rose| 25| # |Andy| null| # ---- --------- peopledf.filter(peopledf['age'] > 25).show() # --- ---- # |age|name| # --- ---- # | 27|Jack| # --- ---- peopledf.groupBy("age").count().show() # ---- ----- # | age|count| # ---- ----- # |null| 1| # | 27| 1| # | 24| 1| # ---- ----- spark.stop()
這種格式數(shù)據(jù)一般存放在hdfs上,用一般編輯器打開(kāi)會(huì)顯示一堆亂碼
編寫代碼加載parquet數(shù)據(jù)并顯示出來(lái)
from pyspark.sql import SparkSessionif __name__ == '__main__': spark = SparkSession .builder .appName("loadParquetData") .master("local[*]") .getOrCreate() peopledf = spark.read.parquet("/home/llh/data/people.parquet") peopledf.createOrReplaceTempView("people") namedf = spark.sql("select name from people where age < 30") namedf.show() # ---- # |name| # ---- # |Jack| # |Rose| # ---- spark.stop()
jdbc可以包含mysql、oracle、tidb等,我們這里以mysql為例,數(shù)據(jù)庫(kù)是test,表為people
編寫代碼加載mysql數(shù)據(jù)庫(kù)并顯示
from pyspark.sql import SparkSessionif __name__ == '__main__': spark = SparkSession .builder .appName("loadJdbcData") .master("local[*]") .getOrCreate() peopledf = spark.read .format("jdbc") .option("url", "jdbc:mysql://localhost:3306/test") .option("driver", "com.mysql.jdbc.Driver") .option("dbtable", "(select * from people) tmp") .option("user", "root") .option("password", "1") .load() peopledf.show() # ---- --- # |name|age| # ---- --- # |Jack| 27| # |Rose| 24| # |Andy| 32| # ---- --- spark.stop()
運(yùn)行時(shí)可以會(huì)報(bào)找不到mysql驅(qū)動(dòng):java.lang.ClassNotFoundException: com.mysql.jdbc.Driver,解決辦法是mysql驅(qū)動(dòng)下載一個(gè)驅(qū)動(dòng)放到pyspark安裝目錄jars下,默認(rèn)在/usr/local/lib/python3.7/site_package/pyspark/jars/
hive數(shù)據(jù)存放文件分隔符是一種特殊符號(hào)"^A",而且一般的spark配置了hive數(shù)據(jù)庫(kù)信息,所以可以直接讀取hive數(shù)據(jù)庫(kù)
編寫代碼加載people.hive到people表中并顯示出來(lái)
from pyspark.sql import SparkSessionif __name__ == '__main__': spark = SparkSession .builder .appName("loadHiveData") .master("local[*]") .enableHiveSupport() .getOrCreate() spark.sql("create table if not exists people (name string, age int) using hive") spark.sql("load data local inpath '/home/llh/data/people.hive' into table people") spark.sql("select * from people").show() # ---- --- # |name|age| # ---- --- # |Jack| 27| # |Rose| 24| # |Andy| 32| # ---- --- spark.stop()
kafka與spark結(jié)合常用于實(shí)時(shí)項(xiàng)目,也就是spark streaming后續(xù)會(huì)單獨(dú)寫
es與mysql等數(shù)據(jù)庫(kù)類似
編寫代碼加載并顯示出來(lái)
from pyspark.sql import SparkSessionif __name__ == '__main__': spark = SparkSession .builder .appName("loadEsData") .master("local[*]") .enableHiveSupport() .getOrCreate() peopledf = spark.read .format("org.elasticsearch.spark.sql") .option("es.nodes", "localhost") .option("es.port", 9200) .option("es.resource", "people/data") .load() peopledf.registerTempTable("people") spark.sql("select * from people").show() # ---- --- # |name|age| # ---- --- # |Jack| 27| # |Rose| 24| # |Andy| 32| # ---- --- spark.stop()
以上是比較常用的數(shù)據(jù)來(lái)源,當(dāng)然還有一些比如hbase、phoenix等等...掌握上面的幾種再舉一反三問(wèn)題不大。
?
?
?
?
?
來(lái)源:https://www.icode9.com/content-1-384101.html聯(lián)系客服