目錄
使用Spark Streaming
(1). 创建StreamingContext
(2). 创建InputDStream
(3). 操作DStream
Output操作:
(4). 启动Spark Streaming
Window Operations
性能调优(摘自参考2)
优化运行时间
优化内存使用
参考
首頁 資料庫 mysql教程 Spark Note – Introduction to Streaming

Spark Note – Introduction to Streaming

Jun 07, 2016 pm 04:38 PM
Note spark stream

Spark Streaming基于Spark处理流式数据的框架,在MapReduce中,由于其分布式特性——所有数据需要读写磁盘、启动job耗时较大,难以满足时效性要求。而Streaming能够在Spark上生根发芽的原因是因为其内存特性、低延时的执行引擎和高速的执行效率。 Streaming

Spark Streaming基于Spark处理流式数据的框架,在MapReduce中,由于其分布式特性——所有数据需要读写磁盘、启动job耗时较大,难以满足时效性要求。而Streaming能够在Spark上生根发芽的原因是因为其内存特性、低延时的执行引擎和高速的执行效率。
Streaming的原理是将Stream数据分成小的时间间隔(比如几秒),即将其离散化(Discretized)并转换成一个一个数据集(RDD),然后分批处理处理这小的RDD。所以Streaming很容易很mlib,Spark SQL等进行结合,做到实时的数据分析处理。此外,Streaming也继承了RDD的容错特性。如果RDD 的某些 partition 丢失了 , 可以通过 lineage 信息重新计算恢复。
Streaming的数据源主要分下面两类:
· 外部文件系统 , 如 HDFS,Streaming可以监控一个目录中新产生的数据,并及时处理。如果出现fail,可以通过重新读取数据来恢复 , 绝对不会有数据丢失。
· 网络系统:如MQ系统(Kafka、ZeroMQ、Flume等)。Streaming会默认会在两个不同节点加载数据到内存 , 一个节点 fail 了 , 系统可以通过另一个节点的数据重算。假设正在运行 InputReceiver 的节点 fail 了 , 可能会丢失一部分数据。
streaming-arch

使用Spark Streaming

一个简单的基于Streaming的workCount代码如下:

/**
 * Created by Administrator on 2014/9/1.
 */
