我已经使用 Spring Web Flux 和 MongoDB Change Streams 实现了 Reactive Async REST API,它可以正常工作。但我不想将带有 Criteria 的聚合对象传递给
".filter(arg0)" method
,而是想将 MongoDB 聚合 JSON 字符串传递给过滤器方法,例如:
ChangeStreamOptions.builder()
.filter(Document.parse("[{$match: {age: {$gt: 18}}}, {'operationType': 'insert'}]"))
.returnFullDocumentOnUpdate().build();
上面的代码片段不起作用,因为它不正确。
下面是我的工作实现。 还有方法
public Flux<Person> watchAgeGreaterThan(Integer ageParam){...}
每次在 MongoDB 中插入文档且其“age”字段大于“ageParam”时,通过 REST 端点调用
"/api/rest/age/greaterthan/{ageParam}"
以异步返回流。"person_collection"
我还尝试了另一种不同的实现,您可以在下面看到:
import org.springframework.data.mongodb.core.ChangeStreamEvent;
import org.springframework.data.mongodb.core.ChangeStreamOptions;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.query.Criteria;
@Service
public class Watch {
@Autowired
private ReactiveMongoTemplate reactiveMongoTemplate;
public Flux<Person> watchAgeGreaterThan(Integer ageParam) {
ChangeStreamOptions options = ChangeStreamOptions.builder()
.filter(Aggregation.newAggregation(Person.class,
Aggregation.match(
Criteria.where("operationType").is("insert")
.and("fullDocument.age").gt(ageParam)
))
).returnFullDocumentOnUpdate().build();
return reactiveMongoTemplate.changeStream("person_collection", options, Person.class)
.map(ChangeStreamEvent::getBody)
.doOnError(throwable -> log.error("Error on 'person' change stream event :: "
+ throwable.getMessage(), throwable));
}
}
但是这个不行。
这会起作用