EmitterProcessor 已弃用,我们如何替换它?

问题描述 投票:0回答:0

在提出问题之前,我浏览了很多链接,例如:Spring WebFlux (Flux): 如何动态发布,但还无法解决。

我有下面的代码,但是 EmitterProcessor> 类型已弃用,我如何将此代码转换为

Sinks.many().multicast().onBackpressureBuffer()

配置.java

@Configuration
public class DynamicDestinationConfig {
    
    @Bean
    public EmitterProcessor<Message<?>> emitterProcessor(){
        return EmitterProcessor.create();
    }
    
    @Bean
    public Supplier<Flux<Message<?>>> supplier() {
        return () -> emitterProcessor();
    }
}

控制器.java

@RestController
public class DynamicDestinationController {
    
    @Autowired
    private EmitterProcessor<Message<?>> emitterProcessor;
    
    @Autowired
    private ObjectMapper jsonMapper;

    @SuppressWarnings("unchecked")
    @PostMapping("/")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void handleRequest(@RequestBody String body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) throws Exception {
        Map<String, String> payload = jsonMapper.readValue(body, Map.class);
        
        String destinationName = payload.get("id");
        
        Message<?> message = MessageBuilder.withPayload(payload)
                .setHeader("spring.cloud.stream.sendto.destination", destinationName)
                .build();
        
        emitterProcessor.onNext(message);
    }
}
spring spring-cloud-stream
© www.soinside.com 2019 - 2024. All rights reserved.