Spring Kafka NewTopic TopicBuilder,--command-config 选项?

问题描述 投票:0回答:2

当我使用

kafka-topics.bat
创建主题时,我这样做:

kafka-topics.bat --bootstrap-server %host%:%port% --create --topic %%t --partitions %partitions% --replication-factor %replication_factor% --config max.message.bytes=%max_message_bytes% --config min.insync.replicas=%min_insync_replicas% --config tention.ms=%retention_ms% --command-config client.properties

我正在尝试使用 2.3 中引入的 Spring Kafka TopicBuilder

转换
上述内容。但我不知道如何转换
command-config
选项。可以吗?

其余部分根据文档很简单:

@Bean
public NewTopic topic(){
    return TopicBuilder.name("topic-name")
        .partitions(x)
        .replicas(x)
        .config(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "xxx")
        .config(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "xxx")
        .config(TopicConfig.RETENTION_MS_CONFIG, "xxx")
        .build();
}
spring-kafka
2个回答
1
投票

终于找到了!如果它对某人有帮助,这是解决方案:

@Bean
public KafkaAdmin admin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
    configs.put("security.protocol", "SASL_PLAINTEXT");
    configs.put("sasl.mechanism", "PLAIN");
    configs.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required " + 
                                    "username=username " + 
                                    "password=password;");
    return new KafkaAdmin(configs);
}

干杯

0
投票

就我而言,我还必须再次将安全属性传递给配置类。 在运行时,application.properties 中的以下条目将被忽略:

spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.security.protocol=SASL_SSL
 spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<cluster-api-key>' password='<cluster-api-secret>';

所以我必须在配置类中明确添加它们:

@Configuration
public class KafkaProducerConfiguration {

  @Value("${spring.kafka.bootstrap-servers}") 
  private String bootstrapAddress;

  @Value("${spring.kafka.properties.sasl.mechanism}") 
    private String saslMechanism;
  
  
  @Value("${spring.kafka.properties.sasl.jaas.config}") 
    private String salsJaasConfig;

  @Value("${spring.kafka.properties.security.protocol}") 
    private String securityProtocol;

  @Bean
  public ProducerFactory<Integer, String> producerFactory() {
      Map<String, Object> configProps = new HashMap<>();
      configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
      configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
      configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
      configProps.put(ProducerConfig.RETRIES_CONFIG, 3);
      configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
      configProps.put("**sasl.mechanism**", saslMechanism);
      configProps.put("**sasl.jaas.config**", salsJaasConfig);
      configProps.put("**security.protocol**", securityProtocol);
      return new DefaultKafkaProducerFactory<>(configProps);
  }
  
  @Bean
  public KafkaTemplate<Integer, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
  }
  

}

如果属性没有“spring.kafka”前缀,Eclipse 会抱怨。这是一个错误。

© www.soinside.com 2019 - 2024. All rights reserved.