jjzjj

Spark RDD算子进阶(转换算子、行动算子、缓存、持久化)

邵奈一shaonaiyi888 2023-03-28 原文
 

教程目录

 

0x00 教程内容
  1. 转换算子与行动算子的进阶操作
  2. RDD的缓存与持久化
0x01 进阶算子操作
1. 创建RDD
val rdd = sc.parallelize(List((1,1),(2,1),(3,1),(3,4)))
2. 转换算子
【1】reduceByKey(func)
含义:合并具有相同键的值。

rdd.reduceByKey((x,y) => x+y).collect()
代码解释:具有相同键的是:(3,1),(3,4),所以合并成了(3,5)

【2】groupByKey()
含义:对具有相同键的值进行分组。

rdd.groupByKey().collect()
代码解释:具有相同键的是:(3,1),(3,4),进行了分组。CompactBuffer 不是 Scala 里定义的数据结构,而是 Spark 里的数据结构,它继承自一个迭代器和序列,其它的返回值是一个很容易进行循环遍历的集合。

【3】mapValues(func)
含义:对键值对 RDD 的每个值应用一个函数而不改变对应的键。

rdd.mapValues(x => x*3).collect()
代码解释:键不变,但是值都进行了 x => x*3 操作,相当于值为 x

【4】flatMapValues(func)
含义:对键值对 RDD 中的每个值应用一个返回迭代器的函数,然后对返回的每个元素都生成一个对应原键的键值对记录,通常用于符号化。

rdd.flatMapValues(x => (x to 4)).collect() 代码解释:键不变,对值都进行了 x => (x to 4) 操作,相当于值为 x

【5】keys()
含义:返回一个仅包含所有键的 RDD。

rdd.keys.collect()
注意:是 keys ,不是keys()

【6】values()
含义:返回一个仅包含所有值的 RDD。

rdd.values.collect()
注意:是 values ,不是values()

【7】sortByKey()
含义:返回一个根据键排序的 RDD。

rdd.sortByKey().collect()
代码解释:对于相同的键,排序顺序是不确定的。

【8】combineByKey(createCombiner, mergeValue, mergeCombiners)
含义:combineByKey() 是键值对 RDD 中较为核心的高级函数,很多其它聚合函数都是在这个之上实现的,比如:groupByKey,reduceByKey 等等。

combineByKey()在遍历分区的所有元素时,主要有两种情况:
1、该元素对应的键没有遇到过;
2、该元素对应的键和之前的某一个元素的键是相同的。

如果是新的元素,combineByKey() 会使用 第一个 参数createCombiner()函数来创建该键对应累加器的初始值。

注意:是在每一个分区中第一次出现新键的时候创建,而不是在整个 RDD 中。

在当前分区中,如果遇到该键是已经存在的键,那么就调用 第二个 参数 mergeValue()方法将该键对应累加器的当前值与这个新的值合并。

因为有多个分区,而且每个分区都是独立处理的,所以最后需要调用 第三个mergeCombiners()方法将各个分区的结果进行合并。

combineByKey的源码,看不懂没关系:

def combineByKey[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null) 请看下面的例子(根据相同键,计算其所有值的平均值):

val cbRDD = sc.parallelize(Seq(("a", 1), ("a", 2), ("a", 3), ("b", 2), ("b", 5))) val result = cbRDD.combineByKey( // 分区内遇到新的键时,创建一个(累加值,出现次数)的键值对 (v) => (v, 1), // 分区内遇到已经创建过的相应累加器的旧键时,更新对应累加器 (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), // 多个分区遇到同一个键的累加器,更新主累加器 (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // 求平均值 ).map{ case (key, value) => (key, value._1 / value._2.toFloat) } // 输出结果 result.collectAsMap().foreach(println(_)) 遍历到第一个元素: ("a", 1)时,因为此元素肯定没出现过,所以调用的是第一个参数:(v) => (v, 1),创建一个(累加值,出现次数)的键值对,此处的累加值为1,因为要计算平均值,所以此键值对为:(1,1)

遍历到第二个元素:("a", 2)时,发现我们的 key 已经在遍历第一个元素时出现过了,所以,需要调用第二个参数:(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),acc 表示已经存在的(累加值,出现次数)键值对,acc._1 表示键值对的键(即为1),acc._2 表示键值对的值(即为1)。第二个元素的累加值为2,所以,此处(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1)得到的结果应该是:(1+2,1+1),即(3,2)

遍历到第三个元素:("a", 3)时,发现 key 也已经存在了,一样是调用第二个参数。结果为:(3+3,2+1),即(6,3)

以此类似,扫描完所有元素,扫描完后需要对多个分区的数据进行合并,调用第三个参数:
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2),acc1、acc2表示具有相同键的键值对,比如"a"在一个分区的结果为:(6,3),一个分区为:(2,3),则进行相加,得到结果:(6+2,3+3),即(8,6)才是最终的结果(此比喻与例子无关)。

