我有一个REST服务。当用户发出请求时,服务会向外部服务和数据库发出多个请求,合并结果并回馈响应。像这样的东西:
public class SomeController extends Controller {
public CompletionStage<Result> someEndpoint() {
return someService.someSlowMethod()
.thenApplyAsync(r -> ok(Json.toJson(r)), ec.current());
}
}
@Singleton
public class SomeService {
public CompletionStage<SomeDTO> someSlowMethod() {
return ws.url(someExternalServiceUrl)
.get()
.thenCompose(firstResult -> ws.url(anotherExternalServiceUrl)
.get()
.thenCompose(secondResult -> someRepository.queryDb()
.thenApply(dbResult -> doSomeWork(firstResult, secondResult, dbResult))));
}
}
但是外部服务可以响应很长时间,用户必须等待,并且不能排除可能的超时。因此,我想将SSE返回给用户并在接收到来自外部服务和数据库的响应时发送进度事件,然后返回结果。
但是我不知道该怎么做。据我了解,我需要Akka Streams。我应该从someSlowMethod
返回什么?如何生成事件并使其具有线程安全性?如果您能向正确的方向推动我,我将不胜感激。
我想通了。
public class Module extends AbstractModule implements AkkaGuiceSupport {
@Override
public void configure() {
bindActor(GraphStageManager.class, "graph-manager");
bindActorFactory(SomeWorker.class, SomeWorkerFactory.class);
}
}
public interface SomeWorkerFactory {
public Actor create();
}
public class SomeWorkerGenerator extends AbstractActor {
private final SomeService service;
@Inject
public DocumentGenerator(SomeService service) {
this.service = service;
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(
GraphStageManager.DoWork.class,
msg -> {
service.someSlowMethod(getSender()).toCompletableFuture().join();
getContext().stop(getSelf());
})
.build();
}
}
@Singleton
public class GraphStageManager extends AbstractActor implements InjectedActorSupport {
private final Map<Long, ActorRef> workers = new HashMap<>();
private DocumentGeneratorFactory workerFactory;
@Inject
public GraphStageManager(DocumentGeneratorFactory workerFactory) {
this.workerFactory = workerFactory;
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(
DoWork.class,
msg -> {
if (workers.get(msg.getId()) != null) {
// ...
} else {
getContext().watchWith(getSender(), new Terminated(msg.getId()));
ActorRef worker = injectedChild(() -> workerFactory.create(), "some-worker-" + msg.getId());
workers.put(msg.getId(), worker);
worker.tell(msg, getSender());
}
})
.match(
Terminated.class,
msg -> {
ActorRef worker = workers.remove(msg.getId());
if (worker != null)
worker.tell(new Stop(), getSelf());
getContext().unwatch(getSender());
})
.build();
}
public static class DoWork {
// ...
}
public static class Terminated {
// ...
}
}
public class EventStage extends GraphStage<SourceShape<String>> {
public final Outlet<String> out = Outlet.create("EventStage.out");
private final SourceShape<String> shape = SourceShape.of(out);
private final Long id;
private final ActorRef manager;
public EventStage(Long id, ActorRef manager) {
this.id = id;
this.manager = manager;
}
@Override
public SourceShape<String> shape() {
return shape;
}
@Override
public GraphStageLogic createLogic(Attributes inheritedAttributes) {
return new GraphStageLogic(shape()) {
private final Deque<String> messages = new LinkedList<>();
private boolean downstreamWaiting = false;
{
setHandler(
out,
new AbstractOutHandler() {
@Override
public void onPull() throws Exception {
if (messages.size() > 0) {
push(out, messages.poll());
} else {
downstreamWaiting = true;
}
}
});
}
@Override
public void preStart() {
ActorRef ref = getStageActor(this::onReceive).ref();
manager.tell(new GraphStageManager.DoWork(id), ref);
}
public BoxedUnit onReceive(Tuple2<ActorRef, Object> msg) {
String event = (String) msg._2();
messages.offer(event);
if (downstreamWaiting) {
downstreamWaiting = false;
push(out, messages.poll());
}
if ("done".equals(event))
completeStage();
return BoxedUnit.UNIT;
}
};
}
}
@Singleton
public class SomeService {
public CompletionStage<SomeDTO> someSlowMethod(ActorRef actor) {
return ws.url(someExternalServiceUrl)
.get()
.thenApply(r -> {
actor.tell("tick", ActorRef.noSender());
return r;
})
.thenCompose(firstResult -> ws.url(anotherExternalServiceUrl)
.get()
.thenApply(r -> {
actor.tell("tick", ActorRef.noSender());
return r;
})
.thenCompose(secondResult -> someRepository.queryDb()
.thenApply(dbResult -> doSomeWork(firstResult, secondResult, dbResult))))
.thenApply(r -> {
actor.tell("done", ActorRef.noSender());
return r;
});
}
}
public class SomeController extends Controller {
private final ActorRef manager;
@Inject
public SomeController(@Named("graph-manager") ActorRef manager) {
this.manager = manager;
}
public Result someEndpoint(Long id) {
Source<EventSource.Event, ?> eventSource = Source.fromGraph(new EventStage(id, manager)).map(EventSource.Event::event);
return ok().chunked(eventSource.via(EventSource.flow())).as(Http.MimeTypes.EVENT_STREAM);
}
}
我希望我做对了。