我的数据源发出具有以下结构的 IOT 数据 -
io_id,value,timestamp
232,1223,1718191205
321,671,1718191254
54,2313,1718191275
232,432,1718191315
321,983,1718191394
........
我想使用 Flink 在这些数据上实现两件事。
我想监视各个 io_id 值的变化。下面的代码可以很好地实现此目的。
从 pyflink.datastream 导入 StreamExecutionEnvironment 从 pyflink.datastream.functions 导入 KeyedProcessFunction,RuntimeContext 从 pyflink.common.typeinfo 导入类型 从 pyflink.datastream.state 导入 ValueStateDescriptor
类 ValueChangeMonitor(KeyedProcessFunction): def init(自身): self.previous_value_state = 无
def open(self, runtime_context: RuntimeContext):
self.previous_value_state = runtime_context.get_state(
ValueStateDescriptor("previous_value", Types.INT())
)
def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
io_id, io_value = value
previous_value = self.previous_value_state.value()
if previous_value is not None:
change = abs(io_value - previous_value)
if change > 100:
print(f"Significant change detected for IO {io_id}: {change}")
self.previous_value_state.update(io_value)
定义主函数(): env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1)
data_stream = env.from_collection([
(232,1223,1718191205)
(321,671,1718191254),
(54,2313,1718191275),
(232,432,1718191315),
(321,983,1718191394)
], type_info=Types.TUPLE([Types.INT(), Types.INT()]))
keyed_stream = data_stream.key_by(lambda x: x[0])
keyed_stream.process(ValueChangeMonitor()).print()
env.execute("IO Value Change Monitor")
if name == 'main': 主要()
我不想监视单个 io_id 值,而是想创建一个虚拟 io,它是 io 传感器的组合。例如 - dummy_io (io_1==234 && io_2==423) 。现在我想监视使用数据中的 io 构建的虚拟 io 值的变化。在这种情况下,当 io_1 的值为 234 并且 io_2 的值为 423 时,dummy_io 的值将为 True 或 1。在其他情况下,它将为 False 或 0。我想在 dummy_io 的值发生变化时生成事件。
我怎样才能实现上述目标? Flink 是适合上述用例的工具吗?