我目前开始尝试使用 KafkaJS,用例如下: 我想检索给定时间范围内的所有消息。
因此,我构建了一个服务器操作,该操作创建一个 kafka 消费者,聚合迄今为止的所有消息并过滤所有超出所需时间范围的消息。非常务实,但只是一个起点。
我不明白的是它在达到最新偏移量后没有断开连接的原因。 它正确地消耗了消息,毕竟它无限地停留在
await consumer.disconnect()
(已注释),没有任何错误或超时。
有什么想法这里有什么问题吗?
上下文(
package.json
也在下面):
import { Kafka, EachMessagePayload, KafkaConfig } from 'kafkajs';
import { DateTime } from 'luxon';
export interface MessageEnvelop {
topic: string;
key: string | null;
value: string | null;
timestamp: DateTime;
}
const kafkaConfig = {
clientId: `my-client`,
brokers: [process.env.BOOTSTRAP_SERVERS!!],
ssl: {
// @ts-ignore
'ssl.endpoint.identification.algorithm': 'https'
},
sasl: {
mechanism: 'plain',
username: process.env.KAFKA_USERNAME!!,
password: process.env.KAFKA_PASSWORD!!
}
} satisfies KafkaConfig;
const client = new Kafka(kafkaConfig);
const adminClient = client.admin();
async function getLatestOffsets(topic: string) {
await adminClient.connect();
const offsets = await adminClient.fetchTopicOffsets(topic);
await adminClient.disconnect();
return offsets;
}
async function resetOffsets(consumerSuffix: string, topic: string) {
console.log(`Resetting offsets for topic ${topic}`);
await adminClient.connect();
await adminClient.resetOffsets({ groupId: 'my-consumer-group', topic, earliest: true })
await adminClient.disconnect();
}
async function getMessages(consumerSuffix: string, topic: string, fromTime: DateTime, toTime: DateTime, timestampExtractor: (message: string) => string): Promise<MessageEnvelop[]> {
await resetOffsets(consumerSuffix, topic);
const consumer = client.consumer({ groupId: 'my-consumer-group' });
await consumer.connect();
await consumer.subscribe({ topic, fromBeginning: true });
const latestOffsets = await getLatestOffsets(topic);
const messages: MessageEnvelop[] = [];
const partitionOffsets: { [partition: number]: number } = {};
latestOffsets.forEach(({ partition, offset }) => {
partitionOffsets[partition] = parseInt(offset, 10);
});
console.log("Partition offsets: ", partitionOffsets);
return new Promise(async (resolve, reject) => {
consumer.run({
eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
console.log(`Received message from topic ${topic} on partition ${partition} with offset ${message.offset}`);
const messageTimestamp = DateTime.fromISO(timestampExtractor(message.value!!.toString()));
if (messageTimestamp >= fromTime && messageTimestamp <= toTime) {
messages.push({
topic: topic,
key: message.key?.toString() || null,
value: message.value?.toString() || null,
timestamp: messageTimestamp
});
}
if (message.offset === (partitionOffsets[partition]-1).toString()) {
partitionOffsets[partition] = -1;
}
console.log(`Partition ${partition} has offset ${message.offset} and latest offset is ${partitionOffsets[partition]}`);
console.log(partitionOffsets);
if (Object.values(partitionOffsets).every(offset => offset === -1)) {
console.log(`consumed ${messages.length} messages. Disconnecting consumer ...`);
await consumer.disconnect(); // <---- STUCKS HERE
console.log("resolving promise ...");
resolve(messages);
}
}
})
.catch((error) => {
console.error("Error while consuming messages: ", error);
reject(error);
});
});
}
package.json
{
"name": "zip-debugger",
"version": "0.1.0",
"private": true,
"scripts": {
"dev": "next dev",
"build": "next build",
"start": "next start",
"lint": "next lint"
},
"dependencies": {
"@emotion/cache": "^11.13.1",
"@emotion/react": "^11.13.3",
"@emotion/styled": "^11.13.0",
"@mui/material-nextjs": "^5.16.6",
"luxon": "^3.5.0",
"next": "14.2.6",
"react": "^18",
"react-dom": "^18",
"uuid": "^10.0.0"
},
"devDependencies": {
"@types/luxon": "^3.4.2",
"@types/node": "^20",
"@types/react": "^18",
"@types/react-dom": "^18",
"@types/uuid": "^10.0.0",
"eslint": "^8",
"eslint-config-next": "14.2.6",
"kafkajs": "^2.2.4",
"postcss": "^8",
"tailwindcss": "^3.4.1",
"typescript": "^5"
}
}
拨打
.disconnect()
将等待您的消费者停止。
由于您在
.disconnect()
函数中等待 eachMessage
,因此您实际上永远锁定了自己。
你需要让你的
eachMessage
函数返回(解析)然后.disconnect()
函数也可以被解析。
最简单的解决方案是在您的情况下不要等待
consumer.disconnect()
。 (但请确保以另一种方式处理承诺)