当我想要更新值状态(queueState.update(queue))时捕获此异常:
org.apache.flink.util.FlinkRuntimeException: Error while adding data to RocksDB
at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:108)
at xxx.xxx.xxx.CleanTimedOutPartialMatches.processElement(CleanTimedOutPartialMatches.java:37)
at xxx.xxx.xxx.CleanTimedOutPartialMatches.processElement(CleanTimedOutPartialMatches.java:22)
at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
2019-10-13 11:06:29,311 WARN org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor - Timestamp monotony violated: 1570948458514 < 1570948663062
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: The Kryo Output still contains data from a previous serialize call. It has to be flushed or cleared at the end of the serialize call.
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:300)
at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValueInternal(AbstractRocksDBState.java:158)
at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:178)
at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:167)
at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:106)
... 11 more
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Comparator;
import java.util.PriorityQueue;
public class CleanTimedOutPartialMatches extends KeyedProcessFunction<String, ObjectNode, ObjectNode> {
private static Logger LOGGER = LoggerFactory.getLogger(CleanTimedOutPartialMatches.class);
private ValueState<PriorityQueue<JsonNode>> queueState = null;
@Override
public void processElement(ObjectNode log, Context context, Collector<ObjectNode> collector) throws Exception {
try {
if (context.timestamp() > context.timerService().currentWatermark()) {
PriorityQueue<JsonNode> queue = queueState.value();
if (queue == null) {
queue = new PriorityQueue<JsonNode>(Comparator.comparingLong(o -> o.get(TS).longValue()));
}
queue.add(log);
queueState.update(queue);
context.timerService().registerEventTimeTimer(log.get(TS).longValue());
}
} catch (Exception e){
e.printStackTrace();
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<ObjectNode> out) throws Exception {
try {
sendToSink(queueState.value(), ctx, out);
} catch (Exception e){
for(StackTraceElement el : e.getStackTrace()){
LOGGER.info("{}.{}:{}", el.getClassName(), el.getMethodName(), el.getLineNumber());
}
}
}
private void sendToSink(PriorityQueue<JsonNode> queue, OnTimerContext context, Collector<ObjectNode> out){
long watermark = context.timerService().currentWatermark();
JsonNode lastSentLog = null;
JsonNode log = queue.peek();
while (log != null && log.get(TS).longValue() <= watermark) {
if(lastSentLog != null && extractLogEndpoint(log).equals(extractLogEndpoint(lastSentLog)) && log.get(TS).longValue() == lastSentLog.get(TS).longValue()){
LOGGER.info("duplicated log removed");
} else {
if(lastSentLog != null){
long gapTime = Math.abs(log.get(TS).longValue() - lastSentLog.get(TS).longValue()) / 1000;
boolean isSameAttempt = (extractLogEndpoint(lastSentLog).equals(AUTOCOMPLETE) && extractLogEndpoint(log).equals(LOG))
|| (extractLogEndpoint(log).equals(extractLogEndpoint(lastSentLog)) && gapTime < MAX_TIME_GAP);
if(isSameAttempt){
((ObjectNode)log).put(ATTEMPT_ID, lastSentLog.get(ATTEMPT_ID).textValue());
}
}
lastSentLog = log;
out.collect((ObjectNode)log);
}
queue.remove(log);
log = queue.peek();
}
}
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<PriorityQueue<JsonNode>> descriptor = new ValueStateDescriptor<>(
// state name
"sort-partial-matches",
// type information of state
TypeInformation.of(new TypeHint<PriorityQueue<JsonNode>>() {
}));
queueState = getRuntimeContext().getState(descriptor);
}
}
一个问题:从队列中删除所有内容后,您似乎忘了呼叫queueState.update(queue)
。
即使您确实可以正常工作,基于RocksDB作为状态后端的PriorityQueue排序也将表现很差,因为每次访问和更新都必须经过整个队列的ser / de。除非您使用的是基于堆的状态后端之一,否则建议使用MapState进行排序,因为这只需要对单个条目(而不是整个映射)进行ser / de。您可以将时间戳记用作MapState的键,并将对象列表用作值。就像现在一样使用计时器来触发刷新列表的内容。
或者您可以使用SQL进行排序-有关示例,请参见the answer to this question。