假设我们有3个服务并行工作并对MongoDB存储进行写操作。他们创建记录,其中包含以下信息:
{
guid: GUID,
ts: Timestamp,
data: Object
}
因此,MongoDB存储在一个时刻应该不是特定GUID或LATEST(最大ts)记录。
小例子:
{guid: 1, ts: 10, data: {}}
通话 - 插入{guid: 1, ts: 10, data: {}}
{guid: 1, ts: 5, data: {}}
通话 - 没有更新{guid: 1, ts: 15, data: {}}
调用 - 使用ts:15更新的文档和新数据。换句话说,如果没有提供GUID的此类文档,我们必须插入记录,并在这种guid已经存在并且ts大于现有记录时更新记录。如果ts小于现有记录,请勿更新记录。
我知道这是一种upsert操作,但我无法想象如何处理这个问题。试图使用findAndModify,mapReduce或$max update运算符,但没有运气atm。先感谢您。
我们需要建模的逻辑有三个部分。给出一份新文件
var newDoc = { "guid" : 1, "ts" : 10, "data" : "asdf" }
guid
相同的newDoc
文件 - 插入newDoc
oldDoc
与guid
和newDoc
相同的oldDoc.ts < newDoc.ts
- newDoc
覆盖oldDoc
oldDoc
与guid
和newDoc
相同的oldDoc.ts > newDoc.ts
- newDoc
没有影响如果ts
值相等,我想我们不关心我们保留的oldDoc
或newDoc
中的哪一个。
没有一步可以处理所有这些逻辑。我们可以使用一个更新来处理第二个或第三个条件,但是将这三个一起处理需要多个步骤。 shell的示例代码,假设{ "guid" : 1 }
上有唯一索引:
// just try to insert
var wr = db.test.insert(newDoc)
if (wr.getWriteError() && wr.getWriteError.code === 11000) {
// this is case 2 or 3 - 11000 is duplicate key error
// so update if case 2
db.test.update({ "guid" : newDoc.guid, "ts" : { "$lte" : newDoc.ts } }, newDoc)
}
else {
// this is case 1 - newDoc was inserted if there weren't other errors
}
我认为此订单适用于并发请求。如果我们假设我们有两个工作人员Alice和Bob想要使用相同的guid
,那么,如果guid
不存在,Alice和Bob中的一个将首先执行插入而另一个将接收索引错误。两者都会以某种顺序运行第二次更新,无论如何,更高的时间戳都会赢。如果guid确实存在,我们会减少到它不存在的第二部分。我认为我们在所有情况下都很好,但并发很难,所以你应该考虑一下自己。并测试它。
重要的是这些更新只能访问一个文档 - 我依赖于一个文档的插入/更新是原子的。
如果我们略微调整wdberkeley的answer,这可以在单个原子操作中完成。如果我们在unique index字段上创建guid
并使用此查询:
db.guids.replaceOne({guid : newDoc.guid, ts : {$lte : newDoc.ts}}, newDoc, {upsert:true});
现在这三个案例的工作原理如下:
guid
相同的newDoc
文件 - 插入newDoc
oldDoc
与guid
和newDoc
相同的oldDoc.ts < newDoc.ts
- newDoc
覆盖oldDoc
oldDoc
与guid
和newDoc
具有相同的oldDoc.ts > newDoc.ts
- 插入(upsert)将失败,我们可以忽略此错误:.
WriteError({
"index" : 0,
"code" : 11000,
"errmsg" : "E11000 duplicate key error collection: test.guids index: guid_1 dup key: { : 1.0 }",
"op" : {
"q" : {
"guid" : 1,
"ts" : {
"$lte" : 14
}
},
"u" : {
"guid" : 1,
"ts" : 14,
"data" : "asdf"
},
"multi" : false,
"upsert" : true
}
})