목차
1. Spark中的基本概念
2. Spark应用框架
3. RDD的创造
4. RDD操作
— Transformations —
(1). map(func)
(2). filter(func)
(3). flatMap(func)
(4). mapPartitions(func)
(5). mapPartitionsWithIndex(func)
(6). sample(withReplacement,fraction, seed)
(7). union(otherDataset)
(8). intersection(otherDataset)
(9). distinct([numTasks]))
(10.)groupByKey([numTasks])
(11).reduceByKey(func, [numTasks])
(12).sortByKey([ascending], [numTasks])
(13). join(otherDataset, [numTasks])
(14).cogroup(otherDataset, [numTasks])
(15).cartesian(otherDataset)
(16). pipe(command, [envVars])
(17).coalesce(numPartitions)
(18).repartition(numPartitions)
— Actions —
(19). reduce(func)
(20). collect()
(21). count()
(22). first()
(23). take(n)
(24). countByKey()
(25). foreach(func)
(26). takeSample(withReplacement,num, seed)
(27). takeOrdered(n, [ordering])
(28). saveAsTextFile(path)
(29). saveAsSequenceFile(path)
(30). saveAsObjectFile(path)
5. RDD缓存
6. RDD的共享变量
参考
데이터 베이스 MySQL 튜토리얼 Spark Note – Programming Guide

Spark Note – Programming Guide

Jun 07, 2016 pm 04:38 PM
Note programming s spark

1. Spark中的基本概念 在Spark中,有下面的基本概念。 Application:基于Spark的用户程序,包含了一个driver program和集群中多个executor Driver Program:运行Application的main()函数并创建SparkContext。通常SparkContext代表driver program Executor:为

1. Spark中的基本概念

在Spark中,有下面的基本概念。
Application:基于Spark的用户程序,包含了一个driver program和集群中多个executor
Driver Program:运行Application的main()函数并创建SparkContext。通常SparkContext代表driver program
Executor:为某Application运行在worker node上的饿一个进程。该进程负责运行Task,并负责将数据存在内存或者磁盘上。每个Application都有自己独立的executors
Cluster Manager: 在集群上获得资源的外部服务(例如 Spark Standalon,Mesos、Yarn)
Worker Node: 集群中任何可运行Application代码的节点
Task:被送到executor上执行的工作单元。
Job:可以被拆分成Task并行计算的工作单元,一般由Spark Action触发的一次执行作业。
Stage:每个Job会被拆分成很多组Task,每组任务被称为stage,也可称TaskSet。该术语可以经常在日志中看打。
RDD:Spark的基本计算单元,通过Scala集合转化、读取数据集生成或者由其他RDD经过算子操作得到。

2. Spark应用框架

1
客户Spark程序(Driver Program)来操作Spark集群是通过SparkContext对象来进行,SparkContext作为一个操作和调度的总入口,在初始化过程中集群管理器会创建DAGScheduler作业调度和TaskScheduler任务调度(For Spark Standalone,而在Spark On Yarn中,TaskScheduler会被YARN代替)。
DAGScheduler作业调度模块是基于Stage的高层调度模块(参考:Spark分析之DAGScheduler),DAG全称 Directed Acyclic Graph,有向无环图。简单的来说,就是一个由顶点和有方向性的边构成的图中,从任意一个顶点出发,没有任何一条路径会将其带回到出发的顶点。它为每个Spark Job计算具有依赖关系的多个Stage任务阶段(通常根据Shuffle来划分Stage,如groupByKey, reduceByKey等涉及到shuffle的transformation就会产生新的stage),然后将每个Stage划分为具体的一组任务,以TaskSets的形式提交给底层的任务调度模块来具体执行。其中,不同stage之前的RDD为宽依赖关系。 TaskScheduler任务调度模块负责具体启动任务,监控和汇报任务运行情况。
创建SparkContext一般要经过下面几个步骤:
a). 导入Spark的类和隐式转换

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._
로그인 후 복사


b). 构建Spark应用程序的应用信息对象SparkConf

val conf = new SparkConf().setAppName(appName).setMaster(master_url)
로그인 후 복사

c). 利用SparkConf对象来初始化SparkContext
val sc = new SparkContext(conf)
d). 创建RDD、并执行相应的Transformation和action并得到最终结果。
e). 关闭Context
在完成应用的设计和编写后,使用spark-submit来提交应用的jar包。spark-submit的命令行参考如下:
Submitting Applications

