Spark集群
Spark集群由兩種進程組成:一個驅(qū)動程序和多個執(zhí)行程序。在本地模式下,所有的進程都在同一個Java虛擬機中運行。在集群上,這些進程則通常在多個節(jié)點上運行。
比如,在單機環(huán)境下運行的集群有以下特征:
1、一個主節(jié)點作為spark單機模式的主進程和驅(qū)動程序。
2、一系列工作節(jié)點,每一個作為執(zhí)行進程。
Spark編程模型
SparkContext
SparkContext(Java中的JavaSparkContext)是spark編寫程序的出發(fā)點,SparkContext由SparkConf對象的實例初始化,它包含了一系列spark集群的配置信息,比如主節(jié)點的URL。
一旦被初始化,我們可以使用一系列的SparkContext對象內(nèi)置的方法、操作分布式數(shù)據(jù)集和全局變量。
SparkShell
Spark shell負責(zé)上下文的初始化工作
本地模式下Scala創(chuàng)建上下文的例子:
val conf = new SparkConf().setAppNme('Test Spark App').setMaster('local[4]')
val sc = new SparkContext(conf)
這在本地模式下創(chuàng)建了一個4線程的上下文,名字是Test Spark App。
Spark Shell
在Spark根目錄下輸入:./bin/spark-shell就能啟動spark shell,如下圖:
如果想在spark中使用Python shell,那么輸入./bin/pyspark,如下圖:
Resilient Distributed Datasets(RDD,彈性分布式數(shù)據(jù)集)
RDD是一系列記錄的集合,嚴格來說,是某一類型的對象,以分布式或者分段的方式分布在集群的諸多節(jié)點上。
Spark中的RDD具有容錯性,如果一個節(jié)點或者任務(wù)(task)運行失敗了,比如硬件故障,通訊丟失等,除了不正確的操作,RDD能夠在剩下的節(jié)點上自動重建,將這個任務(wù)(job)完成。
創(chuàng)建RDD
RDD可以通過集合創(chuàng)建,如下:
val collection = List('a','b','c','d','e')
val rddFromCollection = sc.parallelize(collection)
RDD同樣可以通過基于的Hadoop輸入源創(chuàng)建,包括本地文件系統(tǒng),HDFS等。
基于Hadoop的RDD可以利用任何實現(xiàn)了Hadoop InputFormat接口的數(shù)據(jù)格式,包括文本文件,其他Hadoop標(biāo)準(zhǔn)格式,HBase,Cassandra等。從本地文本文件創(chuàng)建如下:
val rddFromTextFile = sc.textFile('LICENSE')
Spark操作
一旦我們創(chuàng)建了一個RDD,我們就得到了一個可操作的分布式數(shù)據(jù)集。在spark編程模式下,操作分為轉(zhuǎn)換(transformations)和動作(actions)。大體來說,轉(zhuǎn)換對數(shù)據(jù)集提供了一些轉(zhuǎn)變數(shù)據(jù)的方法。動作則會進行一些計算或者聚合,然后把結(jié)果返回到SparkContext運行的驅(qū)動程序中。
Spark中最常見的操作是map,將輸入映射成另一種形式的輸出,如下:
val intsFromStringRDD = rddFromTextFile.map(line => line.size)
=>的左邊是輸入,右邊是輸出。
通常情況下,除了多數(shù)動作(actions)外,spark操作會返回一個新的RDD,所以我們可以把操作串起來,這樣可以使得程序簡單明了,比如:
val aveLengthOfRecordChained = rddFromTextFile.map(line => line.size).
sum / rddFromTextFile.count
Spark的轉(zhuǎn)換是lazy模式,在調(diào)用一個轉(zhuǎn)換方法的時候并不會立即觸發(fā)計算,而是將轉(zhuǎn)換操作串在一起,在動作(action)被調(diào)用的時候才觸發(fā)計算,這樣spark可以更高效的返回結(jié)果給驅(qū)動程序,所以大多數(shù)操作都是以并行的方式在集群上運行。
這意味著,在spark程序中如果沒有調(diào)用action,那么它永遠不會觸發(fā)實際的操作,也不會返回任何結(jié)果。
緩存RDD
Spark一個非常強大的功能是能夠在集群中將數(shù)據(jù)緩存在內(nèi)存中,可以通過調(diào)用cache方法來實現(xiàn)。
調(diào)用cache方法會告訴spark要把RDD保存在內(nèi)存中,第一次調(diào)用action的時候,會初始化計算,從數(shù)據(jù)源讀取數(shù)據(jù)并將它存入內(nèi)存中。所以,這樣的操作第一次被調(diào)用的時候,所花費的時間大部分取決于從數(shù)據(jù)源讀取數(shù)據(jù)的時間。然后這部分數(shù)據(jù)第二次被訪問的時候,比如機器學(xué)習(xí)中分析、迭代所用到的查詢,著部分數(shù)據(jù)可以直接從內(nèi)存中讀取,因此避免了費時的I/O操作,提高了計算速度
廣播變量(broadcast variables)和累加器(accumulators)
另一個Spark的核心功能是可以創(chuàng)建兩種類型的變量:廣播變量和累加器。
廣播變量是只讀變量,讓SparkContext對象所在的驅(qū)動程序上的變量可以傳到節(jié)點上進行計算。
在需要有效地將通一個數(shù)據(jù)變量傳到其他工作節(jié)點(worker nodes)上的情況下,這很有用,比如機器學(xué)習(xí)算法。在spark中,創(chuàng)建一個廣播變量和在SparkContext中調(diào)用一個方法一樣簡單,如下:
val broadcastAList = sc.broadcast(List('a', 'b', 'c', 'd', 'e'))
累加器也是一種可以廣播給工作節(jié)點的變量,與廣播變量不同的是廣播變量是只讀變量,而累加器可以在上面添加,這會有局限性:添加操作必須是聯(lián)合操作,這樣全局累加值可以正確地并行計算然后返回給驅(qū)動程序。每個工作節(jié)點只能訪問并且添加它自己本地的累加器變量,并且只有驅(qū)動程序可以訪問全局變量。
本文轉(zhuǎn)載自CSDN博主lxytsos的專欄文章,博客地址:http://blog.csdn.net/lxytsos/
聯(lián)系客服