我目前正在设置一项作业,将多个设备的基于 IoT 设备的事件中心数据(JSON 负载)转发到数据库。
该作业现在转发到一个 SQL 表,这很好(简单的部分),但是我只对通过流分析转发数据感兴趣,如果自通过该特定设备接收到的最后一个样本以来有效负载数据发生了变化流分析。
显然,在数据库中执行此操作效率低下,应在执行 SQL 之前在数据处理逻辑中进行处理。
在 JSON 有效负载(下面的示例)中,我们有一个时间戳值,在此比较中需要忽略该值,因为主要重点是确定自上次采样以来 6 个通道值 (INT) 是否已更改。
{
"pid": 51387408,
"device_id": "WRL11111111",
"channels": {
"timestamp": 1713019679,
"chd1_value": 0,
"chd2_value": 0,
"chd3_value": 0,
"chd4_value": 0,
"chd5_value": 0,
"chd6_value": 0
}
}
我尝试了流分析查询的多次迭代,尝试使用 LAG 函数执行此比较,但是在线找到的示例并不专注于特定于设备的比较。
诸如以下的查询会引发错误,例如:“LAG”不允许“Order By”子句。
WITH PreviousEvent AS (
SELECT
device_id,
device_data,
LAG(device_data) OVER (PARTITION BY device_id ORDER BY EventEnqueuedUtcTime) AS previous_data
FROM
device_stream
)
SELECT
device_id,
device_data,
previous_data,
CASE WHEN device_data != previous_data THEN 'Value changed' ELSE 'Value unchanged' END AS change_status
FROM
PreviousEvent
我确信这是流分析查询的常见要求,但令我惊讶的是我找不到任何具体示例来清楚地说明如何实现这一点,任何建议或指导表示赞赏。
在流分析中,由于它不支持直接在查询级别使用
ORDER BY
子句,请参考此SO。LAG中不支持
ORDER BY
在 Azure 流分析中使用 LAG。
使用 Azure Stream 中的 CollectTop 函数Analytics
WITH PreviousEvent AS (
SELECT
IoTHub.ConnectionDeviceId AS device_id,
CurrentTemperature AS current_temperature,
PreviousTemperature AS previous_temperature,
EventEnqueuedUtcTime,
-- CollectTop to get the previous event based on device_id and EventEnqueuedUtcTime
CollectTop(1) OVER (
PARTITION BY IoTHub.ConnectionDeviceId
ORDER BY EventEnqueuedUtcTime ASC
LIMIT DURATION(hour, 1)
) AS previous_event
FROM
[certificate122ravi89]
TIMESTAMP BY EventEnqueuedUtcTime
)
SELECT
pe.device_id AS DeviceId,
pe.current_temperature AS CurrentTemperature,
-- Extract the previous temperature from the nested record in CollectTop
pe.Previous_temperature AS PreviousTemperature,
-- Determine whether the temperature changed
CASE
WHEN pe.current_temperature != pe.previous_event.Value THEN 'Value changed'
ELSE 'Value unchanged'
END AS change_status
FROM
PreviousEvent pe
以下是使用流分析作业将数据从 Azure IoT 中心发送到 SQL Server 的步骤:
我已使用此示例代码将示例数据发送到 IoT 中心。替换为您要发送的示例数据。
下面的代码使用 Azure IoT Python SDK 将数据从设备发送到 IoT 中心。发送的数据包括能源类型(从“天然气”、“水”或“电”中随机选择)和一个值(0 到 100 之间的随机浮点数)。
from azure.iot.device import IoTHubDeviceClient, Message
import json
import random
import time
# Connection string for the device
CONNECTION_STRING = ""
def send_data(device_client):
# Sample data
data = {
"energyType": random.choice(["Gas", "Water", "Electricity"]),
"value": random.uniform(0, 100)
}
# Convert data to JSON and create a message
message = Message(json.dumps(data))
# Send the message to IoT Hub
print("Sending message:", message)
device_client.send_message(message)
print("Message sent successfully")
if __name__ == "__main__":
try:
# Create a device client
client = IoTHubDeviceClient.create_from_connection_string(CONNECTION_STRING)
while True:
# Send data periodically
send_data(client)
time.sleep(5) # Adjust the interval as needed
except KeyboardInterrupt:
print("IoTHubClient sample stopped")
为上述数据创建 SQL 数据库、SQL Server 和表。
CREATE TABLE energy_data (
EventId INT PRIMARY KEY IDENTITY(1,1), -- Unique ID for each event
EnergyType VARCHAR(20), -- Type of energy (e.g., Gas, Water, Electricity)
Value FLOAT, -- Value of the energy type
EventProcessedUtcTime DATETIME, -- Time when the event was processed in UTC
PartitionId INT, -- Partition ID for the event
EventEnqueuedUtcTime DATETIME, -- Time when the event was enqueued in UTC
ConnectionDeviceId VARCHAR(50), -- Connection device ID
ConnectionDeviceGenerationId VARCHAR(50), -- Connection device generation ID
EnqueuedTime DATETIME -- Time when the event was enqueued in UTC
);
选择 Outputs 作为 SQL 数据库,并选择输入作为 IoT 中心输入。在流分析中选择分区键为 0 。
SELECT
energyType AS EnergyType,
value AS Value,
EventProcessedUtcTime,
PartitionId,
EventEnqueuedUtcTime,
IoTHub.ConnectionDeviceId AS ConnectionDeviceId,
IoTHub.ConnectionDeviceGenerationId AS ConnectionDeviceGenerationId,
IoTHub.EnqueuedTime AS EnqueuedTime
INTO [sql]
FROM [IoTHub];
运行作业并转到 SQL 数据并运行以下 SQL 查询以查看数据。
SELECT * FROM [dbo].[energy_data]