我有以下用例:我正在流式传输来自 Kafka 主题的事件。我的计划是阅读这些事件,对于每一个事件,我都需要使用 BigQuery 中存在的一些元数据来丰富它。假设我有数百万个独特的项目,每个项目都有其独特的元数据存储在 BigQuery 中。元数据每 24 小时更新一次。除此之外,我的设计需要开发一个有状态的管道,因为我需要在项目到达后至少 24 小时内跟踪应用于这些项目的更改。所以我知道使用固定窗口不是一个选项,因为每个窗口的每个键的状态都是固定的。
在稍后阶段,我应该对其他一些在不同时间更新的 BQ 表执行相同的操作。
这是我尝试实现的
我的第一个方法是使用缓慢变化的查找缓存模式:流式传输 Kafka 主题事件,同时使用事件触发器每 24 小时加载 BQ 元数据。我测试了这种方法,虽然由于元数据保存在内存中而增加了内存使用量,但它提供了一种丰富事件的方法。我遇到的问题是,当我触发元数据更新时,我可以从日志中看到元数据正在更新,但是作业仍然使用旧版本的缓存。这是我正在使用的代码:
with Pipeline(options=pipeline_options) as pipeline:
sideinput_thresholds = (
pipeline
|"Read Side input file base path from pubsub" >> io.ReadFromPubSub(topic="topic")
| "Side input fixed window with early trigger" >> WindowInto(
GlobalWindows(),
trigger=trigger.Repeatedly(trigger.AfterCount(1)),
accumulation_mode=trigger.AccumulationMode.DISCARDING)
| "Pulling BQ thresholds" >> ParDo(PullBQThresholds())
)
enriched_events = (
pipeline
| "Read from Pub/Sub" >> io.ReadFromPubSub(topic=input_topic)
| "Global window" >> WindowInto(GlobalWindows())
| "Add timestamp to elements" >> ParDo(AddTimestamp())
| "Enrich event" >> ParDo(EnrichSideInput(), pvalue.AsSingleton(sideinput_thresholds))
)
在调查这个问题时,我找到了一个答案,该答案指出缓存可能需要几个小时才能更新,另一种选择是通过将侧面输入转换为 pcollection 并进行联合分组来使用
CoGroupByKey
。我的问题是,对于我的情况,我需要应用全局窗口,因为我需要跟踪全局窗口中的状态(事件可能会在摄取 18 小时后收到更新)。在文档中,我可以看到 If you are using unbounded PCollections, you must use either non-global windowing or an aggregation trigger in order to perform a CoGroupByKey.
我的问题是,具有全局窗口的无界 pcollection 是否可以通过键与每 24 小时不断更新且包含数百万元数据的侧输入进行共同分组?
还有其他方法可以解决这个问题吗?非常感谢您的帮助
我尝试过的另一种方法是,我不是将元数据作为侧面输入传递,而是通过自定义转换从 Google BigQuery 流式传输元数据,该转换读取记录并生成结果。之后,我将元数据存储为具有与事件相同的键的状态。目前看来效果还不错