集合的并行处理而不相交

问题描述 投票:0回答:1

有一个 mongodb 实例有一个非常大的集合,例如

events_collection

有一些微服务的

N=4
实例 (
nodejs
)

每个实例都应从

events_collection
读取(批量)文档,并且这些批次不应相交。

换句话说,

events_collection
将被分割成
N
部分。每个部分都将由自己的微服务实例处理。

我想出了以下解决方案:

  1. 批量使用乐观锁。但它看起来不是最佳的,因为批次会相交

  2. 每个微服务实例都有一个唯一的常量ID。

    events_collections
    中的所有文档都将按
    ID
    文档字段进行分区。每个实例都会获取条件为
    {...id: instanceId, ...}
    的文档。但这个解决方案有一个问题:如果实例数量
    N
    不是恒定的

有更好的解决办法吗?

node.js mongodb mongoose architecture
1个回答
0
投票

这个问题我已经遇到过好几次了。 以下解决方案在 mongodb v6 上进行了测试。 它使用

splitVector
命令,但该命令并不广为人知。

db = db.getSiblingDB("testX");

d = db.foo.stats();
exact_count = d['count'];

if(true) {
    num_workers = 8;  // REPLACE with your number of workers

    print("size: " + d['size']);

    // Must divide desired num workers by 2 for split logic to work
    // e.g.  8 workers would yield 16 chunks.
    cc = num_workers / 2;
    splitSize = Math.floor(d['size'] / cc);
    
} else {
    splitSize = 5000000 ;
}

// Used to be we could split on chunks of 32000 bytes.  Now the engine
// says 1MB is the minimum!
splitSize = Math.max(1000000, splitSize);
print("splitSize:", splitSize);

enqueueWork("testX.foo", splitSize);


//
// Here is where workers can consume items on the queue.
// Along the lines of what the OP was looking for, there
// would be multiple threads of the following logic.
//
// As a quick example, though, we will not complicate things with threads
// but will show that 1 thread operating on the work will process exactly
// the proper amount of docs.

n = 0;
totProcessed = 0;

while(1) {
    work = dequeue("testX.foo");
    if(work == null) {
        break;
    }

    idmin = work['start'];
    idmax = work['end'];

    // Here is where we operate on the work!
    c = db.foo.aggregate([
        {$match: {$and: [ {"_id":{$gte:idmin}}, {"_id":{$lt:idmax}} ] }}

        // For our example, just get the count of the chunk we
        // are processing:
        ,{$count: "N"}
    ]);
    d = c.next();

    totProcessed += d['N'];
    n++;
}
print(n + " chunks");
print(exact_count + " total docs");
print(totProcessed + " total processed");


function enqueueWork(ns,splitSizeBytes=32000){
    split = db.runCommand({splitVector:ns, keyPattern:{_id: 1},
                           maxChunkSizeBytes: splitSizeBytes});

    current = MinKey;

    db.queue.drop();
    
    split.splitKeys.forEach(function enqueue(key){
            db.queue.insert({ns:ns, start:current, end: key._id})
                current = key._id });
    db.queue.insert({ns:ns, start:current, end: MaxKey})
        }

function dequeue(ns){
    return db.queue.findAndModify({
            query: {status:{$ne:"processed"}, ns:ns},
                update: {$set:{status:"processed"}}
        })
}
© www.soinside.com 2019 - 2024. All rights reserved.