package com.debugo.example
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.SparkConf
object WordCountStreaming {
  def main(args: Array[String]): Unit ={
    val sparkConf = new SparkConf().setAppName("HDFSWordCount").setMaster("spark://172.19.1.232:7077")
    //create the streaming context
    val  ssc = new StreamingContext(sparkConf, Seconds(30))
    //process file when new file be found.
    val lines = ssc.textFileStream("file:///home/spark/data")
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
登入後複製

这段代码实现了当指定的路径有新文件生成时,就会对这些文件执行wordcount,并把结果print。具体流程如下:
wordcount

(1). 创建StreamingContext

使用Spark Streaming就需要创建StreamingContext对象(类似SparkContext)。创建StreamingContext对象所需的参数与SparkContext基本一致,包括设定Master节点(setMaster),设定应用名称(setAppName)。第二个参数Seconds(30),指定了Spark Streaming处理数据的时间间隔为30秒。需要根据具体应用需要和集群处理能力进行设置。

(2). 创建InputDStream

Spark Streaming它支持并使用的数据流叫Dstream,类似于Spark使用RDD,直接支持Kafka、Flume的数据源。Dstream是一种连续的RDDs。Spark Streaming需要指明数据源。如上例所示的textFileStream,Spark Streaming以文件系统作为输入流。这里有3个要点:
(1)dataDirectory下的文件格式都是一样
(2)在这个目录下创建文件都是通过移动或者重命名的方式创建的
(3)一旦文件进去之后就不能再改变
Spark Streaming支持多种不同数据源,它们对应的DStream同样不同(如kafkaStream,flumeStream, networkStream等)。

(3). 操作DStream

对于从数据源得到的DStream,用户可以在其基础上进行各种操作,如上例所示的操作就是一个典型的word count执行流程:对于当前时间窗口内从数据源得到的数据首先进行 flatMap,然后进行map和reduceByKey,并在最后使用print()输出。
Streaming中的Dstream支持两种操作:Transformation和output。Streaming中的Transformation和RDD的Transformation相类似,都是对其离散化数据集进行处理。

Transformation Meaning
window(windowLength,?slideInterval) Return a new DStream which is computed based on windowed batches of the source DStream.
countByWindow(windowLength,slideInterval) Return a sliding window count of elements in the stream.
reduceByWindow(func,?windowLength,slideInterval) Return a new single-element stream, created by aggregating elements in the stream over a sliding interval using?func. The function should be associative so that it can be computed correctly in parallel.
reduceByKeyAndWindow(func,windowLength,?slideInterval, [numTasks]) When called on a DStream of (K, V) pairs, returns a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function?func?over batches in a sliding window.?Note:?By default, this uses Spark’s default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config propertyspark.default.parallelism) to do the grouping. You can pass an optional?numTasks?argument to set a different number of tasks.
reduceByKeyAndWindow(func,?invFunc,windowLength,?slideInterval, [numTasks]) A more efficient version of the above?reduceByKeyAndWindow()?where the reduce value of each window is calculated incrementally using the reduce values of the previous window. This is done by reducing the new data that enter the sliding window, and “inverse reducing” the old data that leave the window. An example would be that of “adding” and “subtracting” counts of keys as the window slides. However, it is applicable to only “invertible reduce functions”, that is, those reduce functions which have a corresponding “inverse reduce” function (taken as parameterinvFunc. Like in?reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument.
countByValueAndWindow(windowLength,slideInterval, [numTasks]) When called on a DStream of (K, V) pairs, returns a new DStream of (K, Long) pairs where the value of each key is its frequency within a sliding window. Like in?reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument.

Output操作:

Output Operation Meaning
print() Prints first ten elements of every batch of data in a DStream on the driver.
foreachRDD(func) The fundamental output operator. Applies a function,?func, to each RDD generated from the stream. This function should have side effects, such as printing output, saving the RDD to external files, or writing it over the network to an external system.
saveAsObjectFiles(prefix, [suffix]) Save this DStream’s contents as a?SequenceFile?of serialized objects. The file name at each batch interval is generated based on?prefix?and?suffix:?“prefix-TIME_IN_MS[.suffix]“.
saveAsTextFiles(prefix, [suffix]) Save this DStream’s contents as a text files. The file name at each batch interval is generated based on?prefix?and?suffix:?“prefix-TIME_IN_MS[.suffix]“.
saveAsHadoopFiles(prefix, [suffix]) Save this DStream’s contents as a Hadoop file. The file name at each batch interval is generated based on?prefix?and?suffix:?“prefix-TIME_IN_MS[.suffix]“.

(4). 启动Spark Streaming

上述只是定义好了应用拓扑结构,当启动Spark Streaming (ssc.start())时,当ssc.start()启动后程序才真正进行所有预期的操作。才会按既定的interval来执行这一系列job。
运行这个spark-streaming代码:

spark-submit --master spark://debugo:7077 --class MyExample.WordCountStreaming spark0_2.10-1.0.jar
......
(discretized,1)
(stream,3)
(created,1)
(are,1)
(this,2)
(Kafka,1)
(is,1)
(provides,1)
(guide,2)
(Internally,,1)
...
14/09/03 10:37:31 INFO MapPartitionsRDD: Removing RDD 20 from persistence list
14/09/03 10:37:31 INFO BlockManager: Removing RDD 20
14/09/03 10:37:31 INFO MappedRDD: Removing RDD 17 from persistence list
14/09/03 10:37:31 INFO BlockManager: Removing RDD 17
14/09/03 10:37:31 INFO FlatMappedRDD: Removing RDD 16 from persistence list
14/09/03 10:37:31 INFO BlockManager: Removing RDD 16
14/09/03 10:37:31 INFO MappedRDD: Removing RDD 15 from persistence list
14/09/03 10:37:31 INFO BlockManager: Removing RDD 15
14/09/03 10:37:31 INFO UnionRDD: Removing RDD 14 from persistence list
14/09/03 10:37:31 INFO BlockManager: Removing RDD 14
登入後複製

stateful操作
上面的例子是一种最简单的应用场景——无状态更新。大多数时候,这种流式处理模型很难满足需求——比如我们需要累计统计单词的数量,或者计算一个信号的频域参数(需要以滑动窗口的方式处理一个间隔的数据),这就要使用另外两种应用场景——状态操作和窗口操作。状态(stateful)操作中,主要使用到下面一个transformation方法:
UpdateStateByKey
我们希望保存它状态的信息时,使用这个操作,并持续更新状态数值。使用它有两个步骤:
(1)定义状态(state),这个状态可以是任意的数据类型
(2)定义状态更新函数,从前一个状态更改新的状态
下面展示一个例子:
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = … // add the new values with the previous running count to get the new count Some(newCount)
}
它会针对里面的每个元素(如wordCount中的word)调用一下更新函数,newValues是最新的值,runningCount是之前的值。
下面是一个stateful的wordCount实例(修改自rg.apache.spark.examples.streaming.kafkaWordCount):

/**
* Created by debugo on 2014/9/4.
*/
package com.debugo.example
import java.util.Properties
import kafka.producer._
object WordCountProducer {
def main(args: Array[String]) {
  if (args.length 
  val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString).mkString(" ")
  new KeyedMessage[String, String](topic, str)}.toArray
  producer.send(messages: _*)
  Thread.sleep(100)
  }
}
}
/**
* Created by debugo on 2014/9/3.
*/
package com.debugo.example
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
object WordCountStateful {
  def main(args: Array[String]): Unit = {
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.foldLeft(0)(_ + _)
      val previousCount = state.getOrElse(0) //None to 0
      Some(currentCount + previousCount)
    }
    if (args.length  (x, 1))
    //using updateStateByKey to update state.
    val stateDstream = wordCounts.updateStateByKey[Int](updateFunc)
    stateDstream.print()
    ssc.start()
    ssc.awaitTermination()
}
}
登入後複製

