我正在尝试编写自定义 Flex 模板来从 JDBC 中提取数据并写入 GCS 存储桶。我在代码中做错了什么? 代码产生错误:
Error message from worker: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 636, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1611, in apache_beam.runners.common._OutputHandler.handle_process_outputs
File "/dataflow/templates/beam_job.py", line 83, in process
NameError: name 'beam_jdbc' is not defined
代码:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.value_provider import StaticValueProvider
import apache_beam.io.jdbc as beam_jdbc
import os
class CustomOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--jdbc_url', type=str, help='JDBC URL')
parser.add_value_provider_argument('--jdbc_username', type=str, help='JDBC Username')
parser.add_value_provider_argument('--jdbc_password', type=str, help='JDBC Password')
parser.add_value_provider_argument('--jdbc_driver_class_name', type=str, help='JDBC Driver Class Name')
parser.add_value_provider_argument('--jdbc_query', type=str, help='JDBC Query')
parser.add_value_provider_argument('--jdbc_fetch_size', type=int, default=1000, help='JDBC Fetch Size')
parser.add_value_provider_argument('--jdbc_table_name', type=str, help='JDBC Table Name')
parser.add_value_provider_argument('--output_gcs_location', type=str, help='Output GCS Location')
class ReadFromJdbcFn(beam.DoFn):
def __init__(self, jdbc_driver_class_name, jdbc_url, jdbc_username, jdbc_password, jdbc_query, jdbc_fetch_size):
self.jdbc_driver_class_name = jdbc_driver_class_name
self.jdbc_url = jdbc_url
self.jdbc_username = jdbc_username
self.jdbc_password = jdbc_password
self.jdbc_query = jdbc_query
self.jdbc_fetch_size = jdbc_fetch_size
def process(self, element):
jdbc_driver_class_name = self.jdbc_driver_class_name.get()
jdbc_url = self.jdbc_url.get()
jdbc_username = self.jdbc_username.get()
jdbc_password = self.jdbc_password.get()
jdbc_query = self.jdbc_query.get()
jdbc_fetch_size = self.jdbc_fetch_size.get()
source = beam_jdbc.ReadFromJdbc(
driver_class_name=jdbc_driver_class_name,
jdbc_url=jdbc_url,
username=jdbc_username,
password=jdbc_password,
query=jdbc_query,
fetch_size=jdbc_fetch_size
)
yield from source.expand(element)
def run():
pipeline_options = PipelineOptions()
custom_options = pipeline_options.view_as(CustomOptions)
with beam.Pipeline(options=pipeline_options) as p:
_ = (
p
| 'Create' >> beam.Create([None])
| 'Read from JDBC' >> beam.ParDo(
ReadFromJdbcFn(
jdbc_driver_class_name=custom_options.jdbc_driver_class_name,
jdbc_url=custom_options.jdbc_url,
jdbc_username=custom_options.jdbc_username,
jdbc_password=custom_options.jdbc_password,
jdbc_query=custom_options.jdbc_query,
jdbc_fetch_size=custom_options.jdbc_fetch_size
)
)
| 'Write to GCS' >> beam.io.WriteToText(
custom_options.output_gcs_location.get(),
file_name_suffix='.json',
shard_name_template=''
)
)
if __name__ == '__main__':
run()
Apache-beam 版本==最新
腌制
__main__
会话/环境时可能会出现问题。 尝试将此导入放入您的 run()
方法中。