我正在使用beam yaml 创建一个beam 管道。 我有一个包含时间戳值的 csv 表,并且想将其用于窗口聚合。 不确定使用此时间戳作为用于窗口目的的行时间戳的正确语法是什么。
我的 csv 文件是:
id,firstname,age,country,profession,timestamp
5,Natka,26,France,doctor,2024-01-03T13:30:10
10,Orelia,31,Germany,police officer,2024-01-03T13:30:12
1,Reeba,58,Belgium,unemployed,2024-01-03T13:30:15
7,Elvira,39,Italy,doctor,2024-01-03T13:30:21
3,Meg,11,France,unemployed,2024-01-03T13:30:27
6,Aurore,32,Italy,police officer,2024-01-03T13:30:42
11,Theodora,16,Italy,unemployed,2024-01-03T13:30:37
4,Rani,53,Spain,doctor,2024-01-03T13:30:58
9,Lesly,35,Spain,firefighter,2024-01-03T13:30:49
2,Maud,45,Spain,firefighter,2024-01-03T13:30:45
8,Asia,10,Belgium,doctor,2024-01-03T13:30:34
和管道:
pipeline:
type: chain
transforms:
- type: ReadFromCsv
config:
path: data/people.csv
# Should probably add a transform to handle timestamp issue
- type: WindowInto
windowing:
type: fixed
size: 10
- type: Combine
config:
group_by: country
combine:
total:
value: age
fn: sum
- type: LogForTesting
- type: WriteToJson
config:
path: ~/out.json
options:
yaml_experimental_features: Combine
谢谢!
您应该能够使用 AssignTimestamps 转换来实现此目的。例如
pipeline:
type: chain
transforms:
- type: ReadFromCsv
config:
path: data/people.csv
- type: AssignTimestamps
config:
timestamp: "timestamp"
...
如果您实际上需要执行一些代码来解析时间戳,您可以这样做,例如,
- type: AssignTimestamps
config:
language: python
timestamp:
callable: |
import datetime
import pytz
def parse_timestamp(row):
return pytz.utc.localize(
datetime.datetime.strptime(
row.timestamp, '%Y-%m-%dT%H:%M:%S'))
您可能需要添加 错误处理,以防某些时间戳格式错误。