这里是 REPO,其中包含此代码的示例。 https://codesandbox.io/p/sandbox/ts-sandbox-forked-rl523r
我正在尝试找到发布消息和在 HTTP 云函数/服务器中接收消息的正确接口
这是我发布消息的代码
import { Message, PubSub, Subscription, Topic } from '@google-cloud/pubsub'
import { MessageOptions } from '@google-cloud/pubsub/build/src/topic' // I don't like this import. I expect to import this type from '@google-cloud/pubsub'
export const instance = new PubSub()
export const psaWorkerTopic = instance.topic(
MAIN.PSA_WORKER_PUBSUB_TOPIC_NAME,
{
batching: PUBSUB_MAX_BATCHING,
messageOrdering: true,
}
)
export const publishPsaQueueMessage = async (
data: PubSubPsaQueue,
orderingKey?: string
) => {
try {
const dataBuffer = Buffer.from(JSON.stringify(data))
const message: MessageOptions = {
data: dataBuffer,
orderingKey,
}
const messageId = await psaWorkerTopic.publishMessage(message)
logger.log(
`PubSub::PSA::PUBLISHED::msgId: ${messageId}::batchId: ${data.batchId}::orderKey: ${orderingKey}`,
{
partnerId: data.partnerId,
orderingKey,
}
)
return messageId
} catch (e) {
logger.error('PubSub::PSA::Could not publish message: ', e)
if (orderingKey) psaWorkerTopic.resumePublishing(orderingKey)
throw e
}
}
这是接收消息的代码
export const handleMessage = async (messageBody: {
message: {
data: string
messageId: string
publishTime: string
attributes?: Record<string, string>
orderingKey?: string
}
subscription: string
}): Promise<void> => {
const { message } = messageBody
const startTime = Date.now()
const payloadDataStr = Buffer.from(message.data, 'base64').toString('utf8')
if (!isValidJSONString(payloadDataStr)) {
throw new Error(`Invalid Json: ${payloadDataStr}`)
}
const payloadData: PubSubPsaQueue = JSON.parse(payloadDataStr)
logger.debug(
`PubSub::PSA::START::msgId: ${message.messageId}::batchId: ${payloadData.batchId}::orderKey: ${message.orderingKey}::time: ${message.publishTime}::Received payload`,
payloadDataStr
)
}
export const psaSubscriptionListener = () => {
psaWorkerSubscription.on('message', handleMessage)
psaWorkerSubscription.on('error', (err) => {
logger.error('PubSub::PSA::Unexpected error with GCP subscription:', err)
})
}
现在我在这行看到 ts 错误
psaWorkerSubscription.on('message', handleMessage)
在handleMessage函数上
TS2769: No overload matches this call.
The last overload gave the following error.
Argument of type
(messageBody: { message: { data: string; messageId: string; publishTime: string; attributes?: Record<string, string>; orderingKey?: string; }; subscription: string; }) => Promise<void>
is not assignable to parameter of type void
subscription.d.ts(68, 5): The last overload is declared here.
看起来您正在将推送订阅(通常在触发云函数时使用)和拉订阅与尝试处理这两种类型的代码混为一谈。
psaSubscriptionListener
函数中的代码正在使用 Pub/Sub 客户端库启动流式拉取连接以尝试接收消息,该消息将通过 handleMessage
函数进行处理。您的 main.post
方法正在等待请求传入,并尝试直接调用 handleMessage
。如果您尝试处理单个订阅上的所有消息,则应该只执行这两件事之一。
对于使用客户端库的基于拉取的订阅,
handleMessage
不正确。它应该被定义为:
export const handleMessage = async (message: Message): Promise<void> => {
const startTime = Date.now();
const payloadDataStr = message.data.toString("utf8");
const payloadData: Record<string, unknown> = JSON.parse(payloadDataStr);
console.debug(
`PubSub::PSA::START::msgId: ${message.id}::batchId: ${payloadData.batchId}::orderKey: ${message.orderingKey}::time: ${message.publishTime}::Received payload`,
payloadDataStr
);
};