我已经建立了一个管道,用于从pubsub读取数据并将其流式传输到BigQuery。在流传输数据之前,需要对其进行预处理以除去重复项。这是通过打开固定窗口并按ID对其分组来完成的。管道可以正常工作,直到几个小时后突然停止。日志中没有任何内容,没有错误消息或其他任何内容。并且根据作业指标,数据在GroupByKey / MergeBuckets阶段停止。在此之前,一切正常,但没有发送任何数据。从Metrics from the job中可以看出,这似乎是很困难的。
这是分组步骤的代码:
class Transform(PTransform):
def expand(self, pcoll):
return (
pcoll
| "Extract data" >> ParDo(ExtractData())
| "Window into Fixed Intervals" >> WindowInto(window.FixedWindows(10))
| "Make dummy key" >> Map(lambda elem: (elem["sensor_id"], elem))
| "Group by dummy key" >> GroupByKey()
| "Remove dummy key" >> MapTuple(lambda _, elem: elem)
)
[ExtractData函数将消息从json字符串转换为字典。
我怀疑已设置从PubSub读取的消息的时间戳,以便长时间不触发特定窗口。请注意,默认情况下,Beam使用事件时间触发器,并且默认情况下不会过早触发触发器(仅在水印到达窗口边界时才触发)。因此,何时触发窗口取决于从PubSub读取的事件的时间戳。如果您需要每隔10分钟触发一次数据,请考虑设置processing time based trigger。