apache-beam 相关问题

Apache Beam是用于批处理和流处理的统一SDK。它允许使用特定于Beam的DSL指定大规模数据处理工作流程。 Beam工作流可以在不同的运行时执行,如Apache Flink,Apache Spark或Google Cloud Dataflow(云服务)。

问题:BigQueryIO每输入一行创建一个文件,对吗?

我是Apache Beam的新手,我正在开发一个管道来从JDBCIO获取行并将其发送到BigQueryIO。我正在使用withAvroFormatFunction将行转换为avro文件,但它正在创建一个新的...

回答 1 投票 0

Apache Beam中累积窗口与废弃窗口之间的区别?

我在这里有一个示例管道:def print_windows(element,window = beam.DoFn.WindowParam,pane_info = beam.DoFn.PaneInfoParam,timestamp = beam.DoFn.TimestampParam):print(window)print(pane_info)...

回答 1 投票 0

在Apache Beam中为每个键应用有状态DoFn

是否可以仅对键控PCollection中的值应用有状态转换?举个例子,假设这个PCollection键在邮政编码上。值是...

回答 1 投票 0

Apache Beam:分区转换后如何继续管道?

我正在将Apache Beam与Cloud Dataflow结合使用。想象成千上万的产品进入管道。经过一些步骤(过滤,映射等)之后,我想按某个字段对数据进行分区。我尝试过...

回答 1 投票 0

将相同的用户事件聚合到两个不同的窗口中

我正在尝试使用Apache Beam编写一个简单的管道。假设我正在接受类似以下的用户请求:(国家/地区,user_id,得分,时间戳)我只想对总得分进行总结...

回答 1 投票 0

PCollection to Array-如何将标头动态输入到WriteToText PTransform中?

我正在使用主要在Dataflow运行器上运行的Apache Beam 2.19编写数据流作业。我正在尝试将具有嵌套字段和重复字段的BigQuery输入转换为平坦的CSV。 BQ ...

回答 1 投票 0

如何在Apache Beam Transform中创建循环

我有一个使用多个参数运行的批处理作业。这些参数之一代表清单日期,例如2020-05-01,2020-05-02,2020-05-03,对于每个日期,我都会尝试从GCP存储读取数据。在...

回答 1 投票 0

Dataflow中的Apache梁出现与pardo函数相关的错误,并且梁未定义

[我正在尝试在数据流中创建我的第一个小人,当我使用交互式束流道执行时,我具有相同的代码运行,但是在数据流中,我遇到了各种各样的错误,这些错误并没有太多...

回答 1 投票 0

Apache Flink Python Table API UDF依赖关系问题

[通过提交将其提交给本地集群的涉及用户定义函数(UDF)的Python Table API作业后,它崩溃并因py4j.protocol.Py4JJavaError导致了java.util ....]]

回答 1 投票 1

数据流作业停止随机处理数据

我已经建立了一个管道,用于从pubsub读取数据并将其流式传输到BigQuery。在流传输数据之前,需要对其进行预处理以除去重复项。通过在固定窗口中打开窗口,然后...

回答 1 投票 0

从Google Cloud Dataflow内部写入Firestore

我现在遇到的核心问题是,当我运行部署到Google Cloud Dataflow的数据流管道时,出现错误:java.lang.IllegalStateException:名称为[DEFAULT]的FirebaseApp没有...

回答 1 投票 0

消费者组中的Apache Beam KafkaIO消费者正在读取相同的消息

我正在数据流中使用KafkaIO来读取一个主题的消息。我使用以下代码。 KafkaIO。 read().withReadCommitted()....

回答 1 投票 0

如何在处理输入文件模式时在Apache Beam中获取DoFn内部的文件名

我正在处理一个目录内的大量文件。我想在已处理数据输出的元数据中添加fileName。因此,如果在处理过程中出现问题,我们可以检查一下...

回答 1 投票 0

无法在数据流笔记本上的Jupyter Notebook中导入JsonPickle

我正在python的Apache Beam上构建管道,并在Dataflow上使用笔记本进行原型制作。在尝试加载JSON时,我意识到我使用的JSON编码器(基本上是JSON.loads())...

回答 2 投票 1

是否可以使用KafkaIO.read为单个管道为两个不同的集群指定Kafka引导服务器?

我目前正在使用Google Cloud Dataflow和Apache Beam来消耗来自Kafka主题的消息,该主题存在于两个不同的Kafka集群中,两个集群都包含相同的主题名称,但...

回答 2 投票 0

数据流管道中dynamicWrite FileIO中的动态文件夹名称

我有一个PCollection >。我想将数据按K分组,并将密钥K的所有值写入到名为K的文件夹内的Google存储中的文件中。假设我有2个条目...

回答 1 投票 2

使用GCP Dataflow和Apache Beam Python SDK从GCS读取速度极慢

首先让我开始说一切,首先我要了解使用Beam的Python SDK和GCP Dataflow的方法!问题:我的管道对于几乎所有的管道都运转良好...

回答 1 投票 0

在GCP数据流上运行的Apache Beam如何处理大批SQL表?

我有一个大约1TB数据的SQL表,我想将此表ETL到GCS。我不明白的是Apache Beam如何读取表,是否以块为单位,如果是,则块的大小是多少,...

回答 1 投票 1

Cloud Dataflow作业从一个Bigquery项目读取并写入另一个BigQuery项目

我正在GCP上实施Cloud Dataflow作业,该作业需要处理2个GCP项目。输入和输出都是Bigquery分区表。我现在要解决的问题是我必须读取数据...

回答 1 投票 0

为什么在尝试安装Apache-Beam时出现错误?

[运行pip install apache-beam [gcp]时出现以下错误:错误:命令错误,退出状态1:命令:/Library/Frameworks/Python.framework/Versions/3.8/bin/python3。 8 -u -...

回答 2 投票 1

© www.soinside.com 2019 - 2024. All rights reserved.