我正在开发一个应用程序,需要将记录发送到同一 Kafka 集群中的不同主题。我已经探索了两种方法来实现这一目标,但我不确定它们对性能的影响。
我目前在应用程序中使用单个 Kafka 模板,在发送记录时动态指定主题。这是我的代码的简化版本:
class ProducerService {
@Autowired
private KafkaTemplate<GenericRecord, GenericRecord> kafkaTemplate;
public void send(String topic, GenericRecord key, GenericRecord value) {
ListenableFuture<SendResult<GenericRecord, GenericRecord>> future = kafkaTemplate.send(topic, key, value);
}
}
这种方法允许我通过动态传递主题来为多个主题重用相同的 Kafka 模板。但是,我担心它对性能的影响。
@Configuration
public class KafkaConfig {
// Define topics
@Value("${kafka.topic.first}")
private String firstTopic;
@Value("${kafka.topic.second}")
private String secondTopic;
@Bean(name = "firstKafkaTemplate")
public KafkaTemplate<GenericRecord, GenericRecord> firstKafkaTemplate(ProducerFactory<GenericRecord, GenericRecord> defaultKafkaProducerFactory) {
KafkaTemplate<GenericRecord, GenericRecord> kafkaTemplate = new KafkaTemplate<>(defaultKafkaProducerFactory);
kafkaTemplate.setDefaultTopic(firstTopic);
return kafkaTemplate;
}
// Second Kafka template bean definition follows similarly
}
class ProducerService {
@Autowired
@Qualifier("firstKafkaTemplate")
private KafkaTemplate<GenericRecord, GenericRecord> firstTopicTemplate;
// Second Kafka template injection follows similarly
public void send(String topic, GenericRecord key, GenericRecord value) {
ListenableFuture<SendResult<GenericRecord, GenericRecord>> future;
if ("first".equalsIgnoreCase(topic)) {
future = firstTopicTemplate.sendDefault(key, value);
} else if ("second".equalsIgnoreCase(topic)) {
future = secondTopicTemplate.sendDefault(key, value);
} else {
throw new RuntimeException("Topic is not configured");
}
}
}
通过此设置,每个主题都有自己专用的 Kafka 模板。与动态主题方法相比,这种方法会产生更好的性能吗?
我了解Kafka内部执行批处理并通过单独的线程将批次发送到Kafka。考虑到这一点,哪种方法在性能方面会更有效?或者两种方法之间的性能差异可以忽略不计?
我根据吞吐量回答我自己的问题。当我处理记录时,我遇到了超时问题。
spring.kafka.producer.properties.[linger.ms]=100
spring.kafka.producer.properties.[batch.size]=100000
spring.kafka.producer.properties.[request.timeout.ms]=30000
spring.kafka.producer.properties.[delivery.timeout.ms]=200000
为此,当您创建生产者工厂时,您必须将以下 setProducerPerThread 启用为 True。
我添加了一个TaskExecutor来控制生产者的数量,因为生产者的数量=线程数
@Configuration
Public class Conf{
@Bean("kafkaTaskExecutor")
public TaskExecutor getKafkaAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(15);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setThreadNamePrefix("Kafka-Async-");
return executor;
}
@Bean
public KafkaTemplate<GenericRecord, GenericRecord> kafkaTemplate(ProducerFactory<GenericRecord, GenericRecord> producerFactory) {
if (producerFactory instanceof DefaultKafkaProducerFactory<GenericRecord, GenericRecord> defaultFactory) {
defaultFactory.setProducerPerThread(true);
}
return new KafkaTemplate<>(producerFactory);
}
}
不要更改您的 Kafka 代码。就让它一样吧。我们将创建一个新层以使其正常工作。
class AsyncProducer{
@Autowired
private KafkaProducer producer;
@Value("${topic.name}")
private String topic;
@Autowired
@Qualifier("kafkaTaskExecutor")
private TaskExecutor taskExecutor;
public void sendAsync(GenericRecord key, GenericRecord value){
CompletableFuture.completeFuture(value).thenAcceptAsync( val-> producer.send(topic,key,value), taskExecutor);
}
}
通过上述设置,最初将有 5 个生产者开始发送记录,当负载较高时,将增加到 15 个生产者