有一个 mongodb 实例有一个非常大的集合,例如
events_collection
有一些微服务的
N=4
实例 (nodejs
)
每个实例都应从
events_collection
读取(批量)文档,并且这些批次不应相交。
换句话说,
events_collection
将被分割成N
部分。每个部分都将由自己的微服务实例处理。
我想出了以下解决方案:
批量使用乐观锁。但它看起来不是最佳的,因为批次会相交
每个微服务实例都有一个唯一的常量ID。
events_collections
中的所有文档都将按 ID
文档字段进行分区。每个实例都会获取条件为 {...id: instanceId, ...}
的文档。但这个解决方案有一个问题:如果实例数量N
不是恒定的
有更好的解决办法吗?
这个问题我已经遇到过好几次了。 以下解决方案在 mongodb v6 上进行了测试。 它使用
splitVector
命令,但该命令并不广为人知。
db = db.getSiblingDB("testX");
d = db.foo.stats();
exact_count = d['count'];
if(true) {
num_workers = 8; // REPLACE with your number of workers
print("size: " + d['size']);
// Must divide desired num workers by 2 for split logic to work
// e.g. 8 workers would yield 16 chunks.
cc = num_workers / 2;
splitSize = Math.floor(d['size'] / cc);
} else {
splitSize = 5000000 ;
}
// Used to be we could split on chunks of 32000 bytes. Now the engine
// says 1MB is the minimum!
splitSize = Math.max(1000000, splitSize);
print("splitSize:", splitSize);
enqueueWork("testX.foo", splitSize);
//
// Here is where workers can consume items on the queue.
// Along the lines of what the OP was looking for, there
// would be multiple threads of the following logic.
//
// As a quick example, though, we will not complicate things with threads
// but will show that 1 thread operating on the work will process exactly
// the proper amount of docs.
n = 0;
totProcessed = 0;
while(1) {
work = dequeue("testX.foo");
if(work == null) {
break;
}
idmin = work['start'];
idmax = work['end'];
// Here is where we operate on the work!
c = db.foo.aggregate([
{$match: {$and: [ {"_id":{$gte:idmin}}, {"_id":{$lt:idmax}} ] }}
// For our example, just get the count of the chunk we
// are processing:
,{$count: "N"}
]);
d = c.next();
totProcessed += d['N'];
n++;
}
print(n + " chunks");
print(exact_count + " total docs");
print(totProcessed + " total processed");
function enqueueWork(ns,splitSizeBytes=32000){
split = db.runCommand({splitVector:ns, keyPattern:{_id: 1},
maxChunkSizeBytes: splitSizeBytes});
current = MinKey;
db.queue.drop();
split.splitKeys.forEach(function enqueue(key){
db.queue.insert({ns:ns, start:current, end: key._id})
current = key._id });
db.queue.insert({ns:ns, start:current, end: MaxKey})
}
function dequeue(ns){
return db.queue.findAndModify({
query: {status:{$ne:"processed"}, ns:ns},
update: {$set:{status:"processed"}}
})
}