即使消息已发布,RabbitMQ 队列也是空的

问题描述 投票:0回答:1

我正在尝试使用 RabbitMQ 创建一个应用程序,但我陷入了困境,我不知道该怎么办。我尝试按照教程进行操作,一切都应该有效,但事实并非如此。即使我发送邮递员请求,即使我从 localhost:15672 的 RabbitMQ 门户进行发布,也会出现已发布消息的弹出窗口,我会在控制台中收到此文本,但是当我想检查队列时,它说的是空的。

enter image description here

写得很清楚,消息已经到达消费者,但是当我从门户检查时,它仍然是队列,它没有到达

我现在将展示整个代码。

配置

package ro.tuc.ds2020.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    @Value("${rabbitmq.queue.name}")
    private String queue;

    @Value("${rabbitmq.queue.json.name}")
    private String jsonQueue;

    @Value("${rabbitmq.queue.exchange}")
    private String exchange;

    @Value("${rabbitmq.queue.routing_key_one}")
    private String routingKeyOne;

    @Value("${rabbitmq.queue.routing_key_json}")
    private String routingKeyJson;

    @Bean
    public Queue queue() {
        return new Queue(queue);
    }

    @Bean
    public Queue jsonQueue() {
        return new Queue(jsonQueue, true);
    }

    @Bean
    public TopicExchange exchange() {
        return new TopicExchange(exchange, true, false);
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with(routingKeyOne);
    }

    @Bean
    public Binding jsonBinding() {
        return BindingBuilder.bind(jsonQueue()).to(exchange()).with(routingKeyJson);
    }

    @Bean
    public MessageConverter converter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(converter());
        return rabbitTemplate;
    }
}

消费者

package ro.tuc.ds2020.consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import ro.tuc.ds2020.dtos.MeasurementDTO;

@Service
public class RabbitMQJsonConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQJsonConsumer.class);

    @RabbitListener(queues = {"${rabbitmq.queue.json.name}"})
    public void consumeJsonMessage(MeasurementDTO measurementDTO) {
        LOGGER.info(String.format("Received JSON message here -> %s", measurementDTO.toString()));
    }

}

控制器当我使用邮递员时

package ro.tuc.ds2020.controllers;

import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import ro.tuc.ds2020.dtos.MeasurementDTO;
import ro.tuc.ds2020.publisher.RabbitMQJsonProducer;

@RequestMapping("/api/v1")
@RestController
@CrossOrigin(origins = "http://localhost:4200", allowCredentials = "true")
public class MessageJsonController {

    private RabbitMQJsonProducer jsonProducer;

    public MessageJsonController(RabbitMQJsonProducer rabbitMQJsonProducer) {
        this.jsonProducer = rabbitMQJsonProducer;
    }

    @PostMapping("/publish")
    public ResponseEntity<String> sendJsonMessage(@RequestBody MeasurementDTO measurementDTO) {
        jsonProducer.sendJsonMessage(measurementDTO);
        return  ResponseEntity.ok("Json message sent to RabbitMQ ...");
    }
}

出版商

package ro.tuc.ds2020.publisher;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import ro.tuc.ds2020.dtos.MeasurementDTO;

@Service
public class RabbitMQJsonProducer {

    @Value("${rabbitmq.queue.exchange}")
    private String exchange;

    @Value("${rabbitmq.queue.routing_key_json}")
    private String routingKeyJson;

    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQJsonProducer.class);

    private RabbitTemplate rabbitTemplate;

    @Autowired
    public RabbitMQJsonProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendJsonMessage(MeasurementDTO measurementDTO) {
        LOGGER.info(String.format("Json message sent -> %s", measurementDTO.toString()));
        rabbitTemplate.convertAndSend(exchange, routingKeyJson, measurementDTO);
    }

}

这是 application.properties

spring.rabbitmq.host = localhost
spring.rabbitmq.port = 5672
spring.rabbitmq.username = guest
spring.rabbitmq.password = guest

rabbitmq.queue.name = queue_1
rabbitmq.queue.json.name = queue_json
rabbitmq.queue.exchange = exchange
rabbitmq.queue.routing_key_one = routing_key_1
rabbitmq.queue.routing_key_json = routing_key_json
spring spring-boot rabbitmq spring-rabbit
1个回答
0
投票

与 Node.js 或 Python 等其他语言相比,Java 在设置适当的工作环境方面通常面临更多挑战,而后者通常更容易配置。

要求

Maven 3.9.9 和 JDK 17

