在我的项目中,我需要连接到两个不同的 Kafka 代理并消费两个主题的所有事件,每个 Kafka 代理上有一个主题。
我的
application.yaml
看起来有点像这样:
spring:
cloud:
function:
definition: orderCreatedListener;orderProcessedListener
stream:
bindings:
orderCreatedProducer-out-0:
destination: order-created
binder: kafka-one
orderCreatedListener-in-0:
destination: order-created
group: spot
binder: kafka-one
orderCreatedListener-out-0:
destination: order-processed
binder: kafka-two # I changed this binder between kafka-one and kafka-two manually for tests, the orderProcessedListener-in-1 binding doesn't have the exclusive producer
orderProcessedListener-in-0: # CONSUME FROM KAFKA ONE
destination: order-processed
group: spot
binder: kafka-one
orderProcessedListener-in-1: # CONSUMER FROM KAFKA TWO
destination: order-processed
group: spot
binder: kafka-two
kafka:
binder:
auto-create-topics: true
configuration:
security:
protocol: SASL_PLAINTEXT
sasl:
mechanism: PLAIN
bindings:
orderCreatedListener-in-0:
consumer:
enableDlq: true
dlqName: order-created-dlq
autoCommitOnError: true
autoCommitOffset: true
orderProcessedListener-in-0:
consumer:
enableDlq: true
dlqName: order-processed-dlq
autoCommitOnError: true
autoCommitOffset: true
orderProcessedListener-in-1:
consumer:
enableDlq: true
dlqName: order-processed-dlq
autoCommitOnError: true
autoCommitOffset: true
binders:
kafka-one:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
configuration:
sasl:
jaas:
config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"spot\" password=\"spot\";"
kafka-two:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9093
configuration:
sasl:
jaas:
config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"spot\" password=\"spot\";"
但是当我运行应用程序时它不起作用,Kafka One 的消费者绑定正常工作
orderProcessedListener-in-0
,但是 Kafka Two 的另一个消费者绑定不起作用orderProcessedListener-in-1
。
我正在使用:
我的两个 Kafka 集群在带有 docker 容器的开发环境中运行良好,一个暴露在 9092 端口上,另一个暴露在 9093 端口上。
我们该如何调整?
解决此问题的简单方法是使用两个指向唯一入口点方法的 bean。
示例:
@Component
public class OrderProcessedListener {
public void consume(final Message<Order> message) {
// your business logic to process message
}
@Bean
public Consumer<Message<Order>> orderProcessedFromKafkaOneListener() {
return this::consume;
}
@Bean
public Consumer<Message<Order>> orderProcessedFromKafkaTwoListener() {
return this::consume;
}
}
属性配置应该是:
spring:
cloud:
function:
definition: orderCreatedListener;orderProcessedFromKafkaOneListener;orderProcessedFromKafkaTwoListener
stream:
bindings:
orderCreatedProducer-out-0:
destination: order-created
binder: kafka-one
orderCreatedListener-in-0:
destination: order-created
group: spot
binder: kafka-one
orderCreatedListener-out-0:
destination: order-processed
binder: kafka-two
orderProcessedFromKafkaOneListener-in-0:
destination: order-processed
group: spot
binder: kafka-one
orderProcessedFromKafkaTwoListener-in-0:
destination: order-processed
group: spot
binder: kafka-two
kafka:
binder:
auto-create-topics: true
configuration:
security:
protocol: SASL_PLAINTEXT
sasl:
mechanism: PLAIN
bindings:
orderCreatedListener-in-0:
consumer:
enableDlq: true
dlqName: order-created-dlq
autoCommitOnError: true
autoCommitOffset: true
orderProcessedFromKafkaOneListener-in-0:
consumer:
enableDlq: true
dlqName: order-processed-dlq
autoCommitOnError: true
autoCommitOffset: true
orderProcessedFromKafkaTwoListener-in-0:
consumer:
enableDlq: true
dlqName: order-processed-dlq
autoCommitOnError: true
autoCommitOffset: true
binders:
kafka-one:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
configuration:
sasl:
jaas:
config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"spot\" password=\"spot\";"
kafka-two:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9093
configuration:
sasl:
jaas:
config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"spot\" password=\"spot\";"
任何时候最好的解决方案都是使用反应函数。