我从kafka主题获取消息,该主题向我发送JSON消息。我想从该json消息中提取一个字段(可以是例如ID),我想为'n'个唯一设备ID创建'n'个会话。
我已经尝试为我收到的每个唯一ID创建一个新的会话实例,但在创建新的会话窗口实例后,即在管道中为每个ID创建一个新的分支,我无法将下一个即将发送的消息推送到相应的分支。已经存在。
我想要的预期结果是,假设我们正在收到类似的消息
{ID:1,...},{ID:2,...},{ID:3,...},{ID:1,...}
将创建三个不同的会话,第四个消息将转到设备ID 1的会话。有没有办法在apache beam编程范例或Java编程范例中执行此操作?任何帮助将不胜感激。
是的,如果您使用自定义WindowFn
,这可以使用Beam范例。您可以对Sessions类进行子类化并根据每个元素的ID对其进行修改以设置间隙持续时间。你可以在assignWindows
中执行此操作,在Sessions
中看起来像这样:
@Override
public Collection<IntervalWindow> assignWindows(AssignContext c) {
// Assign each element into a window from its timestamp until gapDuration in the
// future. Overlapping windows (representing elements within gapDuration of
// each other) will be merged.
return Arrays.asList(new IntervalWindow(c.timestamp(), gapDuration));
}
AssignContext
类可用于访问分配此窗口的元素,这将允许您检索该元素的ID。
听起来您也希望将具有不同ID的元素分组到不同的窗口中(即,如果元素A和B进入间隙持续时间但具有不同的ID,则它们仍应位于不同的窗口中)。这可以通过执行带有元素ID作为键的GroupByKey
来完成。会话窗口适用于每个密钥as described in the Beam Programming Guide,因此这将按ID分隔元素。