Apache Beam是用于批处理和流处理的统一SDK。它允许使用特定于Beam的DSL指定大规模数据处理工作流程。 Beam工作流可以在不同的运行时执行,如Apache Flink,Apache Spark或Google Cloud Dataflow(云服务)。
我正在尝试部署一个Google Cloud Dataflow管道,该管道从Kafka集群读取、处理其记录,然后将结果写入BigQuery。然而,我一直遇到以下问题......
Apache Beam有状态的DoFn定期输出所有的KV对。
我试图在Apache Beam中使用有状态的DoFn(使用@ProcessElement和@StateId ValueState元素)来聚合(按键)一个流数据源(通过Scio)。我认为这将是最 ...
Apache Beam Combine与GroupByKey的对比
所以,我面临着这个似乎很经典的问题,使用Apache Beam(Flink为引擎),为非绑定的流提取时间框架的topop。假设输入的是 sites+hits tuples: {"aaa.com", 1001}, {"bbb......"。
CombineFn中的任务没有正确地最终完成Apache光束
我使用Python 3.7 SDK和apache beam 2.17.0做数据流。代码在本地运行,但我从pubsub收集数据。我尝试按键组合,一切都很顺利,直到管道调用"..."。
从Spring控制器执行Google Cloud Dataflow管道
我将如何使用Spring执行Apache Beam管道到Google Cloud Dataflow?这个问题类似于在Spring Boot项目中运行Apache Beam pipeline到Google Data Flow,但是...
Apache beam在Dataflow中得到与生成器对象不可下标相关的错误。
我试图在数据流中创建我的第一条流水线,当我使用交互式光束运行器执行时,我有相同的代码运行,但在数据流中,我得到了所有类型的错误,这并没有使很多......
我试图按照KafkaToBigQuery的DataflowTemplate来处理BigQuery在我代码中的错误。PCollection convertedTableRows = pipline .apply("ReadFromKafka", ...
问题陈述:我试图在beam中使用direct runner读取和打印xml文件的内容,以下是代码片段: public class BookStore{ public static void ....
在Python的Apache Beam中使用OrFinally定义自定义触发器的正确语法?
我试图为一个滑动窗口定义一个自定义的触发器,这个触发器可以重复触发每一个元素,但最后也可以在水印结束时触发。我看了周围的文档,对于 ...
使用Apache Beam Python SDK 2.9.0版本,是否可以得到一个类似Google的数据流的可渲染的管道图表示,而不是运行它?我在组装时遇到了困难...
Apache beam splittable DoFn没有将工作分散到Dataflow上的多个工作者。
我正在学习使用可拆分的DoFn,我希望我的工作能分配给500个工人,但Dataflow只用1或2个工人运行。我希望将我的工作分配给500个工人,但Dataflow只用1或2个工人运行它,我是否理解或实现可分割DoFn不正确?我对可拆分DoFn的理解或实现是否有误? ...
一个ParDo操作应该有多小,才能保证一个波束作业的良好性能和伸缩性?
在我的数据流管道中,有多个小型过滤器,我想应用在数据流上(ParDo函数)。鉴于过滤器操作不是cpu密集型的。为了性能和扩展,...
我有一个Apache Beam作业,从PubSub中输入数据,然后加载到BigQuery中,我将PubSub消息转化为带有字段id、name、count的pojo,count意味着非唯一元素的计数......。
如何在beam SQL中从输入数据中选择一组字段作为重复字段数组?
問題說明:我有一個包含以下字段的輸入pcollection。{ firstname_1, lastname_1, dob, firstname_2, lastname_2, firstname_3, lastname_3, } 然后 ....
我有一个无边界的数据源(Kafka流)作为我的beam job的输入。数据的特点。它们是由元素组成的组(组的大小在5-20个元素之间)。每个组的键...
Apache Beam将BigQuery表和模式作为params写入。
我正在使用Python SDK for Apache Beam。datatable和schema的值都在PCollection中。这是我从PubSub中读到的信息:{"DEVICE": "rms005_m1", "DATESTAMP": "2020-05-29 20:..."。
模板化Dataflow中的PubSub输入完全忽略了NestedValueProvider。
我已经为Dataflow创建了模板,它正在读取PubSub的消息,问题是NestedValueProvider不可能修改通过选项值发送的消息,我不知道为什么 - 下面......
如何在apache beam Dataflow python批量作业中设置处理超时?
我目前使用stopit库https:/github.comglenfantstopit来设置批处理作业中每个元素的处理超时。这些作业在直接运行器上工作,我能够超时功能 ...
我的parDo在我的beam作业中的一个例子(用Dataflow runner运行): class StreamEventToJsonConverter : DoFn。 () { @ProcessElement fun processElement(@Element element: ...
我想把存储导入到Apache Beam内核的jupyter笔记本中,但它说未知位置。如果我尝试导入其他google-cloud库,如bigquery或datastore,它的工作, ...