九色国产,午夜在线视频,新黄色网址,九九色综合,天天做夜夜做久久做狠狠,天天躁夜夜躁狠狠躁2021a,久久不卡一区二区三区

打開(kāi)APP
userphoto
未登錄

開(kāi)通VIP,暢享免費(fèi)電子書等14項(xiàng)超值服

開(kāi)通VIP
大數(shù)據(jù)入門與實(shí)戰(zhàn)-PySpark的使用教程

1 PySpark簡(jiǎn)介

Apache Spark是用Scala編程語(yǔ)言編寫的。為了用Spark支持Python,Apache Spark社區(qū)發(fā)布了一個(gè)工具PySpark。使用PySpark,您也可以使用Python編程語(yǔ)言處理RDD。正是由于一個(gè)名為Py4j的庫(kù),他們才能實(shí)現(xiàn)這一目標(biāo)。
這里不介紹PySpark的環(huán)境設(shè)置,主要介紹一些實(shí)例,以便快速上手。

2 PySpark - SparkContext

SparkContext是任何spark功能的入口點(diǎn)。當(dāng)我們運(yùn)行任何Spark應(yīng)用程序時(shí),會(huì)啟動(dòng)一個(gè)驅(qū)動(dòng)程序,它具有main函數(shù),并且此處啟動(dòng)了SparkContext。然后,驅(qū)動(dòng)程序在工作節(jié)點(diǎn)上的執(zhí)行程序內(nèi)運(yùn)行操作。

SparkContext使用Py4J啟動(dòng)JVM并創(chuàng)建JavaSparkContext。默認(rèn)情況下,PySpark將SparkContext作為'sc'提供,因此創(chuàng)建新的SparkContext將不起作用。



以下代碼塊包含PySpark類的詳細(xì)信息以及SparkContext可以采用的參數(shù)。

class pyspark.SparkContext (
   master = None,
   appName = None, 
   sparkHome = None, 
   pyFiles = None, 
   environment = None, 
   batchSize = 0, 
   serializer = PickleSerializer(), 
   conf = None, 
   gateway = None, 
   jsc = None, 
   profiler_cls = <class 'pyspark.profiler.BasicProfiler'>
)

以下是SparkContext的參數(shù)具體含義:

  • Master- 它是連接到的集群的URL。

  • appName- 您的工作名稱。

  • sparkHome - Spark安裝目錄。

  • pyFiles - 要發(fā)送到集群并添加到PYTHONPATH的.zip或.py文件。

  • environment - 工作節(jié)點(diǎn)環(huán)境變量。

  • batchSize - 表示為單個(gè)Java對(duì)象的Python對(duì)象的數(shù)量。設(shè)置1以禁用批處理,設(shè)置0以根據(jù)對(duì)象大小自動(dòng)選擇批處理大小,或設(shè)置為-1以使用無(wú)限批處理大小。

  • serializer- RDD序列化器。

  • Conf - L {SparkConf}的一個(gè)對(duì)象,用于設(shè)置所有Spark屬性。

  • gateway - 使用現(xiàn)有網(wǎng)關(guān)和JVM,否則初始化新JVM。

  • JSC - JavaSparkContext實(shí)例。

  • profiler_cls - 用于進(jìn)行性能分析的一類自定義Profiler(默認(rèn)為pyspark.profiler.BasicProfiler)。
    在上述參數(shù)中,主要使用master和appname。任何PySpark程序的會(huì)使用以下兩行:

from pyspark import SparkContext
sc = SparkContext("local", "First App")

2.1 SparkContext示例 - PySpark Shell

現(xiàn)在你對(duì)SparkContext有了足夠的了解,讓我們?cè)赑ySpark shell上運(yùn)行一個(gè)簡(jiǎn)單的例子。在這個(gè)例子中,我們將計(jì)算README.md文件中帶有字符“a”或“b”的行數(shù)。那么,讓我們說(shuō)如果一個(gè)文件中有5行,3行有字符'a',那么輸出將是→ Line with a:3。字符'b'也是如此。

