val rdd = sc.parallelize(List((1,1),(2,1),(3,1),(3,4)))
rdd.reduceByKey((x,y) => x+y).collect()

(3,1),(3,4),所以合并成了(3,5)
rdd.groupByKey().collect()

(3,1),(3,4),进行了分组。CompactBuffer 不是 Scala 里定义的数据结构,而是 Spark 里的数据结构,它继承自一个迭代器和序列,其它的返回值是一个很容易进行循环遍历的集合。
rdd.mapValues(x => x*3).collect()

x => x*3 操作,相当于值为 x 。
rdd.flatMapValues(x => (x to 4)).collect()
代码解释:键不变,对值都进行了 x => (x to 4) 操作,相当于值为 x 。
rdd.keys.collect()

keys ,不是keys() 。
rdd.values.collect()

values ,不是values() 。
rdd.sortByKey().collect()

groupByKey,reduceByKey 等等。
combineByKey()在遍历分区的所有元素时,主要有两种情况:createCombiner()函数来创建该键对应累加器的初始值。
注意:是在每一个分区中第一次出现新键的时候创建,而不是在整个 RDD 中。
在当前分区中,如果遇到该键是已经存在的键,那么就调用 第二个 参数 mergeValue()方法将该键对应累加器的当前值与这个新的值合并。
因为有多个分区,而且每个分区都是独立处理的,所以最后需要调用 第三个mergeCombiners()方法将各个分区的结果进行合并。
combineByKey的源码,看不懂没关系:
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)
请看下面的例子(根据相同键,计算其所有值的平均值):
val cbRDD = sc.parallelize(Seq(("a", 1), ("a", 2), ("a", 3), ("b", 2), ("b", 5)))
val result = cbRDD.combineByKey(
// 分区内遇到新的键时,创建一个(累加值,出现次数)的键值对
(v) => (v, 1),
// 分区内遇到已经创建过的相应累加器的旧键时,更新对应累加器
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
// 多个分区遇到同一个键的累加器,更新主累加器
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
// 求平均值
).map{ case (key, value) => (key, value._1 / value._2.toFloat) }
// 输出结果
result.collectAsMap().foreach(println(_))
遍历到第一个元素: ("a", 1)时,因为此元素肯定没出现过,所以调用的是第一个参数:(v) => (v, 1),创建一个(累加值,出现次数)的键值对,此处的累加值为1,因为要计算平均值,所以此键值对为:(1,1)。
遍历到第二个元素:("a", 2)时,发现我们的 key 已经在遍历第一个元素时出现过了,所以,需要调用第二个参数:(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),acc 表示已经存在的(累加值,出现次数)键值对,acc._1 表示键值对的键(即为1),acc._2 表示键值对的值(即为1)。第二个元素的累加值为2,所以,此处(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1)得到的结果应该是:(1+2,1+1),即(3,2)。
遍历到第三个元素:("a", 3)时,发现 key 也已经存在了,一样是调用第二个参数。结果为:(3+3,2+1),即(6,3)。
以此类似,扫描完所有元素,扫描完后需要对多个分区的数据进行合并,调用第三个参数:(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2),acc1、acc2表示具有相同键的键值对,比如"a"在一个分区的结果为:(6,3),一个分区为:(2,3),则进行相加,得到结果:(6+2,3+3),即(8,6)才是最终的结果(此比喻与例子无关)。
最后进行 map 算子操作,将 key 映射回来:.map{ case (key, value) => (key, value._1 / value._2.toFloat) },此处的value指的是我们前面所统计的键值对结果,比如(8,6),8为累加之和,6为一共有多少个数。
最后打印结果,如图:
键与 rdd2的键相同的元素。
val rdd1= sc.parallelize(List((1,2),(2,3),(2,4)))
val rdd2= sc.parallelize(List((2,4),(3,5)))
rdd1.subtractByKey(rdd2).collect()

2的元素,rdd2中也有,所以,删除rdd1中的两个元素:(2,3),(2,4),最后剩下一个元素(1,2)。
键的数据分组到一起。
rdd1.cogroup(rdd2).collect().foreach(println(_))

val rdd = sc.parallelize(List((1,2),(2,3),(2,4),(3,5)))
rdd.countByKey().foreach(println(_))

1出现了1次,2出现了2次,3出现了1次。
rdd.lookup(2)
rdd.collectAsMap()
0x02 RDD的缓存与持久化
缓存,或者持久化到内存或者磁盘中。
cache()方法进行缓存,即在集群相关节点的内存中进行缓存。
首先,我们需要引入相关的模块:
import org.apache.spark.storage._
注意:务必先启动HDFS再执行下面的代码!
put.txt 文件为:
shao nai yi
nai nai yi yi
shao nai nai
val textFileRDD = sc.textFile("hdfs://master:9999/files/put.txt")
val wordRDD = textFileRDD.flatMap(line => line.split(" "))
val pairWordRDD = wordRDD.map(word => (word, 1))
val wordCountRDD = pairWordRDD.reduceByKey((a, b) => a + b)
wordCountRDD.cache() // 这里还没有执行缓存
wordCountRDD.collect().foreach(println) // 遇到行动算子操作才真正开始计算RDD并缓存
persist()函数来进行持久化,一般默认的存储空间是在内存中,如果内存不够就会写入磁盘中。persist 持久化分为不同的等级,还可以在存储等级的末尾加上_2用于把持久化的数据存为 2 份,避免数据丢失。
下面的列表列出了不同的持久化等级:
| 级别 | 使用的空间 | 是否在内存中 | 是否在磁盘上 |
|---|---|---|---|
| MEMORY_ONLY | 高 | 是 | 否 |
| MEMORY_ONLY_SER | 低 | 是 | 否 |
| MEMORY_AND_DISK | 高 | 部分 | 部分 |
| MEMORY_AND_DISK_SER | 低 | 部分 | 部分 |
| DISK_ONLY | 低 | 否 | 是 |
rdd.unpersist()清除缓存
import org.apache.spark.storage._
val rdd = sc.makeRDD(1 to 100000)
rdd.persist(StorageLevel.MEMORY_AND_DISK_SER_2)
rdd.take(15)
rdd.unpersist()
提示:其实缓存 cache() 底层就是调用的persist()的无参版本,执行的是:persist(MEMORY_ONLY)。
对于persist()方法而言,我们可以根据不同的业务场景选择不同的持久化级别,具体介绍如下表:
| 持久化级别 | 含义 |
|---|---|
| MEMORY_ONLY | 使用未序列化的Java对象格式,将数据保存在内存中。如果内存不够存放所有的数据,则数据可能就不会进行持久化。那么下次对这个RDD执行算子操作时,那些没有被持久化的数据,需要从源头处重新计算一遍。这是默认的持久化策略,使用cache()方法时,实际就是使用的这种持久化策略。 |
| MEMORY_AND_DISK | 使用未序列化的Java对象格式,优先尝试将数据保存在内存中。如果内存不够存放所有的数据,会将数据写入磁盘文件中,下次对这个RDD执行算子时,持久化在磁盘文件中的数据会被读取出来使用。 |
| MEMORY_ONLY_SER | 基本含义同MEMORY_ONLY。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。 |
| MEMORY_AND_DISK_SER | 基本含义同MEMORY_AND_DISK。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。 |
| DISK_ONLY | 使用未序列化的Java对象格式,将数据全部写入磁盘文件中。 |
| MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等 | 对于上述任意一种持久化策略,如果加上后缀_2,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上。这种基于副本的持久化机制主要用于进行容错。假如某个节点挂掉,节点的内存或磁盘中的持久化数据丢失了,那么后续对RDD计算时还可以使用该数据在其他节点上的副本。如果没有副本的话,就只能将这些数据从源头处重新计算一遍了。 |
共同的RDD,反复使用,比如拥有一个key-value型RDD,后面又需要用到拥有同样value的RDD,则可以复用key-value型的RDD即可。序列化的。如果正常将数据持久化在内存中,那么可能会导致内存的占用过大,进而可能会导致OOM内存溢出。此时就可以选择序列化的方式在纯内存中存储。将RDD的每个partition的数据,序列化成一个大的字节数组,可以大大减少内存的空间占用。 序列化的方式,唯一的缺点就是:在获取数据的时候,机器内部需要反序列化。 如果序列化纯内存方式,还是导致OOM,内存溢出;就只能考虑磁盘的方式,内存+磁盘的普通方式(无序列化或者序列化)。双副本机制进行持久化。一个副本丢了,不用重新计算,还可以使用另外一份副本。DISK 相关的持久化策略,就不要使用,有时从磁盘里读取数据,还不如重新计算一次。我的目标是转换表单输入,例如“100兆字节”或“1GB”,并将其转换为我可以存储在数据库中的文件大小(以千字节为单位)。目前,我有这个:defquota_convert@regex=/([0-9]+)(.*)s/@sizes=%w{kilobytemegabytegigabyte}m=self.quota.match(@regex)if@sizes.include?m[2]eval("self.quota=#{m[1]}.#{m[2]}")endend这有效,但前提是输入是倍数(“gigabytes”,而不是“gigabyte”)并且由于使用了eval看起来疯狂不安全。所以,功能正常,
我想将html转换为纯文本。不过,我不想只删除标签,我想智能地保留尽可能多的格式。为插入换行符标签,检测段落并格式化它们等。输入非常简单,通常是格式良好的html(不是整个文档,只是一堆内容,通常没有anchor或图像)。我可以将几个正则表达式放在一起,让我达到80%,但我认为可能有一些现有的解决方案更智能。 最佳答案 首先,不要尝试为此使用正则表达式。很有可能你会想出一个脆弱/脆弱的解决方案,它会随着HTML的变化而崩溃,或者很难管理和维护。您可以使用Nokogiri快速解析HTML并提取文本:require'nokogiri'h
我需要读入一个包含数字列表的文件。此代码读取文件并将其放入二维数组中。现在我需要获取数组中所有数字的平均值,但我需要将数组的内容更改为int。有什么想法可以将to_i方法放在哪里吗?ClassTerraindefinitializefile_name@input=IO.readlines(file_name)#readinfile@size=@input[0].to_i@land=[@size]x=1whilex 最佳答案 只需将数组映射为整数:@land边注如果你想得到一条线的平均值,你可以这样做:values=@input[x]
这道题是thisquestion的逆题.给定一个散列,每个键都有一个数组,例如{[:a,:b,:c]=>1,[:a,:b,:d]=>2,[:a,:e]=>3,[:f]=>4,}将其转换为嵌套哈希的最佳方法是什么{:a=>{:b=>{:c=>1,:d=>2},:e=>3,},:f=>4,} 最佳答案 这是一个迭代的解决方案,递归的解决方案留给读者作为练习:defconvert(h={})ret={}h.eachdo|k,v|node=retk[0..-2].each{|x|node[x]||={};node=node[x]}node[
对于Rails模型,是否可以/建议让一个类的成员不持久保存到数据库中?我想将用户最后选择的类型存储在session变量中。由于我无法从我的模型中设置session变量,我想将值存储在一个“虚拟”类成员中,该成员只是将值传递回Controller。你能有这样的类(class)成员吗? 最佳答案 将非持久属性添加到Rails模型就像任何其他Ruby类一样:classUser扩展解释:在Ruby中,所有实例变量都是私有(private)的,不需要在赋值前定义。attr_accessor创建一个setter和getter方法:classUs
我试过重新启动apache,缓存的页面仍然出现,所以一定有一个文件夹在某个地方。我没有“公共(public)/缓存”,那么我还应该查看哪些其他地方?是否有一个URL标志也可以触发此效果? 最佳答案 您需要触摸一个文件才能清除phusion,例如:touch/webapps/mycook/tmp/restart.txt参见docs 关于ruby-如何在Ubuntu中清除RubyPhusionPassenger的缓存?,我们在StackOverflow上找到一个类似的问题:
我正在使用Rails构建一个简单的聊天应用程序。当用户输入url时,我希望将其输出为html链接(即“url”)。我想知道在Ruby中是否有任何库或众所周知的方法可以做到这一点。如果没有,我有一些不错的正则表达式示例代码可以使用... 最佳答案 查看auto_linkRails提供的辅助方法。这会将所有URL和电子邮件地址变成可点击的链接(htmlanchor标记)。这是文档中的代码示例。auto_link("Gotohttp://www.rubyonrails.organdsayhellotodavid@loudthinking.
尝试在我的RoR应用程序中实现计数器缓存列时出现错误Unknownkey(s):counter_cache。我在这个问题中实现了模型关联:Modelassociationquestion这是我的迁移:classAddVideoVotesCountToVideos0Video.reset_column_informationVideo.find(:all).eachdo|p|p.update_attributes:videos_votes_count,p.video_votes.lengthendenddefself.downremove_column:videos,:video_vot
我收到格式为的回复#我需要将其转换为哈希值(针对活跃商家)。目前我正在遍历变量并执行此操作:response.instance_variables.eachdo|r|my_hash.merge!(r.to_s.delete("@").intern=>response.instance_eval(r.to_s.delete("@")))end这有效,它将生成{:first="charlie",:last=>"kelly"},但它似乎有点hacky和不稳定。有更好的方法吗?编辑:我刚刚意识到我可以使用instance_variable_get作为该等式的第二部分,但这仍然是主要问题。
2022/8/4更新支持加入水印水印必须包含透明图像,并且水印图像大小要等于原图像的大小pythonconvert_image_to_video.py-f30-mwatermark.pngim_dirout.mkv2022/6/21更新让命令行参数更加易用新的命令行使用方法pythonconvert_image_to_video.py-f30im_dirout.mkvFFMPEG命令行转换一组JPG图像到视频时,是将这组图像视为MJPG流。我需要转换一组PNG图像到视频,FFMPEG就不认了。pyav内置了ffmpeg库,不需要系统带有ffmpeg工具因此我使用ffmpeg的python包装p