Pub/Sub StreamingPull 无法使用节点客户端工作

问题描述 投票:0回答:1

我正在尝试使用 Pub/Sub 中的 StreamingPull。有消息发布,但我没有看到任何回复。对于以下代码

    const {v1 } = require('@google-cloud/pubsub');
    const request = {
        subscription: 'projects/<projectId>/subscriptions/Temp'',
        stream_ack_deadline_seconds: 600
    };

    console.log('Pulling Messages...');
    const stream = await subClient.streamingPull({});
    stream.on('data', response => {
        console.log(response);
    });
    stream.on('error', err => {
        console.log(err);
    });
    stream.on('end', () => {
        console.log("end");
    });
    stream.write(request);
    stream.end();

我看到代码默默完成,没有记录响应。我的请求中是否缺少任何属性。根据 StreamingPullRequest 的文档,没有其他内容是强制性的。唯一的使用示例位于测试文件中。

node.js google-cloud-platform google-cloud-pubsub google-api-nodejs-client node-streams
1个回答
0
投票

如果您想提高吞吐量并同意基于流的方法,请使用 GCP Pub Sub NodeJs 包支持的 Streaming pull API。这是使用 pub sub 满足低延迟/高吞吐量要求的推荐方法。

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
import {PubSub, Message} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function listenForMessages(subscriptionNameOrId: string, timeout: number) {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = (message: Message) => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}
© www.soinside.com 2019 - 2024. All rights reserved.