首頁 資料庫 mysql教程 Spark Streaming编程指南

Spark Streaming编程指南

Jun 07, 2016 pm 04:36 PM
spark Streaming 指南 程式設計

Spark Streaming属于Spark的核心api,它支持高吞吐量、支持容错的实时流数据处理。 它可以接受来自Kafka, Flume, Twitter, ZeroMQ和TCP Socket的数据源,使用简单的api函数比如 map, reduce, join, window等操作,还可以直接使用内置的机器学习算法、图算法

Spark Streaming属于Spark的核心api,它支持高吞吐量、支持容错的实时流数据处理。

它可以接受来自Kafka, Flume, Twitter, ZeroMQ和TCP Socket的数据源,使用简单的api函数比如 map, reduce, join, window等操作,还可以直接使用内置的机器学习算法、图算法包来处理数据。

它的工作流程像下面的图所示一样,接受到实时数据后,给数据分批次,然后传给Spark Engine处理最后生成该批次的结果。

它支持的数据流叫Dstream,直接支持Kafka、Flume的数据源。Dstream是一种连续的RDDs,下面是一个例子帮助大家理解Dstream。

A Quick Example

// 创建StreamingContext,1秒一个批次
val ssc = new StreamingContext(sparkConf, Seconds(1));
// 穿件一个DStream来连接 监听端口:地址
val lines = ssc.socketTextStream(serverIP, serverPort);
// 对每一行数据执行Split操作
val words = lines.flatMap(_.split(" "));
// 统计word的数量
val pairs = words.map(word => (word, 1));
val wordCounts = pairs.reduceByKey(_ + _);
// 输出结果
wordCount.print();
ssc.start();             // 开始
ssc.awaitTermination();  // 计算完毕退出
登入後複製

具体的代码可以访问这个页面:

https://github.com/apache/incubator-spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala

如果已经装好Spark的朋友,我们可以通过下面的例子试试。

首先,启动Netcat,这个工具在Unix-like的系统都存在,是个简易的数据服务器。

使用下面这句命令来启动Netcat:

$ nc -lk 9999
接着启动example

$ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999
在Netcat这端输入hello world,看Spark这边的

复制代码

# TERMINAL 1:
# Running Netcat
$ nc -lk 9999
hello world
...
# TERMINAL 2: RUNNING NetworkWordCount or JavaNetworkWordCount
$ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...
登入後複製

Basics
下面这块是如何编写代码的啦,哇咔咔!

首先我们要在SBT或者Maven工程添加以下信息:

groupId = org.apache.spark
artifactId = spark-streaming_2.10
version = 0.9.0-incubating

//需要使用一下数据源的,还要添加相应的依赖
Source    Artifact
Kafka     spark-streaming-kafka_2.10
Flume     spark-streaming-flume_2.10
Twitter     spark-streaming-twitter_2.10
ZeroMQ     spark-streaming-zeromq_2.10
MQTT     spark-streaming-mqtt_2.10
登入後複製

接着就是实例化

new StreamingContext(master, appName, batchDuration, [sparkHome], [jars])
这是之前的例子对DStream的操作。

Input Sources
除了sockets之外,我们还可以这样创建Dstream

streamingContext.fileStream(dataDirectory)

这里有3个要点:

(1)dataDirectory下的文件格式都是一样

(2)在这个目录下创建文件都是通过移动或者重命名的方式创建的

(3)一旦文件进去之后就不能再改变

假设我们要创建一个Kafka的Dstream。

import org.apache.spark.streaming.kafka._
KafkaUtils.createStream(streamingContext, kafkaParams, …)

如果我们需要自定义流的receiver,可以查看https://spark.incubator.apache.org/docs/latest/streaming-custom-receivers.html

Operations
对于Dstream,我们可以进行两种操作,transformations 和 output

Transformations

Transformation                          Meaning
map(func)                        对每一个元素执行func方法
flatMap(func)                    类似map函数,但是可以map到0+个输出
filter(func)                     过滤
repartition(numPartitions)       增加分区,提高并行度     
union(otherStream)               合并两个流
count()                    统计元素的个数
reduce(func)                     对RDDs里面的元素进行聚合操作,2个输入参数,1个输出参数
countByValue()                   针对类型统计,当一个Dstream的元素的类型是K的时候,调用它会返回一个新的Dstream,包含键值对,Long是每个K出现的频率。
reduceByKey(func, [numTasks])    对于一个(K, V)类型的Dstream,为每个key,执行func函数,默认是local是2个线程,cluster是8个线程,也可以指定numTasks 
join(otherStream, [numTasks])    把(K, V)和(K, W)的Dstream连接成一个(K, (V, W))的新Dstream 
cogroup(otherStream, [numTasks]) 把(K, V)和(K, W)的Dstream连接成一个(K, Seq[V], Seq[W])的新Dstream 
transform(func)                  转换操作,把原来的RDD通过func转换成一个新的RDD
updateStateByKey(func)           针对key使用func来更新状态和值,可以将state该为任何值
登入後複製

