我有两个 mongodb 集合: 1).第一个合集是:
sourcequeuemanualupload
。它包含状态为未处理或已分配的所有记录。这包含上传数据时的记录。数据库中分配项目记录之一的示例:
{
"_id" : ObjectId("63e0e46a6047d75b9c20d8ec"),
"Properties: Name" : "Hangman - Guess Words",
"Appstore URL" : "https://itunes.apple.com/app/id1375993101?hl=None",
"Region" : "na",
"Create Date" : "na",
"AHT" : "1",
"sourceId" : "63e0e3719b4f812ba5333a31",
"type" : "Manual",
"uploadTime" : "2023-02-06T11:28:42.533+0000",
"status" : "Assigned",
"batchId" : "63e0e3719b4f812ba5333a31_746f22e4319b4d81b8ab255f5e653c2c_612023112842"
}
2).第二个收藏是
queuedata
。它包含所处理项目(来自 sourcequeuemanualupload
集合)的数据(问题、用户对工具的响应),它们都保存在该集合中,状态为 Completed
。 sourcequeuemanualupload
中记录的_id作为标识符存储在这里作为“id”字段。完成项目示例:
{
"_id" : ObjectId("63e0e4b19b4f812ba5333a34"),
"templateId" : "63e0e28e9b4f812ba5333a30",
"id" : "63e0e46a6047d75b9c20d8ec",
"moderator" : "kodaga",
"startTime" : "2023-02-06T11:29:46.048Z",
"endTime" : "2023-02-06T11:29:52.438Z",
"status" : "Completed",
"AHT" : NumberLong(6),
"userInput" : [
{
"question" : "Is the URL leading to the desired store page link?",
"response" : "yes"
},
{
"question" : "Comments, if any.",
"response" : "test 1"
}
]
}
DBA 最初犯了一个错误,因为他没有将已完成的已分配项目的状态更新为
Completed
在sourcequeuemanualupload
集合中。因此,已分配项目本身仍保留为已分配。我们正在为此进行修复,以将所有已分配的项目(已处理)标记为已完成。
当前数据量:
> db.sourcequeuemanualupload.count()
414781
> db.sourcequeuemanualupload.count({"status":"Assigned"})
306418
> db.queuedata.count()
298128
我目前编写的用于识别已完成记录并将其标记为已完成的脚本:
var assigned_tasks_arr = [];
db.sourcequeuemanualupload.find({ status: "Assigned" }).forEach(function (rec) {
assigned_tasks_arr.push(rec._id.str);
});
print(assigned_tasks_arr.length)
> 306418
db.queuedata.count({ id: { $in: assigned_tasks_arr }, status: "Completed" }); // <------------- STEP 1
var completed_items = db.queuedata
.find(
{ id: { $in: assigned_tasks_arr }, status: "Completed" },
{ _id: 0, id: 1 }
)
.toArray(); // <------------- STEP 2
var completed_items_ids = [];
completed_items.forEach(function (rec) {
completed_items_ids.push(rec.id);
});
var completed_items_unique_objectid = [];
completed_items.forEach(function (item) {
completed_items_unique_objectid.push(new ObjectId(item));
});
db.sourcequeuemanualupload.updateMany(
{ _id: { $in: completed_items_unique_objectid } },
{ $set: { status: "Completed" } }
); // <------------- STEP 3
基本上,我从
Assigned
集合中获取 sourcequeuemanualupload
状态项的所有 _id,并将它们存储在一个数组中。接下来要查找它们是否确实已经完成并且记录是否存在于队列数据中,我使用 $in 和 assigned_tasks_arr 执行搜索以获取实际完成的项目。一旦我得到完整的 ids 列表,我想继续更新 sourcequeuemanualupload 中的状态。
但是,我的第 1 步和第 2 步执行时间超过 30 分钟。它必须花费一个多小时或更长时间(但我无法获得确切时间,因为我的会话已终止)。我认为 $in with array find 查询执行需要 (N*log(M)),其中 N 是输入数组的长度,M 是集合的大小。如您所见,我的 N 值以百万为单位,M 也是以百万为单位,此查询将花费更长的时间。我如何优化此查询或编写更快的查询来查找 ID 并立即更新?
我需要帮助以更快地执行上述脚本中的第 1 步、第 2 步和第 3 步,这样我就不必等待数小时。我们可以在这里写一个更好的连接查询或任何其他方式来优化查询吗?
谢谢
我尝试在线搜索更快的优化方法。我将索引作为“id”添加到队列数据集合中,但它仍然需要很长时间:( 需要上述查询的帮助
注意:使用 AWS documentDB 4.0.0 就像您在查询中提供任何建议一样,请检查链接:https://docs.aws.amazon.com/documentdb/latest/developerguide/mongo-apis.html 作为一些不支持命令
我会这样尝试:
var completed_ids = []
db.sourcequeuemanualupload.aggregate([
{ $match: { status: "Assigned" } },
{
$lookup: {
from: "queuedata",
let: { id: "$_id" },
pipeline: [
{ $mach: { status: "Completed" } },
{ $match: { $expr: { $eq: ["$$id", { $toObjectId: "$id" }] } } }
],
as: "queuedata"
}
},
{ $match: { queuedata: { $ne: [] } } },
{ $project: { _id: 1 } }
]).forEach(rec => {
completed_ids.push(rec._id);
if (completed_ids.length > 10000) {
db.sourcequeuemanualupload.updateMany(
{ _id: { $in: completed_ids } },
{ $set: { status: "Completed" } }
);
completed_ids = [];
}
})
if (completed_ids.length > 0) {
db.sourcequeuemanualupload.updateMany(
{ _id: { $in: completed_ids } },
{ $set: { status: "Completed" } }
);
}
MongoDB 4.0 版生命终结,您可以寻找现代的 MongoDB 托管服务。
另一个非常简单的方法是这个:
db.queuedata.aggregate([
{ $match: { status: "Completed" } },
{
$project: {
_id: { $toObjectId: "$id" },
status: "Completed"
}
},
{
$merge: {
into: "sourcequeuemanualupload",
whenMatched: "merge",
whenNotMatched: "discard"
}
}
])
但这将更新
sourcequeuemanualupload
中的所有文档,无论status: 'Assigned'
与否。
一个解决方案是这个:
var completed_ids = [];
db.queuedata.aggregate([
{ $match: { status: "Completed" } },
{
$project: {
_id: { $toObjectId: "$id" },
status: "Completed"
}
}
]).forEach(rec => {
completed_ids.push(rec._id);
if (completed_ids.length > 10000) {
db.sourcequeuemanualupload.updateMany(
{
_id: { $in: completed_ids },
status: "Assigned"
},
{ $set: { status: "Completed" } }
);
completed_ids = [];
}
})
if (completed_ids.length > 0) {
db.sourcequeuemanualupload.updateMany(
{
_id: { $in: completed_ids },
status: "Assigned"
},
{ $set: { status: "Completed" } }
);
}
当您要修复数据时,
id
应转换为 ObjectId
而不是纯字符串。