Spark Note – Introduction to Streaming
Spark Streaming基于Spark处理流式数据的框架,在MapReduce中,由于其分布式特性——所有数据需要读写磁盘、启动job耗时较大,难以满足时效性要求。而Streaming能够在Spark上生根发芽的原因是因为其内存特性、低延时的执行引擎和高速的执行效率。 Streaming
Spark Streaming基于Spark处理流式数据的框架,在MapReduce中,由于其分布式特性——所有数据需要读写磁盘、启动job耗时较大,难以满足时效性要求。而Streaming能够在Spark上生根发芽的原因是因为其内存特性、低延时的执行引擎和高速的执行效率。
Streaming的原理是将Stream数据分成小的时间间隔(比如几秒),即将其离散化(Discretized)并转换成一个一个数据集(RDD),然后分批处理处理这小的RDD。所以Streaming很容易很mlib,Spark SQL等进行结合,做到实时的数据分析处理。此外,Streaming也继承了RDD的容错特性。如果RDD 的某些 partition 丢失了 , 可以通过 lineage 信息重新计算恢复。
Streaming的数据源主要分下面两类:
· 外部文件系统 , 如 HDFS,Streaming可以监控一个目录中新产生的数据,并及时处理。如果出现fail,可以通过重新读取数据来恢复 , 绝对不会有数据丢失。
· 网络系统:如MQ系统(Kafka、ZeroMQ、Flume等)。Streaming会默认会在两个不同节点加载数据到内存 , 一个节点 fail 了 , 系统可以通过另一个节点的数据重算。假设正在运行 InputReceiver 的节点 fail 了 , 可能会丢失一部分数据。
使用Spark Streaming
一个简单的基于Streaming的workCount代码如下:
/** * Created by Administrator on 2014/9/1. */ package com.debugo.example import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.SparkConf object WordCountStreaming { def main(args: Array[String]): Unit ={ val sparkConf = new SparkConf().setAppName("HDFSWordCount").setMaster("spark://172.19.1.232:7077") //create the streaming context val ssc = new StreamingContext(sparkConf, Seconds(30)) //process file when new file be found. val lines = ssc.textFileStream("file:///home/spark/data") val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() } }
这段代码实现了当指定的路径有新文件生成时,就会对这些文件执行wordcount,并把结果print。具体流程如下:
(1). 创建StreamingContext
使用Spark Streaming就需要创建StreamingContext对象(类似SparkContext)。创建StreamingContext对象所需的参数与SparkContext基本一致,包括设定Master节点(setMaster),设定应用名称(setAppName)。第二个参数Seconds(30),指定了Spark Streaming处理数据的时间间隔为30秒。需要根据具体应用需要和集群处理能力进行设置。
(2). 创建InputDStream
Spark Streaming它支持并使用的数据流叫Dstream,类似于Spark使用RDD,直接支持Kafka、Flume的数据源。Dstream是一种连续的RDDs。Spark Streaming需要指明数据源。如上例所示的textFileStream,Spark Streaming以文件系统作为输入流。这里有3个要点:
(1)dataDirectory下的文件格式都是一样
(2)在这个目录下创建文件都是通过移动或者重命名的方式创建的
(3)一旦文件进去之后就不能再改变
Spark Streaming支持多种不同数据源,它们对应的DStream同样不同(如kafkaStream,flumeStream, networkStream等)。
(3). 操作DStream
对于从数据源得到的DStream,用户可以在其基础上进行各种操作,如上例所示的操作就是一个典型的word count执行流程:对于当前时间窗口内从数据源得到的数据首先进行 flatMap,然后进行map和reduceByKey,并在最后使用print()输出。
Streaming中的Dstream支持两种操作:Transformation和output。Streaming中的Transformation和RDD的Transformation相类似,都是对其离散化数据集进行处理。
Transformation | Meaning |
---|---|
window(windowLength,?slideInterval) | Return a new DStream which is computed based on windowed batches of the source DStream. |
countByWindow(windowLength,slideInterval) | Return a sliding window count of elements in the stream. |
reduceByWindow(func,?windowLength,slideInterval) | Return a new single-element stream, created by aggregating elements in the stream over a sliding interval using?func. The function should be associative so that it can be computed correctly in parallel. |
reduceByKeyAndWindow(func,windowLength,?slideInterval, [numTasks]) | When called on a DStream of (K, V) pairs, returns a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function?func?over batches in a sliding window.?Note:?By default, this uses Spark’s default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config propertyspark.default.parallelism ) to do the grouping. You can pass an optional?numTasks ?argument to set a different number of tasks. |
reduceByKeyAndWindow(func,?invFunc,windowLength,?slideInterval, [numTasks]) | A more efficient version of the above?reduceByKeyAndWindow() ?where the reduce value of each window is calculated incrementally using the reduce values of the previous window. This is done by reducing the new data that enter the sliding window, and “inverse reducing” the old data that leave the window. An example would be that of “adding” and “subtracting” counts of keys as the window slides. However, it is applicable to only “invertible reduce functions”, that is, those reduce functions which have a corresponding “inverse reduce” function (taken as parameterinvFunc. Like in?reduceByKeyAndWindow , the number of reduce tasks is configurable through an optional argument. |
countByValueAndWindow(windowLength,slideInterval, [numTasks]) | When called on a DStream of (K, V) pairs, returns a new DStream of (K, Long) pairs where the value of each key is its frequency within a sliding window. Like in?reduceByKeyAndWindow , the number of reduce tasks is configurable through an optional argument. |
Output操作:
Output Operation | Meaning |
---|---|
print() | Prints first ten elements of every batch of data in a DStream on the driver. |
foreachRDD(func) | The fundamental output operator. Applies a function,?func, to each RDD generated from the stream. This function should have side effects, such as printing output, saving the RDD to external files, or writing it over the network to an external system. |
saveAsObjectFiles(prefix, [suffix]) | Save this DStream’s contents as a?SequenceFile ?of serialized objects. The file name at each batch interval is generated based on?prefix?and?suffix:?“prefix-TIME_IN_MS[.suffix]“. |
saveAsTextFiles(prefix, [suffix]) | Save this DStream’s contents as a text files. The file name at each batch interval is generated based on?prefix?and?suffix:?“prefix-TIME_IN_MS[.suffix]“. |
saveAsHadoopFiles(prefix, [suffix]) | Save this DStream’s contents as a Hadoop file. The file name at each batch interval is generated based on?prefix?and?suffix:?“prefix-TIME_IN_MS[.suffix]“. |
(4). 启动Spark Streaming
上述只是定义好了应用拓扑结构,当启动Spark Streaming (ssc.start())时,当ssc.start()启动后程序才真正进行所有预期的操作。才会按既定的interval来执行这一系列job。
运行这个spark-streaming代码:
spark-submit --master spark://debugo:7077 --class MyExample.WordCountStreaming spark0_2.10-1.0.jar ...... (discretized,1) (stream,3) (created,1) (are,1) (this,2) (Kafka,1) (is,1) (provides,1) (guide,2) (Internally,,1) ... 14/09/03 10:37:31 INFO MapPartitionsRDD: Removing RDD 20 from persistence list 14/09/03 10:37:31 INFO BlockManager: Removing RDD 20 14/09/03 10:37:31 INFO MappedRDD: Removing RDD 17 from persistence list 14/09/03 10:37:31 INFO BlockManager: Removing RDD 17 14/09/03 10:37:31 INFO FlatMappedRDD: Removing RDD 16 from persistence list 14/09/03 10:37:31 INFO BlockManager: Removing RDD 16 14/09/03 10:37:31 INFO MappedRDD: Removing RDD 15 from persistence list 14/09/03 10:37:31 INFO BlockManager: Removing RDD 15 14/09/03 10:37:31 INFO UnionRDD: Removing RDD 14 from persistence list 14/09/03 10:37:31 INFO BlockManager: Removing RDD 14
stateful操作
上面的例子是一种最简单的应用场景——无状态更新。大多数时候,这种流式处理模型很难满足需求——比如我们需要累计统计单词的数量,或者计算一个信号的频域参数(需要以滑动窗口的方式处理一个间隔的数据),这就要使用另外两种应用场景——状态操作和窗口操作。状态(stateful)操作中,主要使用到下面一个transformation方法:
UpdateStateByKey
我们希望保存它状态的信息时,使用这个操作,并持续更新状态数值。使用它有两个步骤:
(1)定义状态(state),这个状态可以是任意的数据类型
(2)定义状态更新函数,从前一个状态更改新的状态
下面展示一个例子:
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = … // add the new values with the previous running count to get the new count Some(newCount)
}
它会针对里面的每个元素(如wordCount中的word)调用一下更新函数,newValues是最新的值,runningCount是之前的值。
下面是一个stateful的wordCount实例(修改自rg.apache.spark.examples.streaming.kafkaWordCount):
/** * Created by debugo on 2014/9/4. */ package com.debugo.example import java.util.Properties import kafka.producer._ object WordCountProducer { def main(args: Array[String]) { if (args.length val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString).mkString(" ") new KeyedMessage[String, String](topic, str)}.toArray producer.send(messages: _*) Thread.sleep(100) } } } /** * Created by debugo on 2014/9/3. */ package com.debugo.example import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.kafka._ object WordCountStateful { def main(args: Array[String]): Unit = { val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.foldLeft(0)(_ + _) val previousCount = state.getOrElse(0) //None to 0 Some(currentCount + previousCount) } if (args.length (x, 1)) //using updateStateByKey to update state. val stateDstream = wordCounts.updateStateByKey[Int](updateFunc) stateDstream.print() ssc.start() ssc.awaitTermination() } }
完成后compile&package。下面是启动kafka服务的流程。
下载并配置kafka
wget http://mirrors.hust.edu.cn/apache/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz tar xzvf kafka*.tgz vim conf/server.properties broker.id=0 # [0-2] for hdp0[2-4] host.name=hdp03 advertised.host.name=hdp03 zookeeper.connect=hdp02:2181,hdp03:2181,hdp04:2181
此时,zookeeper服务已经在hdp02,hdp03,hdp04上启动。然后在hdp02,hdp03,hdp04上分别启动kafka服务
/root/kafka_2.10-0.8.1.1/bin/kafka-server-start.sh /root/kafka_2.10-0.8.1.1/config/server.properties [2014-09-04 15:15:39,784] INFO Opening socket connection to server hdp03/172.19.17.233:2181 (org.apache.zookeeper.ClientCnxn) [2014-09-04 15:15:39,795] INFO Socket connection established to hdp03/172.19.17.233:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2014-09-04 15:15:39,819] INFO Session establishment complete on server hdp03/172.19.17.233:2181, sessionid = 0x3483b071fa006a7, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn) [2014-09-04 15:15:39,825] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
新建一个TOPIC(replication-factor=num of brokers)
/root/kafka_2.10-0.8.1.1/bin/kafka-topics.sh --create --topic test --replication-factor 2 --partitions 2 --zookeeper hdp03:2181 Created topic "test".
console producer测试
/root/kafka_2.10-0.8.1.1/bin/kafka-console-producer.sh --broker-list hdp03:9092 --sync --topic test
输入一系列文本消息:
Hello Kafka Hi Kafka
console consumer测试
/root/kafka_2.10-0.8.1.1/bin/kafka-console-consumer.sh --zookeeper hdp03:2181 --topic test --from-beginning Hello Kafka Hi Kafka
OK, kafka一切就绪。
运行example的producer
$SPARK_HOME/bin/run-example org.apache.spark.examples.streaming.KafkaWordCountProducer hdp02:9092 test 3 5
再submit刚刚编译的jar,这里需要添加kafka和streaming-kafka的jar
spark-submit --master=spark://hdp02:7077 --class=com.debugo.example.WordCountStateful --jars=/usshare/java/spark-streaming-kafka_2.10-1.0.2.jar,/usr/share/java/kafka_2.10-0.8.0.jar spark0_2.10-1.0.jar hdp02:2181,hdp03:2181,hdp04:2181 test-consumer-group test 1
这样就得到了累计统计的效果。需要注意的是,consumer-group要和配置文件config/consumer.properties中的配置一致。
Window Operations
前面的word count的例子,当我们想要每隔10秒计算一下最近30秒的单词总数。可以使用reduceByKeyAndWindow方法
// Reduce last 30 seconds of data, every 10 secondsval windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))
这里面提到了windows的两个参数:
(1)window length:window的长度是30秒,最近30秒的数据
(2)slice interval:计算的时间间隔
通过这个例子,我们大概能够窗口的意思了,定期计算滑动的数据。
除此之外,Window操作还有下面操作:
Transformation | Meaning |
---|---|
window(windowLength,?slideInterval) | Return a new DStream which is computed based on windowed batches of the source DStream. |
countByWindow(windowLength,slideInterval) | Return a sliding window count of elements in the stream. |
reduceByWindow(func,?windowLength,slideInterval) | Return a new single-element stream, created by aggregating elements in the stream over a sliding interval using?func. The function should be associative so that it can be computed correctly in parallel. |
reduceByKeyAndWindow(func,windowLength,?slideInterval, [numTasks]) | When called on a DStream of (K, V) pairs, returns a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function?func?over batches in a sliding window.?Note:?By default, this uses Spark’s default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config propertyspark.default.parallelism ) to do the grouping. You can pass an optional?numTasks ?argument to set a different number of tasks. |
reduceByKeyAndWindow(func,?invFunc,windowLength,?slideInterval, [numTasks]) | A more efficient version of the above?reduceByKeyAndWindow() ?where the reduce value of each window is calculated incrementally using the reduce values of the previous window. This is done by reducing the new data that enter the sliding window, and “inverse reducing” the old data that leave the window. An example would be that of “adding” and “subtracting” counts of keys as the window slides. However, it is applicable to only “invertible reduce functions”, that is, those reduce functions which have a corresponding “inverse reduce” function (taken as parameterinvFunc. Like in?reduceByKeyAndWindow , the number of reduce tasks is configurable through an optional argument. |
countByValueAndWindow(windowLength,slideInterval, [numTasks]) | When called on a DStream of (K, V) pairs, returns a new DStream of (K, Long) pairs where the value of each key is its frequency within a sliding window. Like in?reduceByKeyAndWindow , the number of reduce tasks is configurable through an optional argument. |
性能调优(摘自参考2)
优化运行时间
增加并行度。确保使用整个集群的资源,而不是把任务集中在几个特定的节点上。对于包含shuffle的操作,增加其并行度以确保更为充分地使用集群资源。
减少数据序列化、反序列化的负担。Spark Streaming默认将接收到的数据序列化后存储以减少内存的使用。但序列化和反序列化需要更多的CPU时间,因此更加高效的序列化方式(Kryo)和自定义的序列化接口可以更高效地使用CPU。
设置合理的batch窗口。在Spark Streaming中,Job之间有可能存在着依赖关系,后面的Job必须确保前面的Job执行结束后才能提交。若前面的Job执行时间超出了设置的batch窗口,那么后面的Job就无法按时提交,这样就会进一步拖延接下来的Job,造成后续Job的阻塞。因此,设置一个合理的batch窗口确保Job能够在这个batch窗口中结束是必须的。
减少任务提交和分发所带来的负担。通常情况下Akka框架能够高效地确保任务及时分发,但当batch窗口非常小(500ms)时,提交和分发任务的延迟就变得不可接受了。使用Standalone模式和Coarse-grained Mesos模式通常会比使用Fine-Grained Mesos模式有更小的延迟。
优化内存使用
控制batch size。Spark Streaming会把batch窗口内接收到的所有数据存放在Spark内部的可用内存区域中,因此必须确保当前节点Spark的可用内存至少能够容纳这个batch窗口内所有的数据,否则必须增加新的资源以提高集群的处理能力。
及时清理不再使用的数据。上面说到Spark Streaming会将接收到的数据全部存储于内部的可用内存区域中,因此对于处理过的不再需要的数据应及时清理以确保Spark Streaming有富余的可用内存空间。通过设置合理的spark.cleaner.ttl时长来及时清理超时的无用数据。
观察及适当调整GC策略。GC会影响Job的正常运行,延长Job的执行时间,引起一系列不可预料的问题。观察GC的运行情况,采取不同的GC策略以进一步减小内存回收对Job运行的影响。
参考
http://spark.apache.org/docs/latest/streaming-programming-guide.html
http://www.csdn.net/article/2014-01-28/2818282-Spark-Streaming-big-data
http://jerryshao.me/architecture/2013/04/02/spark-streaming-introduction/
原文地址:Spark Note – Introduction to Streaming, 感谢原作者分享。

핫 AI 도구

Undresser.AI Undress
사실적인 누드 사진을 만들기 위한 AI 기반 앱

AI Clothes Remover
사진에서 옷을 제거하는 온라인 AI 도구입니다.

Undress AI Tool
무료로 이미지를 벗다

Clothoff.io
AI 옷 제거제

AI Hentai Generator
AI Hentai를 무료로 생성하십시오.

인기 기사

뜨거운 도구

메모장++7.3.1
사용하기 쉬운 무료 코드 편집기

SublimeText3 중국어 버전
중국어 버전, 사용하기 매우 쉽습니다.

스튜디오 13.0.1 보내기
강력한 PHP 통합 개발 환경

드림위버 CS6
시각적 웹 개발 도구

SublimeText3 Mac 버전
신 수준의 코드 편집 소프트웨어(SublimeText3)

뜨거운 주제











ChatGPT는 올해 반년 넘게 인기를 끌었고, 그 인기는 전혀 줄어들지 않았습니다. 딥러닝과 NLP도 모두의 관심을 끌었습니다. 회사의 몇몇 친구들이 자바 개발자인 나에게 인공지능을 어떻게 시작해야 하는지 묻는다. 이제 AI 학습을 위해 숨겨진 자바 라이브러리를 꺼내서 모두에게 소개할 차례다. 이러한 라이브러리와 프레임워크는 기계 학습, 딥 러닝, 자연어 처리 등을 위한 광범위한 도구와 알고리즘을 제공합니다. AI 프로젝트의 특정 요구 사항에 따라 가장 적합한 라이브러리 또는 프레임워크를 선택하고 다양한 알고리즘을 실험하여 AI 솔루션을 구축할 수 있습니다. 1.Deeplearning4j Java 및 Scala용 오픈소스 분산 딥러닝 라이브러리입니다. 딥러닝

스트림 작업은 Java8의 하이라이트입니다! java.util.stream은 매우 강력하지만 실제 작업에서 이를 거의 사용하지 않는 개발자가 여전히 많습니다. 가장 불만이 많은 이유 중 하나는 스트리밍 작업과 같은 작업이 초기에는 실제로 그랬기 때문입니다. DEBUG에서는 스트림을 사용할 수 없기 때문에 한 줄의 코드일 경우 다음 단계로 넘어갈 때 실제로는 많은 작업이 한꺼번에 전달되기 때문에 어느 줄이 문제인지 판단하기 어렵습니다. 플러그인: JavaStreamDebugger 사용 중인 IDEA 버전이 비교적 새로운 버전인 경우 이 플러그인은 이미 포함되어 있으므로 설치할 필요가 없습니다. 아직 설치되지 않은 경우 수동으로 설치한 후 아래 단계를 계속 진행하세요.

java8의 스트림은 maxpublicstaticvoidmain(String[]args){Listlist=Arrays.asList(1,2,3,4,5,6);Integermax=list.stream().max((a,b)->{if ( a>b){return1;}elsereturn-1;}).get();System.out.println(max);}참고: 여기서 크기는 양수, 음수 및 0 값을 통해 결정됩니다. if(a>b){returna;}elseretur를 직접 작성하는 대신

빅데이터 시대가 도래하면서 데이터 처리의 중요성이 더욱 커지고 있습니다. 다양한 데이터 처리 작업을 위해 다양한 기술이 등장했습니다. 그 중 스파크(Spark)는 대규모 데이터 처리에 적합한 기술로 다양한 분야에서 널리 활용되고 있다. 또한 효율적인 프로그래밍 언어인 Go 언어도 최근 몇 년간 점점 더 많은 주목을 받고 있습니다. 이 기사에서는 Go 언어에서 Spark를 사용하여 효율적인 데이터 처리를 달성하는 방법을 살펴보겠습니다. 먼저 스파크의 기본 개념과 원리를 소개하겠습니다.

Java 빅데이터 기술 스택: Hadoop, Spark, Kafka 등 빅데이터 분야에서 Java의 응용을 이해합니다. 데이터의 양이 지속적으로 증가함에 따라 오늘날 인터넷 시대에 빅데이터 기술이 화두가 되고 있습니다. 빅데이터 분야에서 우리는 하둡(Hadoop), 스파크(Spark), 카프카(Kafka) 등의 기술 이름을 자주 듣습니다. 이러한 기술은 매우 중요한 역할을 하며, 널리 사용되는 프로그래밍 언어인 Java는 빅데이터 분야에서도 큰 역할을 합니다. 이 기사에서는 Java의 대규모 애플리케이션에 중점을 둘 것입니다.

머리말 JavaStream은 개발자가 데이터 스트림을 빠르고 효율적으로 처리하고 변환하는 데 도움이 되는 강력한 데이터 처리 도구입니다. Stream 작업을 사용하면 코드가 크게 단순화되어 읽기 쉽고 유지 관리가 쉬워져 개발 효율성이 향상됩니다. filter(): 지정된 조건자를 기반으로 조건을 충족하는 요소를 유지합니다. map(): 지정된 함수에 따라 각 요소를 매핑하고 새 스트림을 생성합니다. flatMap(): 각 요소를 스트림에 매핑한 다음 이러한 스트림을 스트림으로 연결합니다. distinct(): 중복 제거된 스트림을 반환합니다. sorted(): Stre의 경우

Infinix Note 40s는 Note 40 라인업에 새로 추가된 제품입니다. 비밀은 많지 않습니다. PassionateGeekz가 발견한 것처럼 이제 이 전화기는 공식 웹페이지에 모든 기능과 함께 나열됩니다. 현재 Infinix Note 시리즈에서 볼 수 있는 다른 휴대폰(

Linux에서 스트림은 데이터 흐름을 의미하며, 이는 특정 순서로 읽혀진 데이터 문자열이므로 데이터 흐름의 방향은 데이터 흐름의 읽기 순서가 됩니다. Linux 시스템이 데이터를 다른 파일로 읽은 후 출력 결과를 가져오는 프로세스를 리디렉션된 데이터 흐름이라고 합니다. Linux에서 명령을 입력하고 실행하면 화면에 두 가지 결과가 표시됩니다. 성공적인 작업 결과는 표준 출력이고, 실패한 작업 결과는 처리되지 않은 경우 표준 오류 출력입니다. 화면에 표시되고 데이터 흐름을 통해 리디렉션될 수 있습니다.
