如何在进行异步 API 调用的云函数中强制执行同步 Pub/Sub 消息处理?

问题描述 投票:0回答:1

我已将云功能(第二代)配置为每当新消息发布到主题时由 Pub/Sub(推送订阅)触发。可以同时发布数百条消息,但我想同步而不是同时处理它们。 尽管将该函数的配置设置为仅允许一个最大实例和每个实例一个最大并发请求,但仍会同时处理多条消息。此行为似乎源于我的代码中的异步 API 调用。一旦一条消息发起异步 API 调用,另一条消息处理就会开始,从而导致并发处理。

如何同步处理消息,以便在上一个消息处理(包括异步 API 调用)完全完成之前不会开始下一个消息处理?

云函数的代码如下所示。在打印 2 之前,下一条消息处理开始,因为 getBearerToken 是一个异步函数。

functions.cloudEvent('processPayload', startProcess);

function startProcess(cloudEvent) {
  console.log(1);
  getBearerToken(function (bearerToken) {
    console.log(2);
    // and so on
  });
}
node.js google-cloud-platform google-cloud-functions google-cloud-pubsub
1个回答
0
投票

如果一个函数的结果依赖于另一个异步函数的结果,那么它本身实际上就变成了异步函数。但是 Pub/Sub 框架同步调用该函数(没有

await
),然后调用下一个函数,而无需等待异步部分完成。以下示例证明了这一点。

function asynch(message) {
  return new Promise(function(resolve, reject) {
    setTimeout(function() {
      resolve(message);
    }, Math.random(1) * 500 + 500);
  });
}

async function process(message) {
  console.log(await asynch(message));
}

/* Pub/Sub Framework calls function synchronously */
for (var i = 1; i <= 100; i++) process(i);

为了强制同步执行,您必须维护一个队列,使用

Promise::then
方法将这些异步函数写入其中。

function asynch(message) {
  return new Promise(function(resolve, reject) {
    setTimeout(function() {
      resolve(message);
    }, Math.random(1) * 500 + 500);
  });
}

var queue = Promise.resolve();
async function process(message) {
  queue = queue
    .then(() => asynch(message))
    .then(console.log);
}

/* Pub/Sub Framework calls function synchronously */
for (var i = 1; i <= 100; i++) process(i);

但是只有当您有能力保留像以下示例中的

queue
这样的全局变量时,这才有效。我怀疑这样的 Pub/Sub 框架是否会出现这种情况。或者,您可以将消息队列的状态保存在数据库或其他持久性中(您可能也没有)。

这表明: 如果没有大量额外的努力(可能还需要成本),就不可能使用同步消息处理机制来进行“一个接一个消息”的异步操作。

© www.soinside.com 2019 - 2024. All rights reserved.