我已经开始涉猎Spring Cloud Stream和它的Cloud Function支持。
我在这里上传了一个示例项目来阐明这个问题->。https:/github.comnmarquesantosspring-cloud-function-kafka。
我有一个项目,以反应的方式暴露了一些函数,以kafka作为消息中介。
该函数接收一个flux,并通过Reactive Mongo库保存元素。然后它通过另一个flux返回更新后的资源。
@Service
public class ExampleCloudFunction {
@Autowired
private PlayerRepository playerRepository;
@Bean
public Function<Flux<Player>, Flux<Player>> playerUpdate() {
return flux -> flux.flatMap(player -> playerRepository.save(player)).log("Saved player");
}
@PollableBean
public Supplier<Flux<Player>> playerFeeder() {
return () -> Flux.just(new Player(UUID.randomUUID().toString(), "Ronaldo"));
}
}
playerUpdate函数是发生错误的地方。 playerFeeder只是我创建的一个函数,用来发送数据来重现问题。在现实生活中,这将来自不同的服务。
这里是通过运行我上面提到的示例项目的错误片段。
2020-05-21 22:13:40.842 ERROR 1884 --- [container-0-C-1] onfiguration$FunctionToDestinationBinder : Failed to process the following content which will be dropped: Context1{reactor.onNextError.localStrategy=reactor.core.publisher.OnNextFailureStrategy$ResumeStrategy@2c3e726}
org.springframework.transaction.reactive.TransactionContextManager$NoTransactionInContextException: No transaction in context
2020-05-21 22:13:41.853 ERROR 1884 --- [container-0-C-1] onfiguration$FunctionToDestinationBinder : Failed to process the following content which will be dropped: Context1{reactor.onNextError.localStrategy=reactor.core.publisher.OnNextFailureStrategy$ResumeStrategy@2c3e726}
org.springframework.transaction.reactive.TransactionContextManager$NoTransactionInContextException: No transaction in context
我很难理解我到底做错了什么,在我的搜索中找不到太多信息。
Spring for Apache Kafka目前不支持反应式事务。