我正在尝试创建一个redis队列消费者。每条消息都会调用一个 API,因此我想确保消息之间存在超时。消费者循环直到我用于队列的集合为空并且每次迭代都会弹出一条消息。
redisClient.ts:
这是我尝试实现 setTimeout 的地方。回调函数位于下一个代码块中。
export default class Redis {
redis: RedisClientType;
constructor(private url: string) {
this.redis = createClient({ url: this.url });
this.redis.connect();
}
public async consumeMessages(
queue: string,
callback: (message: string) => Promise<void>
) {
while (await this.redis.sCard(queue)) {
let message = await this.redis.sPop(queue);
console.log("[x] Received %s", message);
await callback(message[0]);
setTimeout(() => {
console.log("[x] Ack %s", message);
}, 3000);
}
}
}
从以下用例中调用
export class ConsumerUseCAse {
constructor(
private queueProvider: QueueProvider,
private dataProcessor: DataProcessor,
) {}
async execute(): Promise<Array<string>> {
await this.queueProvider.consumeMessages(
queueName,
async (id: string) => {
let APIResponse = await getAPIResponse(id);
await this.dataProcessor.execute(APIResponse)
}
);
return consumed
}
}
正如有人评论的那样,混合回调和承诺并不是一个好主意。你有两个选择。
import { promisify } from "util"; // <-- Use this node.js util
export class ConsumerUseCAse {
constructor(
private queueProvider: QueueProvider,
private dataProcessor: DataProcessor,
) {}
async execute(): Promise<Array<string>> {
// Convert to promise API
const consumeMessagesAsync = promisify(this.queueProvider.consumeMessages);
// consumeMessages is a method not a function
// if the above fails you may need to bind "this" properly
// const consumeMessagesAsync = promisify(
// this.queueProvider.bind(this.queueProvider).consumeMessages
// );
// Use without callbacks
const id = await consumeMessagesAsync(
queueName
);
let APIResponse = await getAPIResponse(id);
await this.dataProcessor.execute(APIResponse);
return consumed
}
}
export class ConsumerUseCAse {
constructor(
private queueProvider: QueueProvider,
private dataProcessor: DataProcessor,
) {}
async execute(): Promise<Array<string>> {
await this.queueProvider.consumeMessages(
queueName,
(id: string) => { // <-- Remove Async from callback
(async () => { // Use IIFE
// async/await code goes here
let APIResponse = await getAPIResponse(id);
await this.dataProcessor.execute(APIResponse)
})();
}
);
return consumed
}
}