我尝试制作简单的 Apache Flink MongoDB 连接器代码来读取和写入 MongoDB 中的 json 数据。首先,下面的代码是 MongoDB Sink 代码。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
List<Tuple2<String, Integer>> data = new ArrayList<>();
data.add(new Tuple2<>("Hello", 1));
data.add(new Tuple2<>("Hi", 2));
data.add(new Tuple2<>("Hey", 3));
DataStream<Tuple2<String, Integer>> stream = env.fromCollection(data);
MongoSink<Tuple2<String, Integer>> sink = MongoSink.<Tuple2<String, Integer>>builder()
.setUri("mongodb://127.0.0.1:27017")
.setDatabase("test_db")
.setCollection("test_coll")
.setBatchSize(1000)
.setBatchIntervalMs(1000)
.setMaxRetries(3)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setSerializationSchema(
(input, context)
-> {
Document doc = new Document(input.f0, input.f1);
return new InsertOneModel<>(BsonDocument.parse(doc.toJson()));
})
.build();
stream.sinkTo(sink);
这些接收器代码成功将json类型文档插入MongoDB。生成的文档是
{
"_id": {
"$oid": "65f67f3b9779060fd2390d0e"
},
"Hello": 1
}
但是MongoDB源代码带来了一些错误消息。
MongoSource<Tuple2<String,Integer>> source = MongoSource.<Tuple2<String,Integer>>builder()
.setUri("mongodb://127.0.0.1:27017")
.setDatabase("test_db")
.setCollection("test_coll")
.setDeserializationSchema(new MongoDeserializationSchema<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> deserialize(BsonDocument document) {
String key = document.getFirstKey();
Integer value = document.getInt64(key).intValue(); // this line throws the error message
return new Tuple2<String, Integer>(key, value);
}
@Override
public TypeInformation<Tuple2<String, Integer>> getProducedType() {
return Types.TUPLE(Types.STRING, Types.INT);
}
})
.build();
DataStream<Tuple2<String, Integer>> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MongoDB-Source");
ds.print();
错误信息是
Caused by: org.bson.BsonInvalidOperationException: Value expected to be of type INT64 is of unexpected type OBJECT_ID
at org.bson.BsonValue.throwIfInvalidType(BsonValue.java:419)
at org.bson.BsonValue.asInt64(BsonValue.java:105)
at org.bson.BsonDocument.getInt64(BsonDocument.java:203)
at com.aaa.test.FlinkMongoTest$1.deserialize(FlinkMongoTest.java:63)
at com.aaa.test.FlinkMongoTest$1.deserialize(FlinkMongoTest.java:1)
at org.apache.flink.connector.mongodb.source.reader.deserializer.MongoDeserializationSchema.deserialize(MongoDeserializationSchema.java:58)
at org.apache.flink.connector.mongodb.source.reader.emitter.MongoRecordEmitter.emitRecord(MongoRecordEmitter.java:54)
at org.apache.flink.connector.mongodb.source.reader.emitter.MongoRecordEmitter.emitRecord(MongoRecordEmitter.java:34)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:160)
我认为请求json数据的值类型应该是INT64,但返回的类型是OBJECT_ID,所以这些代码会带来错误。请告诉我如何调用mongodb文档的整数值,而不是OBJECT_ID。任何答复将不胜感激。
你能尝试一下吗:
整数值 = document.getObjectId(key).getValue().getCounter();
而不是这个:
整数值 = document.getInt64(key).intValue(); // 此行抛出错误消息
这是一个假设。
在插入过程中,第一个键是基于输出的object_id:
{ “_ID”: { “$oid”:“65f67f3b9779060fd2390d0e” }, “你好”:1 }
getCounter() 将是创建 object_id/document 时自动递增计数器的值(如果这是您要查找的值)。
更多详细信息:https://mongodb.github.io/mongo-java-driver/3.6/javadoc/org/bson/types/ObjectId.html
希望有帮助