最后进行 map 算子操作,将 key 映射回来:.map{ case (key, value) => (key, value._1 / value._2.toFloat) },此处的value指的是我们前面所统计的键值对结果,比如(8,6),8为累加之和,6为一共有多少个数。

最后打印结果,如图:

【9】subtractByKey()
含义:删掉 rdd1中的与 rdd2的相同的元素。

val rdd1= sc.parallelize(List((1,2),(2,3),(2,4))) val rdd2= sc.parallelize(List((2,4),(3,5))) rdd1.subtractByKey(rdd2).collect()
代码解释:rdd1中有键为2的元素,rdd2中也有,所以,删除rdd1中的两个元素:(2,3),(2,4),最后剩下一个元素(1,2)

【10】cogroup()
含义:将两个 RDD 中拥有相同的数据分组到一起。

rdd1.cogroup(rdd2).collect().foreach(println(_))
代码解释:value 中第一个 CompactBuffer 为 rdd1 的值,第二个 CompactBuffer 为 rdd2 的值。

3. 行动算子
【1】countByKey()
含义:对每个键对应的元素分别计数。

val rdd = sc.parallelize(List((1,2),(2,3),(2,4),(3,5))) rdd.countByKey().foreach(println(_))
代码解释:键为1出现了1次,2出现了2次,3出现了1次。

【2】lookup()
含义:返回给定键对应的所有值。

rdd.lookup(2)

【3】collectAsMap()
含义:将结果以映射表的形式返回,key 如果重复,后边的元素会覆盖前面的元素。与 collect 类似,但适用于键值 RDD 并且会保留其键值结构。

rdd.collectAsMap()

0x02 RDD的缓存与持久化
1. 缓存与持久化的意义
在大数据处理场景中,我们的数据量会达到TB、甚至PB级别,并且会重复调用同一组数据,如果每一次调用都要重新计算,将会非常消耗资源,所以我们可以对处理过程中的中间数据进行数据缓存,或者持久化到内存或者磁盘中。

2. 缓存
我们可以对 RDD 使用 cache()方法进行缓存,即在集群相关节点的内存中进行缓存。

首先,我们需要引入相关的模块:

import org.apache.spark.storage._ 注意:务必先启动HDFS再执行下面的代码!

put.txt 文件为:

shao nai yi nai nai yi yi shao nai nai val textFileRDD = sc.textFile("hdfs://master:9999/files/put.txt") val wordRDD = textFileRDD.flatMap(line => line.split(" ")) val pairWordRDD = wordRDD.map(word => (word, 1)) val wordCountRDD = pairWordRDD.reduceByKey((a, b) => a + b) wordCountRDD.cache() // 这里还没有执行缓存 wordCountRDD.collect().foreach(println) // 遇到行动算子操作才真正开始计算RDD并缓存

3. 持久化
持久化,也就是将 RDD 的数据缓存到内存中/磁盘中,以后无论对这个RDD做多少次计算,都是直接取这个RDD的持久化的数据,比如从内存中或者磁盘中,直接提取一份数据。可以使用 persist()函数来进行持久化,一般默认的存储空间是在内存中,如果内存不够就会写入磁盘中。persist 持久化分为不同的等级,还可以在存储等级的末尾加上_2用于把持久化的数据存为 2 份,避免数据丢失。

下面的列表列出了不同的持久化等级:

级别 使用的空间 是否在内存中 是否在磁盘上
MEMORY_ONLY
MEMORY_ONLY_SER
MEMORY_AND_DISK 部分 部分
MEMORY_AND_DISK_SER 部分 部分
DISK_ONLY
还可以执行rdd.unpersist()清除缓存

import org.apache.spark.storage._ val rdd = sc.makeRDD(1 to 100000) rdd.persist(StorageLevel.MEMORY_AND_DISK_SER_2) rdd.take(15) rdd.unpersist()

提示:其实缓存 cache() 底层就是调用的persist()的无参版本,执行的是:persist(MEMORY_ONLY)

对于persist()方法而言,我们可以根据不同的业务场景选择不同的持久化级别,具体介绍如下表:

