我正在使用 python (aioredis) 和 redis 流。
我有一个生产者 - 许多(分组)消费者场景,并希望确保消费者以有序的方式处理发送到流的(批量)消息,这意味着:当第一条消息完成时,处理下一条消息流等。这也意味着消费者组中的一个消费者正在处理,而其他消费者将等待。
我还想依赖第二个、第三个等消费者组中的有序处理 - 所有这些都依赖于发送到一个流的相同消息。意思:
message 1 ... n -> stream1
ordered processing within group 1 ... n
whereas consumer 1 ... n per group 1 ... n
当我还想确保每个组的潜在订单检查逻辑不会出现太多过载时,完成此操作的好方法是什么?
让我回到老派的同步处理,如果你想顺序处理流消息并不容易,原因是失败/重试。
考虑您想要处理每条消息最多一次,流消息执行是关键部分,消费者组成员作为线程/进程。
要同步此操作,您需要某种锁定机制,因为消费者组可能运行在不同的机器上。您可以使用全局锁定机制来防止多个消费者消费来自同一流的消息。
可以使用Redis锁(RedLock)来获取/释放锁。
伪代码
Procedure SequentialProcessor
Input: StreamName
Input: ConsumerName
Input: ConsumerGroup
Input: LockTime
BEGIN
redLock = RedLock()
WHILE True DO
IF redLock.aquireLock(StreamName#ConsumerGroup, LockTime) THEN
message = redis.XREADGROUP( ConsumerGroup, StreamName, ...)
TRY
processMessage( message )
FINALLY
redLock.releaseLock( StreamName#ConsumerGroup )
ENDIF
END
END
使用消费者组顺序处理条目是没有意义的,它们是专门为并行处理而设计的。
您应该做的是创建多个流。首先,您将事件放入第一个流中,它由消费者处理,而消费者又在另一个流中创建事件(可能从原始事件中复制一些相关数据),然后下一个消费者将处理该事件,依此类推。这样您将拥有很大的灵活性和吞吐量,因为您可以在每个阶段并行使用多个消费者。