Google Cloud Dataflow是一种完全托管的云服务,可用于大规模创建和评估数据处理管道。数据流管道基于Apache Beam编程模型,可以在批处理和流模式下运行。 Cloud Dataflow是Google云端平台的一部分。
我写了这个管道,但是当我把它作为jar运行时,当我在build.gradle中指定它时,它找不到直接运行器,并且当我尝试传递参数时--runner = direct或--runner = .. 。
我按照在Beam文档中编写AVRO文件的示例。但它给了我一个错误引起:java.io.NotSerializableException:org.apache.avro.Schema $ RecordSchema at p.run()。waitUntilFinish(...
如何在Combine.GroupedValues中使用自定义CombineFn?
我写了一个输入KV的CombineFn 并输出KV >。我想使用Combine.GroupedValues(或Combine.PerKey)和源代码......
为什么自定义Python对象不能与ParDo Fn一起使用?
我目前刚开始在Python中使用Apache Beam和Dataflow runner。我有兴趣创建一个发布到Google Cloud PubSub的批处理管道,我已经修改了Beam Python API并找到了......
考虑表示用户交互(例如产品购买)的交织记录的输入流的概念。想象一下,我们收到的记录表明用户已将产品放入...
我有以下json {attribute_values = [{key = PO,values = [234234,21]},{key = POReceipt,values = [ABC]}]}我将如何定义attribute_values列?目前我定义为String并收到错误...
我正在编写一个试图根据参数提供启动批量数据流管道的应用程序。为此,我使用PipelineOptionsFactory.create()。as(...),然后设置setters来配置...
我正在使用Dataflow(Apache beam)创建一个管道来读取和写入Google BigQuery上的数据,但是我在创建DAG方面遇到了问题,就像我对Airflow一样。这是我的代码中的一个例子:#...
安排Google Cloud Dataflow作业的最简便方法
我只需要每天运行一个数据流管道,但在我看来,建议像App Engine Cron Service这样需要构建整个Web应用程序的解决方案似乎有点太多了。我曾是 ...
我在运行数据流作业时遇到错误。我试图将我现有的光束版本更新为2.11.0,但我在运行时遇到错误。 java.lang.IncompatibleClassChangeError:...
所以我已经阅读了梁的状态处理和及时处理文章,并发现了实现这些功能的问题。我试图解决的问题类似于......
使用--experiments = upload_graph获取Dataflowrunner
我有一个生成数据流图(序列化JSON表示)的管道,它超出了API的允许限制,因此无法通过apache beam的数据流运行器启动...
将此权重/比例转换为列名列表,并使用Python根据其权重/分数矩阵格式进行排序
将此权重/分数读数从输入.csv文件转换为列名列表,并使用Python Apache Beam按其降序权重/分数矩阵格式排序并写入...
这是一个组成的玩具示例,试图获得有关我的问题的更难部分的帮助。假设我有来自Kafka流的销售数据:...期间:5,SalesPersonId:78,销售:TRUE,......
在阅读了使用Apache Beam的Timely(和Stateful)处理并查看了JavaDoc for Timer之后,我成功地设置了一个被触发的计时器。但是,我错过了如何请求计时器......
如何在python梁中制作通用的Protobuf Parser DoFn?
上下文我正在使用流媒体管道,该管道在pubsub中有一个protobuf数据源。我希望将这个protobuf解析为python dict,因为数据接收器要求输入是一个集合......
为什么我的PAssert与我的PCollection中的项目不匹配?
我有一个PCollection,我肯定包含:“Bob”“John”“Fred”但是,当我测试一个断言,询问“Bob”是否在PCollection中使用:PAssert.that(myPcollection)....
我正在通过apache-beam编写一个简单的python管道来聚合用户投票。在输入中,我有这样的逗号分隔行:pollA,answerB pollA,answerC pollB,answerA pollB,answerB pollC,answerE ...
Apache Beam不支持Kotlin Iterable?
Apache光束似乎拒绝承认Kotlin的Iterable。下面是一个示例代码:@ProcessElement fun processElement(@Element input:KV >,接收者:......
我想在事件时间设置一个Timer,它根据我的DoFn中元素中看到的最小时间戳来触发。