UpdateStateByKey Operation
使用这个操作,我们是希望保存它状态的信息,然后持续的更新它,使用它有两个步骤:

(1)定义状态,这个状态可以是任意的数据类型

(2)定义状态更新函数,从前一个状态更改新的状态

下面展示一个例子:

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    val newCount = ...  // add the new values with the previous running count to get the new count
    Some(newCount)
}
登入後複製

它可以用在包含(word, 1) 的Dstream当中,比如前面展示的example

val runningCounts = pairs.updateStateByKey[Int](updateFunction _)

它会针对里面的每个word调用一下更新函数,newValues是最新的值,runningCount是之前的值。

Transform Operation
和transformWith一样,可以对一个Dstream进行RDD->RDD操作,比如我们要对Dstream流里的RDD和另外一个数据集进行join操作,但是Dstream的API没有直接暴露出来,我们就可以使用transform方法来进行这个操作,下面是例子:

val spamInfoRDD = sparkContext.hadoopFile(…) // RDD containing spam information

val cleanedDStream = inputDStream.transform(rdd => {
rdd.join(spamInfoRDD).filter(…) // join data stream with spam information to do data cleaning

})

另外,我们也可以在里面使用机器学习算法和图算法。

Window Operations

先举个例子吧,比如前面的word count的例子,我们想要每隔10秒计算一下最近30秒的单词总数。

我们可以使用以下语句:

// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))

这里面提到了windows的两个参数:

(1)window length:window的长度是30秒,最近30秒的数据

(2)slice interval:计算的时间间隔

通过这个例子,我们大概能够窗口的意思了,定期计算滑动的数据。

下面是window的一些操作函数,还是有点儿理解不了window的概念,Meaning就不翻译了,直接删掉

Transformation                                                                              Meaning
window(windowLength, slideInterval)     
countByWindow(windowLength, slideInterval)     
reduceByWindow(func, windowLength, slideInterval)     
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])     
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])    
countByValueAndWindow(windowLength, slideInterval, [numTasks])  
登入後複製

Output Operations
Output Operation Meaning
print() 打印到控制台
foreachRDD(func) 对Dstream里面的每个RDD执行func,保存到外部系统
saveAsObjectFiles(prefix, [suffix]) 保存流的内容为SequenceFile, 文件名 : “prefix-TIME_IN_MS[.suffix]“.
saveAsTextFiles(prefix, [suffix]) 保存流的内容为文本文件, 文件名 : “prefix-TIME_IN_MS[.suffix]“.
saveAsHadoopFiles(prefix, [suffix]) 保存流的内容为hadoop文件, 文件名 : “prefix-TIME_IN_MS[.suffix]“.

Persistence
Dstream中的RDD也可以调用persist()方法保存在内存当中,但是基于window和state的操作,reduceByWindow,reduceByKeyAndWindow,updateStateByKey它们就是隐式的保存了,系统已经帮它自动保存了。

从网络接收的数据(such as, Kafka, Flume, sockets, etc.),默认是保存在两个节点来实现容错性,以序列化的方式保存在内存当中。

RDD Checkpointing
状态的操作是基于多个批次的数据的。它包括基于window的操作和updateStateByKey。因为状态的操作要依赖于上一个批次的数据,所以它要根据时间,不断累积元数据。为了清空数据,它支持周期性的检查点,通过把中间结果保存到hdfs上。因为检查操作会导致保存到hdfs上的开销,所以设置这个时间间隔,要很慎重。对于小批次的数据,比如一秒的,检查操作会大大降低吞吐量。但是检查的间隔太长,会导致任务变大。通常来说,5-10秒的检查间隔时间是比较合适的。

ssc.checkpoint(hdfsPath) //设置检查点的保存位置
dstream.checkpoint(checkpointInterval) //设置检查点间隔

对于必须设置检查点的Dstream,比如通过updateStateByKey和reduceByKeyAndWindow创建的Dstream,默认设置是至少10秒。

Performance Tuning
对于调优,可以从两个方面考虑:

(1)利用集群资源,减少处理每个批次的数据的时间

(2)给每个批次的数据量的设定一个合适的大小

Level of Parallelism
像一些分布式的操作,比如reduceByKey和reduceByKeyAndWindow,默认的8个并发线程,可以通过对应的函数提高它的值,或者通过修改参数spark.default.parallelism来提高这个默认值。

