Apache Beam是用于批处理和流处理的统一SDK。它允许使用特定于Beam的DSL指定大规模数据处理工作流程。 Beam工作流可以在不同的运行时执行,如Apache Flink,Apache Spark或Google Cloud Dataflow(云服务)。
错误 DockerEnvironmentFactory:Docker 容器 xxxxx 日志,当尝试使用 Spark 运行器运行用 Go 编写的 Apache Beam 管道时
我有一个用 Go 编写的管道,我想用 Spark 运行器执行,Spark Standalone 安装在我的本地计算机上。 阿帕奇光束 2.56.0 阿帕奇火花3.2.2 我启动了 Spark master 并...
我有一个数据流管道,可以从 kafka 读取消息,处理它们,然后将它们插入到 bigquery 中。 我希望处理/bigquery 插入将按时间批量进行,这样......
我有一个非常小的python数据流包,包的结构如下所示 。 ├── __pycache__ ├── pubsubtobigq.py ├── 需求.txt └── 维尼夫 requirements.txt的内容是 原型...
如果我们更新现有管道,Google Dataflow Apache Beam 版本升级将失败
我有一个在 Apache beam Java 2.50.0 上运行的 Google 数据流流管道。我希望通过更新管道选项升级到 2.56.0(当前最新版本)。然而,更新给出了错误......
如何在java中配置GCP Spanner ChangeStream读取持续时间
我们在 apache beam java 数据流作业中使用 GCP Spanner Changestream。我们使用 SpannerIO 连接器对其进行配置。代码如下, 静态类 Read 扩展 PTransform 我们在 Apache Beam Java 数据流作业中使用 GCP Spanner ChangeStream。我们使用 SpannerIO 连接器对其进行配置。代码如下, static class Read extends PTransform<PBegin, PCollection<DataChangeRecord>> { @Override public PCollection<DataChangeRecord> expand(PBegin input) { Pipeline pipeline = input.getPipeline(); Options options = (Options) pipeline.getOptions(); // Retrieve and parse the startTimestamp and endTimestamp. Timestamp startTimestamp = options.getStartTimestamp().isEmpty() ? Timestamp.now() : Timestamp.parseTimestamp(options.getStartTimestamp()); Timestamp endTimestamp = options.getEndTimestamp().isEmpty() ? Timestamp.now() : Timestamp.parseTimestamp(options.getEndTimestamp()); SpannerConfig spannerConfig = SpannerConfig.create() .withProjectId(getSpannerProjectId(options)) .withInstanceId(getRequiredData(options.getSpannerInstanceId())) .withDatabaseId(getRequiredData(options.getSpannerDatabase())) .withRpcPriority(options.getSpannerRpcPriority()); SpannerIO.ReadChangeStream stream = SpannerIO.readChangeStream() .withSpannerConfig(spannerConfig) .withMetadataInstance(getRequiredData(options.getSpannerMetadataInstanceId())) .withMetadataDatabase(getRequiredData(options.getSpannerMetadataDatabase())) .withChangeStreamName(options.getSpannerChangeStreamName()) .withInclusiveStartAt(startTimestamp) .withInclusiveEndAt(endTimestamp) .withRpcPriority(options.getSpannerRpcPriority()); String spannerMetadataTableName = options.getSpannerMetadataTableName(); if (spannerMetadataTableName != null) { readChangeStream = readChangeStream.withMetadataTable(spannerMetadataTableName); } return pipeline.apply("Read from Spanner", stream ); } 工作正常。但每一秒它都在敲击扳手并寻找变化。我们必须配置扳手点击频率。在日志中我们可以看到启动应用程序后每秒的日志 INFO: Found 0 to be scheduled (readTimestamp = 2024-06-28T23:30:09.117068001Z) Jun 28, 2024 7:30:10 PM org.apache.beam.sdk.io.gcp.spanner.changestreams.action.DetectNewPartitionsAction getAllPartitionsCreatedAfter INFO: Found 0 to be scheduled (readTimestamp = 2024-06-28T23:30:09.117068001Z) Jun 28, 2024 7:30:10 PM org.apache.beam.sdk.io.gcp.spanner.changestreams.action.DetectNewPartitionsAction getAllPartitionsCreatedAfter INFO: Found 0 to be scheduled (readTimestamp = 2024-06-28T23:30:09.117068001Z) Jun 28, 2024 7:30:10 PM org.apache.beam.sdk.io.gcp.spanner.changestreams.action.DetectNewPartitionsAction getAllPartitionsCreatedAfter INFO: Found 0 to be scheduled (readTimestamp = 2024-06-28T23:30:09.117068001Z) 谁知道如何配置这个。我们不想使用 Spanner 数据库客户端。 spannerIO有什么办法吗? 提前致谢:) 您看到的日志来自代码中的这一行:https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/ org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java#L132 此代码与执行变更流查询的Spanner数据库客户端无关。此代码来自 DetectNewPartitionsDoFn,它会轮询元数据表以查找新的更改流分区,以每 100 毫秒执行一次。 您能在这里详细说明您的问题吗?
如何处理 Apache Beam (python) 中的异常,以从 JDBC 读取数据并写入 BigQuery
我能够成功从 JDBC 源读取数据,并将输出写回 BigQuery。 然而,我仍然坚持寻找处理坏行的 BigQuery 插入异常的最佳方法。 例如...
创建经典数据流模板时由于某种原因,模板未写入 template_location
由于某种原因,模板未写入 template_location。当我运行第一个命令时,管道被执行(奇怪),但模板从未生成。可能是什么原因? 我是
我正在使用beam yaml 创建一个beam 管道。 我有一个包含时间戳值的 csv 表,并且想将其用于窗口聚合。 不确定使用这个的正确语法是什么
是否可以强制数据流作业使用特定版本的Python运行? 我有一些依赖项仅受 python 3.11 支持
Java Apache Beam,使用构造函数变量在 DoFn 的 @Setup 方法中初始化模拟外部客户端
Apache Beam 建议使用 Fakes 而不是 Mocks,因为 Mocks 无法通过管道进行序列化。 我正在为旧代码编写单元测试,其中该类使用 Apache Beam 调用其他扩展...
在 Spark 上运行 python Apache Beam 管道
我在这里尝试 apache beam(带有 python sdk),所以我创建了一个简单的管道,并尝试将其部署在 Spark 集群上。 从 apache_beam.options.pipeline_options 导入 PipelineOptions 小鬼...
如何调试在 Google Cloud Dataflow 上未调用 finish_bundle?
这是我在数据流中使用的主要和转换的代码: # main.py 使用 Pipeline(options=options) 作为管道: _ = ( 管道 | “读取输入文件...
缓慢更新侧输入和会话窗口 - 变换节点 AppliedPTransform 未按预期替换
在我的 apache 束流管道中,我有一个与会话窗口一起使用的无限发布/订阅源。 我需要将一些有界配置数据传递到
使用 Apache Beam 进行窗口化 - 修复了窗口似乎没有关闭的问题?
我们正在尝试在 Apache Beam 管道上使用固定窗口(使用 DirectRunner)。我们的流程如下: 从发布/订阅中提取数据 将 JSON 反序列化为 Java 对象 带固定风的窗口事件...
Python 中 apache Beam 上的 csv 配对出现 UnicodeEncodeError
我正在努力在 apache beam python 中解析 CSV 文件。但是,当 CSV 文件中存在一些 unicode 字符(例如“ş”)时,它无法解析并出现错误 运行时错误:UnicodeEncodeError:'asc...
如何创建空的PCollection<KV<String, Object>>
我正在尝试创建一个名为 Incident 的自定义对象的空 PCollection 公共类事件实现可序列化{ 私有整数事件ID; 私有字符串appId; 私人长
我正在datalab中“玩”apache beam/dataflow。 我正在尝试从 gcs 读取 csv 文件。 当我使用以下命令创建 pcollection 时: 线= p | 'ReadMyFile' >> beam.io.ReadFromText('gs://' +
Java Apache Beam ProcessElement 方法必须为 void?
在Java Apache Beam中,@ProcessElement方法是否需要为void?或者它可以返回一个 int、string 或 class? 我们正在进行单元测试,并希望验证方法的输出。我知道那里...
如何将两个 PCollection 中的计数合并到一个对象中
我有一个 apache beam 管道,可以读取 CSV 文件并对 bigquery 表进行查询,我需要计算每个 PCollection 中有多少个寄存器,并基于此创建 FinalResult ob...
在 Apache Beam 和 Dataflow 中使用 ReadFromKafka 时出错
我正在尝试使用 Apache Beam 的 Python SDK 连接到 Kafka 主题,并将管道作为数据流作业提交。 这是我的代码片段 导入系统 导入 apache_beam 作为光束 来自 apache_beam。