Apache Beam是用于批处理和流处理的统一SDK。它允许使用特定于Beam的DSL指定大规模数据处理工作流程。 Beam工作流可以在不同的运行时执行,如Apache Flink,Apache Spark或Google Cloud Dataflow(云服务)。
这里的医生说 Dataflow runner 的 PubsubIO 实现在消息被第一个融合阶段成功处理后自动确认消息以及该阶段的副作用
Apache Beam - 无界 PCollection 中的计数消息(每个窗口)
我需要一个简单的任务来计算来自无界数据源的固定窗口中的消息数。 步骤是: 从发布/订阅中读取数据 定义窗口固定时间 创建一个(键,值),其中一个键...
Apache Beam 管道使用 JdbcIO 的 Wait.on 转换无限期卡住
我有一个 Apache Beam 管道,当与 JdbcIO 一起使用 Wait.on 转换时,它会无限期地卡住。这是我的代码的简化版本,着重于相关部分: 个人收藏<
apache Beam 如何在没有检查点或容错的情况下给出精确一次保证并进行状态计算?
像 groupby 或 combine 这样的东西需要 exactly once 保证像 sum 这样的琐碎计算 但是 apache beam 似乎没有内置检查点到库中,它是否依赖于 flink 或 spark 来...
Apache Beam 上的 Flink:找不到方案“hdfs”的文件系统实现
最近尝试将 s3 读取交换到 hdfs,在我的项目中具有以下依赖项: org.apache.beam ...
想要使用 Apache Beam 通过过滤管道中的事件来动态命名和创建表?
我有一个用例,我在一个事件驱动的架构中从 pub/sub 监听,并想动态地存储和插入数据到表中。如果新的 eventName 不是,我想创建新表...
使用 Java Google 云数据存储 API 将 com.google.cloud.datastore.Key 对象转换为 com.google.datastore.v1.Key 对象
是否有人尝试使用 Java API 将 com.google.cloud.datastore.Key 对象转换为 com.google.datastore.v1.Key 对象。问题是我用 com.google.cloud.datast 执行了一个查询...
ApacheBeam 数据流作业的类型错误:“无法确定地编码<TableReference>,提供类型提示”
我可以使用 Direct Runner 在本地毫无问题地运行我的管道,但是当我部署到 Dataflow 时,出现以下错误: “来自工作人员的错误消息:通用::未知:追溯(大多数......
安装 Apache Beam 时出现错误:“zsh:未找到匹配项:apache-beam [gcp]”
我正在开发一个项目并尝试使用以下命令从终端安装 Apache Beam:pip3 install apache-beam[gcp] 但是我收到此错误:zsh:找不到匹配项:apache-beam[gcp] 我...
在python中bigquery sink后是否可以进行其他处理?
我正在编写一个具有以下过程的管道: 1.读取带有属性“uid”的pubsub消息,这是此消息的唯一ID 2.将消息存储在Bigquery中,数据格式为 uid |
我正在编写一个光束管道,它读取具有名为“uid”的属性的 pubsub 消息,这是当前消息的唯一 id。然后我想使用这个'uid'来查询bigquery以获得额外的
无法安装 Apache-beam 错误:命令“cmake”失败:无 [输出结束] 错误:pyarrow 的构建轮失败 无法构建 pyarrow 错误:无法为 pyarrow 构建轮子,这...
Apache Beam Python > 2.38.0 DirectRunner ~ AssertionError: 共有 N 个 watermark-pending bundle 没有执行
使用 Python 3.9 和 Apache Beam 2.38.0,下面的最小工作示例运行良好。 但是,当我使用 Apache Beam 2.39.0(或 2.44.0)时,示例失败并显示错误 AssertionError: A total o...
Apache Beam python SDK 中的 ReadFromKafka 不起作用:java.io.IOException:error=2,没有这样的文件或目录
我正在尝试在 python 中运行一个简单的 beam 程序,该程序从 Kafka Topic 读取消息并将其打印到控制台,但我收到此错误并且不知道是什么问题。 警告:root:等等...
我们正在处理 Avro 文件,我们使用 GenericRecord 类型(在 Beam/Dataflow 中)。我想基于选定的列子集创建一个新的 GenericRecord。没有
我正在尝试创建一个从 BigQuery 返回到 BigQuery 的数据流脚本。我们的主表很大,破坏了提取功能。我想创建一个简单的表(由于
在 python 中的 Apache Beam 中使用不同的数据库动态提供副/主要输入
我有一个非常独特的管道用例,它是动态地为我的主要输入提供辅助输入。我有一个 pcollection,必须使用必须在不同来源中找到的数据进行连接。我的...
Google Dataflow Key Distribution On Reshuffle After Autoscaling Event
当我的数据流作业从 45 个工作节点扩展到 100 个节点时,我遇到了一些带有键控状态的奇怪行为。 我的代码是键入输入数据,然后使用 Reshuffle 函数重新分配...
我们如何在 python 中读取 apache beam 中的字典列表?
我有一个包含字典列表的文本文件。 现在,我想使用 apache beam 阅读它,并从列表中返回单个词典。 我该怎么做。 我的文本文件是这样的。 [{"id&q...
我正在尝试找到一种方法来为单个光束操作计时。 Apache Beam 提供的指标模块有助于定义计数器、分布和仪表,这些可以从自定义 D 内部进行跟踪...