我试图在Flink(版本1.4.2)上使用可查询状态但不幸的是我不断收到以下错误:
INFO my.test.flink.QueryableState - Params are a96438fa12879b7598c9cf32684e2669, kafka-cluster_jobmanager_1, 6123
INFO my.test.flink.QueryableState - Before the call java.util.concurrent.CompletableFuture@26aa12dd[Not completed]
java.util.concurrent.ExecutionException: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0)
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at my.test.flink.QueryableState.main(QueryableState.java:67)
Caused by: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0)
at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1166)
at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.readInt(AbstractByteBuf.java:619)
at org.apache.flink.queryablestate.network.messages.MessageSerializer.deserializeHeader(MessageSerializer.java:231)
at org.apache.flink.queryablestate.network.ClientHandler.channelRead(ClientHandler.java:76)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)
在客户端,我使用flink-queryable-state-client-java_2_11.jar,可查询客户端的相关代码部分是
QueryableStateClient client = new QueryableStateClient(jobManagerHost, jobManagerPort);
TypeInformation<MyEvent> typeInformation = TypeInformation.of(new TypeHint<MyEvent>() {});
ListStateDescriptor<MyEvent> descriptor = new ListStateDescriptor<MyEvent>("myEvents",
typeInformation.createSerializer(new ExecutionConfig()));
CompletableFuture<ListState<MyEvent>> resultFuture =
client.getKvState(JobID.fromHexString(jobIdParam),"myEvents", "1",
BasicTypeInfo.STRING_TYPE_INFO , descriptor );
logger.info("Before the call " + resultFuture);
try {
logger.info("Finished"+ resultFuture.get());
} catch(Exception ex) {
ex.printStackTrace();
}
最后,在Flink上运行的作业配置了ListState,如下所示。请注意,数据通过String键入ListState
TypeInformation<MyEvent> typeInformation = TypeInformation.of(new TypeHint<MyEvent>() {});
ListStateDescriptor<MyEvent> eventState =
new ListStateDescriptor<MyEvent>("myEvents",typeInformation);
eventState.setQueryable("myEvents");
eventListState = getRuntimeContext().getListState(eventState);
在我看来,像序列化错误,但我不知道我需要做些什么来解决它。有没有人知道上面的代码可能有什么问题?我错过了什么吗?
我在为Flink 1.4更新this queryable state demo时遇到了同样的问题。如果我没记错的话,重要的是正确处理CompletableFuture - 你不能直接调用get()。
有关工作示例,请参阅the code,其关键部分如下所示:
try {
CompletableFuture<FoldingState<BumpEvent, Long>> resultFuture =
client.getKvState(jobId, EventCountJob.ITEM_COUNTS, key,
BasicTypeInfo.STRING_TYPE_INFO, countingState);
resultFuture.thenAccept(response -> {
try {
Long count = response.get();
// now we could do something with the value
} catch (Exception e) {
e.printStackTrace();
}
});
resultFuture.get(5, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
}