Kafkajs `consumer.disconnect()` 未解决

问题描述 投票:0回答:1

我目前开始尝试使用 KafkaJS,用例如下: 我想检索给定时间范围内的所有消息。

因此,我构建了一个服务器操作,该操作创建一个 kafka 消费者,聚合迄今为止的所有消息并过滤所有超出所需时间范围的消息。非常务实,但只是一个起点。

我不明白的是它在达到最新偏移量后没有断开连接的原因。 它正确地消耗了消息,毕竟它无限地停留在

await consumer.disconnect()
(已注释),没有任何错误或超时。 有什么想法这里有什么问题吗?

上下文(

package.json
也在下面):

  • nodejs:22.6.0
  • Nextjs:14.2.6
  • 卢克森:^3.5.0
  • Kafkajs:^2.2.4
import { Kafka, EachMessagePayload, KafkaConfig } from 'kafkajs';
import { DateTime } from 'luxon';

export interface MessageEnvelop {
    topic: string;
    key: string | null;
    value: string | null;
    timestamp: DateTime;
}

const kafkaConfig = {
    clientId: `my-client`,
    brokers: [process.env.BOOTSTRAP_SERVERS!!],
    ssl: {
        // @ts-ignore
        'ssl.endpoint.identification.algorithm': 'https'
    },
    sasl: {
        mechanism: 'plain',
        username: process.env.KAFKA_USERNAME!!,
        password: process.env.KAFKA_PASSWORD!!
    }
} satisfies KafkaConfig;

const client = new Kafka(kafkaConfig);
const adminClient = client.admin();

async function getLatestOffsets(topic: string) {
    await adminClient.connect();
    const offsets = await adminClient.fetchTopicOffsets(topic);
    await adminClient.disconnect();
    return offsets;
}

async function resetOffsets(consumerSuffix: string, topic: string) {
    console.log(`Resetting offsets for topic ${topic}`);

    await adminClient.connect();
    await adminClient.resetOffsets({ groupId: 'my-consumer-group', topic, earliest: true })
    await adminClient.disconnect();
}

async function getMessages(consumerSuffix: string, topic: string, fromTime: DateTime, toTime: DateTime, timestampExtractor: (message: string) => string): Promise<MessageEnvelop[]> {
    await resetOffsets(consumerSuffix, topic);

    const consumer = client.consumer({ groupId: 'my-consumer-group' });

    await consumer.connect();
    await consumer.subscribe({ topic, fromBeginning: true });

    const latestOffsets = await getLatestOffsets(topic);
    const messages: MessageEnvelop[] = [];
    const partitionOffsets: { [partition: number]: number } = {};

    latestOffsets.forEach(({ partition, offset }) => {
        partitionOffsets[partition] = parseInt(offset, 10);
    });
    console.log("Partition offsets: ", partitionOffsets);

    return new Promise(async (resolve, reject) => {
       consumer.run({
            eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
                console.log(`Received message from topic ${topic} on partition ${partition} with offset ${message.offset}`);
                const messageTimestamp = DateTime.fromISO(timestampExtractor(message.value!!.toString()));

                if (messageTimestamp >= fromTime && messageTimestamp <= toTime) {
                    messages.push({
                        topic: topic,
                        key: message.key?.toString() || null,
                        value: message.value?.toString() || null,
                        timestamp: messageTimestamp
                    });
                }

                if (message.offset === (partitionOffsets[partition]-1).toString()) {
                    partitionOffsets[partition] = -1;
                }

                console.log(`Partition ${partition} has offset ${message.offset} and latest offset is ${partitionOffsets[partition]}`);
                console.log(partitionOffsets);
                if (Object.values(partitionOffsets).every(offset => offset === -1)) {
                    console.log(`consumed ${messages.length} messages. Disconnecting consumer ...`);
                    await consumer.disconnect(); // <---- STUCKS HERE
                    console.log("resolving promise ...");
                    resolve(messages);
                }
            }
        })
            .catch((error) => {
                console.error("Error while consuming messages: ", error);
                reject(error);
            });
    });
}

package.json

{
  "name": "zip-debugger",
  "version": "0.1.0",
  "private": true,
  "scripts": {
    "dev": "next dev",
    "build": "next build",
    "start": "next start",
    "lint": "next lint"
  },
  "dependencies": {
    "@emotion/cache": "^11.13.1",
    "@emotion/react": "^11.13.3",
    "@emotion/styled": "^11.13.0",
    "@mui/material-nextjs": "^5.16.6",
    "luxon": "^3.5.0",
    "next": "14.2.6",
    "react": "^18",
    "react-dom": "^18",
    "uuid": "^10.0.0"
  },
  "devDependencies": {
    "@types/luxon": "^3.4.2",
    "@types/node": "^20",
    "@types/react": "^18",
    "@types/react-dom": "^18",
    "@types/uuid": "^10.0.0",
    "eslint": "^8",
    "eslint-config-next": "14.2.6",
    "kafkajs": "^2.2.4",
    "postcss": "^8",
    "tailwindcss": "^3.4.1",
    "typescript": "^5"
  }
}
node.js next.js apache-kafka nextjs14 kafkajs
1个回答
0
投票

拨打

.disconnect()
将等待您的消费者停止。

由于您在

.disconnect()
函数中等待
eachMessage
,因此您实际上永远锁定了自己。

你需要让你的

eachMessage
函数返回(解析)然后
.disconnect()
函数也可以被解析。

最简单的解决方案是在您的情况下不要等待

consumer.disconnect()
。 (但请确保以另一种方式处理承诺)

© www.soinside.com 2019 - 2024. All rights reserved.