我们正在与 spark 1.6 合作我们正在努力保持类似事件的全局身份。可以有几个具有相同 ID 的事件“组”(在示例中为数字。添加字母只是为了唯一性)。我们知道其中一些事件是相似的,因此我们能够将它们联系起来。我们想保留这样的东西:
Z -> 1, 2, 3
X -> 4
所以将来如果有 id 为 4 的事件发生,我们可以分配 X作为全局身份。
请检查示例以获得更好的说明:
假设我们有一些流数据进入 spark 作业。
1a
1b
2c
2d
2e
3f
3g
3h
4i
由于事件 1 是我们的第一次亮相,我们要分配 1 to Z .
接下来我们知道 1b 和 2c 是相似的。所以我们想保留在某个地方 2->1映射。 2e 和 3f 也是一样,所以我们需要映射 3-2 .所以现在我们有 3 对 1->Z , 2->1 , 3->2 .
我们要创建“历史”路径:Z <- 1 <- 2 <- 3
最后,我们将使用 ID = Z 处理所有事件。 .
1a -> Z
1b -> Z
2c -> Z
2d -> Z
2e -> Z
3f -> Z
3g -> Z
3h -> Z
4i -> X
我们尝试使用 mapwithstate但我们唯一能做的就是2->1和 3->2 .与 mapwithstate我们无法在当前事件的状态中获取“父级”的状态 - 例如。当前事件 3 与父 2 无法获得 2 -> 1也不是1 -> Z .
是否可以为此进行一些全局映射?我们已经尝试过累加器和广播,但看起来不太合适。我们无法用 Z 替换第一个映射的事件 1 和第二个映射的事件 2| .
如果有新事件5会来,它与 3h 类似,例如我们需要分配映射 5->Z再次。
最佳答案
接下来是给定问题的解决方案,使用对“状态”RDD 的可变引用,我们每次都会用新结果更新它。
我们使用transform 通过执行相似性连接,用唯一的全局 id 标记传入的事件流。这是“手动”连接,我们使用两个数据集的乘积并成对比较每个条目。
请注意,这是一个昂贵的过程。有许多部分可以更改,具体取决于预期流的具体特征。例如,我们可以将全局状态 RDD 替换为本地 map 并应用 map-side 连接以获得更快的相似性连接,但这在很大程度上取决于预期的基数唯一 ID 集。
这比我原先预期的要棘手。仅将其作为迈向更强大解决方案的起点。例如,状态 RDD 上的 union 操作需要定期检查点,以避免 DAG 超出控制。
(有很大的改进空间 - 但这超出了提供答案的合理努力。)
我在这里勾勒出解决方案的核心,完整的测试笔记本请参见 UniqueGlobalStateChains.snb
// this mutable reference points to the `states` that we keep across interations
@transient var states: RDD[(String, (Int, Long))] = sparkContext.emptyRDD
// we assume an incoming Event stream. Here we prepare it for the global id-process
@transient val eventsById = eventStream.map(event => (event.id, event))
@transient val groupedEvents = eventsById.groupByKey()
// this is the core of the solution.
// We transform the incoming events into tagged events.
// As a by-product, the mutable `states` reference will get updated with the latest state mapping.
// the "chain" of events can be reconstructed ordering the states by timestamp
@transient val taggedEvents = groupedEvents.transform{ (events, currentTime) =>
val currentTransitions = states.reduceByKey{case (event1, event2) => Seq(event1, event2).maxBy{case (id, ts) => ts}}
val currentMappings = currentTransitions.map{case (globalId, (currentId, maxTx)) => (currentId, globalId)}
val newEventIds = events.keys // let's extract the ids of the incoming (grouped) events
val similarityJoinMap = newEventIds.cartesian(currentMappings)
.collect{case (eventId, (currentId, globalId)) if (isSimilar(currentId)(eventId)) => (eventId, globalId)}
.collectAsMap
//val similarityBC = sparkContext.broadcast(similarityJoinMap)
val newGlobalKeys = newEventIds.map(id => (id, similarityJoinMap.getOrElse(id, genGlobalId())))
newGlobalKeys.cache() //avoid lazy evaluation to generate multiple global ids
val newTaggedEvents = events.join(newGlobalKeys).flatMap{case (eventId, (events, globalKey)) =>
events.map(event => (event.id,event.payload, globalKey))
}
val newStates = newGlobalKeys.map{case (eventId, globalKey) => (globalKey, (eventId, currentTime.milliseconds))}
currentState = newStates
states.unpersist(false)
states = newStates.union(states)
states.cache()
newTaggedEvents
}
给定这个输入序列:
"1|a,1|b,3|c", "2|d,2|e,2|f", "3|g,3|h,3|i,4|j", "5|k", "4|f,1|g", "6|h"
我们得到:
具有全局 ID 的标记事件:
---
1|a: gen-4180,1|b: gen-4180,3|c: gen-5819
---
2|d: gen-4180,2|e: gen-4180,2|f: gen-4180
---
3|g: gen-4180,3|h: gen-4180,3|i: gen-4180,4|j: gen-5819
---
5|k: gen-5819
---
1|g: gen-2635,4|f: gen-4180
---
6|h: gen-5819
我们可以重建从全局 id 派生的事件链:
gen-4180: 1<-2<-3<-4
gen-2635: 1
gen-5819: 3<-4<-5<-6
-o-
关于java - 为不同的事件构建状态链并在 spark 中分配全局 ID,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45191359/
我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/
我正在尝试使用boilerpipe来自JRuby。我看过guide从JRuby调用Java,并成功地将它与另一个Java包一起使用,但无法弄清楚为什么同样的东西不能用于boilerpipe。我正在尝试基本上从JRuby中执行与此Java等效的操作:URLurl=newURL("http://www.example.com/some-location/index.html");Stringtext=ArticleExtractor.INSTANCE.getText(url);在JRuby中试过这个:require'java'url=java.net.URL.new("http://www
我只想对我一直在思考的这个问题有其他意见,例如我有classuser_controller和classuserclassUserattr_accessor:name,:usernameendclassUserController//dosomethingaboutanythingaboutusersend问题是我的User类中是否应该有逻辑user=User.newuser.do_something(user1)oritshouldbeuser_controller=UserController.newuser_controller.do_something(user1,user2)我
什么是ruby的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht
是否有简单的方法来更改默认ISO格式(yyyy-mm-dd)的ActiveAdmin日期过滤器显示格式? 最佳答案 您可以像这样为日期选择器提供额外的选项,而不是覆盖js:=f.input:my_date,as::datepicker,datepicker_options:{dateFormat:"mm/dd/yy"} 关于ruby-on-rails-事件管理员日期过滤器日期格式自定义,我们在StackOverflow上找到一个类似的问题: https://s
在编写Ruby(客户端脚本)时,我看到了三种构建更长字符串的方法,包括行尾,所有这些对我来说“闻起来”有点难看。有没有更干净、更好的方法?变量递增。ifrender_quote?quote="NowthatthereistheTec-9,acrappyspraygunfromSouthMiami."quote+="ThisgunisadvertisedasthemostpopularguninAmericancrime.Doyoubelievethatshit?"quote+="Itactuallysaysthatinthelittlebookthatcomeswithit:themo
这篇文章是继上一篇文章“Observability:从零开始创建Java微服务并监控它(一)”的续篇。在上一篇文章中,我们讲述了如何创建一个Javaweb应用,并使用Filebeat来收集应用所生成的日志。在今天的文章中,我来详述如何收集应用的指标,使用APM来监控应用并监督web服务的在线情况。源码可以在地址 https://github.com/liu-xiao-guo/java_observability 进行下载。摄入指标指标被视为可以随时更改的时间点值。当前请求的数量可以改变任何毫秒。你可能有1000个请求的峰值,然后一切都回到一个请求。这也意味着这些指标可能不准确,你还想提取最小/
HashMap中为什么引入红黑树,而不是AVL树呢1.概述开始学习这个知识点之前我们需要知道,在JDK1.8以及之前,针对HashMap有什么不同。JDK1.7的时候,HashMap的底层实现是数组+链表JDK1.8的时候,HashMap的底层实现是数组+链表+红黑树我们要思考一个问题,为什么要从链表转为红黑树呢。首先先让我们了解下链表有什么不好???2.链表上述的截图其实就是链表的结构,我们来看下链表的增删改查的时间复杂度增:因为链表不是线性结构,所以每次添加的时候,只需要移动一个节点,所以可以理解为复杂度是N(1)删:算法时间复杂度跟增保持一致查:既然是非线性结构,所以查询某一个节点的时候
遍历文件夹我们通常是使用递归进行操作,这种方式比较简单,也比较容易理解。本文为大家介绍另一种不使用递归的方式,由于没有使用递归,只用到了循环和集合,所以效率更高一些!一、使用递归遍历文件夹整体思路1、使用File封装初始目录,2、打印这个目录3、获取这个目录下所有的子文件和子目录的数组。4、遍历这个数组,取出每个File对象4-1、如果File是否是一个文件,打印4-2、否则就是一个目录,递归调用代码实现publicclassSearchFile{publicstaticvoidmain(String[]args){//初始目录Filedir=newFile("d:/Dev");Datebeg
我正在尝试将以下SQL查询转换为ActiveRecord,它正在融化我的大脑。deletefromtablewhereid有什么想法吗?我想做的是限制表中的行数。所以,我想删除少于最近10个条目的所有内容。编辑:通过结合以下几个答案找到了解决方案。Temperature.where('id这给我留下了最新的10个条目。 最佳答案 从您的SQL来看,您似乎想要从表中删除前10条记录。我相信到目前为止的大多数答案都会如此。这里有两个额外的选择:基于MurifoX的版本:Table.where(:id=>Table.order(:id).