Influxdb Flux 查询自定义窗口聚合函数

问题描述 投票:0回答:2

您能否帮助我使用 InfluxDB 2 Flux 查询语法来构建带有自定义聚合函数的窗口查询。

我浏览了在线文档,但它们似乎缺乏有关如何从自定义聚合函数中获取实际窗口内容(第一条、最后一条记录)的示例。它也没有立即描述自定义函数的预期签名。

我想构建一个带有滑动窗口的查询,该窗口会在窗口中的第一个值和最后一个值之间产生差异。沿着这些思路:

difference = (column, tables=<-) => ({ tables.last() - tables.first() })

from(bucket: "my-bucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "simple")
  |> filter(fn: (r) => r["_field"] == "value")
  |> aggregateWindow(every: 1mo, fn: difference, column: "_value", timeSrc: "_stop", timeDst: "_time", createEmpty: true)
  |> yield(name: "diff")

上面示例的语法显然是错误的,但希望您能理解我想要做什么。

谢谢!

influxdb influxdb-2
2个回答
2
投票

想出了以下内容。它至少在语法上有效:

from(bucket: "my-bucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "simple")
  |> filter(fn: (r) => r["_field"] == "value")
  |> aggregateWindow(
      every: 1mo, 
      fn: (column, tables=<-) => tables |> reduce(
            identity: {first: -1.0, last: -1.0, diff: -1.0},
            fn: (r, acc) => ({
                first:
                    if acc.first < 0.0 then r._value
                    else acc.first,
                last:
                    r._value,
                diff:
                    if acc.first < 0.0 then 0.0
                    else (acc.last - acc.first)
            })
          )
          |> drop(columns: ["first", "last"])
          |> set(key: "_field", value: column)
          |> rename(columns: {diff: "_value"})
      )
  |> yield(name: "diff")

窗户并不是真的可以滑动。

推拉窗也一样:

from(bucket: "my-bucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "simple")
  |> filter(fn: (r) => r["_field"] == "value")
  |> window(every: 1h, period: 1mo)
  |> reduce(
    identity: {first: -1.0, last: -1.0, diff: -1.0},
    fn: (r, acc) => ({
        first:
            if acc.first < 0.0 then r._value
            else acc.first,
        last:
            r._value,
        diff:
            if acc.first < 0.0 then 0.0
            else (acc.last - acc.first)
    })
  )
  |> duplicate(column: "_stop", as: "_time")
  |> drop(columns: ["first", "last"])
  |> rename(columns: {diff: "_value"})
  |> window(every: inf)

0
投票

我遇到了类似的情况,我需要一个自定义聚合函数与

aggregateWindow()
一起使用,并且无法从流入文档中理解任何意义,并且互联网上没有任何帮助。我在这里发布了一些示例通用工作代码,这与您的代码不完全相关,但可能对其他人有帮助作为参考。据我所知,这可能是整个互联网上唯一完整的示例🤦。

注意几件事:

  1. 自定义 agg 函数之外的所有内容都正常。
  2. 为了使 agg 函数正常工作,您必须提供默认值
    , column="_value"
    1. 否则你会得到错误
      missing required argument column (argument fn)
import "generate"

example = generate.from(
    count: 100,
    fn: (n) => (n + 1),
    start: 2021-01-01T00:00:00Z,
    stop: 2022-01-01T00:00:00Z,
)

alt_sum = (tables=<-, column="_value") => tables
  |> reduce(
        fn: (r, accumulator) => (
        {
            sum: (r._value) + accumulator.sum
        }),
        
        identity: {sum: 0},
    )
  // optional: 
  |> map(fn: (r) => ({r with _value: r.sum}))

example
  |> range(start: 2021-01-01T00:00:00Z, stop: 2022-01-01T00:00:00Z)
  |> aggregateWindow(every: 1mo, fn: alt_sum, createEmpty: false)

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.