如何同步运行 apache beam 数据流管道? 这是 apache beam 自定义管道代码(数据流)。
def run_pipeline(jdbc_url, username, password, types, wisgen_table, recruiter_table, last_sync_table):
options = PipelineOptions()
p = beam.Pipeline(options=options)
last_sync_data = (
p | "Read last sync data" >> ReadFromBigQuery(
query='SELECT MAX(time) as last_sync FROM '
'[wisdomcircle-350611:custom_test_data.last_sync]')
| "Extract last sync time" >> beam.Map(lambda elem: elem['last_sync'])
)
wisgen_data = p | "wisgen job" >> ReadFromJdbc (
jdbc_url=jdbc_url,
username=username,
password=password,
driver_class_name='org.postgresql.Driver',
query="""SELECT users.id AS user_id, where to_char(users.updated_at,'YYYY-MM-DD HH24:MI:SS') >= '{last_sync_data}' """,
table_name="users"
)
recruiter_data = p | "recruiter job" >> ReadFromJdbc(
jdbc_url=jdbc_url,
username=username,
password=password,
driver_class_name='org.postgresql.Driver',
query="""SELECT users.id AS user_id, where to_char(users.updated_at,'YYYY-MM-DD HH24:MI:SS') >= '{last_sync_data}'""",
table_name="users"
)
wisgen_data | "Convert TableRow to dict(wisgen data)" >> beam.Map(
lambda row: row._asdict()
) | "Write to BigQuery in wisgen data table" >> WriteToBigQuery(
wisgen_table,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
schema='user_id:INTEGER,full_name:STRING)
recruiter_data | "Convert TableRow to dict(recruiter data)" >> beam.Map(
lambda row: row._asdict())
| "Write to BigQuery in recruiter data table" >> WriteToBigQuery(
recruiter_table,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
schema='user_id:INTEGER,full_name:STRING'
)
# this must be runs at last when all obove are finished
_ = (p
| "Create current timestamp" >> beam.Create([{'time': datetime.datetime.utcnow()}])
| "Write to last_sync" >> WriteToBigQuery,
last_sync_tab
schema='time:TIMESTAMP',
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
)
if __name__ == '__main__':
run_pipeline(
jdbc_url='xxxx',
username='xxxx',
password='xxxx',
wisgen_table='wisdomcircle-350611:uat_custom_dataset.wisgen_data_table',
recruiter_table='wisdomcircle-350611:uat_custom_dataset.recruiter_data_table',
last_sync_table='wisdomcircle-350611:uat_custom_dataset.last_sync',
)
pipeline异步运行或者并行完成所有job操作,但是我想先执行starting job last_sync_data(写在上面代码的开头),最后,我的timstamp(写在上面的代码)作业应该在上述所有操作完成后运行。
有人可以帮我重写上面的代码以满足我的需要吗?
requirement:
last_sync_data
应该首先运行,因为它会检索上次同步时间。
wisgen_data
和 recruiter_data
可以并行运行,因为它们彼此独立。
写入当前时间戳的最后一个作业应仅在所有先前作业完成后运行。
是否可以写两条流水线?您可以使用 wait_until_finish 完成第一个管道,然后运行第二个管道。