持久化级别 含义
MEMORY_ONLY 使用未序列化的Java对象格式,将数据保存在内存中。如果内存不够存放所有的数据,则数据可能就不会进行持久化。那么下次对这个RDD执行算子操作时,那些没有被持久化的数据,需要从源头处重新计算一遍。这是默认的持久化策略,使用cache()方法时,实际就是使用的这种持久化策略。
MEMORY_AND_DISK 使用未序列化的Java对象格式,优先尝试将数据保存在内存中。如果内存不够存放所有的数据,会将数据写入磁盘文件中,下次对这个RDD执行算子时,持久化在磁盘文件中的数据会被读取出来使用。
MEMORY_ONLY_SER 基本含义同MEMORY_ONLY。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。
MEMORY_AND_DISK_SER 基本含义同MEMORY_AND_DISK。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。
DISK_ONLY 使用未序列化的Java对象格式,将数据全部写入磁盘文件中。
MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等 对于上述任意一种持久化策略,如果加上后缀_2,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上。这种基于副本的持久化机制主要用于进行容错。假如某个节点挂掉,节点的内存或磁盘中的持久化数据丢失了,那么后续对RDD计算时还可以使用该数据在其他节点上的副本。如果没有副本的话,就只能将这些数据从源头处重新计算一遍了。
持久化对于性能调优的原则

  1. 尽可能地去复用RDD,差不多的RDD,可以抽取称为一个共同的RDD,反复使用,比如拥有一个key-value型RDD,后面又需要用到拥有同样value的RDD,则可以复用key-value型的RDD即可。
  2. 对于要多次计算和使用的公共RDD,一定要进行持久化。
  3. 持久化,是可以进行序列化的。如果正常将数据持久化在内存中,那么可能会导致内存的占用过大,进而可能会导致OOM内存溢出。此时就可以选择序列化的方式在纯内存中存储。将RDD的每个partition的数据,序列化成一个大的字节数组,可以大大减少内存的空间占用。 序列化的方式,唯一的缺点就是:在获取数据的时候,机器内部需要反序列化。 如果序列化纯内存方式,还是导致OOM,内存溢出;就只能考虑磁盘的方式,内存+磁盘的普通方式(无序列化或者序列化)。
  4. 如果要求数据的高可靠性,可以使用双副本机制进行持久化。一个副本丢了,不用重新计算,还可以使用另外一份副本
4. persist()的两个坑
请参考此教程:关于 Spark persist() 的两个坑

0xFF 总结
  1. 能够转化算子的时候尽量使用转化算子,少用行动算子,这是性能调优的一个小技巧。
  2. 其实能不使用 DISK 相关的持久化策略,就不要使用,有时从磁盘里读取数据,还不如重新计算一次。
  3. 请继续学习本博客其他教程!


 

有关Spark RDD算子进阶(转换算子、行动算子、缓存、持久化)的更多相关文章

  1. ruby-on-rails - 在 Rails 中将文件大小字符串转换为等效千字节 - 2

    我的目标是转换表单输入,例如“100兆字节”或“1GB”,并将其转换为我可以存储在数据库中的文件大小(以千字节为单位)。目前,我有这个:defquota_convert@regex=/([0-9]+)(.*)s/@sizes=%w{kilobytemegabytegigabyte}m=self.quota.match(@regex)if@sizes.include?m[2]eval("self.quota=#{m[1]}.#{m[2]}")endend这有效,但前提是输入是倍数(“gigabytes”,而不是“gigabyte”)并且由于使用了eval看起来疯狂不安全。所以,功能正常,

  2. ruby - 使用 ruby​​ 将 HTML 转换为纯文本并维护结构/格式 - 2

    我想将html转换为纯文本。不过,我不想只删除标签,我想智能地保留尽可能多的格式。为插入换行符标签,检测段落并格式化它们等。输入非常简单,通常是格式良好的html(不是整个文档,只是一堆内容,通常没有anchor或图像)。我可以将几个正则表达式放在一起,让我达到80%,但我认为可能有一些现有的解决方案更智能。 最佳答案 首先,不要尝试为此使用正则表达式。很有可能你会想出一个脆弱/脆弱的解决方案,它会随着HTML的变化而崩溃,或者很难管理和维护。您可以使用Nokogiri快速解析HTML并提取文本:require'nokogiri'h

  3. ruby - 将数组的内容转换为 int - 2

    我需要读入一个包含数字列表的文件。此代码读取文件并将其放入二维数组中。现在我需要获取数组中所有数字的平均值,但我需要将数组的内容更改为int。有什么想法可以将to_i方法放在哪里吗?ClassTerraindefinitializefile_name@input=IO.readlines(file_name)#readinfile@size=@input[0].to_i@land=[@size]x=1whilex 最佳答案 只需将数组映射为整数:@land边注如果你想得到一条线的平均值,你可以这样做:values=@input[x]

  4. ruby - 将散列转换为嵌套散列 - 2

    这道题是thisquestion的逆题.给定一个散列,每个键都有一个数组,例如{[:a,:b,:c]=>1,[:a,:b,:d]=>2,[:a,:e]=>3,[:f]=>4,}将其转换为嵌套哈希的最佳方法是什么{:a=>{:b=>{:c=>1,:d=>2},:e=>3,},:f=>4,} 最佳答案 这是一个迭代的解决方案,递归的解决方案留给读者作为练习:defconvert(h={})ret={}h.eachdo|k,v|node=retk[0..-2].each{|x|node[x]||={};node=node[x]}node[

  5. ruby-on-rails - Rails 模型——非持久类成员或属性? - 2

    对于Rails模型,是否可以/建议让一个类的成员不持久保存到数据库中?我想将用户最后选择的类型存储在session变量中。由于我无法从我的模型中设置session变量,我想将值存储在一个“虚拟”类成员中,该成员只是将值传递回Controller。你能有这样的类(class)成员吗? 最佳答案 将非持久属性添加到Rails模型就像任何其他Ruby类一样:classUser扩展解释:在Ruby中,所有实例变量都是私有(private)的,不需要在赋值前定义。attr_accessor创建一个setter和getter方法:classUs

  6. ruby - 如何在 Ubuntu 中清除 Ruby Phusion Passenger 的缓存? - 2

    我试过重新启动apache,缓存的页面仍然出现,所以一定有一个文件夹在某个地方。我没有“公共(public)/缓存”,那么我还应该查看哪些其他地方?是否有一个URL标志也可以触发此效果? 最佳答案 您需要触摸一个文件才能清除phusion,例如:touch/webapps/mycook/tmp/restart.txt参见docs 关于ruby-如何在Ubuntu中清除RubyPhusionPassenger的缓存?,我们在StackOverflow上找到一个类似的问题:

  7. ruby-on-rails - Ruby url 到 html 链接转换 - 2

    我正在使用Rails构建一个简单的聊天应用程序。当用户输入url时,我希望将其输出为html链接(即“url”)。我想知道在Ruby中是否有任何库或众所周知的方法可以做到这一点。如果没有,我有一些不错的正则表达式示例代码可以使用... 最佳答案 查看auto_linkRails提供的辅助方法。这会将所有URL和电子邮件地址变成可点击的链接(htmlanchor标记)。这是文档中的代码示例。auto_link("Gotohttp://www.rubyonrails.organdsayhellotodavid@loudthinking.

  8. ruby-on-rails - Ruby on Rails 计数器缓存错误 - 2

    尝试在我的RoR应用程序中实现计数器缓存列时出现错误Unknownkey(s):counter_cache。我在这个问题中实现了模型关联:Modelassociationquestion这是我的迁移:classAddVideoVotesCountToVideos0Video.reset_column_informationVideo.find(:all).eachdo|p|p.update_attributes:videos_votes_count,p.video_votes.lengthendenddefself.downremove_column:videos,:video_vot

  9. ruby-on-rails - 使用 ruby​​ 将多个实例变量转换为散列的更好方法? - 2

    我收到格式为的回复#我需要将其转换为哈希值(针对活跃商家)。目前我正在遍历变量并执行此操作:response.instance_variables.eachdo|r|my_hash.merge!(r.to_s.delete("@").intern=>response.instance_eval(r.to_s.delete("@")))end这有效,它将生成{:first="charlie",:last=>"kelly"},但它似乎有点hacky和不稳定。有更好的方法吗?编辑:我刚刚意识到我可以使用instance_variable_get作为该等式的第二部分,但这仍然是主要问题。

  10. python ffmpeg 使用 pyav 转换 一组图像 到 视频 - 2

    2022/8/4更新支持加入水印水印必须包含透明图像,并且水印图像大小要等于原图像的大小pythonconvert_image_to_video.py-f30-mwatermark.pngim_dirout.mkv2022/6/21更新让命令行参数更加易用新的命令行使用方法pythonconvert_image_to_video.py-f30im_dirout.mkvFFMPEG命令行转换一组JPG图像到视频时,是将这组图像视为MJPG流。我需要转换一组PNG图像到视频,FFMPEG就不认了。pyav内置了ffmpeg库,不需要系统带有ffmpeg工具因此我使用ffmpeg的python包装p

随机推荐