如何将凭证从 ParDo 传递到 ReadFromJdbc IO 连接器

问题描述 投票:0回答:1

我需要安全地获取数据库凭据,我可以使用 ParDo 来完成此任务。不过,我想使用 ReadFromJdbc IO 连接器,并且在将凭据(用户名、密码)传递到连接器时面临着挑战。我是 Apache Beam/Dataflow 的新手,我尝试过侧面输入,但出了点问题。

如有任何帮助,我们将不胜感激!

这是我的 ParDos 和部分管道定义:

class FetchDBCredentials(beam.DoFn):
def process(self, element):
    # get credentials here
    cred = {
        'username': 'username'
        'password': 'password'
    }
    yield cred

with Pipeline(DataflowRunner(), options=pipeline_options) as p:

    credentials = (
        p
        | 'Create' >> beam.Create([None])
        | 'FetchDBCredentials' >> beam.ParDo(FetchDBCredentials())
    )

    username = beam.pvalue.AsSingleton(credentials | 'ExtractUsername' >> beam.Map(lambda cred: cred['username']))
    password = beam.pvalue.AsSingleton(credentials | 'ExtractPassword' >> beam.Map(lambda cred: cred['password']))


    rows = (
        p
        | 'ReadFromJdbc' >> ReadFromJdbc(
            table_name='test',
            driver_class_name='org.postgresql.Driver',
            jdbc_url=jdbc_url,
            username=username,
            password=password,
            query='''
                SELECT 
                    employee_id, 
                    user_id
                FROM db.test
            '''
        )
    )

我收到以下错误:AttributeError:'AsSingleton'对象没有属性'encode'

python google-cloud-dataflow apache-beam
1个回答
0
投票

这应该是正确的代码:

import apache_beam as beam
from apache_beam.io.jdbc import ReadFromJdbc
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions

class FetchDBCredentials(beam.DoFn):
    def process(self, element):
        # Replace with your actual logic to fetch credentials securely
        cred = {
            'username': 'your_username',
            'password': 'your_password'
        }
        yield cred

class ReadFromJdbcFn(beam.DoFn):
    def __init__(self, jdbc_url, table_name, driver_class_name, query, username, password):
        self.jdbc_url = jdbc_url
        self.table_name = table_name
        self.driver_class_name = driver_class_name
        self.query = query
        self.username = username
        self.password = password

    def process(self, element):
        rows = (
            element
            | 'ReadDb' >> ReadFromJdbc(
                table_name=self.table_name,
                driver_class_name=self.driver_class_name,
                jdbc_url=self.jdbc_url,
                username=self.username,
                password=self.password,
                query=self.query
            )
        )
        yield rows

def run():
    pipeline_options = PipelineOptions()
    with beam.Pipeline(options=pipeline_options) as p:
        jdbc_url = "jdbc:postgresql://your_host:5432/your_database"

        credentials = (
            p
            | 'Create' >> beam.Create([None])
            | 'FetchDBCredentials' >> beam.ParDo(FetchDBCredentials())
        )

        # Use CombineGlobally(lambda elements: next(iter(elements)), requires_singleton_input=True)
        # to extract the single credential dictionary from the PCollection.
        single_credential = credentials | beam.combiners.ToSingleton()

        rows = (
            p
            | 'Create Placeholder for Jdbc Read' >> beam.Create([None])
            | 'ReadFromJdbc ParDo' >> beam.ParDo(
                ReadFromJdbcFn(
                    jdbc_url=jdbc_url,
                    table_name='test',
                    driver_class_name='org.postgresql.Driver',
                    query='''
                        SELECT
                            employee_id,
                            user_id
                        FROM your_schema.test
                    ''',
                    username=beam.pvalue.AsSingleton(credentials | 'ExtractUsername' >> beam.Map(lambda cred: cred['username'])),
                    password=beam.pvalue.AsSingleton(credentials | 'ExtractPassword' >> beam.Map(lambda cred: cred['password']))
                )
            | 'Flatten' >> beam.FlatMap(lambda x: x) # Flatten the nested PCollection
        )

        rows | 'Log Output' >> beam.Map(print)

if __name__ == '__main__':
    run()

不要直接将 AsSingleton 对象传递给 ReadFromJdbc,而是将 ReadFromJdbc 调用封装在另一个 ParDo (ReadFromJdbcFn) 中。ReadFromJdbc 需要用户名和密码的具体值。 AsSingleton 提供了一个 PCollectionView,需要在 ParDo 内访问。希望有帮助!

© www.soinside.com 2019 - 2024. All rights reserved.