我想将文档更新插入(如果不存在匹配的 Id 则插入,如果存在匹配的 Id 则更新)文档到现有集合中,但需要注意的是,存在 Version 整数属性,并且集合中已保留的内容是否具有版本高于我们尝试更新插入的版本,则更新插入不会执行任何操作。
给出这样的测试文档定义:
public record TestDocument(string Id, int Version, string SomeDataString);
我们应该期待:
由于 Id 被强制为唯一,因此我可以通过使用匹配 Id 和版本小于新文档版本的过滤器进行更新来以丑陋的方式获得此功能,然后在尝试将冲突文档插入到中时捕获异常持久化文档是相同或更高版本的情况。
var idMatchFilter = Builders<TestDocument>.Filter.Eq(e => e.Id, upsertDocument.Id);
var versionFilter = Builders<TestDocument>.Filter.Lt(e => e.Version, upsertDocument.Version);
var combinedFilter = Builders<TestDocument>.Filter.And(idMatchFilter, versionFilter);
var replaceOptions = new ReplaceOptions() { IsUpsert = true };
try
{
await testCollection.ReplaceOneAsync(combinedFilter, upsertDocument, replaceOptions);
}
catch (MongoWriteException mongoWriteException) when (mongoWriteException.Message.Contains("A write operation resulted in an error. WriteError: { Category : \"DuplicateKey\", Code : 11000, Message : \"E11000 duplicate key error collection"))
{
// expected possible error when we attempt to insert an older document - ideally this would be a no-op instead of attempted write
}
但是,由于实际用例是批量更新插入(例如 1000 个文档)和一个更复杂的文档(不是我们可以只设置一两个属性的问题,我们需要替换文档),那么这并不如果可行的话,这似乎是一个很好的方法。它也针对 Azure Cosmos DB,尽管我希望这不会有意义,并且任何针对普通 mongodb 的东西也应该在这里工作。
这是一个 xunit 测试,其中包括我尝试过的一些方法以及到目前为止它们是如何失败的。请注意,正如所写,它通过了,但这只是因为我们捕获了 MongoWriteException,并且目标是进行一个仅无操作的操作,而不是尝试写入旧版本文档。
[Theory]
// insert, nothing persisted yet
[InlineData(null, 1, true)]
// update, persisted version is older
[InlineData(1, 2, true)]
// update, persisted version is same
[InlineData(2, 2, false)]
// update, persisted version is newer
[InlineData(2, 1, false)]
public async Task ConditionalUpsert(int? persistedVersion, int upsertVersion, bool shouldUpdate)
{
// arrange
var testCollection = GetTestCollection();
var emptyFilter = Builders<TestDocument>.Filter.Empty;
await testCollection.DeleteManyAsync(emptyFilter);
if (persistedVersion.HasValue)
{
// seed the collection with expected version
var testDocument = new TestDocument("Foo", persistedVersion.Value, "persisted document dummy payload");
await testCollection.InsertOneAsync(testDocument);
}
var upsertDocument = new TestDocument("Foo", upsertVersion, "new document dummy payload");
// act
var idMatchFilter = Builders<TestDocument>.Filter.Eq(e => e.Id, upsertDocument.Id);
var versionFilter = Builders<TestDocument>.Filter.Lt(e => e.Version, upsertDocument.Version);
var combinedFilter = Builders<TestDocument>.Filter.And(idMatchFilter, versionFilter);
// incorrectly updates the persisted document when trying to write older version
//var findOneAndReplaceOptions = new FindOneAndReplaceOptions<TestDocument> { IsUpsert = true };
//await testCollection.FindOneAndReplaceAsync(idMatchFilter, upsertDocument, findOneAndReplaceOptions);
// incorrectly tries to insert new document insert of no-op when upserting an old version
// Command findAndModify failed: E11000 duplicate key error collection: SomeDatabase.SomeCollection. Failed _id or unique index constraint
//var findOneAndReplaceOptions = new FindOneAndReplaceOptions<TestDocument> { IsUpsert = true };
//await testCollection.FindOneAndReplaceAsync(combinedFilter, upsertDocument, findOneAndReplaceOptions);
var replaceOptions = new ReplaceOptions() { IsUpsert = true };
try
{
// incorrectly tries to insert new document with older version since the specified filter doesn't find the persisted version if it was newer
await testCollection.ReplaceOneAsync(combinedFilter, upsertDocument, replaceOptions);
}
catch (MongoWriteException mongoWriteException) when (mongoWriteException.Message.Contains("A write operation resulted in an error. WriteError: { Category : \"DuplicateKey\", Code : 11000, Message : \"E11000 duplicate key error collection"))
{
// expected possible error when we attempt to insert an older document - ideally this would be a no-op instead of attempted write
}
// assert
var allDocuments = await testCollection.Find(emptyFilter).ToListAsync();
var queriedDocument = Assert.Single(allDocuments);
var expectedVersion = shouldUpdate ? upsertVersion : persistedVersion;
Assert.Equal(expectedVersion, queriedDocument.Version);
}
好消息是,您可以通过结合使用更新插入和聚合管道更新来解决此问题。作为一个大纲,您将过滤 id 并在聚合管道中执行版本比较。
在普通 MQL 中,此语句可以实现此目的,并且也适用于复杂文档:
db.collection.update({
_id: 1
},
[
{
$set: {
versions: {
$sortArray: {
input: [
"$$ROOT",
{
_id: 1,
"version": 4,
"prop": "new-value"
}
],
sortBy: {
"version": -1
}
}
}
}
},
{
$replaceWith: {
$first: "$versions"
}
}
],
{
upsert: true
})
您可以使用这个playground来测试它。
坏消息是,这个语句不能用驱动程序的方法来描述。虽然支持
$set
阶段本身以及 $replaceWith
阶段,但不支持对 $$ROOT
的引用(至少据我所知)。
以下代码可以作为起点 - 测试并根据您的需要进行调整。首先,创建一个包含版本列表的帮助记录/类:
private record TestDocumentWithVersions(string Id, int Version, string SomeDataString, List<TestDocument> Versions)
: TestDocument(Id, Version, SomeDataString);
您可以像这样执行更新:
var newVersion = doc.ToJson();
var filter = Builders<TestDocument>.Filter.Eq(d => d.Id, doc.Id);
var stage = BsonDocument.Parse("""
{
$set: {
Versions: {
$sortArray: {
input: [
"$$ROOT",
%%NEW_VERSION%%
],
sortBy: {
"Version": -1
}
}
}
}
}
""".Replace("%%NEW_VERSION%%", newVersion));
var pipeline = new EmptyPipelineDefinition<TestDocument>()
.AppendStage<TestDocument, TestDocument, TestDocumentWithVersions>(stage)
.ReplaceWith(x => x.Versions.First());
var update = Builders<TestDocument>.Update.Pipeline(pipeline);
var result = await _collection.UpdateOneAsync(
filter,
update,
new UpdateOptions
{
IsUpsert = true
});
代码首先使用
ToJson
方法将文档序列化为字符串;之后,它从 BSON 字符串中解析 $set
阶段(注意插入序列化文档的 Replace
)。
最后,它设置一个聚合管道,首先将现有文档和新文档插入到一个数组中,并按 Version
属性按降序对数组进行排序。第二阶段将文档替换为数组中的第一个文档。
如上所述,此代码是一个示例;测试并根据您的需要进行调整。
替代方法:您也可以使用
$set
来设置它,而不是从字符串中解析 BsonDocument
阶段。对于示例,我选择了字符串以获得更好的可读性。