基于唯一键创建不同的会话

问题描述 投票:1回答:1

我从kafka主题获取消息,该主题向我发送JSON消息。我想从该json消息中提取一个字段(可以是例如ID),我想为'n'个唯一设备ID创建'n'个会话。

我已经尝试为我收到的每个唯一ID创建一个新的会话实例,但在创建新的会话窗口实例后,即在管道中为每个ID创建一个新的分支,我无法将下一个即将发送的消息推送到相应的分支。已经存在。

我想要的预期结果是,假设我们正在收到类似的消息

{ID:1,...},{ID:2,...},{ID:3,...},{ID:1,...}

将创建三个不同的会话,第四个消息将转到设备ID 1的会话。有没有办法在apache beam编程范例或Java编程范例中执行此操作?任何帮助将不胜感激。

java google-cloud-dataflow apache-beam beam
1个回答
2
投票

是的,如果您使用自定义WindowFn,这可以使用Beam范例。您可以对Sessions类进行子类化并根据每个元素的ID对其进行修改以设置间隙持续时间。你可以在assignWindows中执行此操作,在Sessions中看起来像这样:

  @Override
  public Collection<IntervalWindow> assignWindows(AssignContext c) {
    // Assign each element into a window from its timestamp until gapDuration in the
    // future.  Overlapping windows (representing elements within gapDuration of
    // each other) will be merged.
    return Arrays.asList(new IntervalWindow(c.timestamp(), gapDuration));
  }

AssignContext类可用于访问分配此窗口的元素,这将允许您检索该元素的ID。

听起来您也希望将具有不同ID的元素分组到不同的窗口中(即,如果元素A和B进入间隙持续时间但具有不同的ID,则它们仍应位于不同的窗口中)。这可以通过执行带有元素ID作为键的GroupByKey来完成。会话窗口适用于每个密钥as described in the Beam Programming Guide,因此这将按ID分隔元素。

© www.soinside.com 2019 - 2024. All rights reserved.