我正在尝试使用 Pub/Sub 中的 StreamingPull。有消息发布,但我没有看到任何回复。对于以下代码
const {v1 } = require('@google-cloud/pubsub');
const request = {
subscription: 'projects/<projectId>/subscriptions/Temp'',
stream_ack_deadline_seconds: 600
};
console.log('Pulling Messages...');
const stream = await subClient.streamingPull({});
stream.on('data', response => {
console.log(response);
});
stream.on('error', err => {
console.log(err);
});
stream.on('end', () => {
console.log("end");
});
stream.write(request);
stream.end();
我看到代码默默完成,没有记录响应。我的请求中是否缺少任何属性。根据 StreamingPullRequest 的文档,没有其他内容是强制性的。唯一的使用示例位于测试文件中。
如果您想提高吞吐量并同意基于流的方法,请使用 GCP Pub Sub NodeJs 包支持的 Streaming pull API。这是使用 pub sub 满足低延迟/高吞吐量要求的推荐方法。
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;
// Imports the Google Cloud client library
import {PubSub, Message} from '@google-cloud/pubsub';
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
function listenForMessages(subscriptionNameOrId: string, timeout: number) {
// References an existing subscription
const subscription = pubSubClient.subscription(subscriptionNameOrId);
// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = (message: Message) => {
console.log(`Received message ${message.id}:`);
console.log(`\tData: ${message.data}`);
console.log(`\tAttributes: ${message.attributes}`);
messageCount += 1;
// "Ack" (acknowledge receipt of) the message
message.ack();
};
// Listen for new messages until timeout is hit
subscription.on('message', messageHandler);
// Wait a while for the subscription to run. (Part of the sample only.)
setTimeout(() => {
subscription.removeListener('message', messageHandler);
console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);
}