在将Spring Boot从2.1.11版本升级到2.2.5之后,kafka客户端会在提交jpa事务之前向代理生成消息。如果不使用链接的kafka事务管理器,则事务运行正常。 2.1.x和2.2.x之间的事务中是否引入了向后兼容性更改?有人可以提供涵盖JPA和Kafka的任何工作交易经理吗?
我只为JPA使用了以下事务管理器
@Bean
@Primary
public PlatformTransactionManager transactionManager(final EntityManagerFactory emf) {
final JpaTransactionManager txManager = new JpaTransactionManager();
txManager.setEntityManagerFactory(emf);
return txManager;
}
我在kafka交易中使用了以下属性:
spring.cloud.stream.kafka.default.consumer.configuration.isolation.level:read_committedspring.cloud.stream.kafka.binder.transaction.transaction-id前缀:xyz-0spring.kafka.producer.transaction-id-prefix:xyz-0
我们可以使用jpa和kafka创建链接的交易。我们可以使用BinderFactory来获取kafka活页夹并创建活页夹kafka事务管理器。
@Bean(name = "chainedTransactionManager")
@Primary
public PlatformTransactionManager chainedTransactionManager(JpaTransactionManager jpaTM,
BinderFactory binders) {
Binder<MessageChannel,?,?> binder = binders.getBinder("kafka", MessageChannel.class);
if (binder instanceof KafkaMessageChannelBinder) {
ProducerFactory<byte[], byte[]> pf =
((KafkaMessageChannelBinder) binder).getTransactionalProducerFactory();
KafkaTransactionManager<byte[], byte[]> ktm = new KafkaTransactionManager<>(pf);
ktm.setTransactionSynchronization(
AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return new ChainedKafkaTransactionManager<Object, Object>(jpaTM, ktm);
} else {
return jpaTM;
}
}```