我使用以下 DDL 创建了一个 BigQuery 表:
CREATE TABLE mytable AS
(
id STRING,
source STRING,
PRIMARY KEY (id) NOT ENFORCED
);
如您所见,
id
被设置为表主键。我的 Beam 管道定义如下:
def process_message(message):
import apache_beam as beam
import struct
data = json.loads(message.decode("utf-8"))
if data == {}:
print(f"Running DELETE operation on row {this_message['Key']}")
data['_CHANGE_TYPE'] = 'DELETE'
else:
print(f"Running UPSERT operation on row {this_message['Key']}")
data['_CHANGE_TYPE'] = 'UPSERT'
data['_CHANGE_SEQUENCE_NUMBER'] = str(struct.pack('d', int(round(float(this_message['Value']['updated'])))).hex())
return [data]
with beam.Pipeline(options=PipelineOptions([
f"--project={project_id}",
"--region=europe-west2",
"--runner=DataflowRunner",
"--streaming",
"--temp_location=gs://tmp/cdc",
"--staging_location=gs://tmp/cdc",
])) as pipeline:
data = pipeline | 'ReadFromPubSub' >> ReadFromPubSub(subscription=f'projects/{project_id}/subscriptions/{bq_table_name}')
data | 'ProcessMessages' >> beam.ParDo(process_message) | 'WriteToBigQuery' >> WriteToBigQuery(
f'{project_id}:{bq_dataset}.{bq_table_name}',
schema=schema,
method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
triggering_frequency=5
)
但是,当我在使用少量记录后在 BigQuery 中查询表时,我发现
id
字段有数百个重复项。
如何让我的管道尊重主键并执行 UPSERT 操作,就像它应该做的那样?
看起来您正在尝试实现 CDC 写入,并且需要将: use_cdc_writes 设置为 true,并且您需要在每条记录的基础上提供 row_mutation_info。
更多信息请参考 https://beam.apache.org/releases/pydoc/2.61.0/apache_beam.io.gcp.bigquery.html