Apache Beam -> BigQuery:存储写入 API 不尊重主键

问题描述 投票:0回答:1

我使用以下 DDL 创建了一个 BigQuery 表:

CREATE TABLE mytable AS
(
  id STRING,
  source STRING,
  PRIMARY KEY (id) NOT ENFORCED
);

如您所见,

id
被设置为表主键。我的 Beam 管道定义如下:

def process_message(message):  
    import apache_beam as beam
    import struct
    data = json.loads(message.decode("utf-8"))
    if data == {}:
        print(f"Running DELETE operation on row {this_message['Key']}")
        data['_CHANGE_TYPE'] = 'DELETE'
    else:   
        print(f"Running UPSERT operation on row {this_message['Key']}")
        data['_CHANGE_TYPE'] = 'UPSERT'
    data['_CHANGE_SEQUENCE_NUMBER'] = str(struct.pack('d', int(round(float(this_message['Value']['updated'])))).hex())
    return [data]


with beam.Pipeline(options=PipelineOptions([
    f"--project={project_id}",
    "--region=europe-west2",
    "--runner=DataflowRunner",
    "--streaming",
    "--temp_location=gs://tmp/cdc",
    "--staging_location=gs://tmp/cdc",
])) as pipeline:
  data = pipeline | 'ReadFromPubSub' >> ReadFromPubSub(subscription=f'projects/{project_id}/subscriptions/{bq_table_name}')

  data | 'ProcessMessages' >> beam.ParDo(process_message) | 'WriteToBigQuery' >> WriteToBigQuery(
    f'{project_id}:{bq_dataset}.{bq_table_name}',
    schema=schema,
    method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
    triggering_frequency=5
  )

但是,当我在使用少量记录后在 BigQuery 中查询表时,我发现

id
字段有数百个重复项。

如何让我的管道尊重主键并执行 UPSERT 操作,就像它应该做的那样?

google-chrome google-cloud-dataflow beam
1个回答
0
投票

看起来您正在尝试实现 CDC 写入,并且需要将: use_cdc_writes 设置为 true,并且您需要在每条记录的基础上提供 row_mutation_info

更多信息请参考 https://beam.apache.org/releases/pydoc/2.61.0/apache_beam.io.gcp.bigquery.html

© www.soinside.com 2019 - 2024. All rights reserved.