在 Spring boot 中阻止 GetMapping util 从 Redis 获取消息

问题描述 投票:0回答:1

我有两个服务,服务 A 和服务 B,使用 Redis 进行服务间通信。

使用 Spring boot 的服务 A。

使用expressjs的服务B(它看起来像一个消费者,消费消息然后发布消息通知服务A“嘿服务A,我完成了”)

服务 A 应公开两个端点:

/api-publish
/api-consume
。当用户向
/api-publish
端点发出请求时,服务 A 应向名为
topic1
的 Redis 主题发布一条消息。

服务 B 应设置为使用来自

topic1
的消息。成功处理来自
topic1
的消息后,服务 B 应将消息发布到另一个名为
topic2
的 Redis 主题。

我想向服务 A 上的

/api-consume
端点发出请求,该服务应该等到服务 B 成功向
topic2
发布消息。只有这样服务 A 才会向用户返回响应。

这是我在服务 A 中的 Redis 配置:

@Configuration
public class RedisConfig {

    @Value("${spring.data.redis.host}")
    private String redisHost;

    @Value("${spring.data.redis.port}")
    private int redisPort;

    @Bean
    public RedisConnectionFactory lettuceConnectionFactory() {
        return new LettuceConnectionFactory(new RedisStandaloneConfiguration(redisHost, redisPort));
    }

    @Bean
    public RedisTemplate<?, ?> redisTemplate() {
        RedisTemplate<?, ?> template = new RedisTemplate<>();
        template.setConnectionFactory(lettuceConnectionFactory());
        template.setHashKeySerializer(new StringRedisSerializer());
        template.setKeySerializer(new StringRedisSerializer());
        template.setHashValueSerializer(new StringRedisSerializer());
        template.setValueSerializer(new StringRedisSerializer());
        return template;
    }

    @Bean
    MessageListenerAdapter messageListener() {
        return new MessageListenerAdapter(new RedisMessageSubscriber());
    }

    @Bean
    ChannelTopic topic() {
        return new ChannelTopic("topic2");
    }

    @Bean
    RedisMessageListenerContainer redisContainer() {
        RedisMessageListenerContainer container
                = new RedisMessageListenerContainer();
        container.setConnectionFactory(lettuceConnectionFactory());
        container.addMessageListener(messageListener(), topic());
        return container;
    }

}

这是从 topic2 获取消息的服务:

@Service
public class RedisMessageSubscriber implements MessageListener {
    private final BlockingQueue<String> queue = new LinkedBlockingQueue<>();

    @Override
    public void onMessage(Message message, byte[] pattern) {
        String receivedMessage = new String(message.getBody());
        try {
            queue.put(receivedMessage);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public String waitForMessage(long timeout, TimeUnit unit) throws InterruptedException {
        return queue.poll(timeout, unit);
    }
}

这是一个简单的控制器:

@RestController
public class TestController {
    @Autowired
    private RedisTemplate<String, Object> template;
    @Autowired
    private RedisMessageSubscriber subscriber;

    @GetMapping("/api/v1/public/test")
    public String test() {
        try {
            String message = subscriber.waitForMessage(5, TimeUnit.SECONDS);
            if (message != null) {
                return "Received message from Service B: " + message;
            } else {
                return "Timeout waiting for message from Service B";
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return "Failed to receive message";
        }
    }
}
java spring spring-boot redis
1个回答
0
投票

我怀疑您有 2 个

RedisMessageSubscriber
实例。 您的控制器注入了
@Service
带注释的一个,但消息到达:

@Bean
MessageListenerAdapter messageListener() {
    return new MessageListenerAdapter(new RedisMessageSubscriber()); // <--- this one
}
© www.soinside.com 2019 - 2024. All rights reserved.