九色国产,午夜在线视频,新黄色网址,九九色综合,天天做夜夜做久久做狠狠,天天躁夜夜躁狠狠躁2021a,久久不卡一区二区三区

打開APP
userphoto
未登錄

開通VIP,暢享免費電子書等14項超值服

開通VIP
如何使用Redis流和Apache Spark處理實時數(shù)據(jù)?
AI 前線導(dǎo)讀:將 Redis 流作為流數(shù)據(jù)庫,Apache Spark 作為數(shù)據(jù)處理引擎,兩者怎樣共同部署才能做到最佳搭配?

本文要點
  • Apache Spark 的流框架(Structured Streaming)為數(shù)據(jù)流帶來了 SQL 查詢功能,讓用戶可以實時、可擴展地處理數(shù)據(jù)。

  • Redis 流(Redis Stream)是 Redis 5.0 新引入的數(shù)據(jù)結(jié)構(gòu),能夠以亞毫秒級的延遲高速收集、保存和分發(fā)數(shù)據(jù)。

  • 用戶集成 Redis 流和流框架后就能簡化連續(xù)應(yīng)用程序(continuous application)的擴展工作。

  • 開源的 Spark-Redis 庫將 Apache Spark 與 Redis 連接起來。該庫為 Redis 數(shù)據(jù)結(jié)構(gòu)提供 RDD 和數(shù)據(jù)幀 API,使用戶可以將 Redis 流用作流框架的數(shù)據(jù)源。

流框架是 Apache Spark 2.0 新引入的一項功能,在業(yè)界和數(shù)據(jù)工程社區(qū)中引起了很大關(guān)注。流框架 API 構(gòu)建于 Spark SQL 引擎之上,為流數(shù)據(jù)提供類似 SQL 的界面。

早期的 Apache Spark 以微批處理方式處理流框架查詢,延遲大約為 100 毫秒。

去年的 2.3 版本引入了低延遲(1 毫秒)的“連續(xù)處理”,進一步推動了流框架的應(yīng)用。

為了讓 Spark 保持高速的連續(xù)處理狀態(tài),你需要使用像 Redis 這樣的高速流數(shù)據(jù)庫來支持它。

Redis 開源內(nèi)存數(shù)據(jù)庫以其高速度和亞毫秒級延遲聞名于世。最近 Redis 5.0 新推出了一種名為 Redis 流的數(shù)據(jù)結(jié)構(gòu),使 Redis 能夠在多個生產(chǎn)者和消費者之間消費、保存和分發(fā)流數(shù)據(jù)。

現(xiàn)在的問題是,將 Redis 流作為流數(shù)據(jù)庫,Apache Spark 作為數(shù)據(jù)處理引擎,兩者共同部署,怎樣才能做到最佳搭配?

用 Scala 編寫的 Spark-Redis 庫就集成了 Apache Spark 和 Redis,使用它可以: 

  • 在 Redis 中以 RDD 的形式讀寫數(shù)據(jù)

  • 在 Redis 中以數(shù)據(jù)幀的形式讀寫數(shù)據(jù)(例如,它允許將 Spark SQL 表映射到 Redis 數(shù)據(jù)結(jié)構(gòu))

  • 使用 Redis 流作為流框架的數(shù)據(jù)源

  • 在流框架之后將 Redis 實現(xiàn)為接收器

本文中我將介紹一個真實場景,并指導(dǎo)你如何使用 Redis 和 Apache Spark 實時處理流數(shù)據(jù)。

模擬場景:計算實時點擊

假設(shè)我們是一家廣告公司,在熱門網(wǎng)站上投放廣告。我們根據(jù)社交媒體上的熱門圖片制作包含流行話題梗的動圖,并將其作為廣告投放出去。為了最大化利潤,我們必須識別出能獲得病毒式傳播或贏得更多點擊次數(shù)的資產(chǎn),這樣就能加大它們的投放力度了。

我們的大部分資產(chǎn)傳播期很短,所以能實時處理點擊的話,我們就能快速生成傳播趨勢圖,這對業(yè)務(wù)至關(guān)重要。我們理想中的流數(shù)據(jù)解決方案必須記錄所有廣告點擊并實時處理,然后計算每項資產(chǎn)的實時點擊次數(shù)。以下是設(shè)計思路:

輸入

對于每次點擊,我們的數(shù)據(jù)提取方案(圖 1 中的方框 1)將資產(chǎn) ID 和廣告費用放在 Redis 流中: 

