Flink 卡在广播上

问题描述 投票:0回答:1

我遇到了 Flink 在广播状态写入时卡住的问题:

ctx.collect(data)

其中 ctx 是 SourceContext

没有抛出异常,胎面转储被卡住,但正在运行并且没有被卡住:

"Legacy Source Thread - Source: deviceInfoReader (1/1)#0" Id=86 RUNNABLE
    at org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:299)
    at org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:124)
    at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:42)
    at com.esotericsoftware.kryo.io.Output.flush(Output.java:185)
    at com.esotericsoftware.kryo.io.Output.require(Output.java:164)
    at com.esotericsoftware.kryo.io.Output.writeString_slow(Output.java:473)
    at com.esotericsoftware.kryo.io.Output.writeString(Output.java:368)
    at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:195)
    at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:188)
    at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:629)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:86)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:316)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:168)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.serializeRecord(RecordWriter.java:130)
    at org.apache.flink.runtime.io.network.api.writer.BroadcastRecordWriter.broadcastEmit(BroadcastRecordWriter.java:48)
    at org.apache.flink.runtime.io.network.api.writer.BroadcastRecordWriter.emit(BroadcastRecordWriter.java:41)
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:101)
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:87)
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:43)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:107)
    -  locked java.lang.Object@11434500
    at xxx
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263)

知道为什么会这样吗?序列化过程中可能是一个缓慢的循环吗?数据量很大,但以前从未出现过任何问题。

任务并没有失败,只是围绕着这个点循环。

java serialization apache-flink kryo
1个回答
0
投票

我猜你的堆空间不足,因此被“挂”在GC地狱中。

© www.soinside.com 2019 - 2024. All rights reserved.