使用窗口函数使用influxDB 2.7.6时的性能

问题描述 投票:0回答:1

我正在使用 influxdb 2.7.6,并创建了一个名为 cgm_glucose_history 的测量。我添加了一个名为device_sn的标签,一个名为glucose的字段,以及一个名为device_time的字段来记录血糖生成的确切时间。在此测量中,每个 device_sn 每分钟生成一个葡萄糖值。

我使用java客户端保存数据

List<Point> pointList = new ArrayList<>();
        for (CgmGlucose cgmGlucose : list) {
            // Time alignment to minutes, 00:00,00:01,00:2,00:3...
            long time = cgmGlucose.getDeviceTime() - cgmGlucose.getDeviceTime() % 1000L;
            Point point = Point.measurement(MEASUREMENT_CGM_GLUCOSE_HISTORY)
                    .time(time, WritePrecision.MS)
                    .addTag("patient_code", cgmGlucose.getPatientCode())
                    .addTag("device_sn", cgmGlucose.getDeviceSn())
                    .addField("glucose", cgmGlucose.getGlucose())
                    .addField("device_time", cgmGlucose.getDeviceTime());
            pointList.add(point);
            if (pointList.size() >= 1000) {
                writeApi.writePoints(InfluxConfig.getBucket(), InfluxConfig.getOrg(), pointList);
                pointList.clear();
            }
        }
        if (!pointList.isEmpty()) {
            writeApi.writePoints(InfluxConfig.getBucket(), InfluxConfig.getOrg(), pointList);
            pointList.clear();
        }
        writeApi.close();

我现在需要查询特定device_sn第一次发生持续高血糖事件(如果有)的时间。持续高血糖定义为血糖值大于13.9并持续超过两个小时。我使用了window和reduce方法,代码如下:

import "interpolate"
from(bucket: "cdm_dm")
  |> range(start: 1718679994)
  |> filter(fn: (r) => r["_measurement"] == "cgm_glucose_history")
  |> filter(fn: (r) => r["device_sn"] == "TT22222AN2")
  |> filter(fn: (r) => r["_field"] == "glucose" or r["_field"] == "device_time")
  |> map(fn: (r) => ({r with _value: float(v: r._value)}))
  |> interpolate.linear(every: 1m)
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> window(every: 1m, period: 122m)
  // The core logic is to count the points where the glucose value is greater than 13.9 within two hours.
  |> reduce(fn:(r, accumulator) => ({
  count: if r.glucose > 13.9 then accumulator.count+1 else 0,
  event_start_time:if r.glucose > 13.9  then r.device_time else 0.0,
  glucose:if (r.glucose > 13.9 and accumulator.count==121) then r.glucose else  if (r.glucose > 13.9 and accumulator.count < 121)
  then accumulator.glucose else 0.0 
  }),identity:{count:0,event_start_time:0.0,glucose:0.0})
  |> duplicate(column: "_start", as: "_time")
  |> window(every: inf)
  |> filter(fn: (r) => r["count"] == 122)
  |> limit(n:1)

但是在实际过程中,我发现执行时间特别长。测试数据很少,执行需要30多秒才能得到结果。我的代码逻辑有问题吗? influx 有没有更好的方法来实现这个要求?

我对如何优化这段代码或数据结构没有进一步的想法

performance search reduce influxdb influxdb-2
1个回答
0
投票

我发现了一个新函数可以满足这个需求,stateDuration

from(bucket: "cdm_dm")
  |> range(start: 1718679994)
  |> filter(fn: (r) => r["_measurement"] == "cgm_glucose_history_microtechmd2")
  |> filter(fn: (r) => r["device_sn"] == "TT22222AN2")
  |> filter(fn: (r) => r["_field"] == "glucose")
  |> stateDuration(fn: (r) => r._value > 13.9)
  |> filter(fn:(r) => r["stateDuration"] > 120*60)
  |> limit(n:1)
© www.soinside.com 2019 - 2024. All rights reserved.