范围:
我有一个 flink 作业,它从 kafka 读取数据,使用 keyBy 对流进行分区,处理数据并输出到几个 kafka 主题和几个 mariaDB 表(使用 flink 的侧输出)。我正在使用 flink 的数据流 API。我有一个作业管理器和一个在 docker 中运行的任务管理器。这项工作应该是 24-7 运行。
使用 flink v1.16 和 mysql connector v8.0.32
问题:
我完全按照文档中的建议创建到 mariaDB 的输出(https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/jdbc/#full-example ) 但连接在大约 10 分钟后丢失。我开始在日志中收到此错误:
taskmanager_1 | 2023-03-20 14:52:37,804 ERROR org.apache.flink.connector.jdbc.internal.JdbcOutputFormat [] - JDBC executeBatch error, retry times = 0
taskmanager_1 | java.sql.SQLException: No operations allowed after statement closed.
taskmanager_1 | at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129) ~[RealtimeProcessingEngine-0.1-SNAPSHOT-all.jar:?]
taskmanager_1 | at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97) ~[RealtimeProcessingEngine-0.1-SNAPSHOT-all.jar:?]
taskmanager_1 | at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:89) ~[RealtimeProcessingEngine-0.1-SNAPSHOT-all.jar:?]
taskmanager_1 | at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:63) ~[RealtimeProcessingEngine-0.1-SNAPSHOT-all.jar:?]
taskmanager_1 | at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:73) ~[RealtimeProcessingEngine-0.1-SNAPSHOT-all.jar:?]
taskmanager_1 | at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:82) ~[RealtimeProcessingEngine-0.1-SNAPSHOT-all.jar:?]
taskmanager_1 | at com.mysql.cj.jdbc.ClientPreparedStatement.clearBatch(ClientPreparedStatement.java:270) ~[RealtimeProcessingEngine-0.1-SNAPSHOT-all.jar:?]
taskmanager_1 | at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:422) ~[RealtimeProcessingEngine-0.1-SNAPSHOT-all.jar:?]
taskmanager_1 | at com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:795) ~[RealtimeProcessingEngine-0.1-SNAPSHOT-all.jar:?]
taskmanager_1 | at org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:73) ~[RealtimeProcessingEngine-0.1-SNAPSHOT-all.jar:?]
taskmanager_1 | at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:246) ~[RealtimeProcessingEngine-0.1-SNAPSHOT-all.jar:?]
taskmanager_1 | at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:216) ~[RealtimeProcessingEngine-0.1-SNAPSHOT-all.jar:?]
taskmanager_1 | at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.writeRecord(JdbcOutputFormat.java:195) ~[RealtimeProcessingEngine-0.1-SNAPSHOT-all.jar:?]
taskmanager_1 | at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:57) ~[RealtimeProcessingEngine-0.1-SNAPSHOT-all.jar:?]
taskmanager_1 | at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) ~[flink-dist-1.16.0.jar:1.16.0]
taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-dist-1.16.0.jar:1.16.0]
taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:68) ~[flink-dist-1.16.0.jar:1.16.0]
taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:84) ~[flink-dist-1.16.0.jar:1.16.0]
taskmanager_1 | at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:62) ~[flink-dist-1.16.0.jar:1.16.0]
taskmanager_1 | at org.apache.flink.streaming.api.operators.KeyedProcessOperator$ContextImpl.output(KeyedProcessOperator.java:129) ~[flink-dist-1.16.0.jar:1.16.0]
taskmanager_1 | at com..learningFlink.ProcessFlinkSession.processElement(ProcessFlinkSession.java:95) ~[RealtimeProcessingEngine-0.1-SNAPSHOT.jar:?]
taskmanager_1 | at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) ~[flink-dist-1.16.0.jar:1.16.0]
taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) ~[flink-dist-1.16.0.jar:1.16.0]
taskmanager_1 | at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) ~[flink-dist-1.16.0.jar:1.16.0]
taskmanager_1 | at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) ~[flink-dist-1.16.0.jar:1.16.0]
taskmanager_1 | at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.16.0.jar:1.16.0]
taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542) ~[flink-dist-1.16.0.jar:1.16.0]
taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.16.0.jar:1.16.0]
taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) ~[flink-dist-1.16.0.jar:1.16.0]
taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) ~[flink-dist-1.16.0.jar:1.16.0]
taskmanager_1 | at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) [flink-dist-1.16.0.jar:1.16.0]
taskmanager_1 | at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) [flink-dist-1.16.0.jar:1.16.0]
taskmanager_1 | at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) [flink-dist-1.16.0.jar:1.16.0]
taskmanager_1 | at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) [flink-dist-1.16.0.jar:1.16.0]
taskmanager_1 | at java.lang.Thread.run(Thread.java:750) [?:1.8.0_362]
taskmanager_1 | Caused by: com.mysql.cj.exceptions.StatementIsClosedException: No operations allowed after statement closed.
taskmanager_1 | at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:1.8.0_362]
taskmanager_1 | at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[?:1.8.0_362]
taskmanager_1 | at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[?:1.8.0_362]
taskmanager_1 | at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_362]
taskmanager_1 | at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:61) ~[RealtimeProcessingEngine-0.1-SNAPSHOT-all.jar:?]
taskmanager_1 | at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:85) ~[RealtimeProcessingEngine-0.1-SNAPSHOT-all.jar:?]
taskmanager_1 | at com.mysql.cj.jdbc.StatementImpl.checkClosed(StatementImpl.java:336) ~[RealtimeProcessingEngine-0.1-SNAPSHOT-all.jar:?]
taskmanager_1 | at com.mysql.cj.jdbc.ClientPreparedStatement.clearBatch(ClientPreparedStatement.java:265) ~[RealtimeProcessingEngine-0.1-SNAPSHOT-all.jar:?]
taskmanager_1 | ... 28 more
我尝试使用 mariaDB 的连接器和 MySQL 的连接器,得到相同的结果,但错误消息中的措辞略有不同。
值得一提的是,即使在收到此消息时,有时也会像没有问题一样将数据写入数据库,甚至在几个小时后多次出现错误。
我正在用“重复密钥更新”进行“插入”。
在数据库方面,我看到 flink 启动时建立了许多(~50)个连接,并且大多数连接在 10 分钟后被删除,无论它们是否被使用。
我已经更改了 flink 和 mariaDB 上的超时,但问题仍然存在。
我知道 flink 无法重新建立丢失的连接,这是一个正在修复错误“FLINK-30431”的问题(https://issues.apache.org/jira/browse/FLINK-30431 ) 但我至少需要解决问题。
感谢您的帮助。