我有一项可能需要长时间运行(数小时)的任务。该任务由多个从消息队列(在我的例子中是 AWS SQS)读取的工作线程(在我的例子中是 AWS ECS 实例)执行。我有多个用户将消息添加到队列中。问题是,如果 Bob 向队列添加 5000 条消息,足以让工作人员忙碌 3 天,那么 Alice 出现并想要处理 5 个任务,Alice 将需要等待 3 天才能开始 Alice 的任何任务。
我想在 Alice 提交任务后立即以相同的速率向 Alice 和 Bob 的工作人员发送消息。
我在另一个上下文中解决了这个问题,为每个用户(甚至用户提交的每个批次)创建多个队列(子队列),并在消费者请求下一条消息时在所有子队列之间交替。
至少在我的世界里,这似乎是一个常见问题,我想知道是否有人知道解决它的既定方法。
我没有看到 ActiveMQ 的任何解决方案。我对 Kafka 进行了一些研究,它具有在主题中循环分区的能力,这可能会起作用。现在,我正在使用 Redis 实现一些东西。
我建议使用 Cadence Workflow 而不是队列,因为它支持开箱即用的长时间运行操作和状态管理。
在您的情况下,我将为每个用户创建一个工作流实例。每个新任务都将通过信号 API 发送到用户工作流程。然后工作流实例会将收到的任务排队并一一执行。
以下是实施概要:
public interface SerializedExecutionWorkflow {
@WorkflowMethod
void execute();
@SignalMethod
void addTask(Task t);
}
public interface TaskProcessorActivity {
@ActivityMethod
void process(Task poll);
}
public class SerializedExecutionWorkflowImpl implements SerializedExecutionWorkflow {
private final Queue<Task> taskQueue = new ArrayDeque<>();
private final TaskProcesorActivity processor = Workflow.newActivityStub(TaskProcesorActivity.class);
@Override
public void execute() {
while(!taskQueue.isEmpty()) {
processor.process(taskQueue.poll());
}
}
@Override
public void addTask(Task t) {
taskQueue.add(t);
}
}
然后是通过信号方法将该任务排队到工作流的代码:
private void addTask(WorkflowClient cadenceClient, Task task) {
// Set workflowId to userId
WorkflowOptions options = new WorkflowOptions.Builder().setWorkflowId(task.getUserId()).build();
// Use workflow interface stub to start/signal workflow instance
SerializedExecutionWorkflow workflow = cadenceClient.newWorkflowStub(SerializedExecutionWorkflow.class, options);
BatchRequest request = cadenceClient.newSignalWithStartRequest();
request.add(workflow::execute);
request.add(workflow::addTask, task);
cadenceClient.signalWithStart(request);
}
与使用队列进行任务处理相比,Cadence 提供了许多其他优势。
请参阅 介绍 Cadence 编程模型的演示文稿。
SQS 消息支持一个名为
MessageGroupId
的属性,它允许我们为队列中的消息分配一个组。组内的消息是严格排序的,但不同组的消息则不然。您可以为来自 Bob 和 Alice 的消息分配唯一的组 ID,以便可以乱序处理来自 Bob 和 Alice 的消息。