我是Apache Flink的新手,所以我目前正在尝试做一些实验。我正在从Kafka阅读主题,然后将其打印在控制台上。打印大约100k + kafka消息后,它将引发异常。日志输出如下。
我正在使用自定义类,该类扩展了AbstractDeserializationSchema以反序列化kafka记录值。我什至尝试在其中添加一些异常处理,但这不会被触发。
我用来从Kafka使用的代码非常简单:
public class Main {
private static final Logger LOGGER = LoggerFactory.getLogger(Main.class);
private static final int FLINK_PARALLELISM = 1;
public static void main(String[] args) {
LOGGER.info("Starting Flink Kafka Consumer");
try {
Properties props = new Properties();
props.put("bootstrap.servers", Arrays.asList(
"localhost:9092"
));
props.put("group.id", "test_flink");
StreamExecutionEnvironment flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment();
flinkEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
flinkEnv.setParallelism(FLINK_PARALLELISM);
FlinkKafkaConsumer011<String> kafkaConsumer = new FlinkKafkaConsumer011<>("test", new SimpleStringSchema(), props);
DataStream<String> kafkaStream = flinkEnv.addSource(kafkaConsumer);
kafkaStream.print();
flinkEnv.execute("Flink Test");
} catch (Exception e) {
LOGGER.error("Exception thrown: {}", e.getMessage());
}
}
}
即使在例外之后,输出仍然会从主题追加到文件中。 kafka主题已启动并正在运行,但是无论是否发布任何内容,MiniCluster都将停止。我无法查明问题所在。
有人能指出我正确的方向吗?谢谢
INFO [Source: Custom Source -> Sink: Print to Std.
Out (1/1)] o.a.f.s.c.k.FlinkKafkaConsumerBase - Consumer subtask 0 will start reading the following 1 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='test', partition=0}]
INFO [Kafka 0.10 Fetcher for Source: Custom Source -> Sink: Print to Std. Out (1/1)] o.a.k.c.c.ConsumerConfig - ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = test_flink
heartbeat.interval.ms = 3000
interceptor.classes = null
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
INFO [Kafka 0.10 Fetcher for Source: Custom Source -> Sink: Print to Std. Out (1/1)] o.a.k.c.u.AppInfoParser - Kafka version : 0.11.0.2
INFO [Kafka 0.10 Fetcher for Source: Custom Source -> Sink: Print to Std. Out (1/1)] o.a.k.c.u.AppInfoParser - Kafka commitId : 73be1e1168f91ee2
INFO [Kafka 0.10 Fetcher for Source: Custom Source -> Sink: Print to Std. Out (1/1)] o.a.k.c.c.i.AbstractCoordinator - Discovered coordinator localhost:9092 (id: 2147483647 rack: null) for group test_flink.
INFO [main] o.a.f.r.m.MiniCluster - Shutting down Flink Mini Cluster
INFO [main] o.a.f.r.d.DispatcherRestEndpoint - Shutting down rest endpoint.
INFO [flink-akka.actor.default-dispatcher-4] o.a.f.r.t.TaskExecutor - Stopping TaskExecutor akka://flink/user/taskmanager_0.
INFO [flink-akka.actor.default-dispatcher-4] o.a.f.r.s.TaskExecutorLocalStateStoresManager - Shutting down TaskExecutorLocalStateStoresManager.
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Sink: Print to Std. Out (1/1) (eb1f62611a047c5da09d8fa6f4e49084) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
at org.apache.flink.runtime.taskexecutor.TaskExecutor.postStop(TaskExecutor.java:308) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
at akka.actor.Actor$class.aroundPostStop(Actor.scala:515) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.ActorCell.terminate(ActorCell.scala:374) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.dispatch.Mailbox.run(Mailbox.scala:224) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.dispatch.Mailbox.exec(Mailbox.scala:234) ~[akka-actor_2.11-2.4.20.jar:na]
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[scala-library-2.11.12.jar:na]
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[scala-library-2.11.12.jar:na]
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.11.12.jar:na]
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.11.12.jar:na]
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.e.ExecutionGraph - Job Flink Test (670e9073fbab507c41a26b5641a265eb) switched from state RUNNING to FAILING.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
at org.apache.flink.runtime.taskexecutor.TaskExecutor.postStop(TaskExecutor.java:308) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
at akka.actor.Actor$class.aroundPostStop(Actor.scala:515) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.ActorCell.terminate(ActorCell.scala:374) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.dispatch.Mailbox.run(Mailbox.scala:224) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.dispatch.Mailbox.exec(Mailbox.scala:234) ~[akka-actor_2.11-2.4.20.jar:na]
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[scala-library-2.11.12.jar:na]
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[scala-library-2.11.12.jar:na]
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.11.12.jar:na]
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.11.12.jar:na]
INFO [flink-akka.actor.default-dispatcher-4] o.a.f.r.i.d.i.IOManager - I/O manager removed spill file directory /tmp/flink-io-763eca47-ab9c-4985-aa30-c7ac21442635
INFO [flink-akka.actor.default-dispatcher-4] o.a.f.r.i.n.NetworkEnvironment - Shutting down the network environment and its components.
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.e.ExecutionGraph - Try to restart or fail the job Flink Test (670e9073fbab507c41a26b5641a265eb) if no longer possible.
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.e.ExecutionGraph - Job Flink Test (670e9073fbab507c41a26b5641a265eb) switched from state FAILING to FAILED.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
at org.apache.flink.runtime.taskexecutor.TaskExecutor.postStop(TaskExecutor.java:308) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
at akka.actor.Actor$class.aroundPostStop(Actor.scala:515) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.ActorCell.terminate(ActorCell.scala:374) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.dispatch.Mailbox.run(Mailbox.scala:224) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.dispatch.Mailbox.exec(Mailbox.scala:234) ~[akka-actor_2.11-2.4.20.jar:na]
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[scala-library-2.11.12.jar:na]
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[scala-library-2.11.12.jar:na]
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.11.12.jar:na]
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.11.12.jar:na]
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.e.ExecutionGraph - Could not restart the job Flink Test (670e9073fbab507c41a26b5641a265eb) because the restart strategy prevented it.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
at org.apache.flink.runtime.taskexecutor.TaskExecutor.postStop(TaskExecutor.java:308) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
at akka.actor.Actor$class.aroundPostStop(Actor.scala:515) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.ActorCell.terminate(ActorCell.scala:374) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.dispatch.Mailbox.run(Mailbox.scala:224) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.dispatch.Mailbox.exec(Mailbox.scala:234) ~[akka-actor_2.11-2.4.20.jar:na]
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[scala-library-2.11.12.jar:na]
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[scala-library-2.11.12.jar:na]
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.11.12.jar:na]
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.11.12.jar:na]
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.c.CheckpointCoordinator - Stopping checkpoint coordinator for job 670e9073fbab507c41a26b5641a265eb.
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.c.StandaloneCompletedCheckpointStore - Shutting down
INFO [flink-akka.actor.default-dispatcher-4] o.a.f.r.t.JobLeaderService - Stop job leader service.
INFO [flink-akka.actor.default-dispatcher-4] o.a.f.r.t.TaskExecutor - Stopped TaskExecutor akka://flink/user/taskmanager_0.
INFO [ForkJoinPool.commonPool-worker-9] o.a.f.r.d.DispatcherRestEndpoint - Removing cache directory /tmp/flink-web-ui
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.r.s.SlotManager - Closing the SlotManager.
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.r.s.SlotManager - Suspending the SlotManager.
INFO [ForkJoinPool.commonPool-worker-9] o.a.f.r.d.DispatcherRestEndpoint - Shut down complete.
INFO [flink-akka.actor.default-dispatcher-3] o.a.f.r.j.JobMaster - Close ResourceManager connection 3a6541e469014e5685a7510403385dcb: ResourceManager leader changed to new address null.
INFO [PermanentBlobCache shutdown hook] o.a.f.r.b.PermanentBlobCache - Shutting down BLOB cache
INFO [TransientBlobCache shutdown hook] o.a.f.r.b.TransientBlobCache - Shutting down BLOB cache
INFO [BlobServer shutdown hook] o.a.f.r.b.BlobServer - Stopped BLOB server at 0.0.0.0:46065
我使用1.9.0版存在相同的问题,并且似乎直接在构思上执行1.4.1版之后的版本也会遇到相同的问题。当然,这个问题不会发生在每个人身上。您解决了这个问题吗?