您能否帮助我使用 InfluxDB 2 Flux 查询语法来构建带有自定义聚合函数的窗口查询。
我浏览了在线文档,但它们似乎缺乏有关如何从自定义聚合函数中获取实际窗口内容(第一条、最后一条记录)的示例。它也没有立即描述自定义函数的预期签名。
我想构建一个带有滑动窗口的查询,该窗口会在窗口中的第一个值和最后一个值之间产生差异。沿着这些思路:
difference = (column, tables=<-) => ({ tables.last() - tables.first() })
from(bucket: "my-bucket")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "simple")
|> filter(fn: (r) => r["_field"] == "value")
|> aggregateWindow(every: 1mo, fn: difference, column: "_value", timeSrc: "_stop", timeDst: "_time", createEmpty: true)
|> yield(name: "diff")
上面示例的语法显然是错误的,但希望您能理解我想要做什么。
谢谢!
想出了以下内容。它至少在语法上有效:
from(bucket: "my-bucket")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "simple")
|> filter(fn: (r) => r["_field"] == "value")
|> aggregateWindow(
every: 1mo,
fn: (column, tables=<-) => tables |> reduce(
identity: {first: -1.0, last: -1.0, diff: -1.0},
fn: (r, acc) => ({
first:
if acc.first < 0.0 then r._value
else acc.first,
last:
r._value,
diff:
if acc.first < 0.0 then 0.0
else (acc.last - acc.first)
})
)
|> drop(columns: ["first", "last"])
|> set(key: "_field", value: column)
|> rename(columns: {diff: "_value"})
)
|> yield(name: "diff")
窗户并不是真的可以滑动。
推拉窗也一样:
from(bucket: "my-bucket")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "simple")
|> filter(fn: (r) => r["_field"] == "value")
|> window(every: 1h, period: 1mo)
|> reduce(
identity: {first: -1.0, last: -1.0, diff: -1.0},
fn: (r, acc) => ({
first:
if acc.first < 0.0 then r._value
else acc.first,
last:
r._value,
diff:
if acc.first < 0.0 then 0.0
else (acc.last - acc.first)
})
)
|> duplicate(column: "_stop", as: "_time")
|> drop(columns: ["first", "last"])
|> rename(columns: {diff: "_value"})
|> window(every: inf)
我遇到了类似的情况,我需要一个自定义聚合函数与
aggregateWindow()
一起使用,并且无法从流入文档中理解任何意义,并且互联网上没有任何帮助。我在这里发布了一些示例通用工作代码,这与您的代码不完全相关,但可能对其他人有帮助作为参考。据我所知,这可能是整个互联网上唯一完整的示例🤦。
注意几件事:
, column="_value"
missing required argument column (argument fn)
import "generate"
example = generate.from(
count: 100,
fn: (n) => (n + 1),
start: 2021-01-01T00:00:00Z,
stop: 2022-01-01T00:00:00Z,
)
alt_sum = (tables=<-, column="_value") => tables
|> reduce(
fn: (r, accumulator) => (
{
sum: (r._value) + accumulator.sum
}),
identity: {sum: 0},
)
// optional:
|> map(fn: (r) => ({r with _value: r.sum}))
example
|> range(start: 2021-01-01T00:00:00Z, stop: 2022-01-01T00:00:00Z)
|> aggregateWindow(every: 1mo, fn: alt_sum, createEmpty: false)