同时使用异步任务和批量更新

问题描述 投票:0回答:1

我有一个连接到 Firestore 的 React Native 应用程序。我还使用 Cloud Functions 向每个用户发送通知。下面的代码可以工作,但我最近遇到了超时错误(即使有 540 秒超时和 512MB 内存):

async function sendNotifications() {
    console.log("Sending notifications for recommended events...");

    // Fetch all events once
    const eventsRef = admin.firestore().collection("Events");
    const eventsSnapshot = await eventsRef
        .where('Start', '>=', new Date())
        .get();

    if (eventsSnapshot.empty) {
        console.log("No upcoming events found.");
        return;
    }
    const allEvents = eventsSnapshot.docs.map(doc => ({ ...doc.data(), docId: doc.id }));

    // Fetch all users
    const usersRef = admin.firestore().collection("Users");
    const usersSnapshot = await usersRef.get();

    let reset = false;

    for (const userDoc of usersSnapshot.docs) {
        try {
            const userData = userDoc.data();
            const { fcmToken, preferences, language = "en", sentNotifications = [] } = userData;

            if (!fcmToken) continue; // Skip users without FCM token
            const userPreferredTags = preferences ? preferences : [];
            let eventToSend = findEventForUser(allEvents, userPreferredTags, sentNotifications);

            // Fallback logic: No matching events, or user has no preferences
            if (!eventToSend) {
                eventToSend = findBangerEvent(allEvents, sentNotifications);
            }
            if (!eventToSend && sentNotifications.length > 0) {
                console.log(`No new events to suggest, resetting`);
                eventToSend = sentNotifications[sentNotifications.length - 1];
                reset = true;
            }

            if (!eventToSend) {
                console.log(`No events to send for user ${userDoc.id}. Skipping.`);
                continue;
            }

            const notificationPayload = createNotificationPayload(
                eventToSend,
                fcmToken,
                language
            );
            await admin.messaging().send(notificationPayload);
            console.log(`Successfully sent message to user ${userDoc.id}, ${notificationPayload.notification.title}`);

            const updatedNotifications = updateSentNotifications(eventToSend, reset ? [] : sentNotifications);
            await userDoc.ref.update({ sentNotifications: updatedNotifications });

        } catch (error) {
            console.error(`Error processing user ${userDoc.id}:`, error);
        }
    }

    console.log("Notifications sent successfully.");
}

因此,我转向了异步函数,以便能够同时处理用户,但是,如果我理解正确的话,使用 sendEach() 批量更新 Firestore 的 userDoc 和 FCM 消息也是一个好习惯。

所以我使用以下代码在 Firebase 模拟器上进行了尝试:

async function sendNotifications() {
    console.log("Sending notifications for recommended events...");

    // Fetch all events once
    const eventsRef = admin.firestore().collection("Events");
    const eventsSnapshot = await eventsRef
        .where('Start', '>=', new Date())
        .get();

    if (eventsSnapshot.empty) {
        console.log("No upcoming events found.");
        return;
    }
    const allEvents = eventsSnapshot.docs.map(doc => ({ ...doc.data(), docId: doc.id }));

    // Fetch all users
    const usersRef = admin.firestore().collection("Users");
    const usersSnapshot = await usersRef.get();

    const usersToProcess = usersSnapshot.docs.filter(userDoc => {
        const userData = userDoc.data();
        return true; // Include all users with an FCM token (set to true in emulator)
    });

    console.log(`Processing ${usersToProcess.length} users...`);

    const notifications = [];
    let batch = admin.firestore().batch();
    let batchUserCount = 0; // Track the number of users in the current batch

    const userPromises = usersToProcess.map(async (userDoc) => {
        const userData = userDoc.data();
        const { fcmToken, preferences, language = "en", sentNotifications = [] } = userData;

        const userPreferredTags = preferences || [];
        let eventToSend = findEventForUser(allEvents, userPreferredTags, sentNotifications);

        // Fallback logic: No matching events
        if (!eventToSend) {
            eventToSend = findBangerEvent(allEvents, sentNotifications) ||
                sentNotifications[sentNotifications.length - 1];
        }

        if (!eventToSend) {
            console.log(`No events to send for user ${userDoc.id}. Skipping.`);
            return;
        }

        const notificationPayload = createNotificationPayload(eventToSend, fcmToken ? fcmToken : "ezeazea", language);
        notifications.push(notificationPayload);

        const updatedNotifications = updateSentNotifications(eventToSend, sentNotifications);
        const dataSize = JSON.stringify({ sentNotifications: updatedNotifications }).length;
        console.log(`Estimated size of update: ${dataSize} bytes`);

        batch.update(userDoc.ref, { sentNotifications: updatedNotifications });
        batchUserCount++;

        // If the batch has 100 operations, commit the batch and start a new one
        if (batchUserCount === 100) {
            console.log("Committing Firestore batch...");
            await batch.commit(); // Commit the batch
            batch = admin.firestore().batch(); // Create a new batch
            batchUserCount = 0; // Reset the batch user count
        }
    });

    await Promise.all(userPromises);

    // Commit remaining updates if any users were left in the batch
    if (batchUserCount > 0) {
        console.log("Committing remaining Firestore batch...");
        await batch.commit();
    }

    // Send notifications in bulk (in batches of 100)
    console.log("Sending notifications in bulk...");
    while (notifications.length) {
        const batchNotifications = notifications.splice(0, 100); // Firebase max batch size for FCM
        try {
            await admin.messaging().sendEach(batchNotifications);
        } catch (error) {
            console.error("Error sending notifications:", error);
            // Handle the error as necessary
        }
    }

    console.log("Notifications sent successfully.");
}

但是用户的异步处理似乎与批处理发生冲突,因为我在第二次提交调用时收到以下错误:

⚠ 功能:错误:无法修改已提交的 WriteBatch。

javascript node.js firebase google-cloud-firestore google-cloud-functions
1个回答
0
投票

第一段代码的问题是您在

await
循环中使用
for
,这意味着来自 Firestore 的负载和 FCM 发送调用都是序列化的。因此,直到上一个加载/发送完成后,下一个加载/发送才会开始。

在某些情况下,此流程是必要的,例如,当下一个加载/发送操作取决于上一个操作的结果时。但在你的场景中似乎并非如此。因此,最好尽快处理加载/发送调用,然后等待所有调用完成。

代码应该是这样的:

async function sendNotifications() {
    console.log("Sending notifications for recommended events...");

    // Fetch all events once
    const eventsRef = admin.firestore().collection("Events");
    const eventsSnapshot = await eventsRef
        .where('Start', '>=', new Date())
        .get();

    if (eventsSnapshot.empty) {
        console.log("No upcoming events found.");
        return;
    }
    const allEvents = eventsSnapshot.docs.map(doc => ({ ...doc.data(), docId: doc.id }));

    // Fetch all users
    const usersRef = admin.firestore().collection("Users");
    const usersSnapshot = await usersRef.get();

    let reset = false;
    let promises = []; // 👈 This is where we'll track all pending load/send calls

    for (const userDoc of usersSnapshot.docs) {
        const userData = userDoc.data();
        const { fcmToken, preferences, language = "en", sentNotifications = [] } = userData;

        if (!fcmToken) continue; // Skip users without FCM token
        const userPreferredTags = preferences ? preferences : [];
        let eventToSend = findEventForUser(allEvents, userPreferredTags, sentNotifications);

        // Fallback logic: No matching events, or user has no preferences
        if (!eventToSend) {
            eventToSend = findBangerEvent(allEvents, sentNotifications);
        }
        if (!eventToSend && sentNotifications.length > 0) {
            console.log(`No new events to suggest, resetting`);
            eventToSend = sentNotifications[sentNotifications.length - 1];
            reset = true;
        }

        if (!eventToSend) {
            console.log(`No events to send for user ${userDoc.id}. Skipping.`);
            continue;
        }

        const notificationPayload = createNotificationPayload(
            eventToSend,
            fcmToken,
            language
        );
        let promise = admin.messaging().send(notificationPayload).then(() => { // 👈 Remove await here
            console.log(`Successfully sent message to user ${userDoc.id}, ${notificationPayload.notification.title}`);

            const updatedNotifications = updateSentNotifications(eventToSend, reset ? [] : sentNotifications);
            return userDoc.ref.update({ sentNotifications: updatedNotifications }); 👈 Remove await here
        }).catch(((error) => {
          console.error(`Error processing user ${userDoc.id}:`, error);
        })

        promises.push(promise); // 👈 Add async operation to the list

    }

    await Promise.all(promises); // 👈 Wait for all async operations in one go

    console.log("Notifications sent successfully.");
}

我标记了我在上面的代码中所做的更改,但本质上,此代码为所有异步操作创建了一个承诺列表,然后等待该列表,直到所有操作都完成。这允许并行执行操作而不是顺序执行,这通常会带来巨大的吞吐量增益。

请注意,每个系统可以处理/允许的待处理异步操作数量可能仍然存在限制。如果遇到这些情况,最好在代码中引入批处理系统(因此不要使用 Firebase 批处理 API),其中挂起的异步操作有一定的最大数量。但我通常不会在需要时引入这样的批处理系统,并且可能上述具有并行异步操作的解决方案已经为您带来了相当大的余地。

© www.soinside.com 2019 - 2024. All rights reserved.