我正在尝试为Rabbit和Kafka使用一个事务管理器。首先,我从Rabbit回调中获取一条消息,然后将其发送给Kafka主题。但是我总是得到一个异常,表明Rabbit无法正确完成交易:
2019-11-14 16:02:46.572 ERROR 15640 --- [cTaskExecutor-1] o.s.a.r.c.CachingConnectionFactory : Could not configure the channel to receive publisher confirms java.io.IOException: null
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:126)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:122)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:144)
at com.rabbitmq.client.impl.ChannelN.confirmSelect(ChannelN.java:1552)
at com.rabbitmq.client.impl.ChannelN.confirmSelect(ChannelN.java:52)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel(CachingConnectionFactory.java:602)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:582)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$600(CachingConnectionFactory.java:99)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1053)
at com.sun.proxy.$Proxy124.txCommit(Unknown Source)
at org.springframework.amqp.rabbit.connection.RabbitResourceHolder.commitAll(RabbitResourceHolder.java:164)
at org.springframework.amqp.rabbit.transaction.RabbitTransactionManager.doCommit(RabbitTransactionManager.java:187)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:746)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:714)
at org.springframework.data.transaction.MultiTransactionStatus.commit(MultiTransactionStatus.java:74)
at org.springframework.data.transaction.ChainedTransactionManager.commit(ChainedTransactionManager.java:150)
at org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:532)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:304)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
at com.listener.ExecutionCallbackListener$$EnhancerBySpringCGLIB$$9b575a95.receiveCallback(<generated>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:181)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:114)
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:51)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:188)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:126)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1445)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1368)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1355)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1334)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - cannot switch from tx to confirm mode, class-id=85, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:494)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:288)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:138)
... 37 common frames omitted
这是在Rabbit监听器中发生问题的方法:
@RabbitListener(queues = ["\${queue}"])
@Transactional("chainedTransactionManager")
fun receiveCallback(message: Message<List<CallbackMessage>>) {
traceMessage(message)
val callbacks = message.payload
callbacks.forEach { callback ->
kafkaService.sendAfterCallBack(Object())
}
}
和KafkaService
中的方法:
@Transactional("chainedTransactionManager")
fun sendAfterCallBack(object: Object) {
convertAndSend(kafkaServiceProperties.topics.name, object)
}
这里是TransactionManager
配置:
@Configuration
class TransactionManagerConfiguration {
@Bean
fun chainedTransactionManager(
rabbitTransactionManager: RabbitTransactionManager,
kafkaTransactionManager: KafkaTransactionManager<*, *>
): ChainedTransactionManager {
return ChainedTransactionManager(kafkaTransactionManager, rabbitTransactionManager)
}
}
兔子配置:
@Configuration
@EnableRabbit
@Import(RabbitAutoCreationConfiguration::class)
class RabbitConfiguration(
private val integrationProperties: IntegrationProperties,
private var clientProperties: RabbitClientProperties,
private val jacksonObjectMapper: ObjectMapper
) : RabbitListenerConfigurer {
@Bean
fun rabbitListenerContainerFactory(connectionFactory: ConnectionFactory): SimpleRabbitListenerContainerFactory {
val factory = SimpleRabbitListenerContainerFactory()
factory.setConnectionFactory(connectionFactory)
factory.setErrorHandler { t -> throw AmqpRejectAndDontRequeueException(t) }
return factory
}
@Bean
fun messageConverter(): MessageConverter {
val messageConverter = MappingJackson2MessageConverter()
messageConverter.objectMapper = jacksonObjectMapper
return messageConverter
}
@Bean
fun messageHandlerFactory(): MessageHandlerMethodFactory {
val factory = DefaultMessageHandlerMethodFactory()
factory.setMessageConverter(messageConverter())
return factory
}
@Bean
@ConditionalOnBean(CachingConnectionFactory::class)
fun rabbitConnectionFactoryCustomizer(factory: CachingConnectionFactory): SmartInitializingSingleton {
return SmartInitializingSingleton {
factory.rabbitConnectionFactory.clientProperties.apply {
clientProperties.copyright?.let { put("copyright", it) }
put("os", System.getProperty("os.name"))
put("host", InetAddress.getLocalHost().hostName)
clientProperties.platform?.let { put("platform", it) }
clientProperties.product?.let { put("product", it) }
clientProperties.service?.let { put("service", it) }
}
}
}
override fun configureRabbitListeners(registrar: RabbitListenerEndpointRegistrar?) {
registrar!!.messageHandlerMethodFactory = messageHandlerFactory()
}
@Bean
fun rabbitTemplate(
connectionFactory: ConnectionFactory,
jsonObjectMapper: ObjectMapper
): RabbitTemplate {
val rabbitTemplate = RabbitTemplate(connectionFactory)
val retryTemplate = RetryTemplate()
retryTemplate.setRetryPolicy(SimpleRetryPolicy(integrationProperties.callbackRetry))
rabbitTemplate.setRetryTemplate(retryTemplate)
rabbitTemplate.isChannelTransacted = true
return rabbitTemplate
}
@Bean
fun rabbitTransactionManager(connectionFactory: ConnectionFactory): RabbitTransactionManager {
val rtm = RabbitTransactionManager(connectionFactory)
rtm.transactionSynchronization = AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION
return rtm
}
}
Kafka配置:
@Configuration
@EnableKafka
class KafkaConfiguration(
@Qualifier("kafkaExchangeMessageConverter")
private val messageConverter: MessagingMessageConverter
) {
@Bean
fun kafkaListenerContainerFactory(
configurer: ConcurrentKafkaListenerContainerFactoryConfigurer,
consumerFactory: ConsumerFactory<Any, Any>
): ConcurrentKafkaListenerContainerFactory<Any, Any> {
val factory = ConcurrentKafkaListenerContainerFactory<Any, Any>()
factory.setMessageConverter(messageConverter)
configurer.configure(factory, consumerFactory)
return factory
}
@Bean
fun adminClient(kafkaAdmin: KafkaAdmin): AdminClient = AdminClient.create(kafkaAdmin.config)
@Bean
fun kafkaTransactionManager(
producerFactory: ProducerFactory<*, *>
): KafkaTransactionManager<*, *> {
val ktm = KafkaTransactionManager(producerFactory)
ktm.transactionSynchronization = AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION
return ktm
}
}
我是否错过了RabbitConfiguration
中的某些内容或其他方面的问题吗?
reply-text = PRECONDITION_FAILED-无法从TX切换到确认模式,
您不能在同一频道上使用发布者确认和交易。关闭发布者确认。
而且,最好将链接的事务管理器注入到侦听器容器中,而不要使用@Transactional
。