您好!我希望你做得很好!
我在 kubernetes 环境中工作,有 3 个使用 Spring Boot 和 Kotlin 的 Pod,并且我们使用 Apache Kafka 和 Azure 事件中心。上周,我们的服务开始抛出以下异常:
org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id.
我的团队都不是kafka专家,所以我读了很多相关内容,这似乎是事务性Kafka的问题,就像这里提到的:Medium文章 和 Spring Kafka 文档。
我对 Spring Kafka 不太了解(通常我只写了消费者和生产者,它可以工作哈哈),但根据文档,事务性生产者仅在 @Transactional 范围内创建,并且在 application.yaml 文件中配置时(使用
transaction-id-prefix
)。
问题是:我们没有在配置文件中配置任何 transaction-id-prefix (当服务启动时,它会记录生产者配置,我们有
transaction.id = null
来改进这一点)并且生产者不会在 @ 内部调用事务性注释方法,它在 @TransactionalEventListener 内部调用(我不知道这是否会创建一个新事务)。
我正在使用 spring-boot 3.2.4 和 spring-kafka 3.2.3。
生产者在事件侦听器内激活,当我们将实体持久化到数据库中时会触发该事件侦听器,事件侦听器代码如下:
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, classes = [AbstractEvent::class])
fun publish(event: StaticQRCodeCreatedEvent): Unit {
val entity = event.aggregate
producer.produce(buildMessage(entity))
}
这是我在 application.yaml 文件中的生产者配置:
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
retries: 5
acks: all
properties:
linger.ms: 50
max.block.ms: 120000
我将日志级别更改为 TRACE 和 DEBUG 以了解发生了什么,但是当调用生产者时,spring-kafka 会记录“实例化一个幂等生产者”。并调试生产者的属性,
transaction.id
为空,证明生产者不是事务性的。制作人。
我在日志中看到事务在第一步开始(在将事件发送到事件侦听器之前保存实体),并且在执行 @TransactionalEventListener 方法之前完成。
有人知道如何解决这个问题吗?我只是在发生此异常时重新部署我们的 Pod 来修复此问题,但可能有更智能的解决方案。
您是否尝试过在应用程序中记录Kafka客户端的配置?
也许
enable.idempotence
是 true
,在这种情况下,它应该生成一个带有过期时间的 producerId
: