我正在使用以下格式从Google Cloud Pub / Sub接收消息:
{u'date': u'2019-03-26T09:57:52Z', 'field1': value1, u'field2': u'value2', u'field3': u'value3', u'field4': u'value4',...}
我想在窗口管道中处理此消息时:
| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 10))
字段'date'将被处理为窗口的参考时间戳。
我需要自定义WindowFn还是应该怎么做?
您需要指定自定义时间戳,如下所示:
def custom_timestamp(message):
# assuming that message is already parsed JSON (dict)
import datetime as dt
import apache_beam as beam
ts = dt.datetime.strptime(message["date"], "%Y-%m-%dT%H:%M:%SZ")
return beam.window.TimestampedValue(message, ts.timestamp())
然后:
| 'CustomTimestamp' >> beam.Map(custom_timestamp)
| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 10))
你可以在这里找到完整的细节:https://beam.apache.org/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements
但是,您必须注意,用于Apache Beam的Streaming Python SDK有很多缺少的部分,而且有些东西没有像您期望的那样工作。我想要实现完全相同的情况,并且在添加自定义时间戳之后,DataFlow Runner因为调用了DropDueToLateness而删除了我的消息。我仍然不确定是否可以使用PubSub和Python设置系统水印来处理历史数据。