我有一个梁管道,它从两个 Postgres CloudSQL DB 读取记录,进行一些数据转换,并通过
WriteToPubSub
模块将数据推送到 Google PubSub。 \
我能够在本地作为 DirectRunner 运行此管道,其中与 CloudSQL 的连接以及发布到 PubSub 都工作正常。
但是当我将
runner='DataflowRunner'
作为管道选项包含时,管道失败并出现错误,
“PDone”对象没有属性“windowing”。
此错误发生在beam中
get_windowing
模块内的ptransform.py
函数中。
我不确定在 Dataflow 运行器中运行代码会带来什么差异。
这是因为某些权限问题导致 Dataflow 无法与 PubSub 或 CloudSQL 通信,还是即使在写入 PubSub 时我是否也需要指定一些特定的窗口选项?
以下代码片段显示了主要的管道流程。
# Invoker code
if __name__ == '__main__':
# Set up your PostgreSQL connection parameters
db_config = {
'host': os.getenv('DB_HOST'),
'port': os.getenv('DB_PORT'),
'database': os.getenv('DB_NAME'),
'user': os.getenv('DB_USER'),
'password': os.getenv('DB_PASS')
}
parser = argparse.ArgumentParser()
args, beam_args = parser.parse_known_args()
print(args)
publish_topic = os.getenv('PUBLISH_TOPIC')
# Set up Apache Beam pipeline options
pipeline_options = PipelineOptions(
beam_args,
runner='DataflowRunner',
project='<gcp-project-id>',
job_name='dispatch-demo-1',
temp_location='<bucket path>',
region='europe-west1')
dispatch_args = pipeline_options.view_as(DispatchOptions)
with beam.Pipeline(options=pipeline_options) as pipeline:
# Create a dummy input element
dummy_input = pipeline | beam.Create(['dummy'])
client_operations_query = f"SELECT * FROM table2 WHERE attr1=abc"
get_retailer_categories_query = 'SELECT * FROM table1'
co_rows = dummy_input | 'Get table1 rows' >> ReadDB(
client_operations_query, **db_config) | 'CO list to map' >> beam.Map(list_to_dict, 'internal_category_id')
co_rows | "co_rows " >> beam.Map(print)
retailer_cat_rows = dummy_input | 'Get table2 rows' >> ReadDB(
get_retailer_categories_query, **db_config) | 'Table2 list to map' >> beam.Map(list_to_dict, 'internal_category_id')
retailer_cat_rows | "retailer_cat_rows" >> beam.Map(print)
denormalised_co_rows = (({
'co_rows': co_rows, 'retailer_cat_rows': retailer_cat_rows
})
| 'group by cat_ids' >> beam.CoGroupByKey()
| 'Join by cat_id' >> beam.ParDo(MergeTransform()))
groupedRows = denormalised_co_rows | beam.GroupBy(get_hash) | 'ExtractClientIds' >> beam.Map(lambda element: (element[0], [obj['client_id'] for obj in element[1]], element[1])) | "Convert to string" >> beam.Map(
encode_as_task) | "Write to Pub/Sub" >> beam.io.WriteToPubSub(topic=publish_topic) | "pubsub out print" >> beam.Map(print)