使用 @google-cloud/pubsub 和打字稿

问题描述 投票:0回答:1

这里是 REPO,其中包含此代码的示例。 https://codesandbox.io/p/sandbox/ts-sandbox-forked-rl523r

我正在尝试找到发布消息和在 HTTP 云函数/服务器中接收消息的正确接口

这是我发布消息的代码

import { Message, PubSub, Subscription, Topic } from '@google-cloud/pubsub'
import { MessageOptions } from '@google-cloud/pubsub/build/src/topic' // I don't like this import. I expect to import this type from '@google-cloud/pubsub'

export const instance = new PubSub()
export const psaWorkerTopic = instance.topic(
  MAIN.PSA_WORKER_PUBSUB_TOPIC_NAME,
  {
    batching: PUBSUB_MAX_BATCHING,
    messageOrdering: true,
  }
)

export const publishPsaQueueMessage = async (
  data: PubSubPsaQueue,
  orderingKey?: string
) => {
  try {
    const dataBuffer = Buffer.from(JSON.stringify(data))
    const message: MessageOptions = {
      data: dataBuffer,
      orderingKey,
    }

    const messageId = await psaWorkerTopic.publishMessage(message)
    logger.log(
      `PubSub::PSA::PUBLISHED::msgId: ${messageId}::batchId: ${data.batchId}::orderKey: ${orderingKey}`,
      {
        partnerId: data.partnerId,
        orderingKey,
      }
    )

    return messageId
  } catch (e) {
    logger.error('PubSub::PSA::Could not publish message: ', e)
    if (orderingKey) psaWorkerTopic.resumePublishing(orderingKey)
    throw e
  }
}

这是接收消息的代码

export const handleMessage = async (messageBody: {
  message: {
    data: string
    messageId: string
    publishTime: string
    attributes?: Record<string, string>
    orderingKey?: string
  }
  subscription: string
}): Promise<void> => {
  const { message } = messageBody

  const startTime = Date.now()
  const payloadDataStr = Buffer.from(message.data, 'base64').toString('utf8')
  if (!isValidJSONString(payloadDataStr)) {
    throw new Error(`Invalid Json: ${payloadDataStr}`)
  }

  const payloadData: PubSubPsaQueue = JSON.parse(payloadDataStr)

  logger.debug(
    `PubSub::PSA::START::msgId: ${message.messageId}::batchId: ${payloadData.batchId}::orderKey: ${message.orderingKey}::time: ${message.publishTime}::Received payload`,
    payloadDataStr
  )
}

export const psaSubscriptionListener = () => {
  psaWorkerSubscription.on('message', handleMessage)
  psaWorkerSubscription.on('error', (err) => {
    logger.error('PubSub::PSA::Unexpected error with GCP subscription:', err)
  })
}

现在我在这行看到 ts 错误

psaWorkerSubscription.on('message', handleMessage)
在handleMessage函数上

TS2769: No overload matches this call.
The last overload gave the following error.
Argument of type
(messageBody: {     message: {         data: string;         messageId: string;         publishTime: string;         attributes?: Record<string, string>;         orderingKey?: string;     };     subscription: string; }) => Promise<void>
is not assignable to parameter of type void
subscription.d.ts(68, 5): The last overload is declared here.

这是我在测试服务器上收到的真实数据的示例 rawData

typescript google-cloud-platform google-cloud-pubsub
1个回答
0
投票

看起来您正在将推送订阅(通常在触发云函数时使用)和拉订阅与尝试处理这两种类型的代码混为一谈。

psaSubscriptionListener
函数中的代码正在使用 Pub/Sub 客户端库启动流式拉取连接以尝试接收消息,该消息将通过
handleMessage
函数进行处理。您的
main.post
方法正在等待请求传入,并尝试直接调用
handleMessage
。如果您尝试处理单个订阅上的所有消息,则应该只执行这两件事之一。

对于使用客户端库的基于拉取的订阅,

handleMessage
不正确。它应该被定义为:

export const handleMessage = async (message: Message): Promise<void> => {

  const startTime = Date.now();
  const payloadDataStr = message.data.toString("utf8");

  const payloadData: Record<string, unknown> = JSON.parse(payloadDataStr);

  console.debug(
    `PubSub::PSA::START::msgId: ${message.id}::batchId: ${payloadData.batchId}::orderKey: ${message.orderingKey}::time: ${message.publishTime}::Received payload`,
    payloadDataStr
  );
};
© www.soinside.com 2019 - 2024. All rights reserved.