XADD clicks * asset [asset id] cost [actual cost]

例如: 

XADD clicks * asset aksh1hf98qw7tt9q7 cost 29
輸出

在圖 1 中的方框 2 部分處理數(shù)據(jù)之后,我們的結(jié)果會存儲在數(shù)據(jù)存儲區(qū)中。數(shù)據(jù)查詢方案(圖 1 中的方框 3)為數(shù)據(jù)提供了一個 SQL 接口,我們可以用它查詢最近幾分鐘的最高點擊次數(shù): 

select asset, count from clicks order by count desc

asset count
----------------- -----
aksh1hf98qw7tt9q7 2392
i2dfb8fg023714ins 2010
jsg82t8jasvdh2389 1938
構(gòu)建解決方案

現(xiàn)在我們已經(jīng)定義好了業(yè)務(wù)需求,接下來探討如何使用 Redis 5.0 和 Apache Spark 2.4 構(gòu)建其解決方案。在本文中我用的是 Scala 編程語言,但你也可以在 Java 或 Python 中使用 Spark-Redis 庫。

這張流程圖看起來非常簡單:首先系統(tǒng)將數(shù)據(jù)提取到 Redis 流,然后 Redis 流將數(shù)據(jù)作為 Spark 進程消費,并將結(jié)果聚合傳回 Redis,最后使用 Spark-SQL 接口在 Redis 中查詢結(jié)果。 

  1. 數(shù)據(jù)提取:我選擇用 Redis 流提取數(shù)據(jù),因為它是 Redis 中的內(nèi)置數(shù)據(jù)結(jié)構(gòu),每秒可處理超過一百萬次讀寫操作。此外它還可以根據(jù)時間自動對數(shù)據(jù)排序,并支持簡化數(shù)據(jù)讀取方式的消費者組。Spark-Redis 庫支持將 Redis 流作為數(shù)據(jù)源,因此它完全符合我們對流式數(shù)據(jù)庫使用 Apache Spark 引擎的需求。 

  2. 數(shù)據(jù)處理:Apache Spark 中的流框架 API 是我們處理數(shù)據(jù)的絕佳選擇,而 Spark-Redis 庫使我們能夠?qū)⒌竭_ Redis 流的數(shù)據(jù)轉(zhuǎn)換為數(shù)據(jù)幀。使用流框架時,我們可以用微批處理或 Spark 的連續(xù)處理模式運行查詢。我們還可以開發(fā)一個自定義的“編寫器”來將數(shù)據(jù)寫入指定目的地。如圖 2 所示,我們將使用哈希數(shù)據(jù)結(jié)構(gòu)將輸出寫入 Redis。 

  3. 數(shù)據(jù)查詢:Spark-Redis 庫允許你將本機 Redis 數(shù)據(jù)結(jié)構(gòu)映射為數(shù)據(jù)幀。我們可以聲明一個將列映射到哈希數(shù)據(jù)結(jié)構(gòu)特定鍵的“臨時表”,并且由于 Redis 的速度非???,延遲在亞毫秒級別,我們可以使用 Spark-SQL 獲得實時查詢能力。

之后我將逐個介紹如何開發(fā)并運行解決方案的各個組件。在那之前,我們先用適當?shù)墓ぞ邅沓跏蓟_發(fā)環(huán)境。

尋找合適的開發(fā)工具

在我們的示例中,我們將使用 Homebrew 包管理器在 macOS 上下載和安裝軟件,你也可以根據(jù)你操作系統(tǒng)的情況選擇其他包管理器。 

  1. Redis 5.0或更高版本: 首先,我們需要在環(huán)境中下載并安裝 Redis 5.x。舊版本的 Redis 不支持 Redis 流。

在 Homebrew 上,我們用下面的命令安裝并啟動 Redis 5.0: 

$ brew install Redis
$ brew services start Redis

如果你用的還是舊版 Redis,可以用下面的命令升級它: 

$ brew upgrade Redis
  1. Apacke Spark 2.3或更高版本: 接下來我們從官方網(wǎng)站下載并安裝 Apache Spark,或者使用 Homebrew 安裝:

    $ brew install apache-spark
  2. Scala 2.12.8或更高版本:Scala 也是一樣的操作:

    $ brew install scala
  3. Apache Maven:我們需要用 Maven 來構(gòu)建 Spark-Redis 庫。

    $ brew install maven
  4. JDK 1.8或更高版本:我們可以使用下面的命令從甲骨文網(wǎng)站或 Homebrew 下載并安裝這個 JDK。對于最新版本的 JDK,我們需要用 java 替換 java8。

    $ brew cask install java8
  5. Spark-Redis:這是我們解決方案的核心部分,這里從 GitHub 下載庫并構(gòu)建軟件包,如下所示:

    $ git clone https://github.com/RedisLabs/spark-redis.git
    $ cd spark-redis
    $ mvn clean package -DskipTests