Task Launching Overheads
通过进行的任务太多也不好,比如每秒50个,发送任务的负载就会变得很重要,很难实现压秒级的时延了,当然可以通过压缩来降低批次的大小。

Setting the Right Batch Size
要使流程序能在集群上稳定的运行,要使处理数据的速度跟上数据流入的速度。最好的方式计算这个批量的大小,我们首先设置batch size为5-10秒和一个很低的数据输入速度。确实系统能跟上数据的速度的时候,我们可以根据经验设置它的大小,通过查看日志看看Total delay的多长时间。如果delay的小于batch的,那么系统可以稳定,如果delay一直增加,说明系统的处理速度跟不上数据的输入速度。

24/7 Operation
Spark默认不会忘记元数据,比如生成的RDD,处理的stages,但是Spark Streaming是一个24/7的程序,它需要周期性的清理元数据,通过spark.cleaner.ttl来设置。比如我设置它为600,当超过10分钟的时候,Spark就会清楚所有元数据,然后持久化RDDs。但是这个属性要在SparkContext 创建之前设置。

但是这个值是和任何的window操作绑定。Spark会要求输入数据在过期之后必须持久化到内存当中,所以必须设置delay的值至少和最大的window操作一致,如果设置小了,就会报错。

Monitoring
除了Spark内置的监控能力,还可以StreamingListener这个接口来获取批处理的时间, 查询时延, 全部的端到端的试验。

Memory Tuning
Spark Stream默认的序列化方式是StorageLevel.MEMORY_ONLY_SER,而不是RDD的StorageLevel.MEMORY_ONLY。

默认的,所有持久化的RDD都会通过被Spark的LRU算法剔除出内存,如果设置了spark.cleaner.ttl,就会周期性的清理,但是这个参数设置要很谨慎。一个更好的方法是设置spark.streaming.unpersist为true,这就让Spark来计算哪些RDD需要持久化,这样有利于提高GC的表现。

推荐使用concurrent mark-and-sweep GC,虽然这样会降低系统的吞吐量,但是这样有助于更稳定的进行批处理。

Fault-tolerance Properties
Failure of a Worker Node
下面有两种失效的方式:

1.使用hdfs上的文件,因为hdfs是可靠的文件系统,所以不会有任何的数据失效。

2.如果数据来源是网络,比如Kafka和Flume,为了防止失效,默认是数据会保存到2个节点上,但是有一种可能性是接受数据的节点挂了,那么数据可能会丢失,因为它还没来得及把数据复制到另外一个节点。

Failure of the Driver Node
为了支持24/7不间断的处理,Spark支持驱动节点失效后,重新恢复计算。Spark Streaming会周期性的写数据到hdfs系统,就是前面的检查点的那个目录。驱动节点失效之后,StreamingContext可以被恢复的。

为了让一个Spark Streaming程序能够被回复,它需要做以下操作:

(1)第一次启动的时候,创建 StreamingContext,创建所有的streams,然后调用start()方法。

(2)恢复后重启的,必须通过检查点的数据重新创建StreamingContext。

下面是一个实际的例子:

通过StreamingContext.getOrCreate来构造StreamingContext,可以实现上面所说的。

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
    val ssc = new StreamingContext(...)   // new context
    val lines = ssc.socketTextStream(...) // create DStreams
    ...
    ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
    ssc
}
// Get StreaminContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...
// Start the context
context.start()
context.awaitTermination()
登入後複製

在stand-alone的部署模式下面,驱动节点失效了,也可以自动恢复,让别的驱动节点替代它。这个可以在本地进行测试,在提交的时候采用supervise模式,当提交了程序之后,使用jps查看进程,看到类似DriverWrapper就杀死它,如果是使用YARN模式的话就得使用其它方式来重新启动了。

这里顺便提一下向客户端提交程序吧,之前总结的时候把这块给落下了。

./bin/spark-class org.apache.spark.deploy.Client launch
   [client-options] \
      \
   [application-options]
cluster-url: master的地址.
application-jar-url: jar包的地址,最好是hdfs上的,带上hdfs://...否则要所有的节点的目录下都有这个jar的 
main-class: 要发布的程序的main函数所在类. 
Client Options: 
--memory  (驱动程序的内存,单位是MB) 
--cores  (为你的驱动程序分配多少个核心) 
--supervise (节点失效的时候,是否重新启动应用) 
--verbose (打印增量的日志输出)
登入後複製

在未来的版本,会支持所有的数据源的可恢复性。

为了更好的理解基于HDFS的驱动节点失效恢复,下面用一个简单的例子来说明:

