在使用并发 API 的同步调用中从 websocket 异步调用获取结果

问题描述 投票:0回答:0

我正在使用 websocket 客户端 (org.java_websocket.client.WebSocketClient) 它工作得很好,主要问题是在我的代码中我可以使用 onMessage 方法获得结果,但它是异步的,我需要同步调用者的结果

import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.drafts.Draft_6455;
import org.java_websocket.extensions.IExtension;
import org.java_websocket.handshake.ServerHandshake;
import org.java_websocket.protocols.IProtocol;
import org.java_websocket.protocols.Protocol;

public class EmptyClient extends WebSocketClient {

    private final static Draft_6455 draft = new Draft_6455(Collections.<IExtension>emptyList(), Collections.<IProtocol>singletonList(new Protocol("my-protocol")));
    private String request = null, response = null;

    public EmptyClient(String serverUri, String request) throws URISyntaxException {
        super(new URI(serverUri), draft);
        this.request = request;
        setConnectionLostTimeout(3000);
        connect();
    }

    @Override
    public void onOpen(ServerHandshake handshakedata) {
        send(request);
        System.out.println("new connection opened");
    }

    @Override
    public void onClose(int code, String reason, boolean remote) {
        System.out.println("closed with exit code " + code + " additional info: " + reason + " remote:" + remote);
    }

    @Override
    public void onMessage(String message) {
        this.response = message;
        close();
    }

    @Override
    public void onMessage(ByteBuffer message) {
        System.out.println("received ByteBuffer");
    }

    @Override
    public void onError(Exception ex) {
        System.err.println("an error occurred:" + ex);
    }

    public String getResponse() {
            return response;
    }

    public static void main(String[] args) throws URISyntaxException, ExecutionException, InterruptedException {

        String res = new EmptyClient("ws://123.abc:8888/", "{\"command\":\"create\",\"id\":\"1234\"}").getResponse();
        System.out.println("response : "+res);
    }

}

起初我想我可以使用 Future<T> ExecutorService 但主要问题是 Future 本身是异步的并且 onMessage 方法的结果也是异步

让我告诉你我以前的代码是如何改变的:

import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.drafts.Draft_6455;
import org.java_websocket.extensions.IExtension;
import org.java_websocket.handshake.ServerHandshake;
import org.java_websocket.protocols.IProtocol;
import org.java_websocket.protocols.Protocol;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class EmptyClient extends WebSocketClient {

    private final static Draft_6455 draft = new Draft_6455(Collections.<IExtension>emptyList(), Collections.<IProtocol>singletonList(new Protocol("my-protocol")));
    private String request = null, response = null;
    private ExecutorService executor = Executors.newSingleThreadExecutor();


    public EmptyClient(String serverUri, String request) throws URISyntaxException {
        super(new URI(serverUri), draft);
        this.request = request;
        setConnectionLostTimeout(3000);
        connect();
    }

    @Override
    public void onOpen(ServerHandshake handshakedata) {
        send(request);
        System.out.println("new connection opened");
    }

    @Override
    public void onClose(int code, String reason, boolean remote) {
        System.out.println("closed with exit code " + code + " additional info: " + reason + " remote:" + remote);
        executor.shutdown();
    }

    @Override
    public void onMessage(String message) {
        this.response = message;
        close();
    }

    @Override
    public void onMessage(ByteBuffer message) {
        System.out.println("received ByteBuffer");
    }

    @Override
    public void onError(Exception ex) {
        System.err.println("an error occurred:" + ex);
    }

    public Future<String> getResponse() {
        return executor.submit(() -> {
            return response;
        });
    }

    public static void main(String[] args) throws URISyntaxException, ExecutionException, InterruptedException {

        Future<String> res = new EmptyClient("ws://123.abc:8888/", "{\"command\":\"create\",\"id\":\"1234\"}").getResponse();
        System.out.println(res.isDone() + res.get());
    }

}

我不能使用 Future 作为 onMessage 方法的返回类型,因为它属于我使用的 WebSocketClient 类 因此我在 sync 方法中调用 EmptyClient 类,我需要那个结果来产生其他东西,我该如何解决它?也许其他并发 API 类和/或储物柜可以使用

java asynchronous websocket concurrency synchronous
© www.soinside.com 2019 - 2024. All rights reserved.