Google Cloud Dataflow是一种完全托管的云服务,可用于大规模创建和评估数据处理管道。数据流管道基于Apache Beam编程模型,可以在批处理和流模式下运行。 Cloud Dataflow是Google云端平台的一部分。
如何在apache beam Dataflow python批量作业中设置处理超时?
我目前使用stopit库https:/github.comglenfantstopit来设置批处理作业中每个元素的处理超时。这些作业在直接运行器上工作,我能够超时功能 ...
我的parDo在我的beam作业中的一个例子(用Dataflow runner运行): class StreamEventToJsonConverter : DoFn。 () { @ProcessElement fun processElement(@Element element: ...
我有这样一段简单的代码 def print_windows(element, window=beam.DoFn.WindowParam, timestamp=beam.DoFn.TimestampParam): print(window) print(timestamp) print(element) print('-----------------------...
我们能不能把avro文件写到动态创建的GCS桶里(基于tenantID)?
基本上,我试图做的是,创建GCS bucket的基础上tenantID(作为事件的一部分),并写这些事件使用FileIO.writeDynamic使用动态文件命名在谷歌数据流作业。在...
目前,我正在尝试使用apache beam Metrics为GCP数据流作业创建自定义指标,并希望检查我们是否可以根据租户跟踪分组计数器。例如,我们有事件产生...
我试图在Bigquery中使用write_truncate来截断表,但它没有发生,而是像write_append一样工作。它在追加数据,但没有截断表。谁能告诉我,在Bigquery中使用write_truncate,但是没有发生,而是像write_append一样工作。
Apache Beam - 如何从所有的窗口中通过键对PCollection<KV<String, Int>>进行求和。
给定一个PCollection >有了固定时间的窗口,如何将所有窗口的Int按字符串键相加呢? 比如PCollection。 > pc = ...; ...
我试图在dataflow中创建我的第一个pipleine,我有相同的代码运行时,我执行使用交互式beam runner,但在dataflow上,我得到的所有类型的错误,这是不多......
在数据流上触发Apache beam windowing,使用 elementCountAtLeast时没有像预期的那样发射。
我想让apache beam在一定数量的元素到达后发射。Apche beam的版本是2.18.0,我从pubsub消耗,但数据到达的模式是事先知道的......。
我的流水线的简单伪代码:流水线 .apply(KVPair) .apply(sessionWindow) .apply(groupByKey) 我的问题是:要想实现会话窗口化,beam是否已经将所有数据与 ...
如何使用Dataflow更新IoT设备配置(在Cloud IoT Core中)?
我正在使用谷歌云平台来收集物联网数据。然后会进行分析,可能是在AI Platform中进行分析,我想把一些检索到的数据作为配置设置发送给IoT设备。我见过...
我正在研究Dataflow,我已经通过Python SDK建立了我的自定义管道。我想在Dataflow用户界面上添加参数到我的自定义管道中,使用附加参数。...
如何在Apache beam中读取带有起始日期的pubsub信息?
我有一个简单的工作是从pubsub读取历史数据,以日期为界,例如我想从日期2020-04-10开始读取消息,2020-04-20,然后每天将消息保存在google bucket中......。
我正在使用Apache-Beam编写一个数据流工作,它需要使用FTPS [from ftplib import FTP_TLS] 服务器在BigQuery中导入数据。但是当我尝试导入FTPS类[ftps = FTP_TLS('...
在从模板(云数据流)中创建作业时,在分配临时位置时出错,在谷歌云平台上。
我试图在GCP中使用数据流模板创建一个数据流管道。在数据流模板中创建作业时,在分配临时位置时出现了错误。错误行被提到...
我试图编写一个脚本来自动部署Java Dataflow作业。该脚本创建了一个模板,然后使用命令 gcloud dataflow jobs run my-job --gcs-location=gs:/my_bucket ...
合并BigQuery和Pub / Sub Apache Beam
我正在尝试使用DataFlowRunner执行以下操作:从已分区的BigQuery表中读取数据(大量数据,但仅获得了最后两天)从Pub / Sub订阅中读取JSON加入这两者...]]
在Apache Beam Java中将List与自定义POJO Java类一起使用时收到很多警告
我是Apache Beam的新手,我使用Apache Beam和GCP中的Dataflow作为运行程序。在执行管道时遇到以下错误。类org.apache.beam.sdk.coders.ListCoder类型的编码器具有...
使用Google Dataflow优化使用BigQuery资源从GCS加载200万个JSON文件
我有一个庞大的数据库,其中包含约240万个JSON文件,它们本身包含多条记录。我创建了一个简单的apache-beam数据管道(如下所示),该管道遵循以下步骤:读取...
我正在使用数据流和Apache Beam处理数据集并将结果存储在具有两列的无标题CSV文件中,如下所示:A1,a A2,a A3,b A4,a A5,c ...我想要过滤掉...