Time     Number of lines in input file     Output without driver failure     Output with driver failure
1      10                     10                    10
2      20                     20                    20
3      30                     30                    30
4      40                     40                    [DRIVER FAILS] no output
5      50                     50                    no output
6      60                     60                    no output
7      70                     70                    [DRIVER RECOVERS] 40, 50, 60, 70
8      80                     80                    80
9      90                     90                    90
10     100                     100                   100
登入後複製

在4的时候出现了错误,40,50,60都没有输出,到70的时候恢复了,恢复之后把之前没输出的一下子全部输出。

本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱工具

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

強大的PHP整合開發環境

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

使用正規表示式去除 PHP 數組中的重複值 使用正規表示式去除 PHP 數組中的重複值 Apr 26, 2024 pm 04:33 PM

使用正規表示式從PHP數組中移除重複值的方法:使用正規表示式/(.*)(.+)/i匹配並取代重複項。遍歷數組元素,使用preg_match檢查匹配情況。如果匹配,請跳過值;否則,將其添加到無重複值的新數組中。

程式設計是乾啥的,學了有什麼用 程式設計是乾啥的,學了有什麼用 Apr 28, 2024 pm 01:34 PM

1、程式設計可用於開發各種軟體和應用程序,包括網站、手機應用程式、遊戲和數據分析工具等。它的應用領域非常廣泛,幾乎涵蓋了所有行業,包括科學研究、醫療保健、金融、教育、娛樂等。 2.學習程式設計可以幫助我們提升問題解決能力和邏輯思考能力。在程式設計過程中,我們需要分析和理解問題,找出解決方案,並將其轉換為程式碼。這種思維方式能夠培養我們的分析和抽象能力,提升我們解決實際問題的能力。

使用 Python 解決問題:作為初學者,解鎖強大的解決方案 使用 Python 解決問題:作為初學者,解鎖強大的解決方案 Oct 11, 2024 pm 08:58 PM

Python 讓初學者能夠解決問題。

C++ 程式設計謎題片段:激發思維,提升程式設計水平 C++ 程式設計謎題片段:激發思維,提升程式設計水平 Jun 01, 2024 pm 10:26 PM

C++程式設計謎題涵蓋斐波那契數列、階乘、漢明距離、陣列最大值和最小值等演算法和資料結構概念,透過解決這些謎題,可以鞏固C++知識,提升演算法理解和程式設計技巧。

釋放你內心的程式設計師:C 絕對初學者 釋放你內心的程式設計師:C 絕對初學者 Oct 11, 2024 pm 03:50 PM

C語言是初學者學習程式設計的理想選擇,其優點包括效率、多功能性和可移植性。學習C語言需要:安裝C編譯器(如MinGW或Cygwin)了解變數、資料型別、條件語句和迴圈語句編寫包含主函數和printf()函數的第一個程式透過實戰案例(如計算平均數)練習C語言知識

編碼的關鍵:為初學者釋放 Python 的力量 編碼的關鍵:為初學者釋放 Python 的力量 Oct 11, 2024 pm 12:17 PM

Python透過其易學性和​​強大功能,是初學者的理想程式設計入門語言。其基礎包括:變數:用於儲存資料(數字、字串、列表等)。資料型態:定義變數中資料的型態(整數、浮點數等)。運算符:用於數學運算和比較。控制流程:控製程式碼執行流程(條件語句、迴圈)。

Java函數比較的實用指南 Java函數比較的實用指南 Apr 19, 2024 pm 09:12 PM

在Java中,函數比較用於檢查兩個函數是否相等。相等條件:相同參數列表和函數主體。 Object類別的equals方法可用來比較函數相等性。實戰範例:使用equals方法比較兩個函數f1和f2,它們具有相同參數列表和函數主體,因此相等。其他注意事項:匿名函數和lambda表達式也可以比較。重載的函數不能透過equals方法進行比較。

Python 的力量,簡單:一種適合初學者的程式設計方法 Python 的力量,簡單:一種適合初學者的程式設計方法 Oct 11, 2024 pm 04:53 PM

Python程式設計入門安裝Python:從官方網站下載並安裝。 HelloWorld!:使用print("HelloWorld!")列印第一行程式碼。實戰案例:計算圓面積:使用π(3.14159)和半徑計算圓面積。變數和資料類型:使用變數儲存數據,Python中的資料類型包括整數、浮點數、字串和布林值。表達式與賦值:使用運算子將變數、常數和函數連接起來,並使用賦值運算子(=)將值賦給變數。控制流程:if-else語句:根據條件執行不同的程式碼區塊,確定奇

See all articles