這會在./target/ 目錄下加入 spark-redis-。在我的設(shè)置中這個文件是 spark-redis-2.3.1-SNAPSHOT-jar-with-dependencies.jar

  1. SBT 1.2.8或更高版本:SBT 是一個 Scala 構(gòu)建工具,可簡化管理和構(gòu)建 Scala 文件的工作。

    $ brew install sbt
  2. 開發(fā)環(huán)境:最后該設(shè)置文件夾結(jié)構(gòu)并構(gòu)建文件了。本示例中我們將把程序代碼放在“scala”目錄下。

    $ mkdir scala
    $ cd ./scala

使用以下內(nèi)容創(chuàng)建一個新文件 build.sbt: 

name := 'RedisExample'

version := '1.0'

scalaVersion := '2.12.8' 

val sparkVersion = '2.4.0'

libraryDependencies ++= Seq(
 'org.apache.spark' %% 'spark-core' % sparkVersion,
 'org.apache.spark' %% 'spark-sql' % sparkVersion,
 'org.apache.spark' %% 'spark-catalyst' % sparkVersion
)

初始化目錄。用以下命令初始化包目錄: 

$ mkdir ./src/main/scala/
$ mkdir ./lib
$ sbt package

spark-redis-復(fù)制到 lib 目錄。

構(gòu)建我們的點擊計數(shù)解決方案

如架構(gòu)部分所述,我們的解決方案包含三個部分:數(shù)據(jù)提取組件、Spark 引擎內(nèi)的數(shù)據(jù)處理器和數(shù)據(jù)查詢接口。在本節(jié)中我將詳細說明這三個部分并組合出一個有效的解決方案。

  1. 提取 Redis 流

Redis 流是一種僅附加數(shù)據(jù)結(jié)構(gòu)。假設(shè) Apache Spark 的連續(xù)處理單元將消費這些數(shù)據(jù),我們可以將消息數(shù)限制為一百萬。稍微修改一下前面提到的命令: 

XADD clicks MAXLEN ~ 1000000 * asset aksh1hf98qw7tt9q7 cost 29

大多數(shù)流行的 Redis 客戶端都支持 Redis 流,因此根據(jù)你的編程語言,你可以選擇適用 Python 的 redis-py、適用 Java 的 Jedis 或 Lettuce、適用 Node.js 的 node-redis 等等。

  1. 數(shù)據(jù)處理

這一部分分為三個小節(jié):

  • 從 Redis 流讀取和處理數(shù)據(jù)

  • 將結(jié)果存儲在 Redis 中

  • 運行程序

    1. 從 Redis 流讀取數(shù)據(jù)

要在 Spark 中從 Redis 流讀取數(shù)據(jù),我們需要明白怎樣連接到 Redis,以及 Redis 流中數(shù)據(jù)的 Schema 結(jié)構(gòu)。

為了連接到 Redis,我們必須為 Redis 創(chuàng)建一個帶有連接參數(shù)的新 Spark 會話(SparkSession): 

val spark = SparkSession
.builder()
.appName('redis-example')
.master('local[*]')
.config('spark.redis.host', 'localhost')
.config('spark.redis.port', '6379')
.getOrCreate()

設(shè)置 Schema 結(jié)構(gòu)時,我們用“clicks”命名流,并為“stream.keys”設(shè)置一個“clicks”的選項。由于每個流元素都包含一項資產(chǎn)以及與之相關(guān)的成本,因此我們將創(chuàng)建一個包含兩個 StructField 的數(shù)組的 StructType——一個用于“asset”,另一個用于“cost”,如下所示: 

val clicks = spark
    .readStream
    .format('redis')
    .option('stream.keys','clicks')
    .schema(StructType(Array(
    StructField('asset', StringType),
    StructField('cost', LongType)
    )))
    .load()

在第一個程序中我們對每個資產(chǎn)的點擊次數(shù)感興趣。為此創(chuàng)建一個數(shù)據(jù)幀,其中包含按資產(chǎn)計數(shù)分組的數(shù)據(jù): 

