如何以有状态的方式监控 io 传感器的组合

问题描述 投票:0回答:1

我的数据源发出具有以下结构的 IOT 数据 -

io_id,value,timestamp
232,1223,1718191205
321,671,1718191254
54,2313,1718191275
232,432,1718191315
321,983,1718191394
........

我想使用 Flink 在这些数据上实现两件事。

  1. 我想监视各个 io_id 值的变化。下面的代码可以很好地实现此目的。

    从 pyflink.datastream 导入 StreamExecutionEnvironment 从 pyflink.datastream.functions 导入 KeyedProcessFunction,RuntimeContext 从 pyflink.common.typeinfo 导入类型 从 pyflink.datastream.state 导入 ValueStateDescriptor

    类 ValueChangeMonitor(KeyedProcessFunction): def init(自身): self.previous_value_state = 无

     def open(self, runtime_context: RuntimeContext):
         self.previous_value_state = runtime_context.get_state(
             ValueStateDescriptor("previous_value", Types.INT())
         )
    
     def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
         io_id, io_value = value
         previous_value = self.previous_value_state.value()
    
         if previous_value is not None:
             change = abs(io_value - previous_value)
             if change > 100:
                 print(f"Significant change detected for IO {io_id}: {change}")
    
         self.previous_value_state.update(io_value)
    

    定义主函数(): env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1)

     data_stream = env.from_collection([
         (232,1223,1718191205)
         (321,671,1718191254),
         (54,2313,1718191275),
         (232,432,1718191315),
         (321,983,1718191394)
     ], type_info=Types.TUPLE([Types.INT(), Types.INT()]))
    
     keyed_stream = data_stream.key_by(lambda x: x[0])
     keyed_stream.process(ValueChangeMonitor()).print()
    
     env.execute("IO Value Change Monitor")
    

    if name == 'main': 主要()

  2. 我不想监视单个 io_id 值,而是想创建一个虚拟 io,它是 io 传感器的组合。例如 - dummy_io (io_1==234 && io_2==423) 。现在我想监视使用数据中的 io 构建的虚拟 io 值的变化。在这种情况下,当 io_1 的值为 234 并且 io_2 的值为 423 时,dummy_io 的值将为 True 或 1。在其他情况下,它将为 False 或 0。我想在 dummy_io 的值发生变化时生成事件。

我怎样才能实现上述目标? Flink 是适合上述用例的工具吗?

python apache-flink data-processing real-time-data pyflink
1个回答
0
投票

Flink 是一个很好的流处理工具。它是否是正确的工具取决于它的运行位置以及您/您的团队运行/管理它的熟练程度。

要回答这个问题,您需要创建一个继承自

KeyedProcessFunction
(link) 的类,它将初始化虚拟 IO 状态,然后将更新逻辑放入重写
process_element
的类方法中。

由于您使用的是 Python,因此您应该检查一些 Python 流处理器(Bytewax 和 Quix)。它们将满足您的需求并提供更易于使用、运行和管理的解决方案。

© www.soinside.com 2019 - 2024. All rights reserved.