有没有办法在无限的源pcollection中设置时间戳?

问题描述 投票:0回答:1

我想将时间戳设置为字符串的无界pcollection

在我的解决方案中,pcollection的每一行都是一行csv

在该行的一个字段中有一个时间戳和其他字段,如点击次数等。

我想根据自己的时间戳(事件时间)处理集合,而不是设置apache beam的默认时间戳

此数据流的主要威胁是分组每分钟的点击次数

我有一个ftp服务,将文件发送到我的工作每1分钟听一个文件夹,并在1分钟的修复窗口中处理问题是可能到达后期数据

我做KV pcollection和key是没有秒的时间戳和点击的值数量并且应用group by

我得到“每分钟点击次数”,我将此pcollection发送到数据库

示例包中的数据元素生成到下午12:05(10个文件)时间,下午12:06收到数据

作业生成数据下午12:05点击120次

数据元素包生成到下午12:05和12:06(12:05后期的文件)

时间12:07 pm收到数据

工作生成数据12:05 pm 10次点击12:06 pm 135次点击

我想丢弃基于事件数据的后期数据的过程,这是我搜索的方式

换句话说,不要放入db的晚期日期 - > 12:05 pm -10clouds

java google-cloud-dataflow apache-beam
1个回答
0
投票

您可以使用WithTimestamps变换从数据中分配时间戳。例如,假设您的密钥是一个可以由Joda Time解析的字符串:

records.apply(
 WithTimestamps.of((KV<String, String> rec) -> Instant.parse(rec.getKey()));
© www.soinside.com 2019 - 2024. All rights reserved.