val byasset = clicks.groupBy('asset').count

最后一步是啟動流框架查詢: 

val query = byasset
    .writeStream
    .outputMode('update')
    .foreach(clickWriter)
    .start()

注意這里我們使用自己的 ForeachWriter 將結(jié)果寫回 Redis。如果要將輸出轉(zhuǎn)到控制臺,可以將查詢寫成: 

val query = byasset
    .writeStream
    .outputMode('update')
    .format('console')
    .start()

對于 連續(xù)處理 而言,我們希望在查詢中添加'trigger'命令:.trigger(Trigger.Continuous('1 second'))。trigger 命令不適用于聚合查詢,因此我們無法把它插入這個示例。

下面是完整的程序代碼。它會從 Redis 流讀取新的點擊數(shù)據(jù)并使用 Spark 的流框架 API 處理。如果你想在自己的環(huán)境中嘗試,請將程序保存在 src/main/scala 下,命名為 ClickAnalysis.scala。(如果你的 Redis 服務(wù)器不是在端口 6379 上本地運行的,請根據(jù)具體情況設(shè)置連接參數(shù)。) 

// Program: ClickAnalysis.scala
//
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import com.redislabs.provider.redis._

object ClickAnalysis {
def main(args: Array[String]): Unit = {
    val spark = SparkSession
    .builder()
    .appName('redis-example')
    .master('local[*]')
    .config('spark.redis.host', 'localhost')
    .config('spark.redis.port', '6379')
    .getOrCreate()

val clicks = spark
    .readStream
    .format('redis')
    .option('stream.keys','clicks')
    .schema(StructType(Array(
    StructField('asset', StringType),
    StructField('cost', LongType)
    )))
    .load()
val byasset = clicks.groupBy('asset').count

val clickWriter : ClickForeachWriter =
    new ClickForeachWriter('localhost','6379')

val query = byasset
    .writeStream
    .outputMode('update')
    .foreach(clickWriter)
    .start()

query.awaitTermination()

} // End main
} //End object
  1. 將結(jié)果存儲在 Redis 中

為了將結(jié)果寫回 Redis,我們可以開發(fā)一個名為 ClickForeachWriter 的自定義 ForeachWriter。它會擴展 ForeachWriter,并使用 Redis 的 Java 客戶端 Jedis 連接到 Redis 上。下面是完整的程序代碼,保存為 ClickForeachWriter.scala: 

// Program: ClickForeachWriter.scala
//
import org.apache.spark.sql.ForeachWriter
import org.apache.spark.sql.Row
import redis.clients.jedis.Jedis

class ClickForeachWriter(p_host: String, p_port: String) extends 
ForeachWriter[Row]{

val host: String = p_host
val port: String = p_port

var jedis: Jedis = _

def connect() = {
    jedis = new Jedis(host, port.toInt)
}

    override def open(partitionId: Long, version: Long):
Boolean = {
    return true
}

override def process(record: Row) = {
    var asset = record.getString(0);
    var count = record.getLong(1);
    if(jedis == null){
    connect()
}

jedis.hset('click:'+asset, 'asset', asset)
jedis.hset('click:'+asset, 'count', count.toString)
jedis.expire('click:'+asset, 300)
    }

override def close(errorOrNull: Throwable) = {
    }
}

在這部分程序中有一點需要注意:它將結(jié)果存儲在哈希數(shù)據(jù)結(jié)構(gòu)中,其鍵遵循語法“click:

  1. 運行程序

在我們運行之前首先需要編譯程序。轉(zhuǎn)到主目錄(我們存儲 build.sbt 的目錄)運行命令: 

$ sbt package

我們的程序應(yīng)該能順利編譯通過,沒有錯誤。如果出現(xiàn)了錯誤,請修復(fù)它們并重新運行 sbt 包。編譯完成后,在同一目錄中運行以下命令來啟動程序: 

spark-submit --class ClickAnalysis --jars 
./lib/spark-redis-2.3.1-SNAPSHOT-jar-with-dependencies.jar 
--master local[*] ./target/scala-2.12/redisexample_2.12-1.0.jar

如果你不喜歡調(diào)試消息,可以停止程序(按 ctrl 加 c)并編輯 /usr/local/Cellar/apache-spark/2.4.0/libexec/conf/(或 log4j.properties 文件存儲的目錄)下的 log4j.properties,并將 log4j.rootCategory 更改為 WARN,如下所示: 