完成后compile&package。下面是启动kafka服务的流程。
下载并配置kafka

wget http://mirrors.hust.edu.cn/apache/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz
tar xzvf kafka*.tgz
vim conf/server.properties
broker.id=0        # [0-2] for hdp0[2-4]
host.name=hdp03
advertised.host.name=hdp03
zookeeper.connect=hdp02:2181,hdp03:2181,hdp04:2181
登入後複製

此时,zookeeper服务已经在hdp02,hdp03,hdp04上启动。然后在hdp02,hdp03,hdp04上分别启动kafka服务

/root/kafka_2.10-0.8.1.1/bin/kafka-server-start.sh /root/kafka_2.10-0.8.1.1/config/server.properties
[2014-09-04 15:15:39,784] INFO Opening socket connection to server hdp03/172.19.17.233:2181 (org.apache.zookeeper.ClientCnxn)
[2014-09-04 15:15:39,795] INFO Socket connection established to hdp03/172.19.17.233:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2014-09-04 15:15:39,819] INFO Session establishment complete on server hdp03/172.19.17.233:2181, sessionid = 0x3483b071fa006a7, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2014-09-04 15:15:39,825] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
登入後複製

新建一个TOPIC(replication-factor=num of brokers)

/root/kafka_2.10-0.8.1.1/bin/kafka-topics.sh --create --topic test --replication-factor 2 --partitions 2 --zookeeper hdp03:2181
Created topic "test".
登入後複製

console producer测试

/root/kafka_2.10-0.8.1.1/bin/kafka-console-producer.sh --broker-list hdp03:9092 --sync --topic test
登入後複製

输入一系列文本消息:

Hello Kafka
Hi Kafka
登入後複製

console consumer测试

/root/kafka_2.10-0.8.1.1/bin/kafka-console-consumer.sh --zookeeper hdp03:2181 --topic test --from-beginning 
Hello Kafka
Hi Kafka
登入後複製

OK, kafka一切就绪。
运行example的producer

$SPARK_HOME/bin/run-example org.apache.spark.examples.streaming.KafkaWordCountProducer hdp02:9092 test 3 5
登入後複製

再submit刚刚编译的jar,这里需要添加kafka和streaming-kafka的jar

spark-submit --master=spark://hdp02:7077 --class=com.debugo.example.WordCountStateful --jars=/usshare/java/spark-streaming-kafka_2.10-1.0.2.jar,/usr/share/java/kafka_2.10-0.8.0.jar spark0_2.10-1.0.jar hdp02:2181,hdp03:2181,hdp04:2181 test-consumer-group test 1
登入後複製

这样就得到了累计统计的效果。需要注意的是,consumer-group要和配置文件config/consumer.properties中的配置一致。

Window Operations

前面的word count的例子,当我们想要每隔10秒计算一下最近30秒的单词总数。可以使用reduceByKeyAndWindow方法

// Reduce last 30 seconds of data, every 10 secondsval 
windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))
登入後複製

这里面提到了windows的两个参数:
(1)window length:window的长度是30秒,最近30秒的数据
(2)slice interval:计算的时间间隔
通过这个例子,我们大概能够窗口的意思了,定期计算滑动的数据。
除此之外,Window操作还有下面操作:
 

