我正在开发一个使用 Nextjs/redis 的测试应用程序(目前)。 我的 sse 端点工作(有点)正确。当客户端连接时,它会检索快照并将其发回,当推送更新(使用模拟路由)时,它应该发送带有有效负载的更新事件(它将其发送给第二个用户)
模拟/route.ts
import { NextRequest } from 'next/server';
import Service from '@/models/Service';
import { randomUUID } from 'node:crypto';
import redis from '@/lib/redis';
import { sendUpdateToClients } from '@/lib/sse';
import EventType from '@/models/EventType';
const topic = 'services';
export async function POST(req: NextRequest) {
console.log('<- [POST] on /api/services/mock');
console.log('Adding a mock service');
const u = randomUUID().toString();
const service: Service = {
id: u,
name: `${u}.service`,
active: true
};
try {
const cachedData = await redis.get(topic);
let current: Service[] = [];
if (!cachedData) {
console.log('No cached data, initializing empty array.');
} else {
current = JSON.parse(cachedData);
console.log('Parsed cached data:', current);
}
current.push(service);
await redis.set(topic, JSON.stringify(current));
console.log('Data saved successfully.');
sendUpdateToClients(topic, {
eventType: EventType.UPDATE,
payload: service
});
console.log('Clients updated with new service:', service);
} catch (error) {
console.error('Error handling POST /api/services/mock:', error);
}
return new Response(null, {
status: 204
});
}
sse/路线
import type { NextRequest } from 'next/server';
import { addClient, removeClient } from '@/lib/sse';
import redis from '@/lib/redis';
import EventType from '@/models/EventType';
import Event from '@/models/Event';
import Service from '@/models/Service';
export const dynamic = 'force-dynamic';
const topic = 'services';
export async function GET(req: NextRequest) {
console.log('<- [GET] on /api/services/sse');
const encoder = new TextEncoder();
const { readable, writable } = new TransformStream();
const writer = writable.getWriter();
function handleAbort() {
removeClient(topic, writer);
writer.close().then(() => console.log('writer closed'));
}
addClient(topic, writer);
const cachedData = await redis.get(topic);
let services: Service[];
if (cachedData) {
console.log('Cached data found');
services = JSON.parse(cachedData);
} else {
console.log('Cached data not found');
services = [];
}
const payload: Event<Service[]> = {
eventType: EventType.SNAPSHOT,
payload: services
};
writer
.write(encoder.encode(`data: ${JSON.stringify(payload)}\n\n`))
.then(() => console.log('Snapshot sent to client'))
.catch(err => console.error('Error sending snapshot to client:', err));
req.signal.addEventListener('abort', handleAbort);
return new Response(readable, {
headers: {
'Content-Type': 'text/event-stream; charset=utf-8',
Connection: 'keep-alive',
'Cache-Control': 'no-cache, no-transform',
'Content-Encoding': 'none'
}
});
}
lib/sse
import type Event from '@/models/Event';
interface ClientsMap {
[key: string]: WritableStreamDefaultWriter<any>[];
}
const clients: ClientsMap = {};
export const addClient = (
topic: string,
writer: WritableStreamDefaultWriter<any>
) => {
if (!clients[topic]) {
clients[topic] = [];
}
clients[topic].push(writer);
console.log(
`Client added to topic ${topic}. Total clients for ${topic}: ${clients[topic].length}`
);
console.log(JSON.stringify(clients));
};
export const removeClient = (
topic: string,
writer: WritableStreamDefaultWriter<any>
) => {
if (clients[topic]) {
const index = clients[topic].indexOf(writer);
if (index !== -1) {
clients[topic].splice(index, 1);
}
if (clients[topic].length === 0) {
delete clients[topic];
}
}
console.log(
`Client removed from topic ${topic}. Remaining clients for ${topic}: ${clients[topic] ? clients[topic].length : 0}`
);
};
export const sendUpdateToClients = (topic: string, payload: Event<any>) => {
console.log(JSON.stringify(clients));
// todo: why does it think theres no clients on first connect
// or when theres no data? meaning should return []
// even tho there is at least one? (connected on sse)
//is it somehow closing connection or something? but it's not closing the status (postman req is still running)
// run:
// 1. open 1 sse , open 2nd one
// 2. send mock post -> nothing received on either
// 3. close 2nd sse
// 4. send mock post -> received update on 2nd, but not on first
// 5. close 1st sse
// 6. send mock post -> received on both
if (!clients[topic]) {
console.log(`No clients for topic ${topic}`);
return;
}
console.log(
`Sending update to ${clients[topic].length} clients on topic ${topic}`
);
clients[topic].forEach(client => {
client
.write(
new TextEncoder().encode(`data: ${JSON.stringify(payload)}\n\n`)
)
.then(() => console.log('Update sent to client'))
.catch(err =>
console.error('Error sending update to client:', err)
);
});
};
它会这样做,但前提是至少有一个客户在场。 在下一个服务器日志中我可以看到
✓ Ready in 1120ms
docker-svm-web-1 | ✓ Compiled /api/services/sse in 172ms (96 modules)
docker-svm-web-1 | <- [GET] on /api/services/sse
docker-svm-web-1 | Client added to topic services. Total clients for services: 1
docker-svm-web-1 | {"services":[{}]}
docker-svm-redis-1 | 1:M 19 May 2024 14:05:06.244 * DB saved on disk
docker-svm-web-1 | Redis data flushed: OK
docker-svm-web-1 | Cached data not found
docker-svm-web-1 | Snapshot sent to client
docker-svm-web-1 | ✓ Compiled /api/services/mock in 69ms (99 modules)
docker-svm-web-1 | <- [POST] on /api/services/mock
docker-svm-web-1 | Adding a mock service
docker-svm-redis-1 | 1:M 19 May 2024 14:05:19.834 * DB saved on disk
docker-svm-web-1 | Redis data flushed: OK
docker-svm-web-1 | No cached data, initializing empty array.
docker-svm-web-1 | Data saved successfully.
docker-svm-web-1 | {}
docker-svm-web-1 | No clients for topic services
docker-svm-web-1 | Clients updated with new service: {
docker-svm-web-1 | id: '7f90d7db-0a5b-4d2e-ab72-ffb18587c125',
docker-svm-web-1 | name: '7f90d7db-0a5b-4d2e-ab72-ffb18587c125.service',
docker-svm-web-1 | active: true
docker-svm-web-1 | }
docker-svm-web-1 | POST /api/services/mock 204 in 113ms
即使添加了客户端,也没有客户端。 没有客户端被关闭/邮递员/curl 按预期运行
“数据保存成功”之后的以下行是客户端转储
完整仓库:https://github.com/kamillcodes/docker-svm
你知道为什么会发生这种情况吗?绝对是我没有看到的简单东西
尝试添加大量日志,对一些虚拟客户端进行硬编码,试图找出第一个客户端未正确填充的原因。
作为一种临时解决方案,我将所有逻辑移至一条路由,这样客户端就可以按应有的方式进行维护,尽管逻辑分离将受到赞赏。