注 - 我們不會(huì)在以下示例中創(chuàng)建任何SparkContext對(duì)象,因?yàn)槟J(rèn)情況下,當(dāng)PySpark shell啟動(dòng)時(shí),Spark會(huì)自動(dòng)創(chuàng)建名為sc的SparkContext對(duì)象。如果您嘗試創(chuàng)建另一個(gè)SparkContext對(duì)象,您將收到以下錯(cuò)誤 - “ValueError:無(wú)法一次運(yùn)行多個(gè)SparkContexts”。

在終端輸入pyspark 啟動(dòng)PySpark Shell:

>>> logFile="file:////opt/modules/hadoop-2.8.5/README.txt"
>>> logData=sc.textFile(logFile).cache()
>>> numAs=logData.filter(lambda s:'a' in s).count()
>>> numBs=logData.filter(lambda s:'b' in s).count()                             
>>> print("Line with a:%i,line with b:%i" % (numAs,numBs))
Line with a:25,line with b:7

2.2 SparkContext示例 - Python程序

讓我們使用Python程序運(yùn)行相同的示例。創(chuàng)建一個(gè)名為demo.py的Python文件,并在該文件中輸入以下代碼。

from pyspark import SparkContext
logFile = "file:////opt/modules/hadoop-2.8.5/README.txt"
sc = SparkContext("local", "first app")
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print("Line with a:%i,lines with b :%i" % (numAs, numBs))

然后我們將在終端中執(zhí)行以下命令來(lái)運(yùn)行此Python文件。我們將得到與上面相同的輸出。

spark-submit demo.py

3 PySpark - RDD

在介紹PySpark處理RDD操作之前,我們先了解下RDD的基本概念:

RDD代表Resilient Distributed Dataset,它們是在多個(gè)節(jié)點(diǎn)上運(yùn)行和操作以在集群上進(jìn)行并行處理的元素。RDD是不可變?cè)?,這意味著一旦創(chuàng)建了RDD,就無(wú)法對(duì)其進(jìn)行更改。RDD也具有容錯(cuò)能力,因此在發(fā)生任何故障時(shí),它們會(huì)自動(dòng)恢復(fù)。您可以對(duì)這些RDD應(yīng)用多個(gè)操作來(lái)完成某項(xiàng)任務(wù)。

要對(duì)這些RDD進(jìn)行操作,有兩種方法 :

  • Transformation

  • Action

轉(zhuǎn)換 - 這些操作應(yīng)用于RDD以創(chuàng)建新的RDD。Filter,groupBy和map是轉(zhuǎn)換的示例。

操作 - 這些是應(yīng)用于RDD的操作,它指示Spark執(zhí)行計(jì)算并將結(jié)果發(fā)送回驅(qū)動(dòng)程序。

要在PySpark中應(yīng)用任何操作,我們首先需要?jiǎng)?chuàng)建一個(gè)PySpark RDD。以下代碼塊具有PySpark RDD類的詳細(xì)信息 :

class pyspark.RDD (
   jrdd, 
   ctx, 
   jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
)

接下來(lái)讓我們看看如何使用PySpark運(yùn)行一些基本操作,用以下代碼創(chuàng)建存儲(chǔ)一組單詞的RDD(spark使用parallelize方法創(chuàng)建RDD),我們現(xiàn)在將對(duì)單詞進(jìn)行一些操作。

3.1 count()

返回RDD中的元素個(gè)數(shù)

----------------------------------------count.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize(
    ["scala",
     "java",
     "hadoop",
     "spark",
     "akka",
     "spark vs hadoop",
     "pyspark",
     "pyspark and spark"
     ])
counts = words.count()
print("Number of elements in RDD -> %i" % counts)

執(zhí)行spark-submit count.py,將會(huì)輸出以下結(jié)果

Number of elements in RDD → 8

3.2 collect()

返回RDD中的所有元素

----------------------------------------collect.py - --------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "collect app")
words = sc.parallelize(
    ["scala",
     "java",
     "hadoop",
     "spark",
     "akka",
     "spark vs hadoop",
     "pyspark",
     "pyspark and spark"
     ])
coll = words.collect()
print("Elements in RDD -> %s" % coll)

執(zhí)行spark-submit collect.py 輸出以下結(jié)果

