redis 在消费者组内流式有序处理

问题描述 投票:0回答:2

我正在使用 python (aioredis) 和 redis 流。

我有一个生产者 - 许多(分组)消费者场景,并希望确保消费者以有序的方式处理发送到流的(批量)消息,这意味着:当第一条消息完成时,处理下一条消息流等。这也意味着消费者组中的一个消费者正在处理,而其他消费者将等待。

我还想依赖第二个、第三个等消费者组中的有序处理 - 所有这些都依赖于发送到一个流的相同消息。意思:

message 1 ... n -> stream1 
ordered processing within group 1 ... n  
whereas consumer 1 ... n per group 1 ... n

当我还想确保每个组的潜在订单检查逻辑不会出现太多过载时,完成此操作的好方法是什么?

python redis stream aioredis
2个回答
1
投票

让我回到老派的同步处理,如果你想顺序处理流消息并不容易,原因是失败/重试。

考虑您想要处理每条消息最多一次,流消息执行是关键部分,消费者组成员作为线程/进程。

要同步此操作,您需要某种锁定机制,因为消费者组可能运行在不同的机器上。您可以使用全局锁定机制来防止多个消费者消费来自同一流的消息。

可以使用Redis锁(RedLock)来获取/释放锁。

伪代码

Procedure SequentialProcessor

Input: StreamName
Input: ConsumerName
Input: ConsumerGroup
Input: LockTime 


BEGIN
    redLock = RedLock()
    WHILE True DO
     IF redLock.aquireLock(StreamName#ConsumerGroup, LockTime) THEN
       message = redis.XREADGROUP( ConsumerGroup, StreamName, ...)
       TRY
         processMessage( message )
       FINALLY
          redLock.releaseLock( StreamName#ConsumerGroup )
     ENDIF
    END
END

 

0
投票

使用消费者组顺序处理条目是没有意义的,它们是专门为并行处理而设计的。

您应该做的是创建多个流。首先,您将事件放入第一个流中,它由消费者处理,而消费者又在另一个流中创建事件(可能从原始事件中复制一些相关数据),然后下一个消费者将处理该事件,依此类推。这样您将拥有很大的灵活性和吞吐量,因为您可以在每个阶段并行使用多个消费者。

© www.soinside.com 2019 - 2024. All rights reserved.