如何平衡多个消息队列

问题描述 投票:0回答:2

我有一项可能需要长时间运行(数小时)的任务。该任务由多个从消息队列(在我的例子中是 AWS SQS)读取的工作线程(在我的例子中是 AWS ECS 实例)执行。我有多个用户将消息添加到队列中。问题是,如果 Bob 向队列添加 5000 条消息,足以让工作人员忙碌 3 天,那么 Alice 出现并想要处理 5 个任务,Alice 将需要等待 3 天才能开始 Alice 的任何任务。

我想在 Alice 提交任务后立即以相同的速率向 Alice 和 Bob 的工作人员发送消息。

我在另一个上下文中解决了这个问题,为每个用户(甚至用户提交的每个批次)创建多个队列(子队列),并在消费者请求下一条消息时在所有子队列之间交替。

至少在我的世界里,这似乎是一个常见问题,我想知道是否有人知道解决它的既定方法。

我没有看到 ActiveMQ 的任何解决方案。我对 Kafka 进行了一些研究,它具有在主题中循环分区的能力,这可能会起作用。现在,我正在使用 Redis 实现一些东西。

algorithm message-queue
2个回答
0
投票

我建议使用 Cadence Workflow 而不是队列,因为它支持开箱即用的长时间运行操作和状态管理。

在您的情况下,我将为每个用户创建一个工作流实例。每个新任务都将通过信号 API 发送到用户工作流程。然后工作流实例会将收到的任务排队并一一执行。

以下是实施概要:

public interface SerializedExecutionWorkflow {

    @WorkflowMethod
    void execute();

    @SignalMethod
    void addTask(Task t);
}

public interface TaskProcessorActivity {
    @ActivityMethod
    void process(Task poll);
}

public class SerializedExecutionWorkflowImpl implements SerializedExecutionWorkflow {

    private final Queue<Task> taskQueue = new ArrayDeque<>();
    private final TaskProcesorActivity processor = Workflow.newActivityStub(TaskProcesorActivity.class);

    @Override
    public void execute() {
        while(!taskQueue.isEmpty()) {
            processor.process(taskQueue.poll());
        }
    }

    @Override
    public void addTask(Task t) {
        taskQueue.add(t);
    }
}

然后是通过信号方法将该任务排队到工作流的代码:

private void addTask(WorkflowClient cadenceClient, Task task) {
    // Set workflowId to userId
    WorkflowOptions options = new WorkflowOptions.Builder().setWorkflowId(task.getUserId()).build();
    // Use workflow interface stub to start/signal workflow instance
    SerializedExecutionWorkflow workflow = cadenceClient.newWorkflowStub(SerializedExecutionWorkflow.class, options);
    BatchRequest request = cadenceClient.newSignalWithStartRequest();
    request.add(workflow::execute);
    request.add(workflow::addTask, task);
    cadenceClient.signalWithStart(request);
}

与使用队列进行任务处理相比,Cadence 提供了许多其他优势。

  • 以无限的过期间隔构建指数重试
  • 故障处理。例如,它允许执行一项任务,如果在配置的时间间隔内两次更新都无法成功,则通知另一个服务。
  • 支持长时间运行的心跳操作
  • 能够实现复杂的任务依赖关系。例如,在出现不可恢复的故障时实现调用链或补偿逻辑 (SAGA)
  • 提供对当前更新状态的完整可见性。例如,当使用队列时,您知道队列中是否有一些消息,并且您需要额外的数据库来跟踪总体进度。使用 Cadence,每个事件都会被记录。
  • 能够在飞行中取消更新。

请参阅 介绍 Cadence 编程模型的演示文稿


0
投票

SQS 消息支持一个名为

MessageGroupId
的属性,它允许我们为队列中的消息分配一个组。组内的消息是严格排序的,但不同组的消息则不然。您可以为来自 Bob 和 Alice 的消息分配唯一的组 ID,以便可以乱序处理来自 Bob 和 Alice 的消息。

来源:SQS MessageGroupId

© www.soinside.com 2019 - 2024. All rights reserved.