Elements in RDD -> ['scala', 'java', 'hadoop', 'spark', 'akka', 'spark vs hadoop', 'pyspark', 'pyspark and spark']

3.3 foreach(func)

僅返回滿足foreach內(nèi)函數(shù)條件的元素。在下面的示例中,我們?cè)趂oreach中調(diào)用print函數(shù),該函數(shù)打印RDD中的所有元素。

# foreach.py
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
def f(x): print(x)
fore = words.foreach(f)

執(zhí)行spark-submit foreach.py,然后輸出:

scala
java
hadoop
spark
akka
spark vs hadoop
pyspark
pyspark and spark

3.4 filter(f)

返回一個(gè)包含元素的新RDD,它滿足過(guò)濾器內(nèi)部的功能。在下面的示例中,我們過(guò)濾掉包含''spark'的字符串。

# filter.py
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize(
    ["scala",
     "java",
     "hadoop",
     "spark",
     "akka",
     "spark vs hadoop",
     "pyspark",
     "pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print("Fitered RDD -> %s" % (filtered))

執(zhí)行spark-submit filter.py:

Fitered RDD -> ['spark', 'spark vs hadoop', 'pyspark', 'pyspark and spark']

3.5 map(f, preservesPartitioning = False)

通過(guò)將該函數(shù)應(yīng)用于RDD中的每個(gè)元素來(lái)返回新的RDD。在下面的示例中,我們形成一個(gè)鍵值對(duì),并將每個(gè)字符串映射為值1

# map.py
from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize(
    ["scala",
     "java",
     "hadoop",
     "spark",
     "akka",
     "spark vs hadoop",
     "pyspark",
     "pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print("Key value pair -> %s" % (mapping))

執(zhí)行spark-submit map.py

Key value pair -> [('scala', 1), ('java', 1), ('hadoop', 1), ('spark', 1), ('akka', 1), ('spark vs hadoop', 1), ('pyspark', 1), ('pyspark and spark', 1)]

3.6 reduce(f)

執(zhí)行指定的可交換和關(guān)聯(lián)二元操作后,將返回RDD中的元素。在下面的示例中,我們從運(yùn)算符導(dǎo)入add包并將其應(yīng)用于'num'以執(zhí)行簡(jiǎn)單的加法運(yùn)算。說(shuō)白了和Python的reduce一樣:假如有一組整數(shù)[x1,x2,x3],利用reduce執(zhí)行加法操作add,對(duì)第一個(gè)元素執(zhí)行add后,結(jié)果為sum=x1,然后再將sum和x2執(zhí)行add,sum=x1+x2,最后再將x2和sum執(zhí)行add,此時(shí)sum=x1+x2+x3。

# reduce.py
from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print("Adding all the elements -> %i" % (adding))

執(zhí)行spark-submit reduce.py:

Adding all the elements -> 15

3.7 join(other, numPartitions = None)

它返回RDD,其中包含一對(duì)帶有匹配鍵的元素以及該特定鍵的所有值。

from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print( "Join RDD -> %s" % (final))

執(zhí)行spark-submit join.py:

Join RDD -> [
   ('spark', (1, 2)),  
   ('hadoop', (4, 5))
]

reference:https://www.tutorialspoint.com/pyspark/index.htm

本站僅提供存儲(chǔ)服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊舉報(bào)。
打開(kāi)APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
搭建Spark計(jì)算平臺(tái) python操作Spark
譯文:Spark編程指南(Python版)
SparkR:數(shù)據(jù)科學(xué)家的新利器
Spark 單機(jī)環(huán)境配置
大數(shù)據(jù)IMF傳奇行動(dòng)絕密課程第16課:RDD實(shí)戰(zhàn)
通過(guò) --py-files 可以在pyspark中可以順利導(dǎo)入
更多類似文章 >>
生活服務(wù)
熱點(diǎn)新聞
分享 收藏 導(dǎo)長(zhǎng)圖 關(guān)注 下載文章
綁定賬號(hào)成功
后續(xù)可登錄賬號(hào)暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點(diǎn)擊這里聯(lián)系客服!

聯(lián)系客服