如何创建一个数据流模板,在创建模板时可以绕过Oracle数据库连接?

问题描述 投票:0回答:1

我的要求是从数据流连接到Oracle OnPrem数据库,读取数据并存储在GCS上。我正在使用 python 脚本。我正在通过 git 操作创建数据流经典模板,脚本使用 JDBC 连接器,目前无法创建模板,因为不允许来自 github 运行程序的数据库连接。因此,我想在模板创建期间绕过连接,并且脚本应仅在实际数据流作业运行时才建立连接。

我尝试使用其他连接器,例如 cx_Oracle、jaybedebeapi,并且能够绕过连接,但它在数据流作业中失败,并出现上述模块的错误“未找到匹配的分布”。需要解决上述问题。

`

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions, ValueProvider
import argparse
from apache_beam.options.value_provider import RuntimeValueProvider, ValueProvider
from apache_beam.io.jdbc import WriteToJdbc, ReadFromJdbc
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.transforms import PTransform
from apache_beam.transforms.core import DoFn, ParDo
import logging
from apache_beam import coders


class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--jdbc_url',
            help='jdbc_url'
        )
        parser.add_value_provider_argument(
            '--driver_class_name',
            help='Driver class name'
        )
        parser.add_value_provider_argument(
            '--username',
            help='Username'
        )
        parser.add_value_provider_argument(
            '--password',
            help='Password'
        )
        parser.add_value_provider_argument(
            '--table_name',
            help='table_name'
        )
        parser.add_value_provider_argument(
            '--query',
            help='query'
        )


class ReadFromJdbccustom():
    def __init__(self, jdbc_url, driver_class_name, username, password, query, table_name):
        self.jdbc_url = jdbc_url
        self.driver_class_name = driver_class_name
        self.username = username
        self.password = password
        self.query = query
        self.table_name = table_name

    def __enter__(self):
        self.datasource = ReadFromJdbc(
                    driver_class_name=self.driver_class_name.get(),
                    jdbc_url=self.jdbc_url.get(),
                    username=self.username.get(),
                    password=self.password.get(),
                    query=self.query.get(),
                    table_name=self.table_name.get())
        return self.datasource

    def __exit__(self, exc_type, exc_val, exc_tb):
        pass
        #self.datasource.close()

class ReadFromJdbcWithFn(DoFn):
    def __init__(self, jdbc_url: ValueProvider, driver_class_name: ValueProvider, username: ValueProvider, password: ValueProvider, query: ValueProvider, table_name: ValueProvider):
        self.jdbc_url = jdbc_url
        self.driver_class_name = driver_class_name
        self.username = username
        self.password = password
        self.query = query
        self.table_name = table_name

    def process(self, element):
        with ReadFromJdbccustom(
                    self.driver_class_name,
                    self.jdbc_url,
                    self.username,
                    self.password,
                    self.query,
                    self.table_name) as datasource:
             for row in datasource:
                processed_data = element + row[0]
                yield processed_data

pipeline_options = PipelineOptions(pipeline_args, save_main_session=True)
options = pipeline_options.view_as(UserOptions)

with beam.Pipeline(options=pipeline_options) as p:
    oracle_data = (
        p | 'Start' >> beam.Create([None]) | "Read from Oracle" >> beam.ParDo(ReadFromJdbcWithFn(
            options.jdbc_url,
            options.driver_class_name,
            options.username,
            options.password,
            options.query,
            options.table_name
        )
        )
    )

`

oracle google-cloud-platform google-cloud-dataflow apache-beam
1个回答
0
投票

不要使用经典数据流模板,而是尝试使用数据流 Flex 模板,因为在部署经典模板时,会执行实际的管道。

© www.soinside.com 2019 - 2024. All rights reserved.