beam/dataflow管道将写入BigQuery无法转换时间戳(有时)

问题描述 投票:0回答:1

I有一个梁/数据流管线,该管道从酒吧/sub读取,并用

WriteToBigQuery
写入Biqquery。我将所有时间戳转换为
apache_beam.utils.timestamp.Timestamp
。我确定所有时间戳都已转换,但我确实会在某些行中遇到此错误:

Error message from worker: generic::unknown: org.apache.beam.sdk.util.UserCodeException: java.lang.UnsupportedOperationException: Converting BigQuery type 'class java.lang.String' to 'LOGICAL_TYPE<beam:logical_type:micros_instant:v1>' is not supported

数据量太高,无法隔离失败的行,并且没有任何东西。它仅与

WriteToBigQuery.failed_rows_with_errors
模式
STREAMING_INSERTS
STORAGE_WRITE_API
有此问题。 我的管道就是这样:

( p | "ReadFromPubSub" >> beam.io.ReadFromPubSub( subscription=config.get_arg("pub_sub_subscription")) | "DecodeMessage" >> beam.ParDo(ParsePubSubMessage()) | "FilterEvents" >> beam.Filter(lambda element: element[0][1] == "event") | "ExtractParsedMessage" >> beam.Map(lambda element: element[1]) | "LogBeforeSerialize" >> beam.Map(lambda x: log_element(x, "Before serialize")) | "SerializeDataField" >> beam.ParDo(SerializeDataFieldDoFn(events_schema)) | "LogAfterSerialize" >> beam.Map(lambda x: log_element(x, "After serialize")) | "FilterValidRows" >> beam.Filter(lambda row: row is not None) | "WriteToBigQuery" >> WriteToBigQuery( table="xxx", dataset="xxx", project="xxx", schema={"fields": events_schema}, method=WriteToBigQuery.Method.STORAGE_WRITE_API, use_at_least_once=True, validate=False, ignore_unknown_columns=True, write_disposition=BigQueryDisposition.WRITE_APPEND, create_disposition=BigQueryDisposition.CREATE_NEVER, ) )

任何帮助都将不胜感激。
    

当到达Bigquery水槽时,有些人仍然很有可能会有一些。尝试首先专注于识别有问题的数据类型而不是特定行。
python google-bigquery runtime-error google-cloud-dataflow beam
1个回答
0
投票

您还可以尝试实现一个dead-ledter队列以捕获失败的元素。过滤转换尝试后具有

import apache_beam as beam
from apache_beam.utils.timestamp import Timestamp

def log_element_types(element, message):
    print(f"{message}:")
    for key, value in element.items():
        print(f"  {key}: {type(value)}")
    return element

 ... your pipeline ...
| "LogTypesAfterSerialize" >> beam.Map(lambda x: log_element_types(x, "Types After serialize"))
时间戳的元素,并将其发送到单独的酒吧/子主题或大Query表进行分析。


最新问题
© www.soinside.com 2019 - 2025. All rights reserved.