如何处理 Apache Beam (python) 中的异常,以从 JDBC 读取数据并写入 BigQuery

问题描述 投票:0回答:1

我能够成功从 JDBC 源读取数据,并将输出写回 BigQuery。 然而,我仍然坚持寻找处理坏行的 BigQuery 插入异常的最佳方法。

例如,来自 JDBC 源的以下行,前 2 行是好的,插入 BigQuery 没有问题。但是,第三条记录的日期时间错误,并且会导致我的代码引发期望。

id 电子邮件 名字 姓氏 出生日期
005a31ba-d16c-42d5 [电子邮件受保护] 慢跑 彼得 1996-07-01 00:00:00
007705f9-e248-492c [电子邮件受保护] 慢跑 彼得 2000-09-15 00:00:00
042c5001-077f-4d49 [电子邮件受保护] 慢跑 彼得 0001-01-01 00:00:00

我期望在管道中的获取错误写入错误步骤中处理坏行,其中应该将失败的 BigQuery 行写入死信队列表。

我遇到以下异常:

错误:apache_beam.runners.dataflow.dataflow_runner:2024-06-15T15:27:01.408Z:JOB_MESSAGE_ERROR:org.apache.beam.sdk.util.UserCodeException:java.lang.RuntimeException:附加到流项目/虚拟项目/datasets/jdbctests/tables/users/streams/Cig2NmRiZDZhMC0wMDAwLTI5ZDYtYjYzNS1jODJhZGQ2YzQzZTg6czEz 失败,状态代码为 INVALID_ARGUMENT。该流可能不存在。 在 org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) 在 org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn$DoFnInvoker.invokeFinishBundle(来源未知) 在org.apache.beam.fn.harness.FnApiDoFnRunner.finishBundle(FnApiDoFnRunner.java:1776) 在 org.apache.beam.fn.harness.data.PTransformFunctionRegistry.lambda$register$0(PTransformFunctionRegistry.java:116) 在org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:560) 在 org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150) 在 org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115) 在 java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) 在 java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) 在org.apache.beam.sdk.util.UnboundedScheduledExecutorService $ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163) 在 java.base/java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1144) 在 java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) 在 java.base/java.lang.Thread.run(Thread.java:1583) 原因:java.lang.RuntimeException:附加到流项目/虚拟项目/数据集/jdbctests/tables/users/streams/Cig2NmRiZDZhMC0wMDAwLTI5ZDYtYjYzNS1jODJhZGQ2YzQzZTg6czEz 失败,状态代码为 INVALID_ARGUMENT。该流可能不存在。 在 org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn$DestinationState.lambda$flush$8(StorageApiWriteUnshardedRecords.java:778) 在 org.apache.beam.sdk.io.gcp.bigquery.RetryManager.await(RetryManager.java:311) 在 org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.flushAll(StorageApiWriteUnshardedRecords.java:965) 在 org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.finishBundle(StorageApiWriteUnshardedRecords.java:1113) 引起原因:com.google.api.gax.rpc.InvalidArgumentException:io.grpc.StatusRuntimeException:INVALID_ARGUMENT:时间戳字段值超出范围:出生日期字段上的-62135769600000000。实体:projects/dummy-project/datasets/jdbctests /表/用户/流/Cig2NmRiZDZhMC0wMDAwLTI5ZDYtYjYzNS1jODJhZGQ2YzQzZTg6czEz

这是我的代码:

import argparse
import logging
import json

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.jdbc import ReadFromJdbc
from apache_beam.typehints.schemas import LogicalType, MillisInstant
from apache_beam.io.gcp.bigquery_tools import RetryStrategy


users_schema = {
    "fields": [
        {"type": "STRING", "name": "id", "mode": "NULLABLE"},
        {"type": "STRING", "name": "email", "mode": "NULLABLE"},
        {"type": "STRING", "name": "firstname", "mode": "NULLABLE"},
        {"type": "STRING", "name": "lastname", "mode": "NULLABLE"},
        {"type": "TIMESTAMP", "name": "dateofbirth", "mode": "NULLABLE"},
    ]
}


error_schema = {
    "fields": [
        {"name": "destination", "type": "STRING", "mode": "NULLABLE"},
        {"name": "row", "type": "STRING", "mode": "NULLABLE"},
        {"name": "error_message", "type": "STRING", "mode": "NULLABLE"},
    ]
}


class LogResults(beam.DoFn):
    """Just log the results"""

    def process(self, element):
        logging.info("elment.logger - : %s", element)
        yield element


def run(argv=None):
    parser = argparse.ArgumentParser()
    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_options = PipelineOptions(pipeline_args, pipeline_type_check=True)
    LogicalType.register_logical_type(MillisInstant)

    with beam.Pipeline(options=pipeline_options) as p:
        users = p | "Read users" >> ReadFromJdbc(
            table_name="users",
            query="SELECT id, email, firstname, lastname, dateofbirth FROM users;",
            driver_class_name="com.mysql.cj.jdbc.Driver",
            jdbc_url="xxxx",
            username=r"xxxxx",
            password=r"xxxxx",
            classpath=["jdbc/mysql-connector-j-8.4.0.jar"],
        )

        result = (
            users
            | "map Dict" >> beam.Map(lambda x: x._asdict())
            | "Log users_favfilms" >> beam.ParDo(LogResults())
            | "write users to BQ"
            >> beam.io.WriteToBigQuery(
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                schema=users_schema,
                table="jdbctests.users",
                method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
                insert_retry_strategy=RetryStrategy.RETRY_NEVER,
            )
        )

        _ = (
            result.failed_rows_with_errors
            | "Get Errors"
            >> beam.Map(
                lambda e: {
                    "destination": e[0],
                    "row": json.dumps(e[1]),
                    "error_message": e[2][0]["message"],
                }
            )
            | "Write Errors"
            >> beam.io.WriteToBigQuery(
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
                table="jdbctests.jdbcerrros",
                schema=error_schema,
                insert_retry_strategy=RetryStrategy.RETRY_NEVER,
            )
        )


if __name__ == "__main__":
    logging.getLogger().setLevel(logging.DEBUG)
    run()

我希望能够处理这些坏行,并避免中断管道,然后在稍后阶段处理它们。 任何建议或提示将不胜感激,谢谢。

python google-bigquery google-cloud-dataflow apache-beam
1个回答
0
投票

所以我认为这里的问题是您产生的确切错误。

ERROR:apache_beam.runners.dataflow.dataflow_runner:2024-06-15T15:27:01.408Z: JOB_MESSAGE_ERROR: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: Append to stream projects/dummy-project/datasets/jdbctests/tables/users/streams/Cig2NmRiZDZhMC0wMDAwLTI5ZDYtYjYzNS1jODJhZGQ2YzQzZTg6czEz failed with Status Code INVALID_ARGUMENT. The stream may not exist

似乎您正在尝试写入不存在的

dummy-project

当写入 BQ 时出现某些

valid
失败时,我们会将记录写入 DLQ。基本上,我们在写入时检查底层通道是否存在 BQ 故障,如果 BQ 报告错误,我们会将此类记录写入 DLQ。对于其他更具灾难性的故障,我们只是使管道失败(因此您的记录永远不会丢失)。

在这种情况下,写入 DLQ 发生在此处

由于您试图写入一个不存在的项目,这最终导致了一场更具灾难性的失败(我们根本无法创建通道)。因此管道失败了,而不是将失败的记录写入 BQ。

© www.soinside.com 2019 - 2024. All rights reserved.