我在通过消费者从 kafka 主题获取 200 万条记录时遇到问题。 我的要求是获取记录并将这些记录存储到映射中,然后将该映射返回给其他方法,以便它们可以处理最终输出并将其写入文件。
运行方法:
public Map<String, SaleRecord> run() throws NullPointerException {
kafkaConfigurations();
Map<String, SaleRecord> recordMap = new HashMap<>();
LOG.info("Started consuming kafka messages....");
mapper.enable(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT);
int i = 0;
// int recordLimit = 1000; // Set the limit to the desired number of records
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
LOG.debug(records.count() + "^^^^^^^size of records");
/* if (records.isEmpty()) {
LOG.info("No more records to consume. Exiting...");
break;
}*/
for (ConsumerRecord<String, String> record : records) {
JsonNode jsonData = null;
try {
jsonData = mapper.readTree(record.value());
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
// Validate JSON data
if (!validateJsonData(jsonData)) {
continue; // Skip invalid records
}
SaleRecord priceRecords = mapper.convertValue(jsonData, SaleRecord.class);
if (priceRecords != null && isValidSaleRecord(priceRecords)) {
recordMap.put(priceRecords.getInvoiceId(), priceRecords);
i++;
}
}
LOG.info("Done consuming kafka messages after {} iterations", i);
return recordMap;
}
}
那么,如果我必须在更短的时间内处理 200 万条记录,那么 while 循环的终止条件是什么?
我尝试处理 100k 条记录,其终端条件为 100000。 但我每天必须处理 200 万个,所以它需要时间并且进入无限循环。
我对kafka没有太多专业知识。
如果在运行之前主题中已经有 2M 记录可用,您可以启动多个消费者,每个消费者作为一个线程运行,以在更短的时间内消费记录。
您可以通过迭代所有分区来获取当前偏移量和最新偏移量之间的差值来检查某个主题上可用的消息数量,只有当该主题中有 2M 条可用记录时才开始消费。这样,您可以将偏移量作为终端条件并关闭命中这些偏移量。