我们有一个包含交易的非密钥数据流和一个包含规则的广播流。实际上,我们希望基于最后看到的规则处理交易。如果我们最后看到的规则是daily
,则必须将当前交易添加到dailyTrnsList
。另外,如果dailyTrnsList
大小大于阈值,则必须清除列表并将事务写入数据库。如果最后看到的规则是temp
,我们将做同样的事情。
代码在下面:
public class TransactionProcess extends BroadcastProcessFunction<String, String, String>{
private List<String> dailyTrnsList = new ArrayList<>();
private List<String> tempTrnsList = new ArrayList<>();
private final static int threshold = 100;
private final MapStateDescriptor<String, String> ruleStateDesc =
new MapStateDescriptor<>(
"ControlMapState",
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);
@Override
public void processElement(String s,
ReadOnlyContext readOnlyContext,
Collector<Transaction> collector) throws Exception
{
String ruleName = readOnlyContext.getBroadcastState(ruleStateDesc).get("rule");
if(ruleName.equals("daily"))
{
dailyTrnsList.add(s);
if(dailyTrnsList.size()>=threshold)
{
List<String> buffer = dailyTrnsList;
dailyTrnsList = new ArrayList<>();
insert_to_db(buffer,"daily");
}
}
else if(ruleName.equals("temp"))
{
tempTrnsList.add(s);
if(tempTrnsList.size()>=threshold)
{
List<String> buffer = tempTrnsList;
tempTrnsList = new ArrayList<>();
insert_to_db(buffer,"temp");
}
}
collector.collect(s);
}
@Override
public void processBroadcastElement(String s,
Context context,
Collector<CardTransaction> collector) throws Exception
{
if (s.equals("temp"))
{
context.getBroadcastState(ruleStateDesc).put("rule", "temp");
List<String> buffer = dailyTrnsList;
dailyTrnsList = new ArrayList<>();
insert_to_db(buffer,"daily");
}
else if (s.equals("daily"))
{
context.getBroadcastState(ruleStateDesc).put("rule", "daily");
List<String> buffer = tempTrnsList;
tempTrnsList = new ArrayList<>();
insert_to_db(buffer,"temp");
}
}
}
我们的问题是编写容错方法。我们不知道如何使用ListState
解决问题。到目前为止,我们发现的唯一解决方案是CheckpointedFunction
接口的实现,该接口位于Flink文档的Working with State部分中。
private ListState<String> dailyTrns;
private ListState<String> tempTrns;
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
dailyTrns.clear();
tempTrns.clear();
for (String[] element : dailyTrnsList)
dailyTrns.add(element);
for (String[] element : tempTrnsList)
tempTrns.add(element);
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
dailyTrns = context.getOperatorStateStore().getListState(dailyDescriptor);
tempTrns = context.getOperatorStateStore().getListState(tempDescriptor);
if (context.isRestored()) {
for (String[] element : dailyTrns.get())
dailyTrnsList.add(element);
for (String[] element : tempTrns.get())
tempTrnsList.add(element);
}
}
请您指导我们,如果这种方法不是正确的解决方案,我们还能做什么?如果解决方案正确,那么对于未从dailyTrnsList
和tempTrnsList
传输到dailyTrns
和tempTrns
的元素会发生什么?
任何帮助将不胜感激。
谢谢你。
dailyTrnsList
和tempTrnsList
,因为您已经有dailyTrns
和tempTrns
。只需使用这些ListState
变量来记录交易。MapState<String, String>
。看起来您的状态基于最新规则,即daily
或temp
,因此只需将其存储在ValueState<String>
中即可。您可以简化实现,而不必为此担心。您可以执行以下操作:
(1)简化BroadcastProcessFunction,以便将传入流分为两个流:每日交易流和临时交易流。它通过根据最新规则选择两个侧面输出之一来完成此操作。
((2)跟随带有广播窗口的BroadcastProcessFunction,这些窗口创建批处理并将它们写入数据库。
或者代替使用副输出,BroadcastProcessFunction可以写出(规则,交易)的元组,然后您可以通过规则来键入流。无论哪种方式,其想法都是让window API负责为您管理容错列表。