./bin/spark-submit \
  --class 
  --master  \
  --deploy-mode  \
  ... # other options
   \
  [application-arguments]
로그인 후 복사

Spark的运行模式取决于传递给SparkContext的MASTER环境变量的值。master URL可以是以下任一种形式:
Master URL 含义
local 使用一个Worker线程本地化运行SPARK(完全不并行)
local[*] 使用逻辑CPU个数数量的线程来本地化运行Spark
local[K] 使用K个Worker线程本地化运行Spark(理想情况下,K应该根据运行机器的CPU核数设定)
spark://HOST:PORT 连接到指定的Spark standalone master。默认端口是7077.
yarn-client 以客户端模式连接YARN集群。集群的位置可以在HADOOP_CONF_DIR 环境变量中找到。
yarn-cluster 以集群模式连接YARN集群。集群的位置可以在HADOOP_CONF_DIR 环境变量中找到。
mesos://HOST:PORT 连接到指定的Mesos集群。默认接口是5050.
而spark-shell会在启动的时候自动构建SparkContext,名称为sc。

3. RDD的创造

Spark所有的操作都围绕弹性分布式数据集(RDD)进行,这是一个有容错机制并可以被并行操作的元素集合,具有只读、分区、容错、高效、无需物化、可以缓存、RDD依赖等特征。目前有两种类型的基础RDD:并行集合(Parallelized Collections):接收一个已经存在的Scala集合,然后进行各种并行计算。 Hadoop数据集(Hadoop Datasets):在一个文件的每条记录上运行函数。只要文件系统是HDFS,或者hadoop支持的任意存储系统即可。 这两种类型的RDD都可以通过相同的方式进行操作,从而获得子RDD等一系列拓展,形成lineage血统关系图。
(1). 并行化集合
并行化集合是通过调用SparkContext的parallelize方法,在一个已经存在的Scala集合上创建的(一个Seq对象)。集合的对象将会被拷贝,创建出一个可以被并行操作的分布式数据集。例如,下面的解释器输出,演示了如何从一个数组创建一个并行集合。
例如:val rdd = sc.parallelize(Array(1 to 10)) 根据能启动的executor的数量来进行切分多个slice,每一个slice启动一个Task来进行处理。
val rdd = sc.parallelize(Array(1 to 10), 5) 指定了partition的数量
(2). Hadoop数据集
Spark可以将任何Hadoop所支持的存储资源转化成RDD,如本地文件(需要网络文件系统,所有的节点都必须能访问到)、HDFS、Cassandra、HBase、Amazon S3等,Spark支持文本文件、SequenceFiles和任何Hadoop InputFormat格式。
a). 使用textFile()方法可以将本地文件或HDFS文件转换成RDD
支持整个文件目录读取,文件可以是文本或者压缩文件(如gzip等,自动执行解压缩并加载数据)。如textFile(”file:///dfs/data”)
支持通配符读取,例如:

val rdd1 = sc.textFile("file:///root/access_log/access_log*.filter");
val rdd2=rdd1.map(_.split("\t")).filter(_.length==6)
rdd2.count()
......
14/08/20 14:44:48 INFO HadoopRDD: Input split: file:/root/access_log/access_log.20080611.decode.filter:134217728+20705903
......
로그인 후 복사

textFile()可选第二个参数slice,默认情况下为每一个block分配一个slice。用户也可以通过slice指定更多的分片,但不能使用少于HDFS block的分片数。
b). 使用wholeTextFiles()读取目录里面的小文件,返回(用户名、内容)对
c). 使用sequenceFile[K,V]()方法可以将SequenceFile转换成RDD。SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。
d). 使用SparkContext.hadoopRDD方法可以将其他任何Hadoop输入类型转化成RDD使用方法。一般来说,HadoopRDD中每一个HDFS block都成为一个RDD分区。
此外,通过Transformation可以将HadoopRDD等转换成FilterRDD(依赖一个父RDD产生)和JoinedRDD(依赖所有父RDD)等。

4. RDD操作

2
RDD支持两类操作:
转换(transformation)现有的RDD通关转换生成一个新的RDD,转换是延时执行(lazy)的。
动作(actions)在RDD上运行计算后,返回结果给驱动程序或写入文件系统。
例如,map就是一种transformation,它将数据集每一个元素都传递给函数,并返回一个新的分布数据集表示结果。reduce则是一种action,通过一些函数将所有的元素叠加起来,并将最终结果返回给Driver程序。

