org.bson.BsonInvalidOperationException:预期为 INT64 类型的值是意外的 OBJECT_ID 类型

问题描述 投票:0回答:1

我尝试制作简单的 Apache Flink MongoDB 连接器代码来读取和写入 MongoDB 中的 json 数据。首先,下面的代码是 MongoDB Sink 代码。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
        
List<Tuple2<String, Integer>> data = new ArrayList<>();
data.add(new Tuple2<>("Hello", 1));
data.add(new Tuple2<>("Hi", 2));
data.add(new Tuple2<>("Hey", 3));
        
DataStream<Tuple2<String, Integer>> stream = env.fromCollection(data);

MongoSink<Tuple2<String, Integer>> sink = MongoSink.<Tuple2<String, Integer>>builder()
            .setUri("mongodb://127.0.0.1:27017")
            .setDatabase("test_db")
            .setCollection("test_coll")
            .setBatchSize(1000)
            .setBatchIntervalMs(1000)
            .setMaxRetries(3)
            .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
            .setSerializationSchema(
                    (input, context) 
                        -> {
                            Document doc = new Document(input.f0, input.f1);
                            return new InsertOneModel<>(BsonDocument.parse(doc.toJson()));
                        })
            .build();

stream.sinkTo(sink);

这些接收器代码成功将json类型文档插入MongoDB。生成的文档是

{
  "_id": {
    "$oid": "65f67f3b9779060fd2390d0e"
  },
  "Hello": 1
}

但是MongoDB源代码带来了一些错误消息。

MongoSource<Tuple2<String,Integer>> source = MongoSource.<Tuple2<String,Integer>>builder()
            .setUri("mongodb://127.0.0.1:27017")
            .setDatabase("test_db")
            .setCollection("test_coll")
            .setDeserializationSchema(new MongoDeserializationSchema<Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> deserialize(BsonDocument document) {
                    String key = document.getFirstKey();
                    Integer value = document.getInt64(key).intValue();  // this line throws the error message

                    return new Tuple2<String, Integer>(key, value);
                }

                @Override
                public TypeInformation<Tuple2<String, Integer>> getProducedType() {
                    return Types.TUPLE(Types.STRING, Types.INT);
                }
            })
            .build();

DataStream<Tuple2<String, Integer>> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MongoDB-Source");
ds.print();

错误信息是

Caused by: org.bson.BsonInvalidOperationException: Value expected to be of type INT64 is of unexpected type OBJECT_ID
        at org.bson.BsonValue.throwIfInvalidType(BsonValue.java:419)
        at org.bson.BsonValue.asInt64(BsonValue.java:105)
        at org.bson.BsonDocument.getInt64(BsonDocument.java:203)
        at com.aaa.test.FlinkMongoTest$1.deserialize(FlinkMongoTest.java:63)
        at com.aaa.test.FlinkMongoTest$1.deserialize(FlinkMongoTest.java:1)
        at org.apache.flink.connector.mongodb.source.reader.deserializer.MongoDeserializationSchema.deserialize(MongoDeserializationSchema.java:58)
        at org.apache.flink.connector.mongodb.source.reader.emitter.MongoRecordEmitter.emitRecord(MongoRecordEmitter.java:54) 
        at org.apache.flink.connector.mongodb.source.reader.emitter.MongoRecordEmitter.emitRecord(MongoRecordEmitter.java:34) 
        at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:160)

我认为请求json数据的值类型应该是INT64,但返回的类型是OBJECT_ID,所以这些代码会带来错误。请告诉我如何调用mongodb文档的整数值,而不是OBJECT_ID。任何答复将不胜感激。

mongodb apache-flink flink-streaming bson
1个回答
0
投票

你能尝试一下吗:

整数值 = document.getObjectId(key).getValue().getCounter();

而不是这个:

整数值 = document.getInt64(key).intValue(); // 此行抛出错误消息

这是一个假设。

在插入过程中,第一个键是基于输出的object_id:

{ “_ID”: { “$oid”:“65f67f3b9779060fd2390d0e” }, “你好”:1 }

getCounter() 将是创建 object_id/document 时自动递增计数器的值(如果这是您要查找的值)。

更多详细信息:https://mongodb.github.io/mongo-java-driver/3.6/javadoc/org/bson/types/ObjectId.html

希望有帮助

© www.soinside.com 2019 - 2024. All rights reserved.