我能够成功从 JDBC 源读取数据,并将输出写回 BigQuery。 然而,我仍然坚持寻找处理坏行的 BigQuery 插入异常的最佳方法。
例如,来自 JDBC 源的以下行,前 2 行是好的,插入 BigQuery 没有问题。但是,第三条记录的日期时间错误,并且会导致我的代码引发期望。
id | 电子邮件 | 名字 | 姓氏 | 出生日期 |
---|---|---|---|---|
005a31ba-d16c-42d5 | [电子邮件受保护] | 慢跑 | 彼得 | 1996-07-01 00:00:00 |
007705f9-e248-492c | [电子邮件受保护] | 慢跑 | 彼得 | 2000-09-15 00:00:00 |
042c5001-077f-4d49 | [电子邮件受保护] | 慢跑 | 彼得 | 0001-01-01 00:00:00 |
我期望在管道中的获取错误和写入错误步骤中处理坏行,其中应该将失败的 BigQuery 行写入死信队列表。
我遇到以下异常:
错误:apache_beam.runners.dataflow.dataflow_runner:2024-06-15T15:27:01.408Z:JOB_MESSAGE_ERROR:org.apache.beam.sdk.util.UserCodeException:java.lang.RuntimeException:附加到流项目/虚拟项目/datasets/jdbctests/tables/users/streams/Cig2NmRiZDZhMC0wMDAwLTI5ZDYtYjYzNS1jODJhZGQ2YzQzZTg6czEz 失败,状态代码为 INVALID_ARGUMENT。该流可能不存在。 在 org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) 在 org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn$DoFnInvoker.invokeFinishBundle(来源未知) 在org.apache.beam.fn.harness.FnApiDoFnRunner.finishBundle(FnApiDoFnRunner.java:1776) 在 org.apache.beam.fn.harness.data.PTransformFunctionRegistry.lambda$register$0(PTransformFunctionRegistry.java:116) 在org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:560) 在 org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150) 在 org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115) 在 java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) 在 java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) 在org.apache.beam.sdk.util.UnboundedScheduledExecutorService $ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163) 在 java.base/java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1144) 在 java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) 在 java.base/java.lang.Thread.run(Thread.java:1583) 原因:java.lang.RuntimeException:附加到流项目/虚拟项目/数据集/jdbctests/tables/users/streams/Cig2NmRiZDZhMC0wMDAwLTI5ZDYtYjYzNS1jODJhZGQ2YzQzZTg6czEz 失败,状态代码为 INVALID_ARGUMENT。该流可能不存在。 在 org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn$DestinationState.lambda$flush$8(StorageApiWriteUnshardedRecords.java:778) 在 org.apache.beam.sdk.io.gcp.bigquery.RetryManager.await(RetryManager.java:311) 在 org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.flushAll(StorageApiWriteUnshardedRecords.java:965) 在 org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.finishBundle(StorageApiWriteUnshardedRecords.java:1113) 引起原因:com.google.api.gax.rpc.InvalidArgumentException:io.grpc.StatusRuntimeException:INVALID_ARGUMENT:时间戳字段值超出范围:出生日期字段上的-62135769600000000。实体:projects/dummy-project/datasets/jdbctests /表/用户/流/Cig2NmRiZDZhMC0wMDAwLTI5ZDYtYjYzNS1jODJhZGQ2YzQzZTg6czEz
这是我的代码:
import argparse
import logging
import json
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.jdbc import ReadFromJdbc
from apache_beam.typehints.schemas import LogicalType, MillisInstant
from apache_beam.io.gcp.bigquery_tools import RetryStrategy
users_schema = {
"fields": [
{"type": "STRING", "name": "id", "mode": "NULLABLE"},
{"type": "STRING", "name": "email", "mode": "NULLABLE"},
{"type": "STRING", "name": "firstname", "mode": "NULLABLE"},
{"type": "STRING", "name": "lastname", "mode": "NULLABLE"},
{"type": "TIMESTAMP", "name": "dateofbirth", "mode": "NULLABLE"},
]
}
error_schema = {
"fields": [
{"name": "destination", "type": "STRING", "mode": "NULLABLE"},
{"name": "row", "type": "STRING", "mode": "NULLABLE"},
{"name": "error_message", "type": "STRING", "mode": "NULLABLE"},
]
}
class LogResults(beam.DoFn):
"""Just log the results"""
def process(self, element):
logging.info("elment.logger - : %s", element)
yield element
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args, pipeline_type_check=True)
LogicalType.register_logical_type(MillisInstant)
with beam.Pipeline(options=pipeline_options) as p:
users = p | "Read users" >> ReadFromJdbc(
table_name="users",
query="SELECT id, email, firstname, lastname, dateofbirth FROM users;",
driver_class_name="com.mysql.cj.jdbc.Driver",
jdbc_url="xxxx",
username=r"xxxxx",
password=r"xxxxx",
classpath=["jdbc/mysql-connector-j-8.4.0.jar"],
)
result = (
users
| "map Dict" >> beam.Map(lambda x: x._asdict())
| "Log users_favfilms" >> beam.ParDo(LogResults())
| "write users to BQ"
>> beam.io.WriteToBigQuery(
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
schema=users_schema,
table="jdbctests.users",
method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
insert_retry_strategy=RetryStrategy.RETRY_NEVER,
)
)
_ = (
result.failed_rows_with_errors
| "Get Errors"
>> beam.Map(
lambda e: {
"destination": e[0],
"row": json.dumps(e[1]),
"error_message": e[2][0]["message"],
}
)
| "Write Errors"
>> beam.io.WriteToBigQuery(
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
table="jdbctests.jdbcerrros",
schema=error_schema,
insert_retry_strategy=RetryStrategy.RETRY_NEVER,
)
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.DEBUG)
run()
我希望能够处理这些坏行,并避免中断管道,然后在稍后阶段处理它们。 任何建议或提示将不胜感激,谢谢。
所以我认为这里的问题是您产生的确切错误。
ERROR:apache_beam.runners.dataflow.dataflow_runner:2024-06-15T15:27:01.408Z: JOB_MESSAGE_ERROR: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: Append to stream projects/dummy-project/datasets/jdbctests/tables/users/streams/Cig2NmRiZDZhMC0wMDAwLTI5ZDYtYjYzNS1jODJhZGQ2YzQzZTg6czEz failed with Status Code INVALID_ARGUMENT. The stream may not exist
似乎您正在尝试写入不存在的
dummy-project
。
当写入 BQ 时出现某些
valid
失败时,我们会将记录写入 DLQ。基本上,我们在写入时检查底层通道是否存在 BQ 故障,如果 BQ 报告错误,我们会将此类记录写入 DLQ。对于其他更具灾难性的故障,我们只是使管道失败(因此您的记录永远不会丢失)。
在这种情况下,写入 DLQ 发生在此处。
由于您试图写入一个不存在的项目,这最终导致了一场更具灾难性的失败(我们根本无法创建通道)。因此管道失败了,而不是将失败的记录写入 BQ。