我从一个有界源,CSV文件,批处理管道读取数据,并希望指定一个时间戳基于存储在CSV文件中列数据的元素。如何在Apache的梁管道做到这一点?
如果您的数据的成批源包含每个元素的事件基于时间戳,例如,你必须拥有元组{'timestamp, 'userid','ClickedSomething'}
click事件。您可以在管道中的DoFn
内分配时间戳的元素。
Java的:
public void process(ProcessContext c){
c.outputWithTimestamp(
c.element(),
new Instant(c.element().getTimestamp()));
}
蟒蛇:
'AddEventTimestamps' >> beam.Map(
lambda elem: beam.window.TimestampedValue(elem, elem['timestamp']))
[从束引导编辑非拉姆达Python的例子:]
class AddTimestampDoFn(beam.DoFn):
def process(self, element):
# Extract the numeric Unix seconds-since-epoch timestamp to be
# associated with the current log entry.
unix_timestamp = extract_timestamp_from_log_entry(element)
# Wrap and emit the current entry and new timestamp in a
# TimestampedValue.
yield beam.window.TimestampedValue(element, unix_timestamp)
timestamped_items = items | 'timestamp' >> beam.ParDo(AddTimestampDoFn())
[编辑按安东注释]的更多信息,可以发现@