在提出问题之前,我浏览了很多链接,例如:Spring WebFlux (Flux): 如何动态发布,但还无法解决。
我有下面的代码,但是 EmitterProcessor
Sinks.many().multicast().onBackpressureBuffer()
?
配置.java
@Configuration
public class DynamicDestinationConfig {
@Bean
public EmitterProcessor<Message<?>> emitterProcessor(){
return EmitterProcessor.create();
}
@Bean
public Supplier<Flux<Message<?>>> supplier() {
return () -> emitterProcessor();
}
}
控制器.java
@RestController
public class DynamicDestinationController {
@Autowired
private EmitterProcessor<Message<?>> emitterProcessor;
@Autowired
private ObjectMapper jsonMapper;
@SuppressWarnings("unchecked")
@PostMapping("/")
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest(@RequestBody String body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) throws Exception {
Map<String, String> payload = jsonMapper.readValue(body, Map.class);
String destinationName = payload.get("id");
Message<?> message = MessageBuilder.withPayload(payload)
.setHeader("spring.cloud.stream.sendto.destination", destinationName)
.build();
emitterProcessor.onNext(message);
}
}