我们正在为ApacheBeam管道构建集成测试,但遇到了一些问题。有关上下文,请参见下文...关于我们管道的详细信息:我们使用PubsubIO作为我们的数据源(无界PCollection)中间转换包括自定义CombineFn和非常简单的窗口/触发策略我们最后的转换是JdbcIO,使用org.neo4j.jdbc.Driver写入Neo4j目前的测试方法:在运行测试的机器上运行GoogleCloud的Pub/Sub模拟器构建内存中的Neo4j数据库并将其URI传递到我们的管道选项中通过调用OurPipeline.main(TestPipeline.convertToArgs(option
我对GoogleCloudDataflow的用例是在管道期间使用Redis作为缓存,因为要发生的转换取决于一些缓存数据。这意味着执行RedisGET命令。官方内置RedisI/O转换的文档提到支持几种方法:read-“提供一个源,它返回一个包含键/值对作为KV的有界PCollection”readAll-“可用于使用输入PCollection元素作为键模式(作为字符串)来请求Redis服务器”虽然readAll似乎不对应于GET命令,因为输入PCollection将用于过滤扫描整个Redis源的结果,所以这不是我要找的。我想知道在查看支持我的用例的内置I/O转换时是否遗漏了什么,或者是
我对GoogleCloudDataflow的用例是在管道期间使用Redis作为缓存,因为要发生的转换取决于一些缓存数据。这意味着执行RedisGET命令。官方内置RedisI/O转换的文档提到支持几种方法:read-“提供一个源,它返回一个包含键/值对作为KV的有界PCollection”readAll-“可用于使用输入PCollection元素作为键模式(作为字符串)来请求Redis服务器”虽然readAll似乎不对应于GET命令,因为输入PCollection将用于过滤扫描整个Redis源的结果,所以这不是我要找的。我想知道在查看支持我的用例的内置I/O转换时是否遗漏了什么,或者是
我试图设置一个断点并在监window口中执行以下操作:检查.getSize()应该以字节为单位返回大小。和.materialize()看看我是否可以查看java对象。.getSize()确实显示了一个>0的数字,但我怀疑这是否应该是PTable具有元素的指示器。.materialize()没有显示任何元素的存在。提前致谢。 最佳答案 与其依赖PCollection.size()方法来检查您的集合是否为空,您应该使用PCollection.length(),它的作用正是你需要。 关于jav
HowtotransformboundedpcollectiontounboundedinPythonwithApacheBeam?我正在尝试在不使用太多内存的情况下转换存储在GCS中的几TB邮件日志。按照指南中的建议,我为每个元素添加时间戳,将其拆分为滑动窗口,并在将其发送到GroupByKey和之后的ParDo解析器之前指定了一个(聚合)触发器。这应该可以,但仍然GroupByKey等待所有数据到达。为什么?我也尝试过使用Direct和GoogleDataflowrunner。我错过了什么?这是代码的要点:123456789101112131415161718192021222324252
HowtotransformboundedpcollectiontounboundedinPythonwithApacheBeam?我正在尝试在不使用太多内存的情况下转换存储在GCS中的几TB邮件日志。按照指南中的建议,我为每个元素添加时间戳,将其拆分为滑动窗口,并在将其发送到GroupByKey和之后的ParDo解析器之前指定了一个(聚合)触发器。这应该可以,但仍然GroupByKey等待所有数据到达。为什么?我也尝试过使用Direct和GoogleDataflowrunner。我错过了什么?这是代码的要点:123456789101112131415161718192021222324252