log4j.rootCategory=WARN, console

該程序?qū)⒆詣訌?Redis 流中提取消息。如果 Redis 流中沒有消息,它將異步偵聽新消息。我們可以在新的控制臺中啟動 redis-cli 并向 Redis 流添加一條消息,以測試它是否在正常消費消息: 

$ redis-cli
redis-cli> XADD clicks * asset test cost 100

一切順利的話,我們應(yīng)該能在哈希數(shù)據(jù)結(jié)構(gòu)中讀取結(jié)果:

redis-cli> hgetall click:test
1) 'asset'
2) 'test'
3) 'count'
4) '1'
  1. 查詢數(shù)據(jù):將 Redis 數(shù)據(jù)讀取為數(shù)據(jù)幀

我們解決方案的最后一個組件實際上為 Redis 數(shù)據(jù)提供了一個 SQL 接口。通過 SQL 命令讀取數(shù)據(jù)又是一個兩步過程:首先,我們?yōu)?Redis 數(shù)據(jù)定義 SQL schema;其次,我們運行 SQL 命令。

但在此之前,我們需要從主目錄上在控制臺運行 spark-sql,如下所示: 

$ spark-sql --jars 
./lib/spark-redis-2.3.1-SNAPSHOT-jar-with-dependencies.jar

然后會轉(zhuǎn)到 spark-sql 提示符下: 

spark-sql>

現(xiàn)在我們要為 Redis 哈希數(shù)據(jù)結(jié)構(gòu)中存儲的數(shù)據(jù)定義 SQL schema。如前所述,我們將每個資產(chǎn)的數(shù)據(jù)存儲在由鍵:click:

spark-sql> CREATE TABLE IF NOT EXISTS clicks(asset STRING, count 
INT) USING org.apache.spark.sql.redis OPTIONS (table 'click')

此命令創(chuàng)建一個名為“clicks”的新表視圖。它使用 Spark-Redis 庫中指定的指令將“asset”和“count”列映射到哈希結(jié)構(gòu)中的對應(yīng)字段?,F(xiàn)在我們可以運行查詢: 

spark-sql> select * from clicks;
test 1
Time taken: 0.088 seconds, Fetched 1 row(s)

如果要以編程方式運行 SQL 查詢,請參閱 Apache Spark 提供的有關(guān)如何使用 ODBC/JDBC 驅(qū)動程序連接到 Spark 引擎的文檔。

我們的成果是什么?

在本文中,我演示了如何使用 Redis 流作為 Apache Spark 引擎的數(shù)據(jù)源,介紹了 Redis 流是怎樣為流框架用例提供支持的。我還展示了如何使用 Apache Spark 中的數(shù)據(jù)幀 API 讀取 Redis 數(shù)據(jù),并融合流框架和數(shù)據(jù)幀的理念說明了 Spark-Redis 庫可以實現(xiàn)的功能。

Redis 流簡化了高速收集和分發(fā)數(shù)據(jù)的任務(wù)。將其與 Apache Spark 中的流框架相結(jié)合,可以支持需要實時計算的各種解決方案,包括物聯(lián)網(wǎng)、欺詐檢測、人工智能和機器學(xué)習(xí)、實時分析等。

作者介紹

Roshan Kumar 是 Redis Labs 的高級產(chǎn)品經(jīng)理。他在軟件開發(fā)和技術(shù)領(lǐng)域的產(chǎn)品管理方面擁有豐富的經(jīng)驗。他曾在惠普公司和一些成功的硅谷創(chuàng)業(yè)公司工作。他擁有計算機科學(xué)學(xué)士學(xué)位和美國加利福尼亞州圣克拉拉大學(xué)的 MBA 學(xué)位。

本站僅提供存儲服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊舉報
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
大數(shù)據(jù)需要用到的知識
用Spark做數(shù)據(jù)分析是怎樣一種體驗?
共筑Spark大數(shù)據(jù)引擎的七大工具
Apache 兩個開源項目比較:Flink vs Spark
【小白視角】大數(shù)據(jù)基礎(chǔ)實踐(七) Spark的基本操作
IBM專家深入淺出講解Spark2.0
更多類似文章 >>
生活服務(wù)
熱點新聞
分享 收藏 導(dǎo)長圖 關(guān)注 下載文章
綁定賬號成功
后續(xù)可登錄賬號暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點擊這里聯(lián)系客服!

聯(lián)系客服