TL;DR:如何模仿rabbitMQ的调度功能保持消费者:
我有一个 SQS 队列,在创建时具有默认属性。消费者处理一条消息的平均时间为1~2s。但有一些消息需要在 4 小时窗口之间处理两次。这些消息称为 B,其他消息称为 A。
假设我的队列包含以下消息:
A1, A2, B1, A3, B2
(5 条消息,最多 10 秒来消耗它们)在这些表的开头:
time | what should happen
---------|-------------------
now | consumer connected to queue
now+10s | all As were consumed successfully and deleted from queue
Bs had their unsuccessful first try and now they are waiting for their retry in 4h
between | nothing happens since no new messages arrived and old ones are waiting
now+4h4s | Bs successfully consumed during second retry and due that, deleted from queue
我有一个
Spring
应用程序,当我找到类型 B
消息时,我可以在其中引发异常。由于简单性和可扩展性,我希望有一个单线程消费消息,花费 1~2 秒来消费每条消息。
这样,我就无法挂起消息处理正如这个答案所建议的。我也不需要 SQS'
Delivery delay
,因为它仅推迟到达队列的消息而不重试。如果可能的话,我想继续使用长轮询@JmsListener
并避免在我的内存应用程序上保留任何状态。我想避免这种情况如果可能的话
我会编写一个小型 AWS Lambda 函数,每分钟调用一次。该函数将获取一条消息(从 FIFO 类型的 SQS 队列中获取)并检查其添加时间。如果添加时间 >= 4 小时,它会将其从传入队列中删除,并将其添加到延迟 4 小时的队列中,您的应用程序可以侦听该队列。如果它移动了一条消息,请继续执行此操作,直到下一条消息不是 4 小时前的消息。增加/减少 lambda 的频率,以将“紧”程度的粒度增加到 4 小时,但会增加运行 lambda 的频率。
以下是使用 SQS 的 AWS Lambda 函数示例的快速链接:https://docs.aws.amazon.com/lambda/latest/dg/with-sqs-example.html
您可以将消息 B 发送到 Step Functions 状态机,并置于等待状态以等待 4 小时,然后再将其发送到队列。状态机将为您保留状态,您可以从 Step Functions 直接向 SQS 发送消息,因此您无需编写任何代码。
由于我将
JmsListener
与 setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE)
一起使用,我决定在可重新处理消息的使用者末尾运行此命令:
myAmazonSqsInstance.sendMessage(
new SendMessageRequest()
.withQueueUrl("queueName")
.withMessageBody(myMessageWithText)
.withDelaySeconds(900) // 900s = 15min
);
这样该消息将被成功消费,但队列中将产生一条具有相同正文的新消息。这条消息将在15分钟内被消耗,并且由于我的业务逻辑,再次失败。将会有 16 次失败(16*15min=4h),直到它最终被消耗而不产生新消息。
虽然这不是我所要求的,并且与其他答案类似(只是技术堆栈不同),但我决定将其写在这里,以提供一个可用的java解决方案
最简单、最可靠的方法是,有一个可以处理的时间戳属性,并使用延迟窗口将其隐藏 30 分钟。当消息尚未准备好消费时(即时间戳小于当前时间),如果时间未到,则再次将其保留回SQS,否则进行处理。