我正在研究 REST API,对于收集端点,希望
_embedded
数组异步填充。不幸的是,当also具有_links
元素时,我似乎无法弄清楚如何做到这一点。在响应的顶层放置一个 Publisher
可以正确地异步返回值;在嵌入对象中放置 Publisher
会导致序列化时出错。
这是当前(不工作)的代码。我正在使用 Project Reactor 和 Micronaut。
@Get
HttpResponse<Publisher<Response>> get() {
Response response = new Response();
response.embedded("item", new FluxEmbed(Flux.just(1, 2, 3, 4, 5)));
response.link("index", "example.org");
return HttpResponse.ok(Mono.just(response));
}
@Serdeable.Serializable
class Response extends AbstractResource<Response> {
}
@Serdeable.Serializable
class FluxEmbed extends AbstractResource<FluxEmbed> {
private final Publisher<Integer> publisher;
public FluxEmbed(Publisher<Integer> publisher) {
this.publisher = publisher;
}
public Publisher<Integer> getPublisher() {
return publisher;
}
}
预期的行为是让
_embedded
数组的输出逐渐填充。现在,我可以通过执行阻塞操作来实现这一点,但我更愿意留在异步模型中。
JSON输出;
_embedded
数组应随时间填充。如果我返回一个 Flux 作为 HttpResponse
的主体就可以做到这一点,但这不符合 HAL 标准。
{
"_links": {
"index": [
{
"href": "example.org",
"templated": false
}
]
},
"_embedded": {
"item": [
{
"value": 1
}
]
}
}
你在这里可以做的,就是按照
这里的描述使用
Flux.create
和一座桥
在您的情况下,它将涉及控制器、Flux 桥和生成数据的侦听器。
下面的示例将每 5 秒输出数字 1..10。很抱歉在此示例中使用了 Kotlin,但它应该说明如何执行此操作。如有必要,我可以重写为 Java。
import io.micronaut.context.annotation.Prototype
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.scheduling.annotation.Scheduled
import jakarta.annotation.PostConstruct
import jakarta.inject.Singleton
import reactor.core.publisher.Flux
import reactor.core.publisher.FluxSink
import java.util.concurrent.ConcurrentHashMap
@Controller("/flux-test")
class TestController(private val fluxBridge: FluxBridge) {
@Get
fun get(): Flux<Int> = fluxBridge.getFlux()
}
/** anonymous implementation inside FluxBridge */
interface MyEventListener<T> {
fun onDataChunk(chunk: List<T>)
fun processComplete()
}
/** class that produces 1..10 every 5 seconds */
@Singleton
class MyEventProcessor {
/** map threadId -> MyEventListener */
/** NOTE: Not for production use */
private val listeners = ConcurrentHashMap<Long, MyEventListener<Int>>()
fun register(
threadId: Long,
incomingListener: MyEventListener<Int>
) {
listeners[threadId] = incomingListener
}
@Scheduled(cron = "*/5 * * * * *")
fun scheduled() {
listeners.values.forEach {
it.onDataChunk((1..10).toList())
}
}
}
@Prototype /** class is not thread-safe, hence create a new instance for every access */
class FluxBridge(private val myEventProcessor: MyEventProcessor) {
private lateinit var bridge: Flux<Int>
@PostConstruct
fun init() {
/** initialize the bridge that is used by method produceData */
bridge = Flux.create { sink: FluxSink<Int> ->
myEventProcessor.register(
Thread.currentThread().id,
/** anonymous implementation of MyEventListener */
object : MyEventListener<Int> {
override fun onDataChunk(chunk: List<Int>) {
for (s in chunk) {
sink.next(s)
}
}
/** not used in this example */
override fun processComplete() {
sink.complete()
}
})
}
}
/** just return the bridge which is a Flux<Int> */
fun getFlux(): Flux<Int> = bridge
}