我有一堆事件数据,由
uid
和 timestamp
标识。 我有许多聚合管道,它们将计算每个用户的统计信息。 如果我一次运行一个,这些就可以工作,但我不知道如何在 $group
内运行它们。
所以这段代码有效:
db.Events.aggregate([
{"$match":{"uid":"S001"}},
{"$facet":{
"var1":[pipeline1...],
"var2":[pipeline2...]
}}
];
但是,我想循环用户,所以我想要更多类似的东西:
db.Events.aggregate([
{ "$group": {
"_id": {"uid":"$uid"},
"timestamp":{"$last":"$timestamp"},
"data": { "$facet":{
"var1":[pipeline1...],
"var2":[pipeline2...]
}}
}}
])
但是我收到错误
unknown group operator '$facet'
。
我正在寻找的是如下输出:
{
uid:"S001",
timestamp:"time 1",
data: { ...}
},
{
uid:"S002",
timestamp:"time 2",
data: { ...}
},
...
如何在
$facet
内部运行 $group
。
更新:这是一个更完整的示例。
首先是一些输入:
{
"uid": 1,
"timestamp": 1000000000,
"verb": "initialized",
"object": "game level",
"data":{},
},
{
"uid": 1,
"timestamp": 1000000015,
"verb": "initialized",
"object": "game level",
"data":{},
},
{
"uid": 1,
"timestamp": 1000000031,
"verb": "passed",
"object": "game level",
"data":{},
},
{
"uid": 2,
"timestamp": 1000000000,
"verb": "initialized",
"object": "game level",
"data":{},
},
{
"uid": 2,
"timestamp": 1000000024,
"verb": "passed",
"object": "game level",
"data":{},
}
这是我的目标管道:
db.collection.aggregate([
{ "$group": {
"_id": {"uid":"$uid"},
"uid": {"$first":"$uid"},
"timestamp":{"$last":"$timestamp"},
"data": { "$facet":{
"NumberAttempts": [attempt_filter, attempt_map, attempt_sum],
"time": [leveltime_filter, leveltime_map, leveltime_reduce]
}}
]);
其中
filter
阶段是对 $match
的调用,map
阶段是对 $project
的调用,而 reduce
阶段是对 $group
的调用,以累积匹配的事件。 管道中各个阶段的完整定义可以在以下位置找到:
https://mongoplayground.net/p/Cae0DkO8o0g
(不要太在意确切的定义,我的真实示例有更多不同的管道)。
我想要的输出是这样的:
{
"data": {
"NumberAttempts": 2,
"start_time": 1e+09,
"total_time": 31
},
"timestamp": 1.000000031e+09,
"uid": 1
},
{
"data": {
"NumberAttempts": 1,
"start_time": 1e+09,
"total_time": 24
},
"timestamp": 1.000000024e+09,
"uid": 2
}
每个
uid
我都有一条记录,其中包含摘要统计数据。 如果我首先匹配 uid
然后运行构面操作,我可以一次为一个用户执行此操作。 [这在 mongoplayground 链接中不太有效。 问题是我需要一个 total_time
的自定义聚合器,而且我不太清楚如何让它在操场上工作。] 我想要做的是在用户上迭代此代码,在我看来这就是 $group
或 $bucket
运算符。
我不是正在寻找一种巧妙的方法来使用组累加器函数重写这些特定的管道。 我希望我的中小企业能够指定他们感兴趣的事件、这些事件中的哪些字段相关并选择一个累加器,从而构建自定义的映射减少链。
不确定我们正在谈论多少数据,但如果不是数百万行,您始终可以创建一个数组,然后轻松地对其进行迭代:
db.foo.aggregate([
{ "$group": {
"_id": "$uid",
"uid": {"$first":"$uid"},
"timestamp":{"$last":"$timestamp"},
"X": {$push: '$$CURRENT'}
}},
{$project: {
"uid": true,
"timestamp": true,
"data": {
"NumberAttempts": {$reduce: {
input: "$X",
initialValue: 0,
in:{$cond: {
// Conditions to satisfy NumberAttempts:
if: {$and: [
{$eq:['$$this.verb','initialized']}
,{$eq:['$$this.object','game level']}
]},
then: {$add:['$$value', 1]} ,
else: '$$value' // else return $$value unchanged
}}
}},
'start_time': "$timestamp", // redundant...?
'total_time': {$reduce: {
input: "$X",
initialValue: 0,
in:{$cond: {
// Conditions to satisfy total_time:
if: {$and: [
{$in:["$$this.verb", ["initialized","resumed",
"suspended","exited","passed"]]},
,{$eq:['$$this.object', 'game level']}
]},
then: {$add:['$$value', 22]} , // proxy for safe_sum_acc
else: '$$value'
}}
}}
}
}}
]);
我刚刚修改了计算
NumberAttempts
和 total_time
的函数。
如果每个
uid
只有几个条目,您可能需要考虑消除服务器端的分组并让客户端处理它。 在服务器上检查的记录数量相同,计算压力显着降低,但代价是通过网络传输的数据量增加了 X 倍。