Apache Beam是用于批处理和流处理的统一SDK。它允许使用特定于Beam的DSL指定大规模数据处理工作流程。 Beam工作流可以在不同的运行时执行,如Apache Flink,Apache Spark或Google Cloud Dataflow(云服务)。
Apache Beam python fileio.WriteToFiles 过度分片
我在流式Python管道中使用fileio.WriteToFiles。我明确指定了预期的分片计数,如下所示 fileio.WriteToFiles( 路径=..., file_naming=fileio.default_file_na...
如何在使用 apache beam 编写的流式管道中读取 bigquery
我想运行一个从 Google bigquery 表中连续读取的流管道。现在,我的流管道在从 bigquery 表读取一次后停止。 apache beam 文档...
是否可以将模拟与 doFn 一起使用?我在 doFn 中有一个从 gcs 存储桶读取的处理方法,我想测试一下。 我尝试使用 LocalStorageHelper 但它不支持存储桶获取
如何在传递给 pardo 时修改数据流运行时值提供程序参数?
我在尝试修改传递给 Apache Beam Dataflow 管道中 RuntimeValueProvider 的参数时遇到问题。这是我的代码的简化版本: 导入 apache_beam...
python 数据流:GroupByKey 无法应用于具有全局窗口和默认触发器的无界 PCollection
我有一个简单的 python 数据流代码,它使用无界 pcollection 。它只是 从 pubsub 读取 解析为带有输出标签 SUCCESS 和 FAILURE 的 json 使用输出标签 SUCCESS 和 F 验证 json...
Apache Beam 优化 Firestore 读取 python
我有传感器数据到达 pub/sub (protobuf),它作为 python 字典插入到“pipeline_fstore”中。数据一次到达。 在管道中,在“添加元数据...
我有一个写入 BigQuery 表的数据流作业。每个数据流作业都会创建一个新表。 我意识到对 BigQuery 表的写入操作是异步的,即对
在 apache beam ReadFromKafka 中的一个主题中可以确定分区位置之前的超时
我正在从事 Google 数据流工作,我正在使用 apache beam ReadFromKafka 来消费主题消息。我正在消耗 4 个主题。在我们向我们的 kafka clu 添加新的代理之后,管道曾经工作正常......
所以我正在使用带有SpringBoot的Apache Beam,并在.query中使用JDBCIO,我正在从表“customer”中检索记录(从records.customer中选择*,其中customer_code =“abc&quo...
阿帕奇光束|无法安装 apache beam 并显示子进程错误
我已经安装了最新的 pip、python 3.12,但什么都没有..仍然显示此错误: “为收集的包构建轮子:grpcio-tools grpcio-tools 的构建轮子(setup.py):已启动
如何使用 DataFlow 将数据从 Pub/Sub 流式传输到 Google BigTable?
我想问是否有人可以告诉我,甚至给我展示一个数据流作业模板的示例,最好是用 Python 编写,我可以在其中: 持续从 Pub/Sub 主题读取 JSON 数据 处理这个数据...
我们目前是 Dataflow 批处理作业的大用户,并且希望开始使用 Dataflow 流(如果可以可靠地完成)。 这是一个常见的场景:我们需要一个非常大的 Kafka 主题......
在 GCP Dataflow 上以编程方式部署和运行 Beam 管道
我正在尝试使用 google-cloud-dataflow 以编程方式在 GCP 数据流上部署一些光束管道,但不确定如何做到这一点。 这些管道已经打包为 jar,我的目标是......
如何预构建worker容器Dataflow? [洞察“SDK Worker容器镜像预构建:可以启用”]
我想知道如何预构建工作容器并同时使用 setup.py 文件来实现多个文件依赖项。 即使当我使用这个官方模板时,我仍然有见解:“SDK
我正在使用数据流流管道,并且遇到了一个问题,即在全局窗口中使用侧面输入会锁定使用 30 秒固定窗口的主分支。所有的...
名称错误:运行“创建梁行-ptransform”时未定义名称“梁”
我正在研究一个用例,我正在从 PubSub 读取内容,并且我想编写聚合的内容 值到 bigquery。 这是我正在写给该主题的 PubSub 输入: b"('B', 'Stream1', 77)&q...
我的管道中有一个 Dofn 函数,它在 GCP 数据流中运行,并且应该为每个产品并行执行一些处理。 类Step1(DoFn): def 过程(自身,元素): # 得到一个...
我的数据流管道如下 pipeline_options = PipelineOptions( pipeline_args、streaming=True、save_main_session=True、sdk_location=“容器” ) 与管道(
AttributeError:“RuntimeValueProvider”对象没有属性“projectId”
我正在尝试在Dataflow runner中运行apache beam管道;该作业从 bigquery 表中读取数据并将数据写入数据库。 我正在数据流中使用经典模板选项运行作业 - ...
在 Windows 中使用窗口流数据使用“:”时无法写入文件
我在运行 Java ApacheBeam 代码时遇到问题。当尝试在我的 Windows 系统上创建名称中带有冒号的文件时,程序会抛出错误,指出文件名是