我想为我的
pack
和label
方法(pipe
组成)创建一个集成测试,它基于Spring Cloud Function。我知道配置的输入绑定正在等待来自 order-accepted.dispatcher-service
交换的 order-accepted
队列的消息,并且 Spring Cloud Stream
会自动将消息发送到 order-dispatched
交换。所以我只需要将消息发送到 order-accepted.dispatcher-service
队列,并检查从 order-dispatched
交换收到的消息。但我不知道 routing key
,我如何才能将消息发送到 order-accepted.dispatcher-service
队列?以下是我迄今为止拥有的配置文件和测试类代码:
server:
port: 9003
spring:
application:
name: dispatcher-service
cloud:
function:
definition: pack|label
stream:
bindings:
packlabel-in-0:
destination: order-accepted
group: ${spring.application.name}
packlabel-out-0:
destination: order-dispatched
rabbitmq:
host: localhost
port: 5672
username: user
password: password
connection-timeout: 5s
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.RabbitMQContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Testcontainers
class DispatcherServiceApplicationTests {
@Container
static RabbitMQContainer rabbitMQ = new RabbitMQContainer(DockerImageName.parse("rabbitmq:3.10-management"));
@DynamicPropertySource
static void rabbitMQProperties(DynamicPropertyRegistry registry){
registry.add("spring.rabbitmq.host", rabbitMQ::getHost);
registry.add("spring.rabbitmq.port", rabbitMQ::getAmqpPort);
registry.add("spring.rabbitmq.username", rabbitMQ::getAdminUsername);
registry.add("spring.rabbitmq.password", rabbitMQ::getAdminPassword);
}
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RabbitAdmin rabbitAdmin;
@Test
void contextLoads() {
}
@Test
void packAndLabel(){
long orderId = 121;
Queue inputQueue = this.rabbitAdmin.declareQueue();
assert inputQueue != null;
Binding inputBinding = new Binding(inputQueue.getName(), Binding.DestinationType.QUEUE, "order-accepted", "#", null);
Queue outputQueue = this.rabbitAdmin.declareQueue();
assert outputQueue != null;
Binding outputBinding = new Binding(outputQueue.getName(), Binding.DestinationType.QUEUE, "order-dispatched", "#", null);
this.rabbitAdmin.declareBinding(inputBinding);
this.rabbitAdmin.declareBinding(outputBinding);
// I think this only send message to order-accepted exchange rather than order-accepted.dispatcher-service queue.
rabbitTemplate.convertAndSend("order-accepted", "#", new OrderAcceptedMessage(orderId));
await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
OrderDispatchedMessage message = rabbitTemplate.receiveAndConvert(outputQueue.getName(),
10000, new ParameterizedTypeReference<OrderDispatchedMessage>(){});
assert message != null;
assertThat(message.orderId()).isEqualTo(orderId);
System.out.println("------------------------------------: " + message.orderId());
});
}
}
我们自己使用测试容器,因此您可以查看我们在 Rabbit 中进行的一些测试。这是其中之一 - https://github.com/spring-cloud/spring-cloud-stream/blob/8f5e7d75f23f9463ecc4f1e8372685b1c01d97d4/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/test/java /org/springframework/cloud/stream/binder/rabbit/integration/RabbitBinderModuleTests.java#L99