Spring项目中无法从rabbitmq获取队列消息

问题描述 投票:0回答:1

我试图让我的应用程序使用来自rabbitmq队列的消息,但是将消息添加到队列后我的应用程序不会使用它,并且日志不会显示任何内容。

这是一个使用Axon框架的spring项目

UserEventHandler.java

package com.jasper.ecommerce.query.handler;

import com.jasper.ecommerce.query.model.UserReadModel;
import com.jasper.ecommerce.query.query.FindAllUsersQuery;
import com.jasper.ecommerce.query.repository.UserRepository;
import com.jasper.ecommerce.shared.events.UserCreatedEvent;
import org.axonframework.eventhandling.EventHandler;
import org.axonframework.queryhandling.QueryHandler;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class UserEventHandler {
    private final UserRepository userRepository;

    public UserEventHandler(UserRepository userRepository) {
        this.userRepository = userRepository;
    }

    // Event handler for UserCreatedEvent
    @EventHandler
    public void on(UserCreatedEvent event) {
        System.out.println("UserCreatedEvent received: " + event);
        // Convert event to read model and save it in MongoDB
        UserReadModel user = new UserReadModel(event.getUserId(), event.getName(), event.getEmail());
        userRepository.save(user);  // This inserts the document into MongoDB
    }

    // Query handler to return all users
    @QueryHandler
    public List<UserReadModel> handle(FindAllUsersQuery query) {
        return userRepository.findAll();
    }
}

AxonAMQPConfiguration.java

package com.jasper.ecommerce.query.config;

import org.axonframework.config.EventProcessingConfigurer;
import org.axonframework.extensions.amqp.eventhandling.DefaultAMQPMessageConverter;
import org.axonframework.extensions.amqp.eventhandling.spring.SpringAMQPMessageSource;
import org.axonframework.serialization.Serializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AxonAMQPConfiguration {

    @Bean
    public SpringAMQPMessageSource rabbitMQMessageSource(Serializer serializer) {
        return new SpringAMQPMessageSource(
                DefaultAMQPMessageConverter.builder()
                        .serializer(serializer)
                        .build()
        );
    }

    public void configure(EventProcessingConfigurer configurer, SpringAMQPMessageSource rabbitMQMessageSource) {
        configurer.registerSubscribingEventProcessor("UserEventHandler", c -> rabbitMQMessageSource);
    }
}

application.yml

spring:
  application:
    name: ecommerce-cqrs-query
  data:
    mongodb:
      uri: mongodb://localhost:27017/ecommerce-query
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

axon:
  amqp:
    exchange: AxonExchange
  eventhandling:
    processors:
      UserEventHandler:
        mode: subscribing
  axonserver:
    enabled: false
  distributed:
    enabled: false

logging:
  level:
    org.axonframework: DEBUG
    org.springframework.amqp: DEBUG
    com.rabbitmq: DEBUG

server:
  port: 8080
java spring rabbitmq axon
1个回答
0
投票

请注意,处理器的默认名称是包含事件处理程序的类的包名称。这样,同一包内的类会自动分配给同一处理器。

如果您将事件处理器配置更改为:

configurer.registerSubscribingEventProcessor("com.jasper.ecommerce.query.handler", c -> rabbitMQMessageSource);

它应该开始从 RabbitMQ 消费。

© www.soinside.com 2019 - 2024. All rights reserved.