在我的 apache beam 流管道中,我有一个无限的发布/订阅源,我将其与 会话窗口一起使用。
我需要将一些有界配置数据作为侧面输入传递到管道的某些 DoFns 中。此数据驻留在 BigQuery 中。它正在慢慢变化,我预计每个月在不同的时间点会有一些变化。为了结合有界数据和无界数据,我应用了此模式,它每小时创建一个PeriodicImpulse。随后,DoFn 从 BQ 读取配置数据,将其转换为字典并返回。
随后,上述结果将作为 sideinput 传递到主管道的 DoFns 之一。
当使用 LocalRunner 执行管道时,我得到了一个非常不具体的信息
RuntimeError: Transform node AppliedPTransform(PeriodicImpulse/GenSequence/ProcessKeyedElements/GroupByKey/GroupByKey, _GroupByKeyOnly) was not replaced as expected.
但是,如果我用简单的 Create(["DummyValue"]) 替换PeriodicPulse 步骤,管道工作正常(当然,除了它会忽略初始读取后发生的所有配置数据更改这一事实) BQ)。
我需要更改什么才能使其正常工作?
n = 1
SESSION_GAP_SIZE = 3600 * 24
p_opt = PipelineOptions(
pipeline_args, streaming=True, save_main_session=True,allow_unsafe_triggers=True,runner="DirectRunner"
, ...)
with Pipeline(options=p_opt) as p:
cfg_data = (p
| 'PeriodicImpulse' >> PeriodicImpulse(fire_interval=3600,apply_windowing=True)
| "Retrieve Segment Config from BQ" >> ParDo(get_segment_config_from_bq)
)
main_p = (
p
| "Read Stream from Pub/Sub" >> io.ReadFromPubSub(subscription=SUBSCRIPTION,with_attributes=True)
| "Filter 1" >> Filter(Filter1())
| "Filter 2" >> Filter(Filter2())
| "Decode Pub/Sub Messages" >> ParDo(ReadPubSubMessage())
| "Extract Composite Key" >> ParDo(ExtractKey())
| "Build Session Windows" >> WindowInto(window.Sessions(SESSION_GAP_SIZE ), trigger=AfterCount(n),accumulation_mode=AccumulationMode.ACCUMULATING)
| "Another GroupByKey" >> GroupByKey()
| "Enrich Stream Data by Config" >> ParDo(EnrichWithConfig(),segment_cfg=pvalue.AsSingleton(cfg_data))
| "Output to PubSub" >> WriteToPubSub(topic=TARGET_TOPIC)
)
此问题似乎是 DirectRunner 中的限制/错误。使用Dataflow Runner时,它运行得很好。看起来这与 this open issues 有关。 不幸的是,目前没有真正的解决方法。不过,可以通过注释掉PeriodicImpulse步骤来使用DirectRunner进行本地测试,因此您至少可以在本地调试侧面输入处理管道。