beam yaml - 将时间戳添加到 csv 文件

问题描述 投票:0回答:1

我正在使用beam yaml 创建一个beam 管道。 我有一个包含时间戳值的 csv 表,并且想将其用于窗口聚合。 不确定使用此时间戳作为用于窗口目的的行时间戳的正确语法是什么。

我的 csv 文件是:

id,firstname,age,country,profession,timestamp
5,Natka,26,France,doctor,2024-01-03T13:30:10
10,Orelia,31,Germany,police officer,2024-01-03T13:30:12
1,Reeba,58,Belgium,unemployed,2024-01-03T13:30:15
7,Elvira,39,Italy,doctor,2024-01-03T13:30:21
3,Meg,11,France,unemployed,2024-01-03T13:30:27
6,Aurore,32,Italy,police officer,2024-01-03T13:30:42
11,Theodora,16,Italy,unemployed,2024-01-03T13:30:37
4,Rani,53,Spain,doctor,2024-01-03T13:30:58
9,Lesly,35,Spain,firefighter,2024-01-03T13:30:49
2,Maud,45,Spain,firefighter,2024-01-03T13:30:45
8,Asia,10,Belgium,doctor,2024-01-03T13:30:34

和管道:

pipeline:
  type: chain
  transforms:
    - type: ReadFromCsv
      config:
        path: data/people.csv     

    # Should probably add a transform to handle timestamp issue

    - type: WindowInto
      windowing:
        type: fixed
        size: 10
    - type: Combine
      config:
        group_by: country
        combine:
          total:
            value: age
            fn: sum
    - type: LogForTesting
    - type: WriteToJson
      config:
        path: ~/out.json
options:
  yaml_experimental_features: Combine

谢谢!

apache-beam
1个回答
0
投票

您应该能够使用 AssignTimestamps 转换来实现此目的。例如

pipeline:
  type: chain
  transforms:
    - type: ReadFromCsv
      config:
        path: data/people.csv     

    - type: AssignTimestamps
      config:
        timestamp: "timestamp"

    ...

如果您实际上需要执行一些代码来解析时间戳,您可以这样做,例如,

    - type: AssignTimestamps
      config:
        language: python
        timestamp:
          callable: |
            import datetime
            import pytz

            def parse_timestamp(row):
              return pytz.utc.localize(
                  datetime.datetime.strptime(
                      row.timestamp, '%Y-%m-%dT%H:%M:%S'))

您可能需要添加 错误处理,以防某些时间戳格式错误。

© www.soinside.com 2019 - 2024. All rights reserved.