我正在编写一个 Beam 管道,它读取具有名为“uid”的属性的 pubsub 消息,该属性是当前消息的唯一 ID。然后我想使用这个'uid'来查询bigquery以获得额外的信息来丰富消息。
由于 bigquery 表非常大,使用整个表作为侧输入可能不合适,理想的方法应该是这样的:
mainInput=(p|beam.io.ReadFromPubSub(topic=known_args.inputTopic,with_attributes=True))
sideInput=(p|beam.io.ReadFromBigquery(query= 'select * from tablex where uid="id_from_message"'))
def fn(left,right):
for x in right:
yield (left,x)
process = (mainInput|map(fn,right=beam.pvalue.asDict(sideInput))
但是,我不知道如何获取“id_from_message”,因为它位于管道 mainInput 中,请问您能否就如何在 python 中执行此操作提供建议?
我目前的代码如下
class enrichByBQClient(beam.DoFn):
def process(self, element, *args, **kwargs):
try:
print("Enrich from BQ Start")
attribute = element[1]
query = 'select uid,status from `XXXXXX` where uid="{}" limit 1' \
.format(attribute.get('uid'))
client=bigquery.Client()
query_job = client.query(query)
result=query_job.result()
len_result = 0
for row in result:
status=row.status
len_result+=1
if len_result == 0:
status=OUTPUT_TAG_NEW
elif status != 'complete':
status = OUTPUT_TAG_INCOMPLETE
else:
status = OUTPUT_TAG_COMPLETE
yield (element[0],element[1],status)
message=(
p|beam.io.ReadFromPubSub(topic=known_args.inputTopic,with_attributes=True))
......
message=(p|beam.io.ReadFromPubSub(topic=known_args.inputTopic,with_attributes=True))
enrichData,failure_enrich=(message
|'enrich by bigquery client' >> beam.ParDo(enrichByBQClient())
)
我在 Pardo 函数中添加查询 bigquery 逻辑,这可能不是最佳实践,对吗?
你是对的,在
BigQuery
中执行对 ParDo
的查询不是一个好习惯。它将执行许多查询,并且它在 Beam
上表现不佳,并且在 BigQuery
方面不具有成本效益。
您可以考虑另一种方法和设计来解决您的问题。
你有一个流媒体管道,需要有一个状态。 如果可能的话,在
In Memory
桌子旁边使用 BigQuery
储物柜可能会很有趣。
在
Google Cloud
中,有一个MemoryStore缓存与Redis
.
通过这种方法,您可以在 ParDo
中使用
Python Redis 客户端并从缓存中找到元素。
此解决方案具有出色的响应时间和成本效益
MemoryStore
.
但是,此解决方案有一个缺点,您必须管理缓存并将其与
BigQuery
表同步。
如果你因为任何原因丢失了它,你还必须计划一种重建缓存的方法。
我希望这个答案能有所帮助。