问题:
我正在开发一个系统,其中有一个生产者启动多个任务,每个任务都可能需要大量时间才能完成。以下是系统架构的概述:
const OrbitSubs = require("./Batchs/OrbitSubs");
const recive = async () => {
let conn = await amqp.connect("amqp://localhost");
let channel = await conn.createChannel();
var queue = "runSubs";
channel.assertQueue(queue, {
durable: true,
});
channel.consume(
queue,
async function (msg) {
try {
let jsonObj = JSON.parse(msg.content.toString());
let { domain, id } = jsonObj;
let status = await OrbitSubs(domain, id);
// Send acknowledgment only after job processing is complete
channel.ack(msg);
} catch (error) {
console.error("Error processing message:", error);
channel.reject(msg, false); // Requeue the message
}
},
{
noAck: false, // Use manual acknowledgment
}
);
};
recive();
+--------------------------------+
| Producer |
+--------------------------------+
|
| Calls Task-A
|
+--------------------------------+
| RabbitMQ |
+--------------------------------+
|
| Publishes Task-A to Queue
|
+--------------------------------+
| AWS Batch |
+--------------------------------+
|
| Executes Task-A
|
+--------------------------------+
| RabbitMQ |
+--------------------------------+
|
| Publishes Subtasks (e.g., A-findB, A-findC, A-findE)
|
+--------------------------------+
| AWS Batch |
+--------------------------------+
|
| Executes Subtasks
|
+--------------------------------+
| RabbitMQ |
+--------------------------------+
|
| Publishes Subtask Results (e.g., A-findB1, A-findC1, A-findE1)
|
+--------------------------------+
| Consumer |
+--------------------------------+
|
| Receives Subtask Results
|
+--------------------------------+
我面临的主要挑战是:
a) 维护连接: 由于任务 A 可能需要很长时间才能完成,因此我在完成之前就失去了与 RabbitMQ 的连接。我目前每 5 秒检查一次 AWS Batch 任务的状态,但我需要在整个过程中保持连接处于活动状态。即使在持续数小时的长时间运行任务期间,如何确保我的连接保持活动状态?
b) 处理子任务: 一旦任务 A 完成,我需要按顺序处理子任务(例如,A-findB1、A-findC1、A-findE1)。是否可以从单个生产者接收多个确认,或者我应该在任务 A 完成后将每个子任务单独发送到队列?
替代方法:如果我处理子任务的方法不是最佳的,我愿意接受替代想法。如何在 RabbitMQ 和 AWS Batch 中高效管理长时间运行的任务及其关联的子任务?
我感谢任何有关如何应对这些挑战并提高系统架构的效率和可靠性的见解或建议。预先感谢您的协助!
编辑 任务视觉示例
Task-A
/ | \
/ | \
/ | \
Subtask-B Subtask-C Subtask-E
/ | \ | |
/ | \ | |
/ | \ | |
Sub-B1 Sub-B2 Sub-B3 Sub-E1
/ \ / \
/ \ / \
Sub-B1-1 Sub-B1-2 Sub-B2-1 Sub-B2-2
| | | |
| | | |
Sub-B1-1-1 Sub-B1-1-2 Sub-B2-1-1 Sub-B2-1-2
|
|
Sub-C1
|
|
Sub-C1-1
不要使用队列。您的用例是使用像 temporal.io 这样的系统简单地实现为编排的。它支持开箱即用的长时间运行任务。