我正在使用 influxdb 2.7.6,并创建了一个名为 cgm_glucose_history 的测量。我添加了一个名为device_sn的标签,一个名为glucose的字段,以及一个名为device_time的字段来记录血糖生成的确切时间。在此测量中,每个 device_sn 每分钟生成一个葡萄糖值。
我使用java客户端保存数据
List<Point> pointList = new ArrayList<>();
for (CgmGlucose cgmGlucose : list) {
// Time alignment to minutes, 00:00,00:01,00:2,00:3...
long time = cgmGlucose.getDeviceTime() - cgmGlucose.getDeviceTime() % 1000L;
Point point = Point.measurement(MEASUREMENT_CGM_GLUCOSE_HISTORY)
.time(time, WritePrecision.MS)
.addTag("patient_code", cgmGlucose.getPatientCode())
.addTag("device_sn", cgmGlucose.getDeviceSn())
.addField("glucose", cgmGlucose.getGlucose())
.addField("device_time", cgmGlucose.getDeviceTime());
pointList.add(point);
if (pointList.size() >= 1000) {
writeApi.writePoints(InfluxConfig.getBucket(), InfluxConfig.getOrg(), pointList);
pointList.clear();
}
}
if (!pointList.isEmpty()) {
writeApi.writePoints(InfluxConfig.getBucket(), InfluxConfig.getOrg(), pointList);
pointList.clear();
}
writeApi.close();
我现在需要查询特定device_sn第一次发生持续高血糖事件(如果有)的时间。持续高血糖定义为血糖值大于13.9并持续超过两个小时。我使用了window和reduce方法,代码如下:
import "interpolate"
from(bucket: "cdm_dm")
|> range(start: 1718679994)
|> filter(fn: (r) => r["_measurement"] == "cgm_glucose_history")
|> filter(fn: (r) => r["device_sn"] == "TT22222AN2")
|> filter(fn: (r) => r["_field"] == "glucose" or r["_field"] == "device_time")
|> map(fn: (r) => ({r with _value: float(v: r._value)}))
|> interpolate.linear(every: 1m)
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> window(every: 1m, period: 122m)
// The core logic is to count the points where the glucose value is greater than 13.9 within two hours.
|> reduce(fn:(r, accumulator) => ({
count: if r.glucose > 13.9 then accumulator.count+1 else 0,
event_start_time:if r.glucose > 13.9 then r.device_time else 0.0,
glucose:if (r.glucose > 13.9 and accumulator.count==121) then r.glucose else if (r.glucose > 13.9 and accumulator.count < 121)
then accumulator.glucose else 0.0
}),identity:{count:0,event_start_time:0.0,glucose:0.0})
|> duplicate(column: "_start", as: "_time")
|> window(every: inf)
|> filter(fn: (r) => r["count"] == 122)
|> limit(n:1)
但是在实际过程中,我发现执行时间特别长。测试数据很少,执行需要30多秒才能得到结果。我的代码逻辑有问题吗? influx 有没有更好的方法来实现这个要求?
我对如何优化这段代码或数据结构没有进一步的想法
我发现了一个新函数可以满足这个需求,stateDuration
from(bucket: "cdm_dm")
|> range(start: 1718679994)
|> filter(fn: (r) => r["_measurement"] == "cgm_glucose_history_microtechmd2")
|> filter(fn: (r) => r["device_sn"] == "TT22222AN2")
|> filter(fn: (r) => r["_field"] == "glucose")
|> stateDuration(fn: (r) => r._value > 13.9)
|> filter(fn:(r) => r["stateDuration"] > 120*60)
|> limit(n:1)