我遇到了 Flink 在广播状态写入时卡住的问题:
ctx.collect(data)
其中 ctx 是 SourceContext
没有抛出异常,胎面转储被卡住,但正在运行并且没有被卡住:
"Legacy Source Thread - Source: deviceInfoReader (1/1)#0" Id=86 RUNNABLE
at org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:299)
at org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:124)
at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:42)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:185)
at com.esotericsoftware.kryo.io.Output.require(Output.java:164)
at com.esotericsoftware.kryo.io.Output.writeString_slow(Output.java:473)
at com.esotericsoftware.kryo.io.Output.writeString(Output.java:368)
at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:195)
at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:188)
at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:629)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:86)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:316)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:168)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.serializeRecord(RecordWriter.java:130)
at org.apache.flink.runtime.io.network.api.writer.BroadcastRecordWriter.broadcastEmit(BroadcastRecordWriter.java:48)
at org.apache.flink.runtime.io.network.api.writer.BroadcastRecordWriter.emit(BroadcastRecordWriter.java:41)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:101)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:87)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:43)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:107)
- locked java.lang.Object@11434500
at xxx
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263)
知道为什么会这样吗?序列化过程中可能是一个缓慢的循环吗?数据量很大,但以前从未出现过任何问题。
任务并没有失败,只是围绕着这个点循环。
我猜你的堆空间不足,因此被“挂”在GC地狱中。