从 gcs 加载数据到 bigquery 时进行重复数据删除

问题描述 投票:0回答:1

我们有一个管道,它将发送 cosmos db 中发生的所有更改并将其作为 .json 文件(存储桶)存储在 gcs 中,从那里我想通过删除重复项加载到 bigquery 中,这意味着只有最新记录需要推送到 bigquery 中。

实现这一点的最佳方法是什么?

现在我正在将gcs加载到bigquery,但是我如何通过扫描来消除重复记录?

  BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

  TableId tableId = TableId.of(datasetName, tableName);
  LoadJobConfiguration loadConfig =
      LoadJobConfiguration.newBuilder(tableId, sourceUri)
          .setFormatOptions(FormatOptions.json())
          .setSchema(schema)
          .build();

  // Load data from a GCS JSON file into the table
  Job job = bigquery.create(JobInfo.of(loadConfig));
  // Blocks until this load table job completes its execution, either failing or succeeding.
  job = job.waitFor();
  if (job.isDone()) {
    System.out.println("Json from GCS successfully loaded in a table");
  } else {
    System.out.println(
        "BigQuery was unable to load into the table due to an error:"
            + job.getStatus().getError());
  }
google-bigquery google-cloud-storage spring-cloud-gcp-bigquery
1个回答
0
投票

通过使用 MERGE 语句,这将允许您仅插入新记录或根据 _ts 值更新现有记录。


MERGE `your_dataset.your_target_table` AS target
USING (
    SELECT * EXCEPT(_ts) FROM `your_dataset.your_staging_table` 
) AS source
ON target.id = source.id  -- Match on your primary key
WHEN MATCHED AND target._ts < source._ts THEN
    UPDATE SET * EXCEPT(id, _ts) = source.*  -- Update if the source timestamp is newer
WHEN NOT MATCHED THEN
    INSERT ROW;  -- Insert if the record doesn't exist

© www.soinside.com 2019 - 2024. All rights reserved.