在 BigQueryIO.write() 操作之后执行进程

问题描述 投票:0回答:3

我有一个以 BigQuery 表作为接收器的管道。我需要在数据写入 BigQuery 后执行一些步骤。这些步骤包括对该表执行查询、从中读取数据并写入不同的表。

如何实现以上目标?我是否应该为后者创建一个不同的管道,但在第一个管道之后调用它,这将是我认为的另一个问题。

如果上述方法都不起作用,是否可以从正在运行的管道调用另一个数据流作业(模板)。

确实需要一些帮助。

谢谢。

google-cloud-dataflow apache-beam
3个回答
1
投票

BigQueryIO 目前尚未明确支持此功能。唯一的解决方法是使用单独的管道:启动第一个管道,等待它完成(例如,使用

pipeline.run().waitUntilFinish()
),启动第二个管道(确保为其使用单独的 Pipeline 对象 - 多次重复使用同一对象不支持)。


0
投票

我一直在使用 templates 的解决方法是将 IO 操作的结果写入元数据文件到特定的存储桶中,云函数(即我的 orchestrator)被触发,进而触发以下管道。但是,我仅使用 TextIO 操作对其进行了测试。 所以,就你而言:

  • 执行 BigQueryIO.write() 操作
  • 将其结果写入文件 (xxx-meta-file) 到 Cloud Storage 存储桶 (xxx-meta-bucket),其中仅保留数据流结果 - 这是管道的最后一步
  • 编写一个 Orchestrator Cloud Function 来侦听 xxx-meta-bucket 中创建/修改的对象(请参阅此处
  • 在协调器中,您可能需要一些条件来检查实际创建/修改的文件
  • 相应地触发下一个管道(直接在协调器中或通过触发负责启动该特定管道的另一个云函数来解耦)

非常确定可以使用 PubSub 轻松复制类似的方法,而不是写入存储桶(例如,请参阅此处了解我列表中的第二步)


0
投票

你好@jkff 和@Graham: 我有数据流流管道,它在从原始源读取后使用 WriteToBigQuery。我有类似的要求,我想实现两件事:1.处理WriteToBigQuery错误记录和2.执行大查询SQL,该SQL只能在WriteToBigQuery之后执行。 对于第二个要求:@jkff 提到的方法对我不起作用,因为它将提交单独的数据流流作业,该作业将与原始作业异步。 我对@Graham提到的方法有疑问,如果我这样做,每次有来自原始来源的流数据时都会执行吗?

© www.soinside.com 2019 - 2024. All rights reserved.