我正在编写具有以下过程的管道:
1. Read pubsub messages with attribute 'uid' which is the unique id for this message
2. Store the message in Bigquery, the data format is
uid | message data | status
------------------------------
1 | {XXXXX} | new
3. process the message data
4. update the message to set the status to 'complete'
uid | message data | status
------------------------------
1 | {XXXXX} | complete
我有三个问题:
Q1.bigquery sink后如何继续进一步处理
如果我使用 bigqueryIO 来实现像这样的 step2
p=beam.Pipeline(runner=known_args.runner,options=pipeline_options)
message=(
p
|beam.io.ReadFromPubSub(subscription=known_args.inputSub,with_attributes=True))
|'format complete data output'>> beam.ParDo(format_result_for_bq())
|'write complete data to bq' >> beam.io.WriteToBigQuery(
table='XXXXX',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR
)
|'further process' >> beam.ParDo(further_processing_fn())
)
step 'further process' 只能得到 beam.io.WriteToBigQuery 的输出而不是原始消息。
我也考虑使用侧输出,但这很难确保在数据成功写入 bigquery 之前不会开始“进一步处理”。
你能帮忙建议一下如何实现目标吗?
Q2。在ParDo里面做bigquery操作合适吗
鉴于上面的流程逻辑,这是我的代码
class saveData(beam.DoFn):
def process(self, element, *args, **kwargs):
client=bigquery.Client()
query="insert into `XXXXX` values ('{}','{}','{}')"\
.format(element[1].get('uid'),element[0],'ongoing')
query_job = client.query(query)
result = query_job.result()
res=(element[0],element[1],'ongoing')
yield res
class businessProcess(beam.DoFn):
def process(self, element,*args, **kwargs):
print("business process logic")
res=(element[0],element[1],'complete')
yield res
class updateRow(beam.DoFn):
def process(self, element,*args, **kwargs):
attribute = element[1]
query = 'update `XXXXXX` set status = "complete" where uid="{}" ' \
.format(attribute.get('uid'))
client=bigquery.Client()
query_job = client.query(query)
result=query_job.result()
......
p=beam.Pipeline(runner=known_args.runner,options=pipeline_options)
message=(
p
|beam.io.ReadFromPubSub(subscription=known_args.inputSub,with_attributes=True))
|'insert to bq with status ongoing'>> beam.ParDo(saveData())
|'business process'>> beam.ParDo(businessProcess())
|'update bq' >> beam.ParDo(udpateRow())
)
我不认为这是实现要求的典型光束方法,请您帮忙建议处理此类要求的最佳实践是什么?
Q3。如何将多个输入传递给 ParDo?
如果我需要将 4 个输入项传递给 ParDo 函数,如何在 python 中执行? side input 在我的理解中只支持 2 input,对吗?
为了解决您的问题,我建议您采用以下模式:
Topic 1 -> Job Dataflow 1 -> Multi Sink -> Write Result to BigQuery
-> Write Result to Pub Sub Topic 2
Topic 2 -> Job Dataflow 2 -> Apply Business Transformations -> Write Result to BigQuery
Dataflow
作业从主题 1Dataflow
作业从主题2读取数据ParDo
或 Map
BigQuery
使用此模式,您可以实时应用您的用例。 你需要在 2
Dataflow
工作中分离你的逻辑。
我在你的第二个例子中看到一个
update
,通常BigQuery
在流媒体模式下使用append
而不是update
并且BigQueryIO
不支持更新。
如果您必须在
BigQuery
中处理重复项,则必须考虑最适合您的方法,并且可以在 Dataflow
流作业(BigQuery
视图 + 批处理作业)之外处理此需求。