我有两个服务,服务 A 和服务 B,使用 Redis 进行服务间通信。
使用 Spring boot 的服务 A。
使用expressjs的服务B(它看起来像一个消费者,消费消息然后发布消息通知服务A“嘿服务A,我完成了”)
服务 A 应公开两个端点:
/api-publish
和 /api-consume
。当用户向 /api-publish
端点发出请求时,服务 A 应向名为 topic1
的 Redis 主题发布一条消息。
服务 B 应设置为使用来自
topic1
的消息。成功处理来自 topic1
的消息后,服务 B 应将消息发布到另一个名为 topic2
的 Redis 主题。
我想向服务 A 上的
/api-consume
端点发出请求,该服务应该等到服务 B 成功向 topic2
发布消息。只有这样服务 A 才会向用户返回响应。
这是我在服务 A 中的 Redis 配置:
@Configuration
public class RedisConfig {
@Value("${spring.data.redis.host}")
private String redisHost;
@Value("${spring.data.redis.port}")
private int redisPort;
@Bean
public RedisConnectionFactory lettuceConnectionFactory() {
return new LettuceConnectionFactory(new RedisStandaloneConfiguration(redisHost, redisPort));
}
@Bean
public RedisTemplate<?, ?> redisTemplate() {
RedisTemplate<?, ?> template = new RedisTemplate<>();
template.setConnectionFactory(lettuceConnectionFactory());
template.setHashKeySerializer(new StringRedisSerializer());
template.setKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(new StringRedisSerializer());
template.setValueSerializer(new StringRedisSerializer());
return template;
}
@Bean
MessageListenerAdapter messageListener() {
return new MessageListenerAdapter(new RedisMessageSubscriber());
}
@Bean
ChannelTopic topic() {
return new ChannelTopic("topic2");
}
@Bean
RedisMessageListenerContainer redisContainer() {
RedisMessageListenerContainer container
= new RedisMessageListenerContainer();
container.setConnectionFactory(lettuceConnectionFactory());
container.addMessageListener(messageListener(), topic());
return container;
}
}
这是从 topic2 获取消息的服务:
@Service
public class RedisMessageSubscriber implements MessageListener {
private final BlockingQueue<String> queue = new LinkedBlockingQueue<>();
@Override
public void onMessage(Message message, byte[] pattern) {
String receivedMessage = new String(message.getBody());
try {
queue.put(receivedMessage);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public String waitForMessage(long timeout, TimeUnit unit) throws InterruptedException {
return queue.poll(timeout, unit);
}
}
这是一个简单的控制器:
@RestController
public class TestController {
@Autowired
private RedisTemplate<String, Object> template;
@Autowired
private RedisMessageSubscriber subscriber;
@GetMapping("/api/v1/public/test")
public String test() {
try {
String message = subscriber.waitForMessage(5, TimeUnit.SECONDS);
if (message != null) {
return "Received message from Service B: " + message;
} else {
return "Timeout waiting for message from Service B";
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "Failed to receive message";
}
}
}
我怀疑您有 2 个
RedisMessageSubscriber
实例。
您的控制器注入了 @Service
带注释的一个,但消息到达:
@Bean
MessageListenerAdapter messageListener() {
return new MessageListenerAdapter(new RedisMessageSubscriber()); // <--- this one
}