InfluxDB Flux join 系列

问题描述 投票:0回答:2

我在 influxdb 中有以下数据

server,operation=ADD queryMs=7.9810 1620608972904452000
server,operation=GET queryMs=12.2430 1620608972909339200
server,operation=UPDATE queryMs=11.5780 1620608972909655400
server,operation=ADD queryMs=11.2460 1620608972910445700
server,operation=GET queryMs=15.0620 1620608972911305000
etc...

所以在我的图表中我看到三个系列 enter image description here

我想实现所有

operation
的一系列。

我尝试过

|> group(columns: ["_field"])
,这就是我需要的,但是查询速度非常慢!

from(bucket: "initial")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "server")
  |> filter(fn: (r) => r["_field"] == "queryMs")
  |> group(columns: ["_field"])
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
  |> yield(name: "mean")

enter image description here 有什么快速解决我的问题的方法吗?

influxdb influxdb-2
2个回答
4
投票

这样效果更快

union(tables: [
  from(bucket: "initial")
    |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
    |> filter(fn: (r) => r["_measurement"] == "server")
    |> filter(fn: (r) => r["_field"] == "queryMs")
    |> filter(fn: (r) => r["operation"] == "GET")
    |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false),
  from(bucket: "initial")
    |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
    |> filter(fn: (r) => r["_measurement"] == "server")
    |> filter(fn: (r) => r["_field"] == "queryMs")
    |> filter(fn: (r) => r["operation"] == "ADD")
    |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false),
  from(bucket: "initial")
    |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
    |> filter(fn: (r) => r["_measurement"] == "server")
    |> filter(fn: (r) => r["_field"] == "queryMs")
    |> filter(fn: (r) => r["operation"] == "UPDATE")
    |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false),
  ])
  |> drop(columns:["operation"])
  |> sort(columns: ["_time"], desc: false)
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
  |> yield(name: "mean")

0
投票

虽然这是一个旧线程,但在可以使用适当的过滤器时使用

union
对于遇到第一个答案的人来说没有帮助。

from(bucket: "initial")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop
  |> filter(fn: (r) => r["_measurement"] == "server")
  |> filter(fn: (r) => r["_field"] == "queryMs")
  |> filter(fn: (r) => r["operation"] == "GET" or
    r["operation"] == "ADD" or
    r["operation"] == "UPDATE")
  |> drop(columns:["operation"])
  |> sort(columns: ["_time"], desc: false)
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
  |> yield(name: "mean")
© www.soinside.com 2019 - 2024. All rights reserved.