我需要安全地获取数据库凭据,我可以使用 ParDo 来完成此任务。不过,我想使用 ReadFromJdbc IO 连接器,并且在将凭据(用户名、密码)传递到连接器时面临着挑战。我是 Apache Beam/Dataflow 的新手,我尝试过侧面输入,但出了点问题。
如有任何帮助,我们将不胜感激!
这是我的 ParDos 和部分管道定义:
class FetchDBCredentials(beam.DoFn):
def process(self, element):
# get credentials here
cred = {
'username': 'username'
'password': 'password'
}
yield cred
with Pipeline(DataflowRunner(), options=pipeline_options) as p:
credentials = (
p
| 'Create' >> beam.Create([None])
| 'FetchDBCredentials' >> beam.ParDo(FetchDBCredentials())
)
username = beam.pvalue.AsSingleton(credentials | 'ExtractUsername' >> beam.Map(lambda cred: cred['username']))
password = beam.pvalue.AsSingleton(credentials | 'ExtractPassword' >> beam.Map(lambda cred: cred['password']))
rows = (
p
| 'ReadFromJdbc' >> ReadFromJdbc(
table_name='test',
driver_class_name='org.postgresql.Driver',
jdbc_url=jdbc_url,
username=username,
password=password,
query='''
SELECT
employee_id,
user_id
FROM db.test
'''
)
)
我收到以下错误:AttributeError:'AsSingleton'对象没有属性'encode'
这应该是正确的代码:
import apache_beam as beam
from apache_beam.io.jdbc import ReadFromJdbc
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
class FetchDBCredentials(beam.DoFn):
def process(self, element):
# Replace with your actual logic to fetch credentials securely
cred = {
'username': 'your_username',
'password': 'your_password'
}
yield cred
class ReadFromJdbcFn(beam.DoFn):
def __init__(self, jdbc_url, table_name, driver_class_name, query, username, password):
self.jdbc_url = jdbc_url
self.table_name = table_name
self.driver_class_name = driver_class_name
self.query = query
self.username = username
self.password = password
def process(self, element):
rows = (
element
| 'ReadDb' >> ReadFromJdbc(
table_name=self.table_name,
driver_class_name=self.driver_class_name,
jdbc_url=self.jdbc_url,
username=self.username,
password=self.password,
query=self.query
)
)
yield rows
def run():
pipeline_options = PipelineOptions()
with beam.Pipeline(options=pipeline_options) as p:
jdbc_url = "jdbc:postgresql://your_host:5432/your_database"
credentials = (
p
| 'Create' >> beam.Create([None])
| 'FetchDBCredentials' >> beam.ParDo(FetchDBCredentials())
)
# Use CombineGlobally(lambda elements: next(iter(elements)), requires_singleton_input=True)
# to extract the single credential dictionary from the PCollection.
single_credential = credentials | beam.combiners.ToSingleton()
rows = (
p
| 'Create Placeholder for Jdbc Read' >> beam.Create([None])
| 'ReadFromJdbc ParDo' >> beam.ParDo(
ReadFromJdbcFn(
jdbc_url=jdbc_url,
table_name='test',
driver_class_name='org.postgresql.Driver',
query='''
SELECT
employee_id,
user_id
FROM your_schema.test
''',
username=beam.pvalue.AsSingleton(credentials | 'ExtractUsername' >> beam.Map(lambda cred: cred['username'])),
password=beam.pvalue.AsSingleton(credentials | 'ExtractPassword' >> beam.Map(lambda cred: cred['password']))
)
| 'Flatten' >> beam.FlatMap(lambda x: x) # Flatten the nested PCollection
)
rows | 'Log Output' >> beam.Map(print)
if __name__ == '__main__':
run()
不要直接将 AsSingleton 对象传递给 ReadFromJdbc,而是将 ReadFromJdbc 调用封装在另一个 ParDo (ReadFromJdbcFn) 中。ReadFromJdbc 需要用户名和密码的具体值。 AsSingleton 提供了一个 PCollectionView,需要在 ParDo 内访问。希望有帮助!