我使用的是固定窗口,我应用到一个PCollection上,以便在一个像下面这样的非绑定源上做GroupBy。
PCollection<KV<String, Iterable<CustomObject>>> grouppedBy =
rootObjects.apply(ParDo.of(keyCollection()))
.apply(
Window.<KV<String, CustomObject>>into(FixedWindows.of(Duration.standardSeconds(200))))
.apply(GroupByKey.<String, CustomObject>create());
private DoFn<CustomObject, KV<String, CustomObject>> keyCollection() {
return new DoFn<CustomObject, KV<String, CustomObject>>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(final ProcessContext c) throws Exception {
CustomObject obj = c.element();
String key = obj.getKey();
Instant date = new DateTime().toInstant();
c.outputWithTimestamp(KV.of(key, obj), date);
}
};
}
而且在.apply(GroupByKey.create())步骤中没有单一的输出。
我从文档中了解到,应用Window应该可以在一个非绑定的源上使用GroupByKey。
我在用Windowing函数在非绑定PCollection中做CoGroupByKey时遇到了类似的问题。在非绑定PCollection的情况下,你需要使用Trigger和Window函数来产生输出。试着在你的窗口函数上使用Trigger,然后做GroupByKey或CoGroupByKey,它将开始产生结果。