Apache Beam 代码未运行并出现错误

问题描述 投票:0回答:1

我正在尝试编写自定义 Flex 模板来从 JDBC 中提取数据并写入 GCS 存储桶。我在代码中做错了什么? 代码产生错误:

Error message from worker: Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 636, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1611, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "/dataflow/templates/beam_job.py", line 83, in process
NameError: name 'beam_jdbc' is not defined

代码:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.value_provider import StaticValueProvider
import apache_beam.io.jdbc as beam_jdbc
import os

class CustomOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--jdbc_url', type=str, help='JDBC URL')
        parser.add_value_provider_argument('--jdbc_username', type=str, help='JDBC Username')
        parser.add_value_provider_argument('--jdbc_password', type=str, help='JDBC Password')
        parser.add_value_provider_argument('--jdbc_driver_class_name', type=str, help='JDBC Driver Class Name')
        parser.add_value_provider_argument('--jdbc_query', type=str, help='JDBC Query')
        parser.add_value_provider_argument('--jdbc_fetch_size', type=int, default=1000, help='JDBC Fetch Size')
        parser.add_value_provider_argument('--jdbc_table_name', type=str, help='JDBC Table Name')
        parser.add_value_provider_argument('--output_gcs_location', type=str, help='Output GCS Location')

class ReadFromJdbcFn(beam.DoFn):
    def __init__(self, jdbc_driver_class_name, jdbc_url, jdbc_username, jdbc_password, jdbc_query, jdbc_fetch_size):
        self.jdbc_driver_class_name = jdbc_driver_class_name
        self.jdbc_url = jdbc_url
        self.jdbc_username = jdbc_username
        self.jdbc_password = jdbc_password
        self.jdbc_query = jdbc_query
        self.jdbc_fetch_size = jdbc_fetch_size

    def process(self, element):
        jdbc_driver_class_name = self.jdbc_driver_class_name.get()
        jdbc_url = self.jdbc_url.get()
        jdbc_username = self.jdbc_username.get()
        jdbc_password = self.jdbc_password.get()
        jdbc_query = self.jdbc_query.get()
        jdbc_fetch_size = self.jdbc_fetch_size.get()

        source = beam_jdbc.ReadFromJdbc(
            driver_class_name=jdbc_driver_class_name,
            jdbc_url=jdbc_url,
            username=jdbc_username,
            password=jdbc_password,
            query=jdbc_query,
            fetch_size=jdbc_fetch_size
        )
        
        yield from source.expand(element)

def run():
    pipeline_options = PipelineOptions()
    custom_options = pipeline_options.view_as(CustomOptions)

    with beam.Pipeline(options=pipeline_options) as p:
        _ = (
            p
            | 'Create' >> beam.Create([None])
            | 'Read from JDBC' >> beam.ParDo(
                ReadFromJdbcFn(
                    jdbc_driver_class_name=custom_options.jdbc_driver_class_name,
                    jdbc_url=custom_options.jdbc_url,
                    jdbc_username=custom_options.jdbc_username,
                    jdbc_password=custom_options.jdbc_password,
                    jdbc_query=custom_options.jdbc_query,
                    jdbc_fetch_size=custom_options.jdbc_fetch_size
                )
            )
            | 'Write to GCS' >> beam.io.WriteToText(
                custom_options.output_gcs_location.get(),
                file_name_suffix='.json',
                shard_name_template=''
            )
        )

if __name__ == '__main__':
    run()

Apache-beam 版本==最新

google-cloud-platform jdbc apache-beam
1个回答
0
投票

腌制

__main__
会话/环境时可能会出现问题。 尝试将此导入放入您的
run()
方法中。

© www.soinside.com 2019 - 2024. All rights reserved.