我有2个云函数,一个云函数发布消息,另一个云函数(transform.js)从发布的消息中触发。现在在转换云函数中,我尝试将其插入到一个大查询表中,但在某些情况下它会抛出错误,例如表不存在或架构不匹配,在这种情况下,发布者应该在之后重新发送数据有一段时间。
问题是发布者没有重新发送数据,我什至更改了重试策略以在指数退避延迟后重试
改造云功能
const admin = require("firebase-admin");
//const serviceAccount = require("./path-firebase-adminsdk-uszgp-35748eb111.json");
const { PubSub, Schema } = require("@google-cloud/pubsub");
const { BigQuery } = require("@google-cloud/bigquery");
const functions = require('firebase-functions');
const pubsub = new PubSub({
projectId: "abc"
})
let transformedObject = {}
let initialFirestoreDatatype={}
let fieldsToTransform = []
var rootCollection=''
var docData;
var documentId;
const db = admin.firestore()
module.exports.transform = functions.pubsub.topic('posts-fs-to-bigquery-1').onPublish( async (message, context) => {
// Handle the incoming message
//console.log('Raw message data:', message.data.toString());
try{
const decodedData = Buffer.from(message.data, 'base64').toString('utf-8');
//console.log('decoded data:', decodedData);
const data = JSON.parse(decodedData);
console.log('Received message:', data);
rootCollection = data['rootCollection'];
docData = reconstructData(data['docData'])
console.log("data after processing",docData);
documentId=data['documentId'];
if(data['postId'].includes('/')){
data['postId']=`${rootCollection}/`+data['postId'];
let parts = data['postId'].split("/");
let collection = parts.slice(0, -1).join("/");
let document = parts.slice(-1)[0];
const subcollection = parts.slice(-2, -1)[0];
await fstoBigQuery(collection,document, subcollection, true)
}else{
await fstoBigQuery(data['rootCollection'],data['postId'], '', false)
}
}catch(e){
throw new Error(e)
}
});
async function fstoBigQuery(collection,postId, subcoll ,isSubCollection) {
//this is where error will occur and I want pub to republish the message
try{
const [table] = await bigquery.dataset("firestore_collections").table(`${rootCollection}`).get();
}catch(e){
throw new Error(e)
}
}
我猜你混淆了一些概念。
当您使用异步发布/订阅系统时,发布者通常不需要控制订阅者端的交付和成功处理(如果您愿意,您可以深入了解至少一次和最多一次)。 发布者(通常)进行发布 - 就是这样。
在订阅者方面,有一些机制 - 确认和重试。 如果发生某些异常,则不应自动确认消息已处理。
接下来您可以做的是配置重试策略。 请点击链接