我们有一个管道,它将发送 cosmos db 中发生的所有更改并将其作为 .json 文件(存储桶)存储在 gcs 中,从那里我想通过删除重复项加载到 bigquery 中,这意味着只有最新记录需要推送到 bigquery 中。
实现这一点的最佳方法是什么?
现在我正在将gcs加载到bigquery,但是我如何通过扫描来消除重复记录?
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
TableId tableId = TableId.of(datasetName, tableName);
LoadJobConfiguration loadConfig =
LoadJobConfiguration.newBuilder(tableId, sourceUri)
.setFormatOptions(FormatOptions.json())
.setSchema(schema)
.build();
// Load data from a GCS JSON file into the table
Job job = bigquery.create(JobInfo.of(loadConfig));
// Blocks until this load table job completes its execution, either failing or succeeding.
job = job.waitFor();
if (job.isDone()) {
System.out.println("Json from GCS successfully loaded in a table");
} else {
System.out.println(
"BigQuery was unable to load into the table due to an error:"
+ job.getStatus().getError());
}
通过使用 MERGE 语句,这将允许您仅插入新记录或根据 _ts 值更新现有记录。
MERGE `your_dataset.your_target_table` AS target
USING (
SELECT * EXCEPT(_ts) FROM `your_dataset.your_staging_table`
) AS source
ON target.id = source.id -- Match on your primary key
WHEN MATCHED AND target._ts < source._ts THEN
UPDATE SET * EXCEPT(id, _ts) = source.* -- Update if the source timestamp is newer
WHEN NOT MATCHED THEN
INSERT ROW; -- Insert if the record doesn't exist