如何在Flink中使用ListState用于BroadcastProcessFunction

问题描述 投票:3回答:2

我们有一个包含交易的非密钥数据流和一个包含规则的广播流。实际上,我们希望基于最后看到的规则处理交易。如果我们最后看到的规则是daily,则必须将当前交易添加到dailyTrnsList。另外,如果dailyTrnsList大小大于阈值,则必须清除列表并将事务写入数据库。如果最后看到的规则是temp,我们将做同样的事情。

代码在下面:

public class TransactionProcess extends BroadcastProcessFunction<String, String, String>{
private List<String> dailyTrnsList = new ArrayList<>();
private List<String> tempTrnsList = new ArrayList<>();

private final static int threshold = 100;

private final MapStateDescriptor<String, String> ruleStateDesc =
        new MapStateDescriptor<>(
                "ControlMapState",
                BasicTypeInfo.STRING_TYPE_INFO,
                BasicTypeInfo.STRING_TYPE_INFO);

  @Override
  public void processElement(String s,
                           ReadOnlyContext readOnlyContext,
                           Collector<Transaction> collector) throws Exception
 {
    String ruleName = readOnlyContext.getBroadcastState(ruleStateDesc).get("rule");

    if(ruleName.equals("daily"))
        {
            dailyTrnsList.add(s);
            if(dailyTrnsList.size()>=threshold)
                {
                    List<String> buffer = dailyTrnsList;
                    dailyTrnsList = new ArrayList<>();
                    insert_to_db(buffer,"daily");
                }
        }
    else if(ruleName.equals("temp"))
        {
            tempTrnsList.add(s);
            if(tempTrnsList.size()>=threshold)
                {
                    List<String> buffer = tempTrnsList;
                    tempTrnsList = new ArrayList<>();
                    insert_to_db(buffer,"temp");
                }
        }

    collector.collect(s);

   }
  @Override
  public void processBroadcastElement(String s,
                                    Context context,
                                    Collector<CardTransaction> collector) throws Exception
  {
    if (s.equals("temp"))
    {
        context.getBroadcastState(ruleStateDesc).put("rule", "temp");
    List<String> buffer = dailyTrnsList;
        dailyTrnsList = new ArrayList<>();
        insert_to_db(buffer,"daily");
    }
    else if (s.equals("daily"))
    {
        context.getBroadcastState(ruleStateDesc).put("rule", "daily");
        List<String> buffer = tempTrnsList;
        tempTrnsList = new ArrayList<>();
        insert_to_db(buffer,"temp");
      }
    }
  }

我们的问题是编写容错方法。我们不知道如何使用ListState解决问题。到目前为止,我们发现的唯一解决方案是CheckpointedFunction接口的实现,该接口位于Flink文档的Working with State部分中。

private ListState<String> dailyTrns;
private ListState<String> tempTrns;

@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
    dailyTrns.clear();
    tempTrns.clear();
    for (String[] element : dailyTrnsList)
        dailyTrns.add(element);
    for (String[] element : tempTrnsList)
        tempTrns.add(element);
}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {

    dailyTrns = context.getOperatorStateStore().getListState(dailyDescriptor);
    tempTrns = context.getOperatorStateStore().getListState(tempDescriptor);
    if (context.isRestored()) {
        for (String[] element : dailyTrns.get())
            dailyTrnsList.add(element);
        for (String[] element : tempTrns.get())
            tempTrnsList.add(element);
    }
}

请您指导我们,如果这种方法不是正确的解决方案,我们还能做什么?如果解决方案正确,那么对于未从dailyTrnsListtempTrnsList传输到dailyTrnstempTrns的元素会发生什么?

任何帮助将不胜感激。

谢谢你。

stream apache-flink flink-streaming
2个回答
0
投票
  1. 您不需要dailyTrnsListtempTrnsList,因为您已经有dailyTrnstempTrns。只需使用这些ListState变量来记录交易。
  2. 我不确定您为什么拥有MapState<String, String>。看起来您的状态基于最新规则,即dailytemp,因此只需将其存储在ValueState<String>中即可。

0
投票

您可以简化实现,而不必为此担心。您可以执行以下操作:

(1)简化BroadcastProcessFunction,以便将传入流分为两个流:每日交易流和临时交易流。它通过根据最新规则选择两个侧面输出之一来完成此操作。

((2)跟随带有广播窗口的BroadcastProcessFunction,这些窗口创建批处理并将它们写入数据库。

或者代替使用副输出,BroadcastProcessFunction可以写出(规则,交易)的元组,然后您可以通过规则来键入流。无论哪种方式,其想法都是让window API负责为您管理容错列表。

© www.soinside.com 2019 - 2024. All rights reserved.