我的要求是从数据流连接到Oracle OnPrem数据库,读取数据并存储在GCS上。我正在使用 python 脚本。我正在通过 git 操作创建数据流经典模板,脚本使用 JDBC 连接器,目前无法创建模板,因为不允许来自 github 运行程序的数据库连接。因此,我想在模板创建期间绕过连接,并且脚本应仅在实际数据流作业运行时才建立连接。
我尝试使用其他连接器,例如 cx_Oracle、jaybedebeapi,并且能够绕过连接,但它在数据流作业中失败,并出现上述模块的错误“未找到匹配的分布”。需要解决上述问题。
`
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions, ValueProvider
import argparse
from apache_beam.options.value_provider import RuntimeValueProvider, ValueProvider
from apache_beam.io.jdbc import WriteToJdbc, ReadFromJdbc
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.transforms import PTransform
from apache_beam.transforms.core import DoFn, ParDo
import logging
from apache_beam import coders
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--jdbc_url',
help='jdbc_url'
)
parser.add_value_provider_argument(
'--driver_class_name',
help='Driver class name'
)
parser.add_value_provider_argument(
'--username',
help='Username'
)
parser.add_value_provider_argument(
'--password',
help='Password'
)
parser.add_value_provider_argument(
'--table_name',
help='table_name'
)
parser.add_value_provider_argument(
'--query',
help='query'
)
class ReadFromJdbccustom():
def __init__(self, jdbc_url, driver_class_name, username, password, query, table_name):
self.jdbc_url = jdbc_url
self.driver_class_name = driver_class_name
self.username = username
self.password = password
self.query = query
self.table_name = table_name
def __enter__(self):
self.datasource = ReadFromJdbc(
driver_class_name=self.driver_class_name.get(),
jdbc_url=self.jdbc_url.get(),
username=self.username.get(),
password=self.password.get(),
query=self.query.get(),
table_name=self.table_name.get())
return self.datasource
def __exit__(self, exc_type, exc_val, exc_tb):
pass
#self.datasource.close()
class ReadFromJdbcWithFn(DoFn):
def __init__(self, jdbc_url: ValueProvider, driver_class_name: ValueProvider, username: ValueProvider, password: ValueProvider, query: ValueProvider, table_name: ValueProvider):
self.jdbc_url = jdbc_url
self.driver_class_name = driver_class_name
self.username = username
self.password = password
self.query = query
self.table_name = table_name
def process(self, element):
with ReadFromJdbccustom(
self.driver_class_name,
self.jdbc_url,
self.username,
self.password,
self.query,
self.table_name) as datasource:
for row in datasource:
processed_data = element + row[0]
yield processed_data
pipeline_options = PipelineOptions(pipeline_args, save_main_session=True)
options = pipeline_options.view_as(UserOptions)
with beam.Pipeline(options=pipeline_options) as p:
oracle_data = (
p | 'Start' >> beam.Create([None]) | "Read from Oracle" >> beam.ParDo(ReadFromJdbcWithFn(
options.jdbc_url,
options.driver_class_name,
options.username,
options.password,
options.query,
options.table_name
)
)
)
`
不要使用经典数据流模板,而是尝试使用数据流 Flex 模板,因为在部署经典模板时,会执行实际的管道。