> mvn --version
Apache Maven 3.9.9 (8e8579a9e76f7d015ee5ec7bfcdc97d260186937)
Maven home: C:\Users\benchvue\maven\apache-maven-3.9.9
Java version: 17.0.12, vendor: Amazon.com Inc., runtime: C:\Program Files\Amazon Corretto\jdk17.0.12_7
Default locale: en_US, platform encoding: Cp1252
OS name: "windows 11", version: "10.0", arch: "amd64", family: "windows"

enter image description here

文件树

C:.
│   docker-compose.yml
│   pom.xml
│
├───.idea
│       .gitignore
│       compiler.xml
│       encodings.xml
│       jarRepositories.xml
│       misc.xml
│
└───src
    └───main
        ├───java
        │   └───ro
        │       └───tuc
        │           └───ds2020
        │               │   Ds2020Application.java
        │               │
        │               ├───config
        │               │       RabbitMQConfig.java
        │               │
        │               ├───consumer
        │               │       RabbitMQJsonConsumer.java
        │               │
        │               ├───controllers
        │               │       MessageJsonController.java
        │               │
        │               ├───dtos
        │               │       MeasurementDTO.java
        │               │
        │               └───publisher
        │                       RabbitMQJsonProducer.java
        │
        └───resources
            │   application.properties
            │
            └───static

enter image description here

RabbitMQConfig.java

package ro.tuc.ds2020.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    @Value("${rabbitmq.queue.name}")
    private String queue;

    @Value("${rabbitmq.queue.json.name}")
    private String jsonQueue;

    @Value("${rabbitmq.queue.exchange}")
    private String exchange;

    @Value("${rabbitmq.queue.routing_key_one}")
    private String routingKeyOne;

    @Value("${rabbitmq.queue.routing_key_json}")
    private String routingKeyJson;

    @Bean
    public Queue queue() {
        return new Queue(queue);
    }

    @Bean
    public Queue jsonQueue() {
        return new Queue(jsonQueue, true);
    }

    @Bean
    public TopicExchange exchange() {
        return new TopicExchange(exchange, true, false);
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with(routingKeyOne);
    }

    @Bean
    public Binding jsonBinding() {
        return BindingBuilder.bind(jsonQueue()).to(exchange()).with(routingKeyJson);
    }

    @Bean
    public MessageConverter converter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(converter());
        return rabbitTemplate;
    }
}

RabbitMQJsonConsumer.java

package ro.tuc.ds2020.consumer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import ro.tuc.ds2020.dtos.MeasurementDTO;

@Service
public class RabbitMQJsonConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQJsonConsumer.class);
    private final ObjectMapper objectMapper = new ObjectMapper();

    @RabbitListener(queues = {"${rabbitmq.queue.json.name}"})
    public void consumeJsonMessage(MeasurementDTO measurementDTO) {
        try {
            String jsonMessage = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(measurementDTO);
            LOGGER.info("Received JSON message here -> \n{}", jsonMessage);
        } catch (JsonProcessingException e) {
            LOGGER.error("Failed to convert message to JSON", e);
        }
    }
}

MessageJsonController/java

package ro.tuc.ds2020.controllers;

import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import ro.tuc.ds2020.dtos.MeasurementDTO;
import ro.tuc.ds2020.publisher.RabbitMQJsonProducer;

import java.util.HashMap;
import java.util.Map;

@RequestMapping("/api/v1")
@RestController
public class MessageJsonController {

    private final RabbitMQJsonProducer jsonProducer;

    public MessageJsonController(RabbitMQJsonProducer rabbitMQJsonProducer) {
        this.jsonProducer = rabbitMQJsonProducer;
    }

    @PostMapping("/publish")
    public ResponseEntity<Map<String, String>> sendJsonMessage(@RequestBody MeasurementDTO measurementDTO) {
        jsonProducer.sendJsonMessage(measurementDTO);

        // Create a JSON response body
        Map<String, String> response = new HashMap<>();
        response.put("message", "Json message sent to RabbitMQ");
        response.put("status", "success");

        return ResponseEntity.ok(response);
    }
}

MeasurementDTO.java

package ro.tuc.ds2020.dtos;

import com.fasterxml.jackson.annotation.JsonProperty;

public class MeasurementDTO {

    @JsonProperty("sensorId")
    private String sensorId;

    @JsonProperty("value")
    private double value;

    @JsonProperty("unit")
    private String unit;

    @JsonProperty("timestamp")
    private String timestamp;

    // Getters and Setters
    public String getSensorId() {
        return sensorId;
    }

    public void setSensorId(String sensorId) {
        this.sensorId = sensorId;
    }

    public double getValue() {
        return value;
    }