Transformation Meaning
window(windowLength,?slideInterval) Return a new DStream which is computed based on windowed batches of the source DStream.
countByWindow(windowLength,slideInterval) Return a sliding window count of elements in the stream.
reduceByWindow(func,?windowLength,slideInterval) Return a new single-element stream, created by aggregating elements in the stream over a sliding interval using?func. The function should be associative so that it can be computed correctly in parallel.
reduceByKeyAndWindow(func,windowLength,?slideInterval, [numTasks]) When called on a DStream of (K, V) pairs, returns a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function?func?over batches in a sliding window.?Note:?By default, this uses Spark’s default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config propertyspark.default.parallelism) to do the grouping. You can pass an optional?numTasks?argument to set a different number of tasks.
reduceByKeyAndWindow(func,?invFunc,windowLength,?slideInterval, [numTasks]) A more efficient version of the above?reduceByKeyAndWindow()?where the reduce value of each window is calculated incrementally using the reduce values of the previous window. This is done by reducing the new data that enter the sliding window, and “inverse reducing” the old data that leave the window. An example would be that of “adding” and “subtracting” counts of keys as the window slides. However, it is applicable to only “invertible reduce functions”, that is, those reduce functions which have a corresponding “inverse reduce” function (taken as parameterinvFunc. Like in?reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument.
countByValueAndWindow(windowLength,slideInterval, [numTasks]) When called on a DStream of (K, V) pairs, returns a new DStream of (K, Long) pairs where the value of each key is its frequency within a sliding window. Like in?reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument.

性能调优(摘自参考2)

优化运行时间

增加并行度。确保使用整个集群的资源,而不是把任务集中在几个特定的节点上。对于包含shuffle的操作,增加其并行度以确保更为充分地使用集群资源。
减少数据序列化、反序列化的负担。Spark Streaming默认将接收到的数据序列化后存储以减少内存的使用。但序列化和反序列化需要更多的CPU时间,因此更加高效的序列化方式(Kryo)和自定义的序列化接口可以更高效地使用CPU。
设置合理的batch窗口。在Spark Streaming中,Job之间有可能存在着依赖关系,后面的Job必须确保前面的Job执行结束后才能提交。若前面的Job执行时间超出了设置的batch窗口,那么后面的Job就无法按时提交,这样就会进一步拖延接下来的Job,造成后续Job的阻塞。因此,设置一个合理的batch窗口确保Job能够在这个batch窗口中结束是必须的。
减少任务提交和分发所带来的负担。通常情况下Akka框架能够高效地确保任务及时分发,但当batch窗口非常小(500ms)时,提交和分发任务的延迟就变得不可接受了。使用Standalone模式和Coarse-grained Mesos模式通常会比使用Fine-Grained Mesos模式有更小的延迟。

优化内存使用

控制batch size。Spark Streaming会把batch窗口内接收到的所有数据存放在Spark内部的可用内存区域中,因此必须确保当前节点Spark的可用内存至少能够容纳这个batch窗口内所有的数据,否则必须增加新的资源以提高集群的处理能力。
及时清理不再使用的数据。上面说到Spark Streaming会将接收到的数据全部存储于内部的可用内存区域中,因此对于处理过的不再需要的数据应及时清理以确保Spark Streaming有富余的可用内存空间。通过设置合理的spark.cleaner.ttl时长来及时清理超时的无用数据。
观察及适当调整GC策略。GC会影响Job的正常运行,延长Job的执行时间,引起一系列不可预料的问题。观察GC的运行情况,采取不同的GC策略以进一步减小内存回收对Job运行的影响。

参考

http://spark.apache.org/docs/latest/streaming-programming-guide.html

http://www.csdn.net/article/2014-01-28/2818282-Spark-Streaming-big-data

http://jerryshao.me/architecture/2013/04/02/spark-streaming-introduction/

本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱工具

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

強大的PHP整合開發環境

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

IntelliJ IDEA中如何調試Java Stream操作 IntelliJ IDEA中如何調試Java Stream操作 May 09, 2023 am 11:25 AM

Stream作業是Java8推出的一大亮點!雖然java.util.stream很強大,但還是有很多開發者在實際工作中很少使用,其中吐槽最多的一個原因就是不好調試,一開始確實是這樣,因為stream這樣的流式操作在DEBUG的時候,是一行程式碼,直接下一步的時候,其實一下就過了好多操作,這樣我們就很難判斷到底是裡面的哪一行出了問題。外掛:JavaStreamDebugger如果你用的IDEA版本比較新的話,這個插件已經是自備的了,就不需要安裝了。如果還沒安裝的話,就手動安裝一下,然後繼續下面

十個AI演算法常用函式庫Java版 十個AI演算法常用函式庫Java版 Jun 13, 2023 pm 04:33 PM

今年ChatGPT火了半年多,熱度絲毫沒有降下來。深度學習和NLP也重新回到了大家的視線中。公司裡有一些小夥伴都在問我,身為Java開發人員,如何入門人工智慧,是時候拿出壓箱底的私藏的學習AI的Java庫來介紹給大家。這些函式庫和框架為機器學習、深度學習、自然語言處理等提供了廣泛的工具和演算法。根據AI專案的具體需求,可以選擇最合適的函式庫或框架,並開始嘗試使用不同的演算法來建立AI解決方案。 1.Deeplearning4j它是一個用於Java和Scala的開源分散式深度學習函式庫。 Deeplearning

java8的stream怎麼取max java8的stream怎麼取max May 14, 2023 pm 03:43 PM

java8的stream取maxpublicstaticvoidmain(String[]args){Listlist=Arrays.asList(1,2,3,4,5,6);Integermax=list.stream().max((a,b)->{if (a>b){return1;}elsereturn-1;}).get();System.out.println(max);}注意點:這裡判斷大小是透過正負數和0值。而不是直接寫成if(a>b){returna;}elseretur

linux中stream什麼意思 linux中stream什麼意思 Mar 17, 2023 am 09:55 AM

在linux中,stream是資料流的意思,就是以一定順序讀取的一串數據,所以資料流的方向就是資料流的讀取順序。 Linux系統把資料讀取後輸出的結果導入到其他檔案的過程稱為重定向資料流。 Linux下輸入一段命令並運行以後,螢幕裡會顯示兩種結果:運行成功結果即標準輸出、運行失敗結果即標準錯誤輸出;如果不做處理,它們都會顯示在螢幕上,而透過資料流重定向就可將其儲存到其他的文件中。

Infinix Note 40s 線上上市,包含所有功能和規格 Infinix Note 40s 線上上市,包含所有功能和規格 Jun 30, 2024 pm 09:32 PM

Infinix Note 40s 是 Note 40 系列的最新成員。沒有太多秘密。據 PassionateGeekz 發現,這款手機現已在官方網頁上列出了其所有功能。目前 Infinix Note 系列中的其他手機(

Java Stream API讓程式碼更出色的操作方法是什麼 Java Stream API讓程式碼更出色的操作方法是什麼 May 14, 2023 pm 06:22 PM

前言JavaStream是一種強大的資料處理工具,可以幫助開發人員快速且有效率地處理和轉換資料流。使用Stream操作可以大大簡化程式碼,使其更具可讀性和可維護性,從而提高開發效率。 filter():依照指定的Predicate保留符合條件的元素。 map():根據指定的Function映射每個元素,產生一個新的Stream。 flatMap():將每個元素映射為一個Stream,然後將這些Stream連接成一個Stream。 distinct():傳回一個去重後的Stream。 sorted():對Stre

Microsoft 是否在 Microsoft Stream(在 SharePoint 上)中引入修剪影片?新的路線圖更新是這樣說的 Microsoft 是否在 Microsoft Stream(在 SharePoint 上)中引入修剪影片?新的路線圖更新是這樣說的 Nov 24, 2023 pm 11:13 PM

在Microsoft365Roadmap網站(功能ID:186956)上的更新條目中,這家總部位於雷德蒙德的科技巨頭表示,此功能將賦予用戶編輯權限,以修剪影片中的開頭、結尾和任何片段。 「當您修剪影片時,Stream不會更改原始視訊檔案本身。相反,它只是向觀眾隱藏了修剪的部分,」更新中寫道。然後,您可以透過在StreamWeb應用程式中啟動影片並在接下來的幾個月內完成推出後點擊編輯按鈕來試用此功能。路線圖更新指出,推出將於「2023年3月」開始。但是,鑑於路線圖條目是在2023年11月21日新增的

探索Java在大數據領域的應用:Hadoop、Spark、Kafka等技術堆疊的了解 探索Java在大數據領域的應用:Hadoop、Spark、Kafka等技術堆疊的了解 Dec 26, 2023 pm 02:57 PM

Java大數據技術堆疊:了解Java在大數據領域的應用,如Hadoop、Spark、Kafka等隨著資料量不斷增加,大數據技術成為了當今網路時代的熱門話題。在大數據領域,我們常聽到Hadoop、Spark、Kafka等技術的名字。這些技術起到了至關重要的作用,而Java作為一門廣泛應用的程式語言,也在大數據領域發揮著巨大的作用。本文將重點放在Java在大

See all articles