由于數(shù)據(jù)量太大而不能在一臺(tái)機(jī)器上進(jìn)行處理這樣的情況已經(jīng)越來越常見了。幸運(yùn)的是,已經(jīng)有Apache Spark、Hadoop等技術(shù)被開發(fā)出來,去解決這個(gè)確切的問題。這些系統(tǒng)的強(qiáng)大功能可以直接在Python中使用PySpark來發(fā)掘!
有效地處理GB及以上級(jí)別的數(shù)據(jù)集是任何Python開發(fā)者都應(yīng)該會(huì)的,無論你是一個(gè)數(shù)據(jù)科學(xué)家、web開發(fā)人員還是介于兩者之間的任何人員。
在本教程中,你將學(xué)習(xí):
免費(fèi)福利: 點(diǎn)擊這里訪問《Python技巧》中的一章:這本書向你展示了Python的最佳實(shí)踐和一些簡(jiǎn)單的例子,你可以立即應(yīng)用這些例子來編寫更漂亮的Python式代碼。(https://realpython.com/pyspark-intro/ )
盡管Python只是一種流行的腳本語(yǔ)言,但是它提供了幾種編程范式,比如面向數(shù)組編程、面向?qū)ο缶幊?、異步編程等等。?duì)于有抱負(fù)的大數(shù)據(jù)專業(yè)人士來說,函數(shù)式編程是一個(gè)特別有趣的范式。
在處理大數(shù)據(jù)時(shí),函數(shù)式編程是一種常見的范例。以函數(shù)的方式進(jìn)行編寫會(huì)生成高度并行的代碼。這意味著將你的代碼在多個(gè)CPU上或者甚至完全不同的機(jī)器上運(yùn)行要容易的多。你可以同時(shí)在多個(gè)系統(tǒng)上運(yùn)行你的代碼,這樣就可以繞過單個(gè)工作站的物理內(nèi)存和CPU限制。
這就是PySpark生態(tài)系統(tǒng)的強(qiáng)大功能,它允許你獲取函數(shù)代碼并將其自動(dòng)分布到整個(gè)計(jì)算機(jī)集群中。
幸運(yùn)的是,Python程序員可以在Python的標(biāo)準(zhǔn)庫(kù)和內(nèi)置程序中使用函數(shù)式編程的許多核心思想。你可以學(xué)習(xí)大數(shù)據(jù)處理所需的許多概念,而不必離開Python的舒適環(huán)境。
函數(shù)式編程的核心思想是數(shù)據(jù)應(yīng)該由函數(shù)進(jìn)行操作,而不需要維護(hù)任何外部狀態(tài)。這意味著你的代碼避免了全局變量,并且總是會(huì)返回新數(shù)據(jù),而不是原地操作數(shù)據(jù)。
函數(shù)編程中的另一個(gè)常見概念是匿名函數(shù)。Python使用lambda關(guān)鍵字定義匿名函數(shù),不要與AWS lambda函數(shù)相混淆。
現(xiàn)在你已經(jīng)了解了一些術(shù)語(yǔ)和概念,你可以探索這些思想在Python生態(tài)系統(tǒng)中是如何體現(xiàn)的。
Python中的lambda函數(shù)是內(nèi)聯(lián)定義的,并且僅限于一個(gè)表達(dá)式。在使用內(nèi)置的sorted()函數(shù)時(shí),你可能已經(jīng)見過lambda函數(shù)了:
sorted函數(shù)的key參數(shù)會(huì)被調(diào)用來獲取iteranle(迭代)中的每個(gè)項(xiàng)。這使得排序不區(qū)分大小寫,方法是在排序之前將所有字符串變?yōu)樾憽?/p>
這是lambda函數(shù)的一個(gè)常見用例,它是一些小的匿名函數(shù),不維護(hù)任何外部狀態(tài)。
Python中還存在其它常見的函數(shù)式編程函數(shù),如filter()、map()和reduce()。所有這些函數(shù)都可以使用lambda函數(shù)或使用def以類似的方式定義的標(biāo)準(zhǔn)函數(shù)。
內(nèi)置的filter()、map()和reduce()函數(shù)在函數(shù)式編程中都很常見。你很快就會(huì)看到,這些概念可以構(gòu)成一個(gè)PySpark程序功能的重要部分。
在一個(gè)核心Python上下文中理解這些函數(shù)非常重要。然后,你就可以將這些知識(shí)轉(zhuǎn)換為PySpark程序和Spark API。
filter()根據(jù)條件過濾一個(gè)iterable,通常會(huì)被表示為一個(gè)lambda函數(shù):
filter()接受一個(gè)iterable,對(duì)每個(gè)項(xiàng)調(diào)用lambda函數(shù),并返回lambda返回True的項(xiàng)。
注意: 調(diào)用list()是必需的,因?yàn)閒ilter()也是一個(gè)迭代. filter()只在你循環(huán)遍歷它們時(shí)才給出值。list()會(huì)將所有項(xiàng)強(qiáng)制載入內(nèi)存,而不是必須使用一個(gè)循環(huán)。
你可以想象使用filter()去替換一個(gè)常見的for循環(huán)模式,如下所示:
這段代碼會(huì)收集所有少于8個(gè)字符的字符串。代碼比filter()示例更冗長(zhǎng),但是它執(zhí)行的是相同的函數(shù),得到是相同的結(jié)果。
filter()的另一個(gè)不太明顯的好處是,它會(huì)返回一個(gè)iterable。這意味著filter()并不需要你的計(jì)算機(jī)有足夠的內(nèi)存來同時(shí)保存iterable中的所有項(xiàng)。隨著大數(shù)據(jù)集快速增長(zhǎng)成幾GB大小,這一點(diǎn)變得越來越重要。
map()類似于filter(),它也會(huì)對(duì)一個(gè)iterable中的每個(gè)項(xiàng)應(yīng)用一個(gè)函數(shù),只不過它總是生成原始項(xiàng)的一個(gè)1對(duì)1映射。map()返回的新iterable總是具有與原始iterable相同的元素?cái)?shù)量,而filter()則不是這樣:
map()會(huì)在所有的項(xiàng)上自動(dòng)調(diào)用lambda函數(shù),有效地替換了一個(gè)for循環(huán),如下所示:
for循環(huán)的結(jié)果與map()示例相同,后者中收集了所有項(xiàng)的大寫形式。但是,與filter()示例一樣,map()會(huì)返回一個(gè)iterable,這再次使得它可以處理無法完全裝入內(nèi)存的大數(shù)據(jù)集。
最后,Python標(biāo)準(zhǔn)庫(kù)中的函數(shù)三人組的最后一個(gè)函數(shù)是reduce()。與filter()和map()一樣,reduce()會(huì)將一個(gè)函數(shù)應(yīng)用于一個(gè)可迭代對(duì)象中的元素。
同樣,所應(yīng)用的函數(shù)可以是使用def關(guān)鍵字創(chuàng)建的標(biāo)準(zhǔn)Python函數(shù),也可以是一個(gè)lambda函數(shù)。
但是,reduce()不會(huì)返回一個(gè)新的iterable。相反,reduce()會(huì)使用所調(diào)用的函數(shù)將該iterable縮減為單個(gè)值:
這段代碼會(huì)將iterable中的所有項(xiàng)(從左到右)組合成一個(gè)單獨(dú)的項(xiàng)。這里沒有調(diào)用list(),因?yàn)閞educe()已經(jīng)返回了一個(gè)單獨(dú)項(xiàng)目。
注意: Python 3.x將內(nèi)置的reduce()函數(shù)移動(dòng)到了functools包中。
lambda、map()、filter()和reduce()是存在于許多語(yǔ)言中的概念,可以在常規(guī)Python程序中使用。很快,你將看到這些概念會(huì)擴(kuò)展到PySpark API來處理大量數(shù)據(jù)。
Set是標(biāo)準(zhǔn)Python中存在的另一種常見功能,并在大數(shù)據(jù)處理中廣泛使用。集合與列表非常相似,只是它們沒有任何順序,并且不能包含重復(fù)的值。你可以將集合看作類似于Python字典中的鍵。
與在任何優(yōu)秀的編程教程中一樣,你都希望從一個(gè)Hello World示例開始。以下是等效的PySpark示例:
不要擔(dān)心所有的細(xì)節(jié)。其主要思想是要記住,PySpark程序與常規(guī)的Python程序并沒有太大的不同。
注意: 如果你還沒有安裝PySpark或沒有指定的copyright文件,這個(gè)程序可能會(huì)在你的系統(tǒng)上引發(fā)一個(gè)異常,稍后你將看到如何處理它。
你很快就會(huì)了解到這個(gè)項(xiàng)目的所有細(xì)節(jié),但是要好好看看。該程序會(huì)計(jì)算一個(gè)名為copyright的文件的總行數(shù)和包含單詞python的行數(shù)。
請(qǐng)記住,一個(gè)PySpark程序與一個(gè)常規(guī)Python程序并沒有太大的不同,但是執(zhí)行模型可能與常規(guī)Python程序非常不同,特別是當(dāng)你在集群上運(yùn)行時(shí)。
如果你是在一個(gè)集群上,可能會(huì)有很多事情在幕后發(fā)生,這些事情會(huì)將處理過程分布到多個(gè)節(jié)點(diǎn)。但是,現(xiàn)在來說,你可以將該程序看作是一個(gè)使用了PySpark庫(kù)的Python程序。
既然你已經(jīng)了解了Python中存在的一些常見函數(shù)性概念,以及一個(gè)簡(jiǎn)單的PySpark程序,現(xiàn)在是深入了解Spark和PySpark的時(shí)候了。
Apache Spark由幾個(gè)組件組成,因此我們很難描述它。從其核心來說,Spark一個(gè)是處理大量數(shù)據(jù)的通用引擎。
Spark是用Scala編寫的,運(yùn)行在JVM上。Spark內(nèi)置了處理流數(shù)據(jù)、機(jī)器學(xué)習(xí)、圖形處理甚至通過SQL與數(shù)據(jù)交互的組件。
在本指南中,你將只了解處理大數(shù)據(jù)的核心Spark組件。然而,所有其它組件,如機(jī)器學(xué)習(xí)、SQL等,也都可以通過PySpark在Python項(xiàng)目中使用。
PySpark是什么?
Spark是用Scala實(shí)現(xiàn)的,Scala是一種運(yùn)行在JVM上的語(yǔ)言,那么,我們?nèi)绾瓮ㄟ^Python來訪問所有這些功能?
PySpark就是答案。
PySpark的當(dāng)前版本是2.4.3,可以用于Python 2.7、3.3及以上版本。
你可以將PySpark看作是在Scala API之上的一個(gè)基于Python的包裝器。這意味著你有兩套文檔可以參考:
PySpark API文檔中有一些示例,但通常你希望參考Scala文檔并將你的PySpark程序的代碼轉(zhuǎn)換為Python語(yǔ)法。幸運(yùn)的是,Scala是一種非常易讀的基于函數(shù)的編程語(yǔ)言。
PySpark通過Py4J庫(kù)與Spark 基于Scala的API進(jìn)行通信。Py4J不是特定于PySpark或Spark的。Py4J允許任何Python程序與基于JVM的代碼進(jìn)行對(duì)話。
PySpark基于函數(shù)式范式有兩個(gè)原因:
你還可以將PySpark看作是一個(gè)允許在單個(gè)機(jī)器或一組機(jī)器上處理大量數(shù)據(jù)的庫(kù)。
在一個(gè)Python上下文中,PySpark可以處理并行進(jìn)程,而不需要threading 或者 multiprocessing模塊。所有線程、進(jìn)程甚至不同CPU之間的復(fù)雜通信和同步都由Spark處理。
要與PySpark進(jìn)行交互,你需要?jiǎng)?chuàng)建被稱為彈性分布式數(shù)據(jù)集(RDDs)的專用數(shù)據(jù)結(jié)構(gòu)。
如果你的程序運(yùn)行在集群上,RDDs將通過一個(gè)調(diào)度程序在多個(gè)節(jié)點(diǎn)上自動(dòng)轉(zhuǎn)換和分發(fā)數(shù)據(jù),從而隱藏所有的復(fù)雜性。
為了更好地理解PySpark的API和數(shù)據(jù)結(jié)構(gòu),請(qǐng)回憶一下我們前面提到的Hello World程序:
任何PySpark程序的入口點(diǎn)都是一個(gè)SparkContext對(duì)象。此對(duì)象允許你連接到一個(gè)Spark集群并創(chuàng)建RDDs。local[*]字符串是一個(gè)特殊的字符串,表示你正在使用一個(gè)本地集群,這是告訴你你是在單機(jī)模式下運(yùn)行的另一種方式。這個(gè)*會(huì)告訴Spark在你的機(jī)器上創(chuàng)建與邏輯核心一樣多的工人線程。
當(dāng)你正在使用一個(gè)集群時(shí),創(chuàng)建SparkContext可能會(huì)更加復(fù)雜。要連接到一個(gè)Spark集群,你可能需要處理身份驗(yàn)證和一些特定于集群的其它信息。你可以設(shè)置類似于下面的這些細(xì)節(jié):
有了SparkContext之后,你就可以開始創(chuàng)建RDDs了。
你可以以多種方式來創(chuàng)建RDDs,但是一種普遍的方式是使用PySpark的 parallelize()函數(shù)。parallelize()可以將一些Python數(shù)據(jù)結(jié)構(gòu)(如列表和元組)轉(zhuǎn)換為RDDs,這為你提供了容錯(cuò)和分布式的功能。
為了更好地理解RDDs,我們考慮另一個(gè)例子。下面的代碼會(huì)創(chuàng)建一個(gè)包含10,000個(gè)元素的迭代器,然后使用parallelize()將數(shù)據(jù)分布到2個(gè)分區(qū)中:
parallelize()將該迭代器轉(zhuǎn)換為一組分布式數(shù)字,并為你提供Spark基礎(chǔ)設(shè)施的所有功能。
注意,這段代碼使用了RDD的filter()方法,而不是你前面看到的Python的內(nèi)置filter()方法。結(jié)果是一樣的,但幕后發(fā)生的事情卻截然不同。通過使用RDD的filter()方法,該操作以分布式方式跨多個(gè)CPU或計(jì)算機(jī)進(jìn)行。
同樣,假設(shè)這是Spark正在為你執(zhí)行multiprocessing工作,所有這些工作都封裝在RDD數(shù)據(jù)結(jié)構(gòu)中。
take()是一種查看你的RDD內(nèi)容的方法,但只能看到一個(gè)小子集。take()會(huì)將該數(shù)據(jù)子集從這個(gè)分布式系統(tǒng)拖向一臺(tái)機(jī)器。
take()對(duì)于調(diào)試非常重要,因?yàn)樵谝慌_(tái)機(jī)器上檢查你的整個(gè)數(shù)據(jù)集可能是不可能的。RDDs被優(yōu)化為用于大數(shù)據(jù),因此在實(shí)際情況中,一臺(tái)機(jī)器可能沒有足夠的RAM來保存你的整個(gè)數(shù)據(jù)集。
注意: 在shell中運(yùn)行這樣的示例時(shí),Spark會(huì)臨時(shí)將信息打印到stdout,稍后你將看到如何處理它。你的stdout可能會(huì)臨時(shí)顯示類似于 [Stage 0:>(0 + 1) / 1]的內(nèi)容。
stdout文本展示了Spark如何分割RDDs并將你的數(shù)據(jù)處理為跨不同CPU和機(jī)器的多個(gè)階段。
創(chuàng)建RDDs的另一種方法是使用textFile()讀入一個(gè)文件,你在前面的示例中已經(jīng)看到了該方法。RDDs是使用PySpark的基本數(shù)據(jù)結(jié)構(gòu)之一,因此API中的許多函數(shù)都會(huì)返回RDDs。
RDDs與其它數(shù)據(jù)結(jié)構(gòu)之間的一個(gè)關(guān)鍵區(qū)別是,它的處理過程會(huì)被延遲到結(jié)果被請(qǐng)求時(shí)才進(jìn)行。這類似于一個(gè)Python生成器。Python生態(tài)系統(tǒng)中的開發(fā)人員通常使用術(shù)語(yǔ)延遲計(jì)算來解釋這種行為。
你可以在同一個(gè)RDD上疊加多個(gè)轉(zhuǎn)換,而不需要進(jìn)行任何處理。這個(gè)功能是可能的,因?yàn)镾park維護(hù)了這些轉(zhuǎn)換的一個(gè)有向無環(huán)圖。只有在最終結(jié)果被請(qǐng)求時(shí),底層圖才會(huì)被激活。在前面的示例中,只有在你通過調(diào)用take()請(qǐng)求結(jié)果時(shí)才進(jìn)行計(jì)算。
有多種方法可以從一個(gè)RDD中請(qǐng)求結(jié)果。通過在一個(gè)RDD上使用collect(),你可以顯式地請(qǐng)求將結(jié)果進(jìn)行計(jì)算并收集到單個(gè)集群節(jié)點(diǎn)中。你還可以通過各種方式隱式地請(qǐng)求結(jié)果,其中之一就是使用前面看到的count()。
注意: 使用這些方法時(shí)要小心,因?yàn)樗鼈儠?huì)將整個(gè)數(shù)據(jù)集拖放到內(nèi)存中,如果數(shù)據(jù)集太大而無法放入一臺(tái)機(jī)器的RAM中時(shí),那么這些方法將無法使用。
同樣,請(qǐng)參考PySpark API文檔以獲得關(guān)于所有可能的功能的更多細(xì)節(jié)。
通常,你會(huì)在一個(gè)Hadoop集群上運(yùn)行PySpark程序,但是選擇在其它集群上進(jìn)行部署也是支持的。你可以閱讀《Spark的集群模式概述》來獲取更多信息。
注意: 設(shè)置這些集群中的一個(gè)可能很困難,并且超出了本指南的范圍。理想情況下,你的團(tuán)隊(duì)需要一些向?qū)evOps工程師來幫助實(shí)現(xiàn)這一點(diǎn)。如果沒有,Hadoop發(fā)布的一個(gè)指南會(huì)幫助你。
在本指南中,你將看到在本地機(jī)器上運(yùn)行PySpark程序的幾種方法。這對(duì)于測(cè)試和學(xué)習(xí)是非常有用的,但是你很快就會(huì)希望將你的新程序運(yùn)行在一個(gè)集群上來真正地處理大數(shù)據(jù)。
有時(shí)候,由于所有必需的依賴項(xiàng),單獨(dú)設(shè)置PySpark也很有挑戰(zhàn)性。
PySpark運(yùn)行在JVM之上,并需要許多底層Java基礎(chǔ)設(shè)施才能運(yùn)行。也就是說,我們生活在Docker時(shí)代,這使得使用PySpark進(jìn)行實(shí)驗(yàn)變得更加容易。
更有甚者,Jupyter背后的優(yōu)秀開發(fā)人員已經(jīng)為你完成了所有繁重的工作。他們發(fā)布了一個(gè)Dockerfile,其中包括所有的PySpark依賴項(xiàng)以及Jupyter。因此,你可以直接在Jupyternotebook上進(jìn)行實(shí)驗(yàn)!
注意:Jupyter notebook有很多功能。請(qǐng)查看《Jupyter notebook介紹》來獲取更多有關(guān)如何有效使用notebook的詳細(xì)信息。
首先,你需要安裝Docker。如果你還沒有設(shè)置好Docker,請(qǐng)查看《Docker 實(shí)戰(zhàn) – 更輕松、更愉快、更高效》。
注意: Docker鏡像可能非常大,所以請(qǐng)確保你可以使用大約5GB的磁盤空間來使用PySpark和Jupyter。
接下來,你可以運(yùn)行以下命令來下載并自動(dòng)運(yùn)行一個(gè)帶有預(yù)置PySpark單節(jié)點(diǎn)設(shè)置的Docker容器。這個(gè)命令運(yùn)行可能需要幾分鐘的時(shí)間,因?yàn)樗苯訌腄ockerHub下載鏡像,以及Spark、PySpark和Jupyter的所有需求:
一旦該命令停止打印輸出,你就有了一個(gè)正在運(yùn)行的容器,其中包含了在一個(gè)單節(jié)點(diǎn)環(huán)境中測(cè)試PySpark程序所需的所有東西。
要停止容器,請(qǐng)?jiān)谀沔I入docker run命令的同一窗口中按下Ctrl+C。
現(xiàn)在我們終于可以運(yùn)行一些程序了!
有很多方法可以執(zhí)行PySpark程序,這取決于你喜歡命令行還是更直觀的界面。對(duì)于一個(gè)命令行界面,你可以使用spark-submit命令、標(biāo)準(zhǔn)Python shell或?qū)iT的PySpark shell。
首先,你將看到帶有一個(gè)Jupyter notebook的更直觀的界面。
Jupyter Notebook
你可以在一個(gè)Jupyter notebook中運(yùn)行你的程序,方法是運(yùn)行以下命令來啟動(dòng)你之前下載的Docker容器(如果它還沒有運(yùn)行):
現(xiàn)在你有一個(gè)容器來運(yùn)行PySpark了。注意,docker run命令輸出的末尾提到了一個(gè)本地URL。
注意: docker命令在每臺(tái)機(jī)器上的輸出會(huì)略有不同,因?yàn)榱钆啤⑷萜鱅D和容器名稱都是隨機(jī)生成的。
你需要使用該URL連接到Docker容器來在一個(gè)web瀏覽器中運(yùn)行Jupyter。將URL從你的輸出中直接復(fù)制并粘貼到你的web瀏覽器中。下面是一個(gè)你可能會(huì)看到的URL的例子:
下面命令中的URL在你的機(jī)器上可能會(huì)略有不同,但是一旦你在你的瀏覽器中連接到該URL,你就可以訪問一個(gè)Jupyter notebook環(huán)境了,該環(huán)境應(yīng)該類似如下:
從Jupyter notebook頁(yè)面,你可以使用最右邊的New按鈕來創(chuàng)建一個(gè)新的python3 shell。然后你可以測(cè)試一些代碼,就像之前的Hello World例子一樣:
以下是在Jupyter notebook中運(yùn)行該代碼的樣子:
這里的幕后發(fā)生了很多事情,所以可能需要幾秒鐘的時(shí)間才能顯示結(jié)果。在你單擊單元格后,答案并不會(huì)立即出現(xiàn)。
命令行界面提供了多種提交PySpark程序的方法,包括PySpark shell和spark-submit命令。要使用這些CLI方法,你首先需要連接到安裝了PySpark的系統(tǒng)的CLI。
要連接到Docker設(shè)置的CLI,你需要像以前那樣啟動(dòng)容器,然后附加到該容器。同樣,要啟動(dòng)容器,你可以運(yùn)行以下命令:
運(yùn)行Docker容器后,你需要通過shell連接到它,而不是使用一個(gè)Jupyter notebook。為此,請(qǐng)運(yùn)行以下命令來查找容器名稱:
這個(gè)命令將顯示所有正在運(yùn)行的容器。找到運(yùn)行jupyter/pyspark-notebook鏡像的容器的CONTAINER ID,并使用它連接到容器內(nèi)的bash shell:
現(xiàn)在你應(yīng)該連接到容器內(nèi)部的一個(gè)bash提示符了。你可以確認(rèn)一切正常,因?yàn)槟鉺hell的提示符將變?yōu)轭愃朴趈ovyan@4d5ab7a93902的東西,但是使用的是你的容器的唯一ID。
注意: 用你機(jī)器上使用的CONTAINER ID來替換4d5ab7a93902。
你可以通過命令行使用與Spark一起安裝的spark -submit命令將PySpark代碼提交給一個(gè)集群。該命令接受一個(gè)PySpark或Scala程序,并在集群上執(zhí)行它。這很可能就是你執(zhí)行真正的大數(shù)據(jù)處理工作的方式。
注意: 這些命令的路徑取決于Spark安裝在何處,并且可能只有在使用引用的Docker容器時(shí)才能工作。
要使用正在運(yùn)行的Docker容器運(yùn)行Hello World示例(或任何PySpark程序),首先,你得像前邊描述的那樣訪問shell。一旦進(jìn)入容器的shell環(huán)境,你就可以使用nano文本編輯器創(chuàng)建文件。
要在你的當(dāng)前文件夾中創(chuàng)建文件,只需帶上要?jiǎng)?chuàng)建的文件的名稱來啟動(dòng)nano:
輸入Hello World示例的內(nèi)容,然后按下Ctrl+X保存文件,并遵循保存提示:
最后,你可以使用pyspark-submit命令通過Spark來運(yùn)行代碼:
默認(rèn)情況下,該命令會(huì)產(chǎn)生大量輸出,因此可能很難看到你的程序的輸出。通過更改SparkContext變量上的級(jí)別,你可以在你的PySpark程序中控制日志的詳細(xì)程度。要做到這一點(diǎn),把這一行代碼放在你的腳本頂部附近:
這將忽略spark-submit的一些輸出,因此,你將更清楚地看到你的程序的輸出。但是,在一個(gè)實(shí)際場(chǎng)景中,你會(huì)希望將任何輸出放入一個(gè)文件、數(shù)據(jù)庫(kù)或其它存儲(chǔ)機(jī)制中,以便稍后更容易地進(jìn)行調(diào)試。
幸運(yùn)的是,PySpark程序仍然可以訪問所有的Python標(biāo)準(zhǔn)庫(kù),所以將你的結(jié)果保存到一個(gè)文件中并不是問題:
現(xiàn)在你的結(jié)果在一個(gè)名為results.txt的單獨(dú)文件中,方便以后參考。
注意: 上面的代碼使用了f-strings,它是在Python 3.6中被引入的。
運(yùn)行你的程序的另一種特定于PySpark的方法是使用PySpark本身提供的shell。同樣,使用Docker設(shè)置,你可以像上面描述的那樣連接到容器的CLI。然后,你可以使用以下命令來運(yùn)行專門的Python shell:
現(xiàn)在你已經(jīng)處于你的Docker容器中的Pyspark shell環(huán)境中了,你可以測(cè)試與Jupyter notebook示例類似的代碼:
現(xiàn)在你可以使用Pyspark shell了,就像使用普通Python shell一樣。
注意: 你不必在Pyspark shell示例中創(chuàng)建一個(gè)SparkContext變量。PySpark shell會(huì)自動(dòng)創(chuàng)建一個(gè)變量sc,并以單節(jié)點(diǎn)模式將你連接到Spark引擎。
當(dāng)你使用spark-submit或一個(gè)Jupyter notebook提交真正的PySpark程序時(shí),你必須創(chuàng)建自己的SparkContext。
你還可以使用標(biāo)準(zhǔn)的Python shell來執(zhí)行你的程序,只要該P(yáng)ython環(huán)境中安裝了PySpark。你一直在使用的這個(gè)Docker容器沒有為標(biāo)準(zhǔn)Python環(huán)境啟用PySpark。因此,你必須使用前面的方法之一才能在該Docker容器中使用PySpark。
正如你已經(jīng)看到的,PySpark附帶了額外的庫(kù)來完成像機(jī)器學(xué)習(xí)和類SQL大型數(shù)據(jù)集操作這樣的事情。不過,你也可以使用其它常見的科學(xué)庫(kù),如NumPy和Pandas。
你必須在每個(gè)集群節(jié)點(diǎn)上的相同環(huán)境中安裝這些庫(kù),然后你的程序就可以像往常一樣使用它們了。之后,你就可以自由地使用你已經(jīng)知道的所有熟悉的慣用Pandas技巧了。
記住: Pandas DataFrame需要被迅速計(jì)算,因此所有數(shù)據(jù)都將需要在一臺(tái)機(jī)器上放入內(nèi)存。
在學(xué)習(xí)了PySpark基礎(chǔ)知識(shí)后不久,你肯定想要開始分析在你使用單機(jī)模式時(shí)可能無法工作的大量數(shù)據(jù)。安裝和維護(hù)一個(gè)Spark集群遠(yuǎn)遠(yuǎn)超出了本指南的范圍,而且很可能它本身就是一項(xiàng)全職工作。
因此,可能是時(shí)候去拜訪你辦公室的IT部門或研究一個(gè)托管的Spark集群解決方案。一個(gè)潛在的托管解決方案是Databricks。
Databricks允許你使用Microsoft Azure或AWS托管數(shù)據(jù),并提供14天的免費(fèi)試用。
在你擁有了一個(gè)工作的Spark集群之后,你會(huì)想要將所有數(shù)據(jù)放入該集群進(jìn)行分析。Spark有很多導(dǎo)入數(shù)據(jù)的方法:
你甚至可以直接從一個(gè)網(wǎng)絡(luò)文件系統(tǒng)中讀取數(shù)據(jù),這就是前面示例的工作方式。
訪問你的所有數(shù)據(jù)的方法并不缺乏,不管你使用的是像Databricks這樣的一個(gè)托管解決方案,還是你自己的機(jī)器集群。
PySpark是大數(shù)據(jù)處理的一個(gè)很好的切入點(diǎn)。
在本教程中,如果你熟悉一些函數(shù)式編程概念,如map()、filter()和基本Python,那么你就不必花費(fèi)大量的時(shí)間來預(yù)先學(xué)習(xí)。實(shí)際上,你可以在你的PySpark程序中直接使用你已經(jīng)知道的所有Python知識(shí),包括熟悉的工具,如NumPy和Pandas。
你現(xiàn)在可以:
英文原文:https://realpython.com/pyspark-intro/
譯者:浣熊君( ????? )
聯(lián)系客服