缓慢更新侧输入和会话窗口 - 变换节点 AppliedPTransform 未按预期替换

问题描述 投票:0回答:1

在我的 apache beam 流管道中,我有一个无限的发布/订阅源,我将其与 会话窗口一起使用。

我需要将一些有界配置数据作为侧面输入传递到管道的某些 DoFns 中。此数据驻留在 BigQuery 中。它正在慢慢变化,我预计每个月在不同的时间点会有一些变化。为了结合有界数据和无界数据,我应用了此模式,它每小时创建一个PeriodicImpulse。随后,DoFn 从 BQ 读取配置数据,将其转换为字典并返回。

随后,上述结果将作为 sideinput 传递到主管道的 DoFns 之一。

当使用 LocalRunner 执行管道时,我得到了一个非常不具体的信息

RuntimeError: Transform node AppliedPTransform(PeriodicImpulse/GenSequence/ProcessKeyedElements/GroupByKey/GroupByKey, _GroupByKeyOnly) was not replaced as expected. 

但是,如果我用简单的 Create(["DummyValue"]) 替换PeriodicPulse 步骤,管道工作正常(当然,除了它会忽略初始读取后发生的所有配置数据更改这一事实) BQ)。

我需要更改什么才能使其正常工作?


n = 1
SESSION_GAP_SIZE = 3600 * 24

p_opt = PipelineOptions(
        pipeline_args, streaming=True,  save_main_session=True,allow_unsafe_triggers=True,runner="DirectRunner"
        , ...)


        with Pipeline(options=p_opt) as p:

        cfg_data  = (p
                      | 'PeriodicImpulse' >> PeriodicImpulse(fire_interval=3600,apply_windowing=True)
                      | "Retrieve Segment Config from BQ" >> ParDo(get_segment_config_from_bq)
                     )


        main_p  = (
            p
            | "Read Stream from Pub/Sub" >> io.ReadFromPubSub(subscription=SUBSCRIPTION,with_attributes=True)
            | "Filter 1" >> Filter(Filter1())
            | "Filter 2" >> Filter(Filter2())
            | "Decode Pub/Sub Messages" >> ParDo(ReadPubSubMessage())
            | "Extract Composite Key" >> ParDo(ExtractKey())
            | "Build Session Windows" >> WindowInto(window.Sessions(SESSION_GAP_SIZE ), trigger=AfterCount(n),accumulation_mode=AccumulationMode.ACCUMULATING)
            | "Another GroupByKey" >> GroupByKey()
            | "Enrich Stream Data by Config" >> ParDo(EnrichWithConfig(),segment_cfg=pvalue.AsSingleton(cfg_data))
            | "Output to PubSub" >> WriteToPubSub(topic=TARGET_TOPIC)
        )
python stream google-cloud-dataflow apache-beam stream-processing
1个回答
0
投票

此问题似乎是 DirectRunner 中的限制/错误。使用Dataflow Runner时,它运行得很好。看起来这与 this open issues 有关。 不幸的是,目前没有真正的解决方法。不过,可以通过注释掉PeriodicImpulse步骤来使用DirectRunner进行本地测试,因此您至少可以在本地调试侧面输入处理管道。

© www.soinside.com 2019 - 2024. All rights reserved.