来自 Python 中 Bigquery 的动态侧输入

问题描述 投票:0回答:1

我正在编写一个 Beam 管道,它读取具有名为“uid”的属性的 pubsub 消息,该属性是当前消息的唯一 ID。然后我想使用这个'uid'来查询bigquery以获得额外的信息来丰富消息。

由于 bigquery 表非常大,使用整个表作为侧输入可能不合适,理想的方法应该是这样的:

mainInput=(p|beam.io.ReadFromPubSub(topic=known_args.inputTopic,with_attributes=True))
sideInput=(p|beam.io.ReadFromBigquery(query= 'select * from tablex where uid="id_from_message"'))
def fn(left,right):
   for x in right:
      yield (left,x)
process = (mainInput|map(fn,right=beam.pvalue.asDict(sideInput))

但是,我不知道如何获取“id_from_message”,因为它位于管道 mainInput 中,请问您能否就如何在 python 中执行此操作提供建议?

我目前的代码如下

class enrichByBQClient(beam.DoFn):

    def process(self, element, *args, **kwargs):
        try:
            print("Enrich from BQ Start")
            attribute = element[1]
            query = 'select uid,status from `XXXXXX` where uid="{}" limit 1' \
               .format(attribute.get('uid'))
            client=bigquery.Client()
            query_job = client.query(query)
            result=query_job.result()

            len_result = 0
            for row in result:
                status=row.status
                len_result+=1

            if len_result == 0:
                status=OUTPUT_TAG_NEW
            elif status != 'complete':
                status = OUTPUT_TAG_INCOMPLETE
            else:
                status = OUTPUT_TAG_COMPLETE

            yield (element[0],element[1],status)

      message=(
           p|beam.io.ReadFromPubSub(topic=known_args.inputTopic,with_attributes=True))

......
message=(p|beam.io.ReadFromPubSub(topic=known_args.inputTopic,with_attributes=True))

enrichData,failure_enrich=(message 
|'enrich by bigquery client' >> beam.ParDo(enrichByBQClient())
)

我在 Pardo 函数中添加查询 bigquery 逻辑,这可能不是最佳实践,对吗?

google-cloud-dataflow apache-beam
1个回答
0
投票

你是对的,在

BigQuery
中执行对
ParDo
的查询不是一个好习惯。它将执行许多查询,并且它在
Beam
上表现不佳,并且在
BigQuery
方面不具有成本效益。

您可以考虑另一种方法和设计来解决您的问题。

你有一个流媒体管道,需要有一个状态。 如果可能的话,在

In Memory
桌子旁边使用
BigQuery
储物柜可能会很有趣。

Google Cloud
中,有一个MemoryStore缓存与
Redis
.

通过这种方法,您可以在 ParDo 中使用

Python Redis 客户端
并从缓存中找到元素。

此解决方案具有出色的响应时间和成本效益

MemoryStore
.

但是,此解决方案有一个缺点,您必须管理缓存并将其与

BigQuery
表同步。

如果你因为任何原因丢失了它,你还必须计划一种重建缓存的方法。

我希望这个答案能有所帮助。

© www.soinside.com 2019 - 2024. All rights reserved.