数据流作业停止随机处理数据

问题描述 投票:0回答:1

我已经建立了一个管道,用于从pubsub读取数据并将其流式传输到BigQuery。在流传输数据之前,需要对其进行预处理以除去重复项。这是通过打开固定窗口并按ID对其分组来完成的。管道可以正常工作,直到几个小时后突然停止。日志中没有任何内容,没有错误消息或其他任何内容。并且根据作业指标,数据在GroupByKey / MergeBuckets阶段停止。在此之前,一切正常,但没有发送任何数据。从Metrics from the job中可以看出,这似乎是很困难的。

这是分组步骤的代码:

class Transform(PTransform):

def expand(self, pcoll):
    return (
            pcoll
            | "Extract data" >> ParDo(ExtractData())
            | "Window into Fixed Intervals" >> WindowInto(window.FixedWindows(10))
            | "Make dummy key" >> Map(lambda elem: (elem["sensor_id"], elem))
            | "Group by dummy key" >> GroupByKey()
            | "Remove dummy key" >> MapTuple(lambda _, elem: elem)
    )

[ExtractData函数将消息从json字符串转换为字典。

python google-cloud-dataflow apache-beam
1个回答
0
投票

我怀疑已设置从PubSub读取的消息的时间戳,以便长时间不触发特定窗口。请注意,默认情况下,Beam使用事件时间触发器,并且默认情况下不会过早触发触发器(仅在水印到达窗口边界时才触发)。因此,何时触发窗口取决于从PubSub读取的事件的时间戳。如果您需要每隔10分钟触发一次数据,请考虑设置processing time based trigger

© www.soinside.com 2019 - 2024. All rights reserved.