setTimeout 在异步 while 循环中不起作用

问题描述 投票:0回答:1

我正在尝试创建一个redis队列消费者。每条消息都会调用一个 API,因此我想确保消息之间存在超时。消费者循环直到我用于队列的集合为空并且每次迭代都会弹出一条消息。

redisClient.ts:

这是我尝试实现 setTimeout 的地方。回调函数位于下一个代码块中。

export default class Redis  {
  redis: RedisClientType;

  constructor(private url: string) {
    this.redis = createClient({ url: this.url });
    this.redis.connect();
  }

  public async consumeMessages(
    queue: string,
    callback: (message: string) => Promise<void>
  ) {
    while (await this.redis.sCard(queue)) {
      let message = await this.redis.sPop(queue);
      console.log("[x] Received %s", message);
   
      await callback(message[0]);
      setTimeout(() => {
          console.log("[x] Ack %s", message);
        }, 3000);
   
      }
    }
  }

从以下用例中调用

export class ConsumerUseCAse {

  constructor(
    private queueProvider: QueueProvider,
    private dataProcessor: DataProcessor,
  ) {}
  async execute(): Promise<Array<string>> {

    await this.queueProvider.consumeMessages(
      queueName,
      async (id: string) => {
        let APIResponse = await getAPIResponse(id);
        await this.dataProcessor.execute(APIResponse)
      }
    );
    return consumed
  }
}
typescript async-await while-loop settimeout
1个回答
0
投票

正如有人评论的那样,混合回调和承诺并不是一个好主意。你有两个选择。

A) 使用 util.promisify(推荐)

import { promisify } from "util"; // <-- Use this node.js util

export class ConsumerUseCAse {

  constructor(
    private queueProvider: QueueProvider,
    private dataProcessor: DataProcessor,
  ) {}
  async execute(): Promise<Array<string>> {
    // Convert to promise API
    const consumeMessagesAsync = promisify(this.queueProvider.consumeMessages);
    // consumeMessages is a method not a function 
    // if the above fails you may need to bind "this" properly
    // const consumeMessagesAsync = promisify(
    //    this.queueProvider.bind(this.queueProvider).consumeMessages
    // );
    // Use without callbacks
    const id = await consumeMessagesAsync(
      queueName
    );
    let APIResponse = await getAPIResponse(id);
    await this.dataProcessor.execute(APIResponse);
    return consumed
  }
}

B) 使用立即调用函数表达式 (IIFE)

export class ConsumerUseCAse {

  constructor(
    private queueProvider: QueueProvider,
    private dataProcessor: DataProcessor,
  ) {}
  async execute(): Promise<Array<string>> {

    await this.queueProvider.consumeMessages(
      queueName,
      (id: string) => { // <-- Remove Async from callback 
        (async () => { // Use IIFE
          // async/await code goes here
          let APIResponse = await getAPIResponse(id);
          await this.dataProcessor.execute(APIResponse)
        })();
      }
    );
    return consumed
  }
}
© www.soinside.com 2019 - 2024. All rights reserved.