我有三个节点集群,两个队列绑定到一个扇出交换机。我的要求是所有发送到exchange的消息都必须保存到这两个队列中,所有消息不能丢失并且必须得到处理。
当其中一个节点离线时,该节点上的队列将丢失交换机收到的消息。我也许可以使用仲裁队列,但这只能允许一个节点离线。如果两个节点离线,也会出现同样的问题。有什么解决办法吗?
您可以只使用一个直接队列。比你的 3 个节点将一条一条地处理消息,这样会更快。您还可以使用这些标头为有错误的消息创建一个接收队列:
async function initBaseQueues(
ch: amqplib.Channel,
hash: Record<string, string>,
) {
const queues = Object.keys(hash);
await ch.assertExchange(FACTORY_GATEWAY_EXCHANGE, 'direct');
for (let i = 0; i < queues.length; i++) {
const queue = queues[i];
const routingKey = hash[queue];
await ch.assertQueue(queue, {
autoDelete: false,
durable: true,
arguments: {
[ERabbitQueueArguments.deadLetterExchange]:
FACTORY_GATEWAY_EXCHANGE + RETRY_CONST,
[ERabbitQueueArguments.deadLetterRoutingKey]: routingKey + RETRY_CONST,
},
});
await ch.bindQueue(queue, FACTORY_GATEWAY_EXCHANGE, routingKey);
Logger.log(`Queue ${queue} initialized`);
}
}
async function initRetryQueues(
ch: amqplib.Channel,
hash: Record<string, string>,
) {
const queues = Object.keys(hash);
await ch.assertExchange(FACTORY_GATEWAY_EXCHANGE + RETRY_CONST, 'direct');
for (let i = 0; i < queues.length; i++) {
const queue = queues[i];
const routingKey = hash[queue];
await ch.assertQueue(queue + RETRY_CONST, {
autoDelete: false,
durable: true,
arguments: {
[ERabbitQueueArguments.deadLetterExchange]: FACTORY_GATEWAY_EXCHANGE,
[ERabbitQueueArguments.deadLetterRoutingKey]: routingKey,
[ERabbitQueueArguments.messageTTL]: REQUEUE_DEALAY_CONST,
},
});
await ch.bindQueue(
queue + RETRY_CONST,
FACTORY_GATEWAY_EXCHANGE + RETRY_CONST,
routingKey + RETRY_CONST,
);
Logger.log(`Queue ${queue + RETRY_CONST} initialized`);
}
}
export default async function initRabbitQueues() {
const conn = await amqplib.connect(
process.env.RABBIT_HOST ?? 'amqp://localhost:5672',
);
const ch1 = await conn.createChannel();
const hash = queueNames();
await initBaseQueues(ch1, hash);
await initRetryQueues(ch1, hash);
await conn.close();
}
标题所在:
export enum ERabbitQueueArguments {
deadLetterExchange = 'x-dead-letter-exchange',
deadLetterRoutingKey = 'x-dead-letter-routing-key',
messageTTL = 'x-message-ttl',
}