I有一个梁/数据流管线,该管道从酒吧/sub读取,并用
WriteToBigQuery
写入Biqquery。我将所有时间戳转换为apache_beam.utils.timestamp.Timestamp
。我确定所有时间戳都已转换,但我确实会在某些行中遇到此错误:
Error message from worker: generic::unknown: org.apache.beam.sdk.util.UserCodeException: java.lang.UnsupportedOperationException: Converting BigQuery type 'class java.lang.String' to 'LOGICAL_TYPE<beam:logical_type:micros_instant:v1>' is not supported
数据量太高,无法隔离失败的行,并且没有任何东西。它仅与
WriteToBigQuery.failed_rows_with_errors
模式STREAMING_INSERTS
STORAGE_WRITE_API
有此问题。
我的管道就是这样:
(
p
| "ReadFromPubSub" >> beam.io.ReadFromPubSub(
subscription=config.get_arg("pub_sub_subscription"))
| "DecodeMessage" >> beam.ParDo(ParsePubSubMessage())
| "FilterEvents" >> beam.Filter(lambda element: element[0][1] == "event")
| "ExtractParsedMessage" >> beam.Map(lambda element: element[1])
| "LogBeforeSerialize" >> beam.Map(lambda x: log_element(x, "Before serialize"))
| "SerializeDataField" >> beam.ParDo(SerializeDataFieldDoFn(events_schema))
| "LogAfterSerialize" >> beam.Map(lambda x: log_element(x, "After serialize"))
| "FilterValidRows" >> beam.Filter(lambda row: row is not None)
| "WriteToBigQuery" >> WriteToBigQuery(
table="xxx",
dataset="xxx",
project="xxx",
schema={"fields": events_schema},
method=WriteToBigQuery.Method.STORAGE_WRITE_API,
use_at_least_once=True,
validate=False,
ignore_unknown_columns=True,
write_disposition=BigQueryDisposition.WRITE_APPEND,
create_disposition=BigQueryDisposition.CREATE_NEVER,
)
)
任何帮助都将不胜感激。
当到达Bigquery水槽时,有些人仍然很有可能会有一些。尝试首先专注于识别有问题的数据类型而不是特定行。
您还可以尝试实现一个dead-ledter队列以捕获失败的元素。过滤转换尝试后具有
import apache_beam as beam
from apache_beam.utils.timestamp import Timestamp
def log_element_types(element, message):
print(f"{message}:")
for key, value in element.items():
print(f" {key}: {type(value)}")
return element
... your pipeline ...
| "LogTypesAfterSerialize" >> beam.Map(lambda x: log_element_types(x, "Types After serialize"))
时间戳的元素,并将其发送到单独的酒吧/子主题或大Query表进行分析。