    public void setValue(double value) {
        this.value = value;
    }

    public String getUnit() {
        return unit;
    }

    public void setUnit(String unit) {
        this.unit = unit;
    }

    public String getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(String timestamp) {
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "MeasurementDTO{" +
                "sensorId='" + sensorId + '\'' +
                ", value=" + value +
                ", unit='" + unit + '\'' +
                ", timestamp='" + timestamp + '\'' +
                '}';
    }
}

RabbitMQJsonProducer.java

package ro.tuc.ds2020.publisher;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import ro.tuc.ds2020.dtos.MeasurementDTO;

@Service
public class RabbitMQJsonProducer {

    @Value("${rabbitmq.queue.exchange}")
    private String exchange;

    @Value("${rabbitmq.queue.routing_key_json}")
    private String routingKeyJson;

    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQJsonProducer.class);

    private RabbitTemplate rabbitTemplate;

    @Autowired
    public RabbitMQJsonProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendJsonMessage(MeasurementDTO measurementDTO) {
        LOGGER.info(String.format("Json message sent -> %s", measurementDTO.toString()));
        rabbitTemplate.convertAndSend(exchange, routingKeyJson, measurementDTO);
    }
}

Ds2020Application.java

package ro.tuc.ds2020;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Ds2020Application {
    public static void main(String[] args) {
        SpringApplication.run(Ds2020Application.class, args);
    }
}

application.properties

spring.rabbitmq.host = localhost
spring.rabbitmq.port = 5672
spring.rabbitmq.username = guest
spring.rabbitmq.password = guest

rabbitmq.queue.name = queue_1
rabbitmq.queue.json.name = queue_json
rabbitmq.queue.exchange = exchange
rabbitmq.queue.routing_key_one = routing_key_1
rabbitmq.queue.routing_key_json = routing_key_json

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>ro.tuc</groupId>
    <artifactId>ds2020</artifactId>
    <version>1.0.0</version>
    <packaging>jar</packaging>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <dependencies>
        <!-- Spring Boot Starter Web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Spring Boot Starter AMQP -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <!-- Jackson for JSON serialization -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

        <!-- Spring Boot Starter Test -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- Spring Boot Maven Plugin -->
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
        <resources>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*</include>
                </includes>
            </resource>
        </resources>        
    </build>
</project>

docker-compose.yml

version: '3.8'

services:
  rabbitmq:
    image: rabbitmq:3-management
    container_name: rabbitmq
    ports:
      - "5672:5672" # RabbitMQ messaging port
      - "15672:15672" # RabbitMQ management UI
    environment:
      RABBITMQ_DEFAULT_USER: guest
      RABBITMQ_DEFAULT_PASS: guest

启动 RabbitMQ

docker compose up

enter image description here

访问 RabbitMQ UI

username: guest
password: guest
http://localhost:15672/#/

enter image description here

编译jar

mvn clean install

enter image description here

dir target

enter image description here

启动Java项目

java -jar target/ds2020-1.0.0.jar

enter image description here

通过Postman调用REST API

POST http://localhost:8080/api/v1/publish

输入主体

{
    "sensorId": "12345",
    "value": 67.5,
    "unit": "Celsius",
    "timestamp": "2024-11-16T18:30:00Z"
}

enter image description here

Java端 enter image description here

Consumer 将显示在 Spring Log 中

2024-11-16 19:11:43.964  INFO 22464 --- [ntContainer#0-1] r.t.d.consumer.RabbitMQJsonConsumer      : Received JSON message here ->
{
  "sensorId" : "12345",
  "value" : 67.5,
  "unit" : "Celsius",
  "timestamp" : "2024-11-16T18:30:00Z"
}

您可以在 Rabbit UI 中看到 Spike enter image description here

如果你想通过 RabbitMQ UI 查看队列消息

需要注释掉

RabbitMQJsonConsumer.java

来自

@RabbitListener(queues = {"${rabbitmq.queue.json.name}"})

//@RabbitListener(queues = {"${rabbitmq.queue.json.name}"})

然后构建 jar 并再次运行

enter image description here

  • @RabbitListener 注解让消费者自动 队列中的消息一到达就使用它们。
  • 当消费者处理消息时,会将其从队列中删除, 让队列空着。
  • 通过注释掉@RabbitListener,消费者被禁用,并且 消息保留在队列中以供检查。
  • 此行为可确保消息不会丢失,而是立即丢失 除非明确暂停,否则将进行处理。
  • 为了调试,禁用消费者允许您验证消息 RabbitMQ 中的流和队列内容。

祝你好运!

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.