jjzjj

PCollection

全部标签

java - Apache Beam - 与无限 PCollection 的集成测试

我们正在为ApacheBeam管道构建集成测试,但遇到了一些问题。有关上下文,请参见下文...关于我们管道的详细信息:我们使用PubsubIO作为我们的数据源(无界PCollection)中间转换包括自定义CombineFn和非常简单的窗口/触发策略我们最后的转换是JdbcIO,使用org.neo4j.jdbc.Driver写入Neo4j目前的测试方法:在运行测试的机器上运行GoogleCloud的Pub/Sub模拟器构建内存中的Neo4j数据库并将其URI传递到我们的管道选项中通过调用OurPipeline.main(TestPipeline.convertToArgs(option

redis - 有没有办法使用内置的 Apache beam Redis I/O 转换执行 Redis GET 命令?

我对GoogleCloudDataflow的用例是在管道期间使用Redis作为缓存,因为要发生的转换取决于一些缓存数据。这意味着执行RedisGET命令。官方内置RedisI/O转换的文档提到支持几种方法:read-“提供一个源,它返回一个包含键/值对作为KV的有界PCollection”readAll-“可用于使用输入PCollection元素作为键模式(作为字符串)来请求Redis服务器”虽然readAll似乎不对应于GET命令,因为输入PCollection将用于过滤扫描整个Redis源的结果,所以这不是我要找的。我想知道在查看支持我的用例的内置I/O转换时是否遗漏了什么,或者是

redis - 有没有办法使用内置的 Apache beam Redis I/O 转换执行 Redis GET 命令?

我对GoogleCloudDataflow的用例是在管道期间使用Redis作为缓存,因为要发生的转换取决于一些缓存数据。这意味着执行RedisGET命令。官方内置RedisI/O转换的文档提到支持几种方法:read-“提供一个源,它返回一个包含键/值对作为KV的有界PCollection”readAll-“可用于使用输入PCollection元素作为键模式(作为字符串)来请求Redis服务器”虽然readAll似乎不对应于GET命令,因为输入PCollection将用于过滤扫描整个Redis源的结果,所以这不是我要找的。我想知道在查看支持我的用例的内置I/O转换时是否遗漏了什么,或者是

java - 在 Apache Crunch 中,如何确定 PCollection 或 PTable 中是否包含任何元素?如果有,有多少?

我试图设置一个断点并在监window口中执行以下操作:检查.getSize()应该以字节为单位返回大小。和.materialize()看看我是否可以查看java对象。.getSize()确实显示了一个>0的数字,但我怀疑这是否应该是PTable具有元素的指示器。.materialize()没有显示任何元素的存在。提前致谢。 最佳答案 与其依赖PCollection.size()方法来检查您的集合是否为空,您应该使用PCollection.length(),它的作用正是你需要。 关于jav

关于谷歌云数据流:如何使用 Apache Beam 在 Python 中将有界 pcollection 转换为无界?

HowtotransformboundedpcollectiontounboundedinPythonwithApacheBeam?我正在尝试在不使用太多内存的情况下转换存储在GCS中的几TB邮件日志。按照指南中的建议,我为每个元素添加时间戳,将其拆分为滑动窗口,并在将其发送到GroupByKey和之后的ParDo解析器之前指定了一个(聚合)触发器。这应该可以,但仍然GroupByKey等待所有数据到达。为什么?我也尝试过使用Direct和GoogleDataflowrunner。我错过了什么?这是代码的要点:123456789101112131415161718192021222324252

关于谷歌云数据流:如何使用 Apache Beam 在 Python 中将有界 pcollection 转换为无界?

HowtotransformboundedpcollectiontounboundedinPythonwithApacheBeam?我正在尝试在不使用太多内存的情况下转换存储在GCS中的几TB邮件日志。按照指南中的建议,我为每个元素添加时间戳,将其拆分为滑动窗口,并在将其发送到GroupByKey和之后的ParDo解析器之前指定了一个(聚合)触发器。这应该可以,但仍然GroupByKey等待所有数据到达。为什么?我也尝试过使用Direct和GoogleDataflowrunner。我错过了什么?这是代码的要点:123456789101112131415161718192021222324252