我无法在 Visual Studio Codes maven 中访问 java pojo。
OS : Windows 11
JDK : open-jdk 17.0.2
maven : apache-maven 3.9.6
apache flink : 1.18.1 (installed on WSL 2 ubuntu)
visual studio code : x64 1.87.2
以下是简单的flink mongodb接收器代码。
== Car.java(POJO类)
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class Car{
private String brand;
private int price;
}
== Flink MongoDB Sink 代码
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
List<Car> cars = new ArrayList<>();
cars.add(new Car("BMW", 500));
cars.add(new Car("Kia", 300));
cars.add(new Car("Ford", 600));
DataStream<Car> stream = env.fromCollection(cars);
MongoSink<Car> sink = MongoSink.<Car>builder()
.setUri("mongodb://127.0.0.1:27017")
.setDatabase("class_db")
.setCollection("class_coll")
.setBatchSize(1000)
.setBatchIntervalMs(1000)
.setMaxRetries(3)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setSerializationSchema(
(input, context)
-> {
Document doc = new Document(input.getBrand(), input.getPrice());
System.out.println(doc);
return new InsertOneModel<>(BsonDocument.parse(doc.toJson()));
})
.build();
stream.sinkTo(sink);
env.execute("MongoDB POJO Test");
env.close();
除了这个异常之外,上述代码不包含任何错误,
Exception in thread "main" java.io.UncheckedIOException: java.io.IOException: Serializing the source elements failed: java.lang.reflect.InaccessibleObjectException: Unable to make field private final java.lang.Object[] java.util.Arrays$ArrayList.a accessible: module java.base does not "opens java.util" to unnamed module @2a32de6c
at org.apache.flink.streaming.api.functions.source.FromElementsFunction.setOutputType(FromElementsFunction.java:162)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySetOutputType(StreamingFunctionUtils.java:84)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.setOutputType(StreamingFunctionUtils.java:60)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setOutputType(AbstractUdfStreamOperator.java:146)
at org.apache.flink.streaming.api.operators.SimpleOperatorFactory.setOutputType(SimpleOperatorFactory.java:118)
at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:434)
at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:402)
at org.apache.flink.streaming.api.graph.StreamGraph.addLegacySource(StreamGraph.java:356)
at org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateInternal(LegacySourceTransformationTranslator.java:66)
at org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:53)
at org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:40)
at org.apache.flink.streaming.api.graph.SimpleTransformationTranslator.translateForStreaming(SimpleTransformationTranslator.java:62)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:860)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:590)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:881)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:839)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:590)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:328)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2289)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2280)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2266)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2093)
at com.aaa.test.FlinkMongoClassTest.main(FlinkMongoClassTest.java:49)
Caused by: java.io.IOException: Serializing the source elements failed: java.lang.reflect.InaccessibleObjectException: Unable to make field private final java.lang.Object[] java.util.Arrays$ArrayList.a accessible: module java.base does not "opens java.util" to unnamed module @2a32de6c
at org.apache.flink.streaming.api.functions.source.FromElementsFunction.serializeElements(FromElementsFunction.java:139)
at org.apache.flink.streaming.api.functions.source.FromElementsFunction.setOutputType(FromElementsFunction.java:160)
... 22 more
Caused by: java.lang.RuntimeException: java.lang.reflect.InaccessibleObjectException: Unable to make field private final java.lang.Object[] java.util.Arrays$ArrayList.a accessible: module java.base does not "opens java.util" to unnamed module @2a32de6c
at com.twitter.chill.java.ArraysAsListSerializer.<init>(ArraysAsListSerializer.java:69)
at org.apache.flink.api.java.typeutils.runtime.kryo.FlinkChillPackageRegistrar.registerSerializers(FlinkChillPackageRegistrar.java:67)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.getKryoInstance(KryoSerializer.java:513)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:522)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:348)
at org.apache.flink.streaming.api.functions.source.FromElementsFunction.serializeElements(FromElementsFunction.java:136)
... 23 more
Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make field private final java.lang.Object[] java.util.Arrays$ArrayList.a accessible: module java.base does not "opens java.util" to unnamed module @2a32de6c
at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:354)
at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)
所以我将输入流的类从 Car 类更改为 Tuple2,如下所示
List<Tuple2<String, Integer>> cars = new ArrayList<>();
cars.add(new Tuple2<>("BMW", 500));
cars.add(new Tuple2<>("Kia", 300));
cars.add(new Tuple2<>("Ford", 600));
DataStream<Tuple2<String, Integer>> stream = env.fromCollection(cars);
然后一切顺利,没有任何错误。看来主要的java代码无法访问Car POJO类。任何回复都会非常有帮助。谢谢。
您依赖 Kryo 来序列化您的
Car
对象。从错误消息来看,我认为您的 Car
类可能有一个 Final 字段(对象数组),Kryo 在反序列化对象时无法设置该字段。