— Transformations —

(1). map(func)

Return a new distributed dataset formed by passing each element of the source through a function func.
返回一个新分布式数据集,由每一个输入元素经过func函数转换后组成

(2). filter(func)

Return a new dataset formed by selecting those elements of the source on which func returns true.
返回一个新数据集,由经过func函数计算后返回值为true的输入元素组成
Test:

val num=sc.parallelize(1 to 100)
val num2 = num.map(_*2)
val num3 = num2.filter(_ % 3 == 0)
......
num3.collect
//res: Array[Int] = Array(6, 12, 18, 24, 30, 36, 42, 48, 54, 60, 66, 72, 78, 84, 90, 96, 102, 108, 114, 120, 126, 132, 138, 144, 150, 156, 162, 168, 174, 180, 186, 192, 198)
num3.toDebugString
//res5: String = 
//FilteredRDD[20] at filter at :16 (48 partitions)
//  MappedRDD[19] at map at :14 (48 partitions)
//    ParallelCollectionRDD[18] at parallelize at :12 (48 partitions)
로그인 후 복사

(3). flatMap(func)

Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
类似于map,但是每一个输入元素可以被映射为0或多个输出元素(因此func应该返回一个序列,而不是单一元素)
Test:

val kv=sc.parallelize(List(List(1,2),List(3,4),List(3,6,8)))
kv.flatMap(x=>x.map(_+1)).collect
//res0: Array[Int] = Array(2, 3, 4, 5, 4, 7, 9)
//Word Count
sc.textFile('hdfs://hdp01:9000/home/debugo/*.txt').flatMap(_.split(' ')).map((_,1)).reduceByKey(_+_)
로그인 후 복사

(4). mapPartitions(func)

Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator => Iterator when running on an RDD of type T.
类似于map,但独立地在RDD的每一个分块上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]。mapPartitions将会被每一个数据集分区调用一次。各个数据集分区的全部内容将作为顺序的数据流传入函数func的参数中,func必须返回另一个Iterator[T]。被合并的结果自动转换成为新的RDD。下面的测试中,元组(3,4)和(6,7)将由于我们选择的分区策略和方法而消失。
The combined result iterators are automatically converted into a new RDD. Please note, that the tuples (3,4) and (6,7) are missing from the following result due to the partitioning we chose
Test:

val nums = sc . parallelize (1 to 9 , 3)
def myfunc[T] ( iter : Iterator [T] ) : Iterator [( T , T ) ] = {
    var res = List [(T , T) ]()
    var pre = iter.next
    while ( iter.hasNext )
    {
        val cur = iter . next ;
        res .::= ( pre , cur )
        pre = cur ;
    }
    res . iterator
}
//myfunc: [T](iter: Iterator[T])Iterator[(T, T)]
nums.mapPartitions(myfunc).collect
//res12: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
로그인 후 복사

(5). mapPartitionsWithIndex(func)

Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator) ==> Iterator when running on an RDD of type T.
类似于mapPartitions, 其函数原型是:
def mapPartitionsWithIndex [ U : ClassTag ]( f : ( Int , Iterator [ T ]) => Iterator [ U ] , preservesPartitioning : Boolean = false ) : RDD [ U ],
mapPartitionsWithIndex的func接受两个参数,第一个参数是分区的索引,第二个是一个数据集分区的迭代器。而输出的是一个包含经过该函数转换的迭代器。下面测试中,将分区索引和分区数据一起输出。
Test:

val x = sc . parallelize ( List (1 ,2 ,3 ,4 ,5 ,6 ,7 ,8 ,9 ,10) , 3)
def myfunc ( index : Int , iter : Iterator [ Int ]) : Iterator [ String ] = {
iter . toList . map ( x => index + "-" + x ) . iterator
}
//myfunc: (index: Int, iter: Iterator[Int])Iterator[String]
x . mapPartitionsWithIndex ( myfunc ) . collect()
res: Array[String] = Array(0-1, 0-2, 0-3, 1-4, 1-5, 1-6, 2-7, 2-8, 2-9, 2-10)
로그인 후 복사

(6). sample(withReplacement,fraction, seed)

Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.
根据fraction指定的比例,对数据进行采样,可以选择是否用随机数进行替换,seed用于指定随机数生成器种子。

val a = sc . parallelize (1 to 10000 , 3)
a . sample ( false , 0.1 , 0) . count
res0 : Long = 960
a . sample ( true , 0.7 , scala.util.Random.nextInt(10000)) . count
res1: Long = 7073
로그인 후 복사

