我试图从事件发件箱表中读取数据,然后有两个接收器,一个将事件推送到 Kafka 主题,另一个使用 Flink 更新同一数据库中的表。由于我希望这两个接收器具有恰好一次的传递语义,因此我使用 JDBC Sink 来实现这一点。下面是代码。
Map<String, String> map = processor.getStreamConfigMap();
EnvironmentSettings settings = processor.getEnvironmentSettings(map);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
tEnv.registerCatalog(ApplicationConstants.CATALOG_NAME, processor.createCatalog());
tEnv.useCatalog(ApplicationConstants.CATALOG_NAME);
DataStream<Row> resultStream = processor.fetchData(tEnv);
resultStream.keyBy(row -> row.getField("id")).addSink(
JdbcSink.exactlyOnceSink(
"update event_log set event_status = 'SUCCESS' where id = ?",
((preparedStatement, row) -> preparedStatement.setString(1, (String) row.getField("id"))),
JdbcExecutionOptions.builder()
.withMaxRetries(0)
.build(),
JdbcExactlyOnceOptions.builder()
.withTransactionPerConnection(true)
.build(),
() -> {
// create a driver-specific XA DataSource
PGXADataSource xaDataSource = new org.postgresql.xa.PGXADataSource();
xaDataSource.setUrl(processor.applicationConfig.getDbConfig().getJdbcUrl());
xaDataSource.setUser(processor.dbUsername);
xaDataSource.setPassword(processor.dbPassword);
xaDataSource.setCurrentSchema(processor.applicationConfig.getDbConfig().getSchema());
return xaDataSource;
}));
我在 addSink 方法附近收到以下错误。
The implementation of the AbstractRichFunction is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2317)
at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:202)
at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1244)
at org.apache.flink.streaming.api.datastream.KeyedStream.addSink(KeyedStream.java:302)
at com.xxx.enterprise.xxx.dataprocessor.processor.EventOutboxProcessor.lambda$main$0(EventOutboxProcessor.java:119)
at io.micrometer.core.instrument.AbstractTimer.record(AbstractTimer.java:223)
at com.xxx.enterprise.xxx.dataprocessor.processor.EventOutboxProcessor.main(EventOutboxProcessor.java:97)
首先,
org.apache.flink.connector.jdbc.JdbcSink#exactlyOnceSink
创建一个org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunction
实例,它扩展AbstractRichFunction
,它与您的错误信息匹配。org.apache.flink.api.common.ExecutionConfig
启用或禁用“关闭清洁器”。闭包清理器预处理函数的实现。如果它们是(匿名)内部类,它会删除对封闭类的未使用的引用,以修复某些与序列化相关的问题并减少闭包的大小。
您可以通过以下方式修复它
env.getConfig().disableClosureCleaner();