我想从Kafka获取数据,该方法成功获取记录但无法传递给变量。这是我的代码
public void subscribeFromKafka() throws Exception {
List<String> result = new ArrayList<>();
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVERS);
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
System.out.println("Receiving from Kafka . . .");
consumer.subscribe(Arrays.asList(KAFKA_TOPIC_URL));
// Set a timeout for the loop, or you can use another mechanism to break out of the loop.
long endTimeMillis = System.currentTimeMillis() + TIMEOUT_MILLIS;
while (System.currentTimeMillis() < endTimeMillis) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received data: " + record.value());
result.add(record.value());
}
}
System.out.println(result + " this is the result from Kafka.");
} catch (Exception e) {
throw new Exception("Failed to subscribe from Kafka", e);
}
}
如何解决?
我尝试将来自kafka消费者的记录添加到字符串的数组列表中,但记录无法添加到数组列表中
看起来您的代码正确订阅了 Kafka 主题、接收记录并将值添加到 result 列表中。如果您遇到无法在 try 块之外访问结果的问题,可能是由于 result 变量的范围所致。
在您的代码中,您已在 subscribeFromKafka 方法中声明了 result 列表。这意味着result列表只能在该方法的范围内访问。如果您想在方法之外使用 result 列表,您应该将其声明为类级别或实例变量。