(7). union(otherDataset)

Return a new dataset that contains the union of the elements in the source dataset and the argument.
返回一个新的数据集,新数据集是由源数据集和参数数据集联合而成。

(8). intersection(otherDataset)

Return a new RDD that contains the intersection of elements in the source dataset and the argument.

(9). distinct([numTasks]))

Return a new dataset that contains the distinct elements of the source dataset.
返回一个包含源数据集中所有不重复元素的新数据集
Test:

val kv1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5)))
val kv2=sc.parallelize(List(("A",4),("A", 2),("C",3),("A",4),("B",5)))
kv2.distinct.collect
res0: Array[(String, Int)] = Array((A,4), (C,3), (B,5), (A,2))
kv1.union(kv2).collect
res1: Array[(String, Int)] = Array((A,1), (B,2), (C,3), (A,4), (B,5), (A,4), (A,2), (C,3), (A,4), (B,5))
kv1.union(kv2).collect.distinct
res2: Array[(String, Int)] = Array((A,1), (B,2), (C,3), (A,4), (B,5), (A,2))
kv1.intersection(kv2).collect
res43: Array[(String, Int)] = Array((A,4), (C,3), (B,5))
로그인 후 복사

(10.)groupByKey([numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable) pairs.
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or combineByKey will yield much better performance.
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.
在一个(K,V)对的数据集上调用,返回一个(K,Seq[V])对的数据集
注意:默认情况下,只有8个并行任务来做操作,但是你可以传入一个可选的numTasks参数来改变它。如果分组是用来计算聚合操作(如sum或average),那么应该使用reduceByKey 或combineByKey 来提供更好的性能。
groupByKey, reduceByKey等transformation操作涉及到了shuffle操作,所以这里引出两个概念宽依赖和窄依赖。
3
窄依赖(narrow dependencies)
子RDD的每个分区依赖于常数个父分区(与数据规模无关)
输入输出一对一的算子,且结果RDD的分区结构不变。主要是map/flatmap
输入输出一对一的算子,但结果RDD的分区结构发生了变化,如union/coalesce
从输入中选择部分元素的算子,如filter、distinct、substract、sample
宽依赖(wide dependencies)
子RDD的每个分区依赖于所有的父RDD分区
对单个RDD基于key进行重组和reduce,如groupByKey,reduceByKey
对两个RDD基于key进行join和重组,如join
经过大量shuffle生成的RDD,建议进行缓存。这样避免失败后重新计算带来的开销。
注意:reduce是一个action,和reduceByKey完全不同。

(11).reduceByKey(func, [numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like ingroupByKey, the number of reduce tasks is configurable through an optional second argument.
在一个(K,V)对的数据集上调用时,返回一个(K,V)对的数据集,使用指定的reduce函数,将相同key的值聚合到一起。类似groupByKey,reduce任务个数是可以通过第二个可选参数来配置的

(12).sortByKey([ascending], [numTasks])

When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
在一个(K,V)对的数据集上调用,K必须实现Ordered接口,返回一个按照Key进行排序的(K,V)对数据集。升序或降序由ascending布尔参数决定
Test:

val kv1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5)))
res0: Array[(String, Int)] = Array((A,1), (A,4), (B,2), (B,5), (C,3))
kv1.sortByKey().collect //注意sortByKey的小括号不能省
res1: Array[(String, Int)] = Array((A,1), (A,4), (B,2), (B,5), (C,3))
kv1.groupByKey().collect
res1: Array[(String, Iterable[Int])] = Array((A,ArrayBuffer(1, 4)), (B,ArrayBuffer(2, 5)), (C,ArrayBuffer(3)))
kv1.reduceByKey(_+_).collect
res2: Array[(String, Int)] = Array((A,5), (B,7), (C,3))
로그인 후 복사

(13). join(otherDataset, [numTasks])

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are also supported through leftOuterJoin and rightOuterJoin.
在类型为(K,V)和(K,W)类型的数据集上调用时,返回一个相同key对应的所有元素对在一起的(K, (V, W))数据集

(14).cogroup(otherDataset, [numTasks])

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, Iterable, Iterable) tuples. This operation is also called groupWith.
在类型为(K,V)和(K,W)的数据集上调用,返回一个 (K, Seq[V], Seq[W])元组的数据集。这个操作也可以称之为groupwith
Test:

val kv1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5)))
val kv3=sc.parallelize(List(("A",10),("B",20),("D",30)))
kv1.join(kv3).collect
res16: Array[(String, (Int, Int))] = Array((A,(1,10)), (A,(4,10)), (B,(2,20)), (B,(5,20)))
kv1.cogroup(kv3).collect
res0: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((A,(ArrayBuffer(1, 4),ArrayBuffer(10))), (B,(ArrayBuffer(2, 5),ArrayBuffer(20))), (C,(ArrayBuffer(3),ArrayBuffer())), (D,(ArrayBuffer(),ArrayBuffer(30))))
로그인 후 복사

(15).cartesian(otherDataset)

When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
笛卡尔积,在类型为 T 和 U 类型的数据集上调用时,返回一个 (T, U)对数据集(两两的元素对)

(16). pipe(command, [envVars])

Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process’s stdin and lines output to its stdout are returned as an RDD of strings.
通过POSIX 管道来将每个RDD分区的数据传入一个shell命令(例如Perl或bash脚本)。RDD元素会写入到进程的标准输入,其标准输出会作为RDD字符串返回。

(17).coalesce(numPartitions)

Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
将RDD分区的数量降低为numPartitions,对于经过过滤后的大数据集的在线处理更加有效。

(18).repartition(numPartitions)

Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
随机重新shuffle RDD中的数据,并创建numPartitions个分区。这个操作总会通过网络来shuffle全部数据。

— Actions —

(19). reduce(func)

Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
通过函数func(接受两个参数,返回一个参数)聚集数据集中的所有元素。这个功能必须可交换且可关联的,从而可以正确的被并行执行。

(20). collect()

Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
在驱动程序中,以数组的形式,返回数据集的所有元素。这通常会在使用filter或者其它操作并返回一个足够小的数据子集后再使用会比较有用。

(21). count()

Return the number of elements in the dataset.
返回数据集的元素的个数。

(22). first()

Return the first element of the dataset (similar to take(1)).
返回数据集的第一个元素(类似于take(1))

(23). take(n)

Return an array with the first n elements of the dataset. Note that this is currently not executed in parallel. Instead, the driver program computes all the elements.
返回一个由数据集的前n个元素组成的数组。注意,这个操作目前并非并行执行,而是由驱动程序计算所有的元素

(24). countByKey()

Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
对(K,V)类型的RDD有效,返回一个(K,Int)对的Map,表示每一个key对应的元素个数

(25). foreach(func)

Run a function func on each element of the dataset. This is usually done for side effects such as updating an accumulator variable (see below) or interacting with external storage systems.
在数据集的每一个元素上,运行函数func进行更新。这通常用于边缘效果,例如更新一个累加器,或者和外部存储系统进行交互,例如HBase.
Test:

val num=sc.parallelize(1 to 10)
num.reduce (_ + _)
res1: Int = 55
num.take(5)
res2: Array[Int] = Array(1, 2, 3, 4, 5)
num.first
res3: Int = 1
num.count
res4: Long = 10
num.take(5).foreach(println)
1
2
3
4
5
val kv1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5),("A",7),("B",7)))
val kv1_count=kv1.countByKey()
kv1_count: scala.collection.Map[String,Long] = Map(A -> 3, C -> 1, B -> 3)
로그인 후 복사

(26). takeSample(withReplacement,num, seed)

Return an array with a random sample of num elements of the dataset, with or without replacement, using the given random number generator seed.
返回一个数组,在数据集中随机采样num个元素组成,可以选择是否用随机数替换不足的部分,Seed用于指定的随机数生成器种子

(27). takeOrdered(n, [ordering])

Return the first n elements of the RDD using either their natural order or a custom comparator.
返回一个由数据集的前n个元素组成的有序数组,使用自然序或自定义的比较器。

(28). saveAsTextFile(path)

Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
将数据集的元素,以textfile的形式,保存到本地文件系统,HDFS或者任何其它hadoop支持的文件系统。对于每个元素,Spark将会调用toString方法,将它转换为文件中的文本行

(29). saveAsSequenceFile(path)

Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that either implement Hadoop’s Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
将数据集的元素,以Hadoop sequencefile的格式,保存到指定的目录下,本地系统,HDFS或者任何其它hadoop支持的文件系统。这个只限于由key-value对组成,并实现了Hadoop的Writable接口,或者隐式的可以转换为Writable的RDD。(Spark包括了基本类型的转换,例如Int,Double,String,等等)

(30). saveAsObjectFile(path)

Write the elements of the dataset in a simple format using Java serialization, which can then be loaded usingSparkContext.objectFile().
将数据集元素写入Java序列化的可以被SparkContext.objectFile()加载的简单格式中。
当然,transformation和action的操作远远不止这些。其他请参考API文档:
RDD API

5. RDD缓存

Spark可以使用 persist 和 cache 方法将任意 RDD 缓存到内存、磁盘文件系统中。缓存是容错的,如果一个 RDD 分片丢失,可以通过构建它的 transformation自动重构。被缓存的 RDD 被使用的时,存取速度会被大大加速。一般的executor内存60%做 cache, 剩下的40%做task。
Spark中,RDD类可以使用cache() 和 persist() 方法来缓存。cache()是persist()的特例,将该RDD缓存到内存中。而persist可以指定一个StorageLevel。StorageLevel的列表可以在StorageLevel 伴生单例对象中找到:

object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(false, false, true, false) // Tachyon
}
// 其中,StorageLevel 类的构造器参数如下:
class StorageLevel private(  private var useDisk_ : Boolean,  private var useMemory_ : Boolean,  private var useOffHeap_ : Boolean,  private var deserialized_ : Boolean,  private var replication_ : Int = 1)
로그인 후 복사

Spark的不同StorageLevel ,目的满足内存使用和CPU效率权衡上的不同需求。我们建议通过以下的步骤来进行选择:
·如果你的RDDs可以很好的与默认的存储级别(MEMORY_ONLY)契合,就不需要做任何修改了。这已经是CPU使用效率最高的选项,它使得RDDs的操作尽可能的快。
·如果不行,试着使用MEMORY_ONLY_SER并且选择一个快速序列化的库使得对象在有比较高的空间使用率的情况下,依然可以较快被访问。
·尽可能不要存储到硬盘上,除非计算数据集的函数,计算量特别大,或者它们过滤了大量的数据。否则,重新计算一个分区的速度,和与从硬盘中读取基本差不多快。
·如果你想有快速故障恢复能力,使用复制存储级别(例如:用Spark来响应web应用的请求)。所有的存储级别都有通过重新计算丢失数据恢复错误的容错机制,但是复制存储级别可以让你在RDD上持续的运行任务,而不需要等待丢失的分区被重新计算。
·如果你想要定义你自己的存储级别(比如复制因子为3而不是2),可以使用StorageLevel 单例对象的apply()方法。
在不会使用cached RDD的时候,及时使用unpersist方法来释放它。

6. RDD的共享变量

在应用开发中,一个函数被传递给Spark操作(例如map和reduce),在一个远程集群上运行,它实际上操作的是这个函数用到的所有变量的独立拷贝。这些变量会被拷贝到每一台机器。通常看来,在任务之间中,读写共享变量显然不够高效。然而,Spark还是为两种常见的使用模式,提供了两种有限的共享变量:广播变量和累加器。
(1). 广播变量(Broadcast Variables)
– 广播变量缓存到各个节点的内存中,而不是每个 Task
– 广播变量被创建后,能在集群中运行的任何函数调用
– 广播变量是只读的,不能在被广播后修改
– 对于大数据集的广播, Spark 尝试使用高效的广播算法来降低通信成本
使用方法:

val broadcastVar = sc.broadcast(Array(1, 2, 3))
로그인 후 복사

(2). 累加器
累加器只支持加法操作,可以高效地并行,用于实现计数器和变量求和。Spark 原生支持数值类型和标准可变集合的计数器,但用户可以添加新的类型。只有驱动程序才能获取累加器的值
使用方法:

val accum = sc.accumulator(0)
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum  + = x)
accum.value
val num=sc.parallelize(1 to 100)
로그인 후 복사

参考

http://spark.apache.org/docs/latest/programming-guide.html

http://www.oschina.net/translate/spark-configuration

http://shiyanjun.cn/archives/744.html

《Apache Spark API By Example》

본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.

핫 AI 도구

Undresser.AI Undress

Undresser.AI Undress

사실적인 누드 사진을 만들기 위한 AI 기반 앱

AI Clothes Remover

AI Clothes Remover

사진에서 옷을 제거하는 온라인 AI 도구입니다.

Undress AI Tool

Undress AI Tool

무료로 이미지를 벗다

Clothoff.io

Clothoff.io

AI 옷 제거제

AI Hentai Generator

AI Hentai Generator

AI Hentai를 무료로 생성하십시오.

인기 기사

R.E.P.O. 에너지 결정과 그들이하는 일 (노란색 크리스탈)
3 몇 주 전 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. 최고의 그래픽 설정
3 몇 주 전 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. 아무도들을 수없는 경우 오디오를 수정하는 방법
3 몇 주 전 By 尊渡假赌尊渡假赌尊渡假赌
WWE 2K25 : Myrise에서 모든 것을 잠금 해제하는 방법
4 몇 주 전 By 尊渡假赌尊渡假赌尊渡假赌

뜨거운 도구

메모장++7.3.1

메모장++7.3.1

사용하기 쉬운 무료 코드 편집기

SublimeText3 중국어 버전

SublimeText3 중국어 버전

중국어 버전, 사용하기 매우 쉽습니다.

스튜디오 13.0.1 보내기

스튜디오 13.0.1 보내기

강력한 PHP 통합 개발 환경

드림위버 CS6

드림위버 CS6

시각적 웹 개발 도구

SublimeText3 Mac 버전

SublimeText3 Mac 버전

신 수준의 코드 편집 소프트웨어(SublimeText3)

AI 알고리즘에 일반적으로 사용되는 10가지 라이브러리 Java 버전 AI 알고리즘에 일반적으로 사용되는 10가지 라이브러리 Java 버전 Jun 13, 2023 pm 04:33 PM

ChatGPT는 올해 반년 넘게 인기를 끌었고, 그 인기는 전혀 줄어들지 않았습니다. 딥러닝과 NLP도 모두의 관심을 끌었습니다. 회사의 몇몇 친구들이 자바 개발자인 나에게 인공지능을 어떻게 시작해야 하는지 묻는다. 이제 AI 학습을 위해 숨겨진 자바 라이브러리를 꺼내서 모두에게 소개할 차례다. 이러한 라이브러리와 프레임워크는 기계 학습, 딥 러닝, 자연어 처리 등을 위한 광범위한 도구와 알고리즘을 제공합니다. AI 프로젝트의 특정 요구 사항에 따라 가장 적합한 라이브러리 또는 프레임워크를 선택하고 다양한 알고리즘을 실험하여 AI 솔루션을 구축할 수 있습니다. 1.Deeplearning4j Java 및 Scala용 오픈소스 분산 딥러닝 라이브러리입니다. 딥러닝

업계 최초 AI 스마트 스크린 카메라 '스카이워스 스마트 스크린 카메라 S50' 최초 출시 업계 최초 AI 스마트 스크린 카메라 '스카이워스 스마트 스크린 카메라 S50' 최초 출시 Nov 18, 2023 pm 06:43 PM

Skyworth Security는 최근 연간 주력 신제품인 Skyworth Smart Screen Camera S50을 출시했습니다. 업계 최초로 인공 지능 기술을 탑재한 스마트 스크린 카메라인 Skyworth 스마트 스크린 카메라 S50의 가장 큰 특징은 컬러 스마트 스크린과 카메라를 교묘하게 결합하여 강력한 양방향 영상 통화 기능을 구현한다는 것입니다. 2T 컴퓨팅 성능으로 인공 지능 기능이 크게 향상되었습니다. 이 올해의 새로운 주력 제품은 미래 지향적인 외관, 고급 반투명 소재, 부드럽고 매끄러운 양방향 작동이 가능한 다채로운 터치형 고화질 화면을 갖추고 있습니다. 비디오 인터콤, WeChat 화상 통화, 매우 선명한 화질, 사각지대 없는 360° 모니터링, 아무리 어두워도 12배 스마트 줌으로 선명하게 볼 수 있습니다. 확대하면 모든 세부 사항이 명확하게 표시됩니다. 2

Go 언어에서 Spark를 사용하여 효율적인 데이터 처리 달성 Go 언어에서 Spark를 사용하여 효율적인 데이터 처리 달성 Jun 16, 2023 am 08:30 AM

빅데이터 시대가 도래하면서 데이터 처리의 중요성이 더욱 커지고 있습니다. 다양한 데이터 처리 작업을 위해 다양한 기술이 등장했습니다. 그 중 스파크(Spark)는 대규모 데이터 처리에 적합한 기술로 다양한 분야에서 널리 활용되고 있다. 또한 효율적인 프로그래밍 언어인 Go 언어도 최근 몇 년간 점점 더 많은 주목을 받고 있습니다. 이 기사에서는 Go 언어에서 Spark를 사용하여 효율적인 데이터 처리를 달성하는 방법을 살펴보겠습니다. 먼저 스파크의 기본 개념과 원리를 소개하겠습니다.

빅 데이터 분야에서 Java 적용 살펴보기: Hadoop, Spark, Kafka 및 기타 기술 스택에 대한 이해 빅 데이터 분야에서 Java 적용 살펴보기: Hadoop, Spark, Kafka 및 기타 기술 스택에 대한 이해 Dec 26, 2023 pm 02:57 PM

Java 빅데이터 기술 스택: Hadoop, Spark, Kafka 등 빅데이터 분야에서 Java의 응용을 이해합니다. 데이터의 양이 지속적으로 증가함에 따라 오늘날 인터넷 시대에 빅데이터 기술이 화두가 되고 있습니다. 빅데이터 분야에서 우리는 하둡(Hadoop), 스파크(Spark), 카프카(Kafka) 등의 기술 이름을 자주 듣습니다. 이러한 기술은 매우 중요한 역할을 하며, 널리 사용되는 프로그래밍 언어인 Java는 빅데이터 분야에서도 큰 역할을 합니다. 이 기사에서는 Java의 대규모 애플리케이션에 중점을 둘 것입니다.

Infinix Note 40s는 모든 기능과 사양과 함께 온라인에 나열됩니다. Infinix Note 40s는 모든 기능과 사양과 함께 온라인에 나열됩니다. Jun 30, 2024 pm 09:32 PM

Infinix Note 40s는 Note 40 라인업에 새로 추가된 제품입니다. 비밀은 많지 않습니다. PassionateGeekz가 발견한 것처럼 이제 이 전화기는 공식 웹페이지에 모든 기능과 함께 나열됩니다. 현재 Infinix Note 시리즈에서 볼 수 있는 다른 휴대폰(

PHP의 웹 서버 PHP의 웹 서버 May 23, 2023 am 11:31 AM

PHP는 웹 애플리케이션 개발에 널리 사용되는 널리 사용되는 서버 측 스크립팅 언어입니다. PHP는 Apache, Nginx 등 다양한 방식으로 웹 서버와 함께 사용할 수 있습니다. 이 기사에서는 PHP의 웹 서버, 작동 방식 및 PHP에서 웹 서버를 사용하는 방법에 중점을 둘 것입니다. 웹 서버는 HTTP 요청을 수신하고 HTTP 응답을 보내는 네트워크 애플리케이션입니다. 웹 서버는 정적 파일(예: HTM)도 처리할 수 있습니다.

PHP 시작하기: PHP와 스파크 PHP 시작하기: PHP와 스파크 May 20, 2023 am 08:41 AM

PHP는 배우기 쉽고 오픈 소스이며 크로스 플랫폼이기 때문에 매우 인기 있는 서버 측 프로그래밍 언어입니다. 현재 많은 대기업에서는 PHP 언어를 사용하여 Facebook 및 WordPress와 같은 애플리케이션을 구축하고 있습니다. Spark는 웹 애플리케이션 구축을 위한 빠르고 가벼운 개발 프레임워크입니다. JVM(Java Virtual Machine)을 기반으로 하며 PHP와 함께 작동합니다. 이 기사에서는 PHP와 Spark를 사용하여 웹 애플리케이션을 구축하는 방법을 소개합니다. PHP란 무엇입니까? PH

PHP를 사용하여 Hadoop, Spark, Flink 등 대규모 데이터 처리를 수행합니다. PHP를 사용하여 Hadoop, Spark, Flink 등 대규모 데이터 처리를 수행합니다. May 11, 2023 pm 04:13 PM

데이터의 양이 지속적으로 증가함에 따라 대규모 데이터 처리는 기업이 직면하고 해결해야 할 문제가 되었습니다. 기존의 관계형 데이터베이스는 더 이상 이러한 요구를 충족할 수 없습니다. 대규모 데이터의 저장 및 분석을 위해서는 Hadoop, Spark 및 Flink와 같은 분산 컴퓨팅 플랫폼이 최선의 선택이 되었습니다. 데이터 처리 도구를 선택하는 과정에서 PHP는 개발 및 유지 관리가 쉬운 언어로 개발자들 사이에서 점점 인기를 얻고 있습니다. 이 기사에서는 대규모 데이터 처리에 PHP를 활용하는 방법과 방법을 살펴보겠습니다.

See all articles