连续执行数据流作业?

问题描述 投票:0回答:1

如何同步运行 apache beam 数据流管道? 这是 apache beam 自定义管道代码(数据流)。

def run_pipeline(jdbc_url, username, password, types, wisgen_table, recruiter_table, last_sync_table):

options = PipelineOptions()
p = beam.Pipeline(options=options)
 
last_sync_data = (
 p | "Read last sync data" >> ReadFromBigQuery(
                query='SELECT MAX(time) as last_sync FROM '
                '[wisdomcircle-350611:custom_test_data.last_sync]')
 | "Extract last sync time" >> beam.Map(lambda elem: elem['last_sync'])
 )


wisgen_data = p | "wisgen job" >> ReadFromJdbc (
jdbc_url=jdbc_url,
username=username,
password=password,
driver_class_name='org.postgresql.Driver',
query="""SELECT users.id AS user_id, where to_char(users.updated_at,'YYYY-MM-DD HH24:MI:SS') >= '{last_sync_data}' """,
table_name="users"
)

recruiter_data = p | "recruiter job" >> ReadFromJdbc(
jdbc_url=jdbc_url,
username=username,
password=password,
driver_class_name='org.postgresql.Driver',
query="""SELECT users.id AS user_id, where to_char(users.updated_at,'YYYY-MM-DD HH24:MI:SS') >= '{last_sync_data}'""",
table_name="users"
)

wisgen_data | "Convert TableRow to dict(wisgen data)" >> beam.Map(
lambda row: row._asdict()
) | "Write to BigQuery in wisgen data table" >> WriteToBigQuery(
wisgen_table,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
schema='user_id:INTEGER,full_name:STRING)


recruiter_data | "Convert TableRow to dict(recruiter data)" >> beam.Map(
lambda row: row._asdict())
 | "Write to BigQuery in recruiter data table" >> WriteToBigQuery(
recruiter_table,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
schema='user_id:INTEGER,full_name:STRING'
)

# this must be runs at last when all obove are finished 
_ = (p
| "Create current timestamp" >> beam.Create([{'time': datetime.datetime.utcnow()}])
| "Write to last_sync" >> WriteToBigQuery,
last_sync_tab
schema='time:TIMESTAMP',
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
)

if __name__ == '__main__':
run_pipeline(
        jdbc_url='xxxx',
        username='xxxx',
        password='xxxx',
        wisgen_table='wisdomcircle-350611:uat_custom_dataset.wisgen_data_table',
        recruiter_table='wisdomcircle-350611:uat_custom_dataset.recruiter_data_table',
        last_sync_table='wisdomcircle-350611:uat_custom_dataset.last_sync',
    )

pipeline异步运行或者并行完成所有job操作,但是我想先执行starting job last_sync_data(写在上面代码的开头),最后,我的timstamp(写在上面的代码)作业应该在上述所有操作完成后运行。

有人可以帮我重写上面的代码以满足我的需要吗?

requirement

last_sync_data
应该首先运行,因为它会检索上次同步时间。
wisgen_data
recruiter_data
可以并行运行,因为它们彼此独立。 写入当前时间戳的最后一个作业应仅在所有先前作业完成后运行。

google-cloud-platform google-cloud-dataflow apache-beam gcloud
1个回答
0
投票

是否可以写两条流水线?您可以使用 wait_until_finish 完成第一个管道,然后运行第二个管道。

© www.soinside.com 2019 - 2024. All rights reserved.