流分析 - 仅在更改时输出设备消息

问题描述 投票:0回答:1

我目前正在设置一项作业,将多个设备的基于 IoT 设备的事件中心数据(JSON 负载)转发到数据库。

该作业现在转发到一个 SQL 表,这很好(简单的部分),但是我只对通过流分析转发数据感兴趣,如果自通过该特定设备接收到的最后一个样本以来有效负载数据发生了变化流分析。

显然,在数据库中执行此操作效率低下,应在执行 SQL 之前在数据处理逻辑中进行处理。

在 JSON 有效负载(下面的示例)中,我们有一个时间戳值,在此比较中需要忽略该值,因为主要重点是确定自上次采样以来 6 个通道值 (INT) 是否已更改。

  {
    "pid": 51387408,
    "device_id": "WRL11111111",
    "channels": {
      "timestamp": 1713019679,
      "chd1_value": 0,
      "chd2_value": 0,
      "chd3_value": 0,
      "chd4_value": 0,
      "chd5_value": 0,
      "chd6_value": 0
    }
  }

我尝试了流分析查询的多次迭代,尝试使用 LAG 函数执行此比较,但是在线找到的示例并不专注于特定于设备的比较。

诸如以下的查询会引发错误,例如:“LAG”不允许“Order By”子句

WITH PreviousEvent AS (
    SELECT 
        device_id,
        device_data,
        LAG(device_data) OVER (PARTITION BY device_id ORDER BY EventEnqueuedUtcTime) AS previous_data
    FROM 
        device_stream
)
SELECT 
    device_id,
    device_data,
    previous_data,
    CASE WHEN device_data != previous_data THEN 'Value changed' ELSE 'Value unchanged' END AS change_status
FROM 
    PreviousEvent

我确信这是流分析查询的常见要求,但令我惊讶的是我找不到任何具体示例来清楚地说明如何实现这一点,任何建议或指导表示赞赏。

azure-iot-hub azure-stream-analytics azure-sql-managed-instance
1个回答
0
投票
  • 在流分析中,由于它不支持直接在查询级别使用

    ORDER BY
    子句,请参考此SO。LAG
    中不支持
    ORDER BY

  • 在 Azure 流分析中使用 LAG

  • 使用 Azure Stream 中的 CollectTop 函数Analytics

WITH PreviousEvent AS  (

SELECT

IoTHub.ConnectionDeviceId AS device_id,

CurrentTemperature AS current_temperature,

PreviousTemperature AS previous_temperature,

EventEnqueuedUtcTime,

-- CollectTop to get the previous event based on device_id and EventEnqueuedUtcTime

CollectTop(1)  OVER  (

PARTITION  BY IoTHub.ConnectionDeviceId

ORDER  BY EventEnqueuedUtcTime ASC

LIMIT  DURATION(hour, 1)

)  AS previous_event

FROM

[certificate122ravi89]

TIMESTAMP  BY EventEnqueuedUtcTime

)

SELECT

pe.device_id AS DeviceId,

pe.current_temperature AS CurrentTemperature,

-- Extract the previous temperature from the nested record in CollectTop

pe.Previous_temperature AS PreviousTemperature,

-- Determine whether the temperature changed

CASE

WHEN pe.current_temperature != pe.previous_event.Value THEN  'Value changed'

ELSE  'Value unchanged'

END  AS change_status

FROM

PreviousEvent pe

enter image description here

以下是使用流分析作业将数据从 Azure IoT 中心发送到 SQL Server 的步骤:

  • 创建流分析作业。
  • 创建 Azure IoT 中心和设备。

我已使用此示例代码将示例数据发送到 IoT 中心。替换为您要发送的示例数据。

下面的代码使用 Azure IoT Python SDK 将数据从设备发送到 IoT 中心。发送的数据包括能源类型(从“天然气”、“水”或“电”中随机选择)和一个值(0 到 100 之间的随机浮点数)。

from azure.iot.device import IoTHubDeviceClient, Message
import json
import random
import time

# Connection string for the device
CONNECTION_STRING = ""

def send_data(device_client):
    # Sample data
    data = {
        "energyType": random.choice(["Gas", "Water", "Electricity"]),
        "value": random.uniform(0, 100)
    }

    # Convert data to JSON and create a message
    message = Message(json.dumps(data))

    # Send the message to IoT Hub
    print("Sending message:", message)
    device_client.send_message(message)
    print("Message sent successfully")

if __name__ == "__main__":
    try:
        # Create a device client
        client = IoTHubDeviceClient.create_from_connection_string(CONNECTION_STRING)

        while True:
            # Send data periodically
            send_data(client)
            time.sleep(5)  # Adjust the interval as needed

    except KeyboardInterrupt:
        print("IoTHubClient sample stopped")

为上述数据创建 SQL 数据库、SQL Server 和表。

CREATE TABLE energy_data (
EventId INT PRIMARY KEY IDENTITY(1,1), -- Unique ID for each event
EnergyType VARCHAR(20), -- Type of energy (e.g., Gas, Water, Electricity)
Value FLOAT, -- Value of the energy type
EventProcessedUtcTime DATETIME, -- Time when the event was processed in UTC
PartitionId INT, -- Partition ID for the event
EventEnqueuedUtcTime DATETIME, -- Time when the event was enqueued in UTC
ConnectionDeviceId VARCHAR(50), -- Connection device ID
ConnectionDeviceGenerationId VARCHAR(50), -- Connection device generation ID
EnqueuedTime DATETIME -- Time when the event was enqueued in UTC
);

选择 Outputs 作为 SQL 数据库,并选择输入作为 IoT 中心输入。在流分析中选择分区键为 0 。

SELECT
energyType AS EnergyType,
value AS Value,
EventProcessedUtcTime,
PartitionId,
EventEnqueuedUtcTime,
IoTHub.ConnectionDeviceId AS ConnectionDeviceId,
IoTHub.ConnectionDeviceGenerationId AS ConnectionDeviceGenerationId,
IoTHub.EnqueuedTime AS EnqueuedTime
INTO [sql]
FROM [IoTHub];

运行作业并转到 SQL 数据并运行以下 SQL 查询以查看数据。

SELECT * FROM [dbo].[energy_data]

SQL Data

© www.soinside.com 2019 - 2024. All rights reserved.