目前正在使用带有Python的GoogleDataflow进行批处理。这工作正常,但是,我有兴趣在不必处理Java的情况下提高我的数据流作业的速度。使用GoSDK,我实现了一个简单的管道,它从Google存储中读取一系列100-500mb文件(使用textio.Read),做一些聚合并用结果更新CloudSQL。正在读取的文件数量可以从几十个到数百个不等。当我运行管道时,我可以从日志中看到文件是串行读取的,而不是并行读取的,因此作业需要更长的时间。使用PythonSDK执行的相同过程会触发自动缩放并在几分钟内运行多次读取。我已经尝试使用--num_workers=指定工作人员的数量,但
目前正在使用带有Python的GoogleDataflow进行批处理。这工作正常,但是,我有兴趣在不必处理Java的情况下提高我的数据流作业的速度。使用GoSDK,我实现了一个简单的管道,它从Google存储中读取一系列100-500mb文件(使用textio.Read),做一些聚合并用结果更新CloudSQL。正在读取的文件数量可以从几十个到数百个不等。当我运行管道时,我可以从日志中看到文件是串行读取的,而不是并行读取的,因此作业需要更长的时间。使用PythonSDK执行的相同过程会触发自动缩放并在几分钟内运行多次读取。我已经尝试使用--num_workers=指定工作人员的数量,但
我的应用程序配置为从配置的Kafka读取主题,然后将转换后的结果写入HadoopHDFS。为此,它需要在Yarn集群节点上启动。为此,我们想使用SpringDataFlow。但是由于这个应用程序不需要来自另一个流的任何输入(它已经知道从哪里提取它的源),并且什么都不输出,我如何从它创建一个有效的DataFlow流?换句话说,这将是一个仅由一个应用程序组成的流,它应该在Yarn节点上无限期运行。 最佳答案 在这种情况下,您需要一个连接到Kafka中指定目的地并写入HDFS的流定义。例如,流看起来像这样:streamcreatea1--
我们正在使用GoogleDataflow进行批量数据处理,并寻找一些工作流编排工具选项,类似于Azkaban为Hadoop所做的事情。我们正在寻找的关键事物是,配置工作流安排工作流程监控和警告失败的工作流能够重新运行失败的作业我们已经评估了Pentaho,但这些功能在其昂贵的企业版中可用。我们目前正在评估Azkaban,因为它支持javaprocess作业类型。但Azkaban主要是为Hadoop作业创建的,因此它与Hadoop基础设施的集成比普通的java进程更深入。感谢对开源或极低成本解决方案的一些建议。 最佳答案 听起来Apa
我正在通过PythonAPI在Dataflow上使用ApacheBeam从Bigquery读取数据,对其进行处理,然后将其转储到Datastore接收器中。不幸的是,作业经常会无限期地挂起,我必须手动停止它。当数据写入Datastore和Redis时,从Dataflow图中我注意到只有几个条目卡住并导致作业挂起。因此,当有15台16核机器的作业运行9小时(正常情况下,作业运行30分钟)时,会导致巨大的成本。也许有一种方法可以设置一个计时器,如果超过时间限制,该计时器会停止Dataflow作业? 最佳答案 如果你能创建一个custom
有2个不同的官方TPL数据流nuget包。我很困惑选择我应该使用哪个。据我了解,System.Threading.Tasks.Dataflow版本比其他版本稍新,而且System.Threading.Tasks.Dataflow似乎是针对最新版本的.net。谁能解释一下它们之间的区别? 最佳答案 Microsoft.Tpl.Dataflow最初作为.net4.5的一部分作为独立于BCL的组件发布-这里是blogpostannouncingthereleaseSystem.Threading.Tasks.Dataflow作为一个单独的
我正在寻找一种TPL数据流block解决方案,它可以容纳多个项目,可以链接到多个目标block,但能够将项目仅转发到通过过滤器的特定目标block/谓词。任何时候都不应将一个项目同时传递给多个目标block,始终只传递给与过滤器匹配的目标block,否则可以丢弃该项目。我不喜欢BroadCastBlock,因为如果我理解正确的话,它不保证交付(或者确实如此?)并且过滤是在目标block端完成的,这意味着BroadCastBlock本质上将每个项目的副本发送到所有linkedTo目标block。如果我理解正确的话,它也不会在任何时候容纳超过一件元素。我不想使用Post/Async,而是维
我在GoSDK上实现了ApacheBeam代码,如下所述。管道有3个步骤。一个是textio.Read,另一个是CountLines,最后一步是ProcessLines。ProcessLines步骤需要大约10秒的时间。为了简洁起见,我只是添加了一个Sleep函数。我正在调用有20个工作人员的管道。当我运行管道时,我的预期是20个工作人员并行运行,textio.Read从文件中读取20行,ProcessLines将在10秒内执行20次并行执行。然而,管道并不是那样工作的。它目前的工作方式是textio.Read从文件中读取一行,将数据推送到下一步并等待ProcessLines步骤完成其
我的目标是创建一种机制,当新文件上传到云存储时,它会触发云函数。最终,此Cloud函数将触发CloudDataflow作业。我有一个限制,即CloudDataflow作业应使用Go编写,而CloudFunction应使用Python编写。我现在面临的问题是,我无法从CloudFunction调用CloudDataflow作业。用Go编写的CloudDataflow中的问题是ApacheBeamGoSDK中没有定义template-location变量。这就是我无法创建数据流模板的原因。而且,由于没有数据流模板,我可以从云函数调用云数据流作业的唯一方法是编写一个Python作业,该作业调
简介在文章《ApacheBeam入门及JavaSDK开发初体验》中大概讲了ApapcheBeam的简单概念和本地运行,本文将讲解如何把代码运行在GCPCloudDataflow上。本地运行通过maven命令来创建项目:mvnarchetype:generate\-DarchetypeGroupId=org.apache.beam\-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples\-DarchetypeVersion=2.37.0\-DgroupId=org.example\-DartifactId=word-count