我想BulkInsert某些文件在不同的收藏在2个数据库MongoDB中。
MongoClient mongoClient2 = this.getMongoClient();
MongoDatabase currentDB = mongoClient2.getDatabase(splits[0]);
MongoCollection<Document> currentCollectionNew = currentDB.getCollection(splits[1])
.withWriteConcern(WriteConcern.MAJORITY.withJournal(true))
.withReadConcern(ReadConcern.MAJORITY);
BulkWriteOptions bulkWriteOptions = new BulkWriteOptions();
bulkWriteOptions.ordered(true);
bulkWriteOptions.bypassDocumentValidation(true);
try {
BulkWriteResult bulkWriteResult = urrentCollectionNew.bulkWrite(listDoc,
bulkWriteOptions);
logger.info("bulkWriteResult inserted count in MAIN: " + bulkWriteResult.getInsertedCount());
logger.info("bulkWriteResult modified count in MAIN: " + bulkWriteResult.getModifiedCount());
logger.info("bulkWriteResult matched count in MAIN : " + bulkWriteResult.getMatchedCount());
logger.info("bulkWriteResult deleted count in MAIN : " + bulkWriteResult.getDeletedCount());
logger.info("bulkWriteResult upserted count in MAIN : " + bulkWriteResult.getUpserts().size());
logger.info("bulkWriteResult was acknowledged in MAIN : " + bulkWriteResult.wasAcknowledged());
mongoClient2.close()
} catch (Exception e) {
logger.warn("Error in bulkWriting main DB: {} ", e.getMessage());
logger.error(e.getMessage(), e);
}
MongoCollection<Document> mongoStageCollection = objFactory.getCollectionObject(resourceType, true);
String[] splitsStage = mongoStageCollection.getNamespace().getFullName().split("\\.");
MongoClient mongoClient3 = this.getMongoClient();
MongoDatabase newStageDB = mongoClient3.getDatabase(splitsStage[0]);
MongoCollection<Document> stageCollectionNew = newStageDB.getCollection(splitsStage[1])
.withWriteConcern(WriteConcern.MAJORITY.withJournal(true))
.withReadConcern(ReadConcern.MAJORITY);
logger.info("mongoStageCollection.getWriteConcern(): {} ", mongoStageCollection.getWriteConcern());
logger.info("mongoStageCollection.getReadConcern(): {} ",
mongoStageCollection.getReadConcern().toString());
logger.info("mongoStageCollection.getReadPreference(): {}",
mongoStageCollection.getReadPreference().getName());
try {
BulkWriteResult bulkWriteResult = stageCollectionNew.bulkWrite(listDoc, bulkWriteOptions);
logger.info("bulkWriteResult inserted count in STAGING: " + bulkWriteResult.getInsertedCount());
logger.info("bulkWriteResult modified count in STAGING: " + bulkWriteResult.getModifiedCount());
logger.info("bulkWriteResult matched count in STAGING: " + bulkWriteResult.getMatchedCount());
logger.info("bulkWriteResult deleted count in STAGING: " + bulkWriteResult.getDeletedCount());
logger.info("bulkWriteResult upserted count in STAGING: " + bulkWriteResult.getUpserts().size());
logger.info("bulkWriteResult was acknowledged in STAGING: " + bulkWriteResult.wasAcknowledged());
mongoClient3.close();
} catch (Exception e) {
logger.warn("Error in bulkWriting STAGING DB: {} ", e.getMessage());
logger.error(e.getMessage(), e);
}
例如2分的DB FHIR和FHIR_Stage。相同的集合都区议会内部创建的。 FHIR.Condition和FHIR_STAGE.Condition
FHIR.Observation和FHIR_STAGE.Observation
等等...
FHIR应该同时FHIR_Stage应该只有增量数据将所有数据。然而,在初始加载两个数据块应包含完全相同的数据。
我所看到的是,在这2个数据块的集合中的计数不匹配,即在FHIR.Condition计数不一样FHIR_STAGE.Condition
这里的问题是这种不匹配是随机发生的,即它匹配有时有有时它没有(当我清理一切,重新运行初始负载),而这种情况在那些2个DB的不同类别。并没有模式的,随机收集一些计数不会匹配,有时一切都将匹配。
我已经不能够现在满脑子都在这一个星期。任何帮助是极大的赞赏。
MongoDB的设置:
我们有一个3节点(VM)集群。我们有3个碎片运行,每个碎片是一个3成员副本集。每个节点为副本集的一个的初级。
群集正在使用X509证书安全的。
我看不出有什么错误,或者在sh.status()或rs.status()。也没有复制延迟。
DeNB与收藏是从取决于一些业务逻辑的Java代码中动态创建。而且我还能够对数据库,然后从代码集合分片。
WriteConcern - 多数
ReadPreference - 主
ReadConcern - 多数
蒙戈版本:3.4.15蒙戈Java驱动程序:3.4.2
仅供参考 - 相同的代码库工作在一个独立的MongoDB的预期。
由于在期待。
我会很高兴,如果需要分享更多的信息。
附:
如果这有什么差别,工艺写入MongoDB是一个卡夫卡消费者
我们发现这个问题张贴这几天后 -
我们是分片的MongoDB集群。
这是因为我们没有使用从聚合管道,而不是依靠db.collectionName.count计数()函数()
移动到聚合管道,我们可以看到在两个平等的DB文件。
Reference分出来。