在批处理管道我怎么分配时间戳将数据从一个梁管道批来源例如CSV文件

问题描述 投票:1回答:1

我从一个有界源,CSV文件,批处理管道读取数据,并希望指定一个时间戳基于存储在CSV文件中列数据的元素。如何在Apache的梁管道做到这一点?

google-cloud-dataflow apache-beam
1个回答
2
投票

如果您的数据的成批源包含每个元素的事件基于时间戳,例如,你必须拥有元组{'timestamp, 'userid','ClickedSomething'} click事件。您可以在管道中的DoFn内分配时间戳的元素。

Java的:

public void process(ProcessContext c){
     c.outputWithTimestamp(
         c.element(), 
         new Instant(c.element().getTimestamp()));
}

蟒蛇:

'AddEventTimestamps' >> beam.Map(
            lambda elem: beam.window.TimestampedValue(elem, elem['timestamp']))

[从束引导编辑非拉姆达Python的例子:]

class AddTimestampDoFn(beam.DoFn):

  def process(self, element):
    # Extract the numeric Unix seconds-since-epoch timestamp to be
    # associated with the current log entry.
    unix_timestamp = extract_timestamp_from_log_entry(element)
    # Wrap and emit the current entry and new timestamp in a
    # TimestampedValue.
    yield beam.window.TimestampedValue(element, unix_timestamp)

timestamped_items = items | 'timestamp' >> beam.ParDo(AddTimestampDoFn())

[编辑按安东注释]的更多信息,可以发现@

https://beam.apache.org/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements

© www.soinside.com 2019 - 2024. All rights reserved.