我在不使用 Spring Boot 和 Spring Kafka 的事务性生产者的情况下遇到了 InvalidPidMappingException

问题描述 投票:0回答:1

GitHub 中的存储库

您好!我希望你做得很好!

我在 kubernetes 环境中工作,有 3 个使用 Spring Boot 和 Kotlin 的 Pod,并且我们使用 Apache Kafka 和 Azure 事件中心。上周,我们的服务开始抛出以下异常:

org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id.
我的团队都不是kafka专家,所以我读了很多相关内容,这似乎是事务性Kafka的问题,就像这里提到的:Medium文章Spring Kafka 文档

我对 Spring Kafka 不太了解(通常我只写了消费者和生产者,它可以工作哈哈),但根据文档,事务性生产者仅在 @Transactional 范围内创建,并且在 application.yaml 文件中配置时(使用

transaction-id-prefix
)。

问题是:我们没有在配置文件中配置任何 transaction-id-prefix (当服务启动时,它会记录生产者配置,我们有

transaction.id = null
来改进这一点)并且生产者不会在 @ 内部调用事务性注释方法,它在 @TransactionalEventListener 内部调用(我不知道这是否会创建一个新事务)。

我正在使用 spring-boot 3.2.4 和 spring-kafka 3.2.3。

生产者在事件侦听器内激活,当我们将实体持久化到数据库中时会触发该事件侦听器,事件侦听器代码如下:

    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, classes = [AbstractEvent::class])
    fun publish(event: StaticQRCodeCreatedEvent): Unit {
        val entity = event.aggregate
        producer.produce(buildMessage(entity))
    }

这是我在 application.yaml 文件中的生产者配置:

      producer:
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.apache.kafka.common.serialization.StringSerializer
        retries: 5
        acks: all
        properties:
          linger.ms: 50
          max.block.ms: 120000

我将日志级别更改为 TRACE 和 DEBUG 以了解发生了什么,但是当调用生产者时,spring-kafka 会记录“实例化一个幂等生产者”。并调试生产者的属性,

transaction.id
为空,证明生产者不是事务性的。制作人。

我在日志中看到事务在第一步开始(在将事件发送到事件侦听器之前保存实体),并且在执行 @TransactionalEventListener 方法之前完成。

有人知道如何解决这个问题吗?我只是在发生此异常时重新部署我们的 Pod 来修复此问题,但可能有更智能的解决方案。

spring-boot apache-kafka spring-kafka azure-eventhub
1个回答
0
投票

您是否尝试过在应用程序中记录Kafka客户端的配置?

也许

enable.idempotence
true
,在这种情况下,它应该生成一个带有过期时间的
producerId

© www.soinside.com 2019 - 2024. All rights reserved.