我有一个以 BigQuery 表作为接收器的管道。我需要在数据写入 BigQuery 后执行一些步骤。这些步骤包括对该表执行查询、从中读取数据并写入不同的表。
如何实现以上目标?我是否应该为后者创建一个不同的管道,但在第一个管道之后调用它,这将是我认为的另一个问题。
如果上述方法都不起作用,是否可以从正在运行的管道调用另一个数据流作业(模板)。
确实需要一些帮助。
谢谢。
BigQueryIO 目前尚未明确支持此功能。唯一的解决方法是使用单独的管道:启动第一个管道,等待它完成(例如,使用
pipeline.run().waitUntilFinish()
),启动第二个管道(确保为其使用单独的 Pipeline 对象 - 多次重复使用同一对象不支持)。
我一直在使用 templates 的解决方法是将 IO 操作的结果写入元数据文件到特定的存储桶中,云函数(即我的 orchestrator)被触发,进而触发以下管道。但是,我仅使用 TextIO 操作对其进行了测试。 所以,就你而言:
非常确定可以使用 PubSub 轻松复制类似的方法,而不是写入存储桶(例如,请参阅此处了解我列表中的第二步)
你好@jkff 和@Graham: 我有数据流流管道,它在从原始源读取后使用 WriteToBigQuery。我有类似的要求,我想实现两件事:1.处理WriteToBigQuery错误记录和2.执行大查询SQL,该SQL只能在WriteToBigQuery之后执行。 对于第二个要求:@jkff 提到的方法对我不起作用,因为它将提交单独的数据流流作业,该作业将与原始作业异步。 我对@Graham提到的方法有疑问,如果我这样做,每次有来自原始来源的流数据时都会执行吗?