第一个客户的SSE更新问题

问题描述 投票:0回答:1

我正在开发一个使用 Nextjs/redis 的测试应用程序(目前)。 我的 sse 端点工作(有点)正确。当客户端连接时,它会检索快照并将其发回,当推送更新(使用模拟路由)时,它应该发送带有有效负载的更新事件(它将其发送给第二个用户)

模拟/route.ts

import { NextRequest } from 'next/server';
import Service from '@/models/Service';
import { randomUUID } from 'node:crypto';
import redis from '@/lib/redis';
import { sendUpdateToClients } from '@/lib/sse';
import EventType from '@/models/EventType';

const topic = 'services';

export async function POST(req: NextRequest) {
    console.log('<- [POST] on /api/services/mock');
    console.log('Adding a mock service');

    const u = randomUUID().toString();
    const service: Service = {
        id: u,
        name: `${u}.service`,
        active: true
    };

    try {
        const cachedData = await redis.get(topic);
        let current: Service[] = [];

        if (!cachedData) {
            console.log('No cached data, initializing empty array.');
        } else {
            current = JSON.parse(cachedData);
            console.log('Parsed cached data:', current);
        }

        current.push(service);
        await redis.set(topic, JSON.stringify(current));
        console.log('Data saved successfully.');

        sendUpdateToClients(topic, {
            eventType: EventType.UPDATE,
            payload: service
        });
        console.log('Clients updated with new service:', service);
    } catch (error) {
        console.error('Error handling POST /api/services/mock:', error);
    }

    return new Response(null, {
        status: 204
    });
}

sse/路线

import type { NextRequest } from 'next/server';
import { addClient, removeClient } from '@/lib/sse';
import redis from '@/lib/redis';
import EventType from '@/models/EventType';
import Event from '@/models/Event';
import Service from '@/models/Service';

export const dynamic = 'force-dynamic';
const topic = 'services';

export async function GET(req: NextRequest) {
    console.log('<- [GET] on /api/services/sse');

    const encoder = new TextEncoder();
    const { readable, writable } = new TransformStream();
    const writer = writable.getWriter();

    function handleAbort() {
        removeClient(topic, writer);
        writer.close().then(() => console.log('writer closed'));
    }

    addClient(topic, writer);
    const cachedData = await redis.get(topic);

    let services: Service[];
    if (cachedData) {
        console.log('Cached data found');
        services = JSON.parse(cachedData);
    } else {
        console.log('Cached data not found');
        services = [];
    }

    const payload: Event<Service[]> = {
        eventType: EventType.SNAPSHOT,
        payload: services
    };

    writer
        .write(encoder.encode(`data: ${JSON.stringify(payload)}\n\n`))
        .then(() => console.log('Snapshot sent to client'))
        .catch(err => console.error('Error sending snapshot to client:', err));

    req.signal.addEventListener('abort', handleAbort);

    return new Response(readable, {
        headers: {
            'Content-Type': 'text/event-stream; charset=utf-8',
            Connection: 'keep-alive',
            'Cache-Control': 'no-cache, no-transform',
            'Content-Encoding': 'none'
        }
    });
}

lib/sse

import type Event from '@/models/Event';

interface ClientsMap {
    [key: string]: WritableStreamDefaultWriter<any>[];
}

const clients: ClientsMap = {};

export const addClient = (
    topic: string,
    writer: WritableStreamDefaultWriter<any>
) => {
    if (!clients[topic]) {
        clients[topic] = [];
    }
    clients[topic].push(writer);

    console.log(
        `Client added to topic ${topic}. Total clients for ${topic}: ${clients[topic].length}`
    );
    console.log(JSON.stringify(clients));
};

export const removeClient = (
    topic: string,
    writer: WritableStreamDefaultWriter<any>
) => {
    if (clients[topic]) {
        const index = clients[topic].indexOf(writer);
        if (index !== -1) {
            clients[topic].splice(index, 1);
        }

        if (clients[topic].length === 0) {
            delete clients[topic];
        }
    }

    console.log(
        `Client removed from topic ${topic}. Remaining clients for ${topic}: ${clients[topic] ? clients[topic].length : 0}`
    );
};

export const sendUpdateToClients = (topic: string, payload: Event<any>) => {
    console.log(JSON.stringify(clients));
    // todo: why does it think theres no clients on first connect
    // or when theres no data? meaning should return []
    // even tho there is at least one? (connected on sse)

    //is it somehow closing connection or something? but it's not closing the status (postman req is still running)
    // run:
    // 1. open 1 sse , open 2nd one
    // 2. send mock post -> nothing received on either
    // 3. close 2nd sse
    // 4. send mock post -> received update on 2nd, but not on first
    // 5. close 1st sse
    // 6. send mock post -> received on both
    if (!clients[topic]) {
        console.log(`No clients for topic ${topic}`);
        return;
    }

    console.log(
        `Sending update to ${clients[topic].length} clients on topic ${topic}`
    );

    clients[topic].forEach(client => {
        client
            .write(
                new TextEncoder().encode(`data: ${JSON.stringify(payload)}\n\n`)
            )
            .then(() => console.log('Update sent to client'))
            .catch(err =>
                console.error('Error sending update to client:', err)
            );
    });
};

它会这样做,但前提是至少有一个客户在场。 在下一个服务器日志中我可以看到

✓ Ready in 1120ms
docker-svm-web-1    |  ✓ Compiled /api/services/sse in 172ms (96 modules)
docker-svm-web-1    | <- [GET] on /api/services/sse
docker-svm-web-1    | Client added to topic services. Total clients for services: 1
docker-svm-web-1    | {"services":[{}]}
docker-svm-redis-1  | 1:M 19 May 2024 14:05:06.244 * DB saved on disk
docker-svm-web-1    | Redis data flushed: OK
docker-svm-web-1    | Cached data not found
docker-svm-web-1    | Snapshot sent to client
docker-svm-web-1    |  ✓ Compiled /api/services/mock in 69ms (99 modules)
docker-svm-web-1    | <- [POST] on /api/services/mock
docker-svm-web-1    | Adding a mock service
docker-svm-redis-1  | 1:M 19 May 2024 14:05:19.834 * DB saved on disk
docker-svm-web-1    | Redis data flushed: OK
docker-svm-web-1    | No cached data, initializing empty array.
docker-svm-web-1    | Data saved successfully.
docker-svm-web-1    | {}
docker-svm-web-1    | No clients for topic services
docker-svm-web-1    | Clients updated with new service: {
docker-svm-web-1    |   id: '7f90d7db-0a5b-4d2e-ab72-ffb18587c125',
docker-svm-web-1    |   name: '7f90d7db-0a5b-4d2e-ab72-ffb18587c125.service',
docker-svm-web-1    |   active: true
docker-svm-web-1    | }
docker-svm-web-1    |  POST /api/services/mock 204 in 113ms

即使添加了客户端,也没有客户端。 没有客户端被关闭/邮递员/curl 按预期运行

“数据保存成功”之后的以下行是客户端转储

完整仓库:https://github.com/kamillcodes/docker-svm

你知道为什么会发生这种情况吗?绝对是我没有看到的简单东西

尝试添加大量日志,对一些虚拟客户端进行硬编码,试图找出第一个客户端未正确填充的原因。

javascript typescript next.js redis server-sent-events
1个回答
0
投票

作为一种临时解决方案,我将所有逻辑移至一条路由,这样客户端就可以按应有的方式进行维护,尽管逻辑分离将受到赞赏。

© www.soinside.com 2019 - 2024. All rights reserved.