云函数遇到错误后,Pub sub 不会重试

问题描述 投票:0回答:1

我有2个云函数,一个云函数发布消息,另一个云函数(transform.js)从发布的消息中触发。现在在转换云函数中,我尝试将其插入到一个大查询表中,但在某些情况下它会抛出错误,例如表不存在或架构不匹配,在这种情况下,发布者应该在之后重新发送数据有一段时间。

问题是发布者没有重新发送数据,我什至更改了重试策略以在指数退避延迟后重试

enter image description here

改造云功能

const admin = require("firebase-admin");
//const serviceAccount = require("./path-firebase-adminsdk-uszgp-35748eb111.json");
const { PubSub, Schema } = require("@google-cloud/pubsub");
const { BigQuery } = require("@google-cloud/bigquery");
const functions = require('firebase-functions');


const pubsub = new PubSub({
  projectId: "abc"
})

let transformedObject = {}
let initialFirestoreDatatype={}
let fieldsToTransform = []
var rootCollection=''
var docData;
var documentId;
const db = admin.firestore()

module.exports.transform = functions.pubsub.topic('posts-fs-to-bigquery-1').onPublish( async (message, context) => {
  // Handle the incoming message

  //console.log('Raw message data:', message.data.toString());
try{
 const decodedData = Buffer.from(message.data, 'base64').toString('utf-8');
 //console.log('decoded data:', decodedData);
  const data = JSON.parse(decodedData);
  console.log('Received message:', data);
  rootCollection = data['rootCollection'];

  docData = reconstructData(data['docData'])
  console.log("data after processing",docData);

  documentId=data['documentId'];

  if(data['postId'].includes('/')){
  
    
      data['postId']=`${rootCollection}/`+data['postId'];
    

  let parts = data['postId'].split("/");
  let collection = parts.slice(0, -1).join("/");
  let document = parts.slice(-1)[0];
  const subcollection = parts.slice(-2, -1)[0];

    await fstoBigQuery(collection,document, subcollection, true)
  
  }else{
  
    await fstoBigQuery(data['rootCollection'],data['postId'], '', false)
  
  }
}catch(e){
throw new Error(e)
}



});

async function fstoBigQuery(collection,postId, subcoll ,isSubCollection) {
//this is where error will occur and I want pub to republish the message
try{
const [table] = await bigquery.dataset("firestore_collections").table(`${rootCollection}`).get();
          
}catch(e){
throw new Error(e)
}
}

node.js firebase google-bigquery google-cloud-pubsub publish-subscribe
1个回答
0
投票

我猜你混淆了一些概念。

当您使用异步发布/订阅系统时,发布者通常不需要控制订阅者端的交付和成功处理(如果您愿意,您可以深入了解至少一次和最多一次)。 发布者(通常)进行发布 - 就是这样。

在订阅者方面,有一些机制 - 确认和重试。 如果发生某些异常,则不应自动确认消息已处理。

接下来您可以做的是配置重试策略。 请点击链接

© www.soinside.com 2019 - 2024. All rights reserved.