Flink 中使用 JDBC Sink 时 AbstractRichFunction 的实现不可序列化

问题描述 投票:0回答:1

我试图从事件发件箱表中读取数据,然后有两个接收器,一个将事件推送到 Kafka 主题,另一个使用 Flink 更新同一数据库中的表。由于我希望这两个接收器具有恰好一次的传递语义,因此我使用 JDBC Sink 来实现这一点。下面是代码。

Map<String, String> map = processor.getStreamConfigMap();
                    EnvironmentSettings settings = processor.getEnvironmentSettings(map);

                    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

                    tEnv.registerCatalog(ApplicationConstants.CATALOG_NAME, processor.createCatalog());
                    tEnv.useCatalog(ApplicationConstants.CATALOG_NAME);
                    DataStream<Row> resultStream = processor.fetchData(tEnv);
                    resultStream.keyBy(row -> row.getField("id")).addSink(
                    JdbcSink.exactlyOnceSink(
                            "update event_log set event_status = 'SUCCESS' where id = ?",
                            ((preparedStatement, row) -> preparedStatement.setString(1, (String) row.getField("id"))),
                            JdbcExecutionOptions.builder()
                                    .withMaxRetries(0)
                                    .build(),
                            JdbcExactlyOnceOptions.builder()
                                    .withTransactionPerConnection(true)
                                    .build(),
                            () -> {
                                // create a driver-specific XA DataSource
                                PGXADataSource xaDataSource = new org.postgresql.xa.PGXADataSource();
                                xaDataSource.setUrl(processor.applicationConfig.getDbConfig().getJdbcUrl());
                                xaDataSource.setUser(processor.dbUsername);
                                xaDataSource.setPassword(processor.dbPassword);
                                xaDataSource.setCurrentSchema(processor.applicationConfig.getDbConfig().getSchema());
                                return xaDataSource;
                            }));

我在 addSink 方法附近收到以下错误。

The implementation of the AbstractRichFunction is not serializable. The object probably contains or references non serializable fields.
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2317)
    at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:202)
    at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1244)
    at org.apache.flink.streaming.api.datastream.KeyedStream.addSink(KeyedStream.java:302)
    at com.xxx.enterprise.xxx.dataprocessor.processor.EventOutboxProcessor.lambda$main$0(EventOutboxProcessor.java:119)
    at io.micrometer.core.instrument.AbstractTimer.record(AbstractTimer.java:223)
    at com.xxx.enterprise.xxx.dataprocessor.processor.EventOutboxProcessor.main(EventOutboxProcessor.java:97)
apache-flink flink-streaming flink-sql
1个回答
0
投票

首先,

org.apache.flink.connector.jdbc.JdbcSink#exactlyOnceSink
创建一个
org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunction
实例,它扩展
AbstractRichFunction
,它与您的错误信息匹配。
其次,下面是“封闭清洁剂”的描述,来自
org.apache.flink.api.common.ExecutionConfig

启用或禁用“关闭清洁器”。闭包清理器预处理函数的实现。如果它们是(匿名)内部类,它会删除对封闭类的未使用的引用,以修复某些与序列化相关的问题并减少闭包的大小。

您可以通过以下方式修复它

env.getConfig().disableClosureCleaner();
© www.soinside.com 2019 - 2024. All rights reserved.