当我使用
kafka-topics.bat
创建主题时,我这样做:
kafka-topics.bat --bootstrap-server %host%:%port% --create --topic %%t --partitions %partitions% --replication-factor %replication_factor% --config max.message.bytes=%max_message_bytes% --config min.insync.replicas=%min_insync_replicas% --config tention.ms=%retention_ms% --command-config client.properties
我正在尝试使用 2.3 中引入的 Spring Kafka TopicBuilder
转换上述内容。但我不知道如何转换
command-config
选项。可以吗?
其余部分根据文档很简单:
@Bean
public NewTopic topic(){
return TopicBuilder.name("topic-name")
.partitions(x)
.replicas(x)
.config(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "xxx")
.config(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "xxx")
.config(TopicConfig.RETENTION_MS_CONFIG, "xxx")
.build();
}
终于找到了!如果它对某人有帮助,这是解决方案:
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
configs.put("security.protocol", "SASL_PLAINTEXT");
configs.put("sasl.mechanism", "PLAIN");
configs.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=username " +
"password=password;");
return new KafkaAdmin(configs);
}
就我而言,我还必须再次将安全属性传递给配置类。 在运行时,application.properties 中的以下条目将被忽略:
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.security.protocol=SASL_SSL
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<cluster-api-key>' password='<cluster-api-secret>';
所以我必须在配置类中明确添加它们:
@Configuration
public class KafkaProducerConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Value("${spring.kafka.properties.sasl.mechanism}")
private String saslMechanism;
@Value("${spring.kafka.properties.sasl.jaas.config}")
private String salsJaasConfig;
@Value("${spring.kafka.properties.security.protocol}")
private String securityProtocol;
@Bean
public ProducerFactory<Integer, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.RETRIES_CONFIG, 3);
configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
configProps.put("**sasl.mechanism**", saslMechanism);
configProps.put("**sasl.jaas.config**", salsJaasConfig);
configProps.put("**security.protocol**", securityProtocol);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
如果属性没有“spring.kafka”前缀,Eclipse 会抱怨。这是一个错误。