如何验证servlet处理是否真的在做非阻塞io?

问题描述 投票:0回答:1

[我正在尝试使用非阻塞io接收http请求,然后使用非阻塞io向另一个服务器发出另一个http请求,并返回一些响应,这是我的servlet的代码:

package learn;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import javax.servlet.AsyncContext;
import javax.servlet.ReadListener;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.core.Response;

import org.jboss.resteasy.client.jaxrs.ResteasyClientBuilder;

@WebServlet(urlPatterns = {"/async"}, asyncSupported = true)
public class AsyncProcessing extends HttpServlet {

    private static final long serialVersionUID = -535924906221872329L;

    public CompletableFuture<String> readRequestAsync(final HttpServletRequest req) {       

        final CompletableFuture<String> request = new CompletableFuture<>();
        final StringBuilder httpRequestData = new StringBuilder();
        try (ServletInputStream inputStream = req.getInputStream()){                        
            inputStream.setReadListener(new ReadListener() {
                final int BUFFER_SIZE = 4*1024;
                final byte buffer[] = new byte[BUFFER_SIZE];

                @Override
                public void onError(Throwable t) {
                    request.completeExceptionally(t);
                }

                @Override
                public void onDataAvailable() {
                    if(inputStream.isFinished()) return;
                    System.out.println("----------------------------------------");
                    System.out.println("onDataAvailable: " + Thread.currentThread().getName());
                    try {
                        while(inputStream.isReady()) {
                          int length = inputStream.read(buffer);                      
                          httpRequestData.append(new String(buffer, 0, length));
                       }
                    } catch (IOException ex) { 
                        request.completeExceptionally(ex);
                    }
                }

                @Override
                public void onAllDataRead() throws IOException {
                    try {
                        request.complete(httpRequestData.toString());
                    }
                    catch(Exception e) {
                        request.completeExceptionally(e);
                    }
                }
            });
        } catch (IOException e) {
            request.completeExceptionally(e);
        }                               

        return request;
    }



    private Client createAsyncHttpClient() {
        ResteasyClientBuilder restEasyClientBuilder = (ResteasyClientBuilder)ClientBuilder.newBuilder();

        return restEasyClientBuilder.useAsyncHttpEngine().connectTimeout(640, TimeUnit.SECONDS).build();
    }

    public CompletableFuture<Response>  process(String httpRequest){        
        System.out.println("----------------------------------------");
        System.out.println("process: " + Thread.currentThread());

        CompletableFuture<Response> futureResponse = new CompletableFuture<>();

        Client client = createAsyncHttpClient();
        client.target("http://localhost:3000").request().async().get(new InvocationCallback<Response>() {
            @Override
            public void completed(Response response) {
                System.out.println("----------------------------------------");
                System.out.println("completed: " + Thread.currentThread());
                futureResponse.complete(response);
            }

            @Override
            public void failed(Throwable throwable) {
                System.out.println(throwable);
                futureResponse.completeExceptionally(throwable);
            }
        });

        return futureResponse;
    }

    public CompletableFuture<Integer> outputResponseAsync(Response httpResponseData, HttpServletResponse resp){
        System.out.println("----------------------------------------");
        System.out.println("outputResponseAsync: " + Thread.currentThread().getName());

        CompletableFuture<Integer> total = new CompletableFuture<>();
        try (ServletOutputStream outputStream = resp.getOutputStream()){            
            outputStream.setWriteListener(new WriteListener() {             

                @Override
                public void onWritePossible() throws IOException {
                    System.out.println("----------------------------------------");
                    System.out.println("onWritePossible: " + Thread.currentThread().getName());                                 

                    outputStream.print(httpResponseData.getStatus());
                    total.complete(httpResponseData.getLength());
                }

                @Override
                public void onError(Throwable t) {
                    System.out.println(t);
                    total.completeExceptionally(t);
                }
            });
        } catch (IOException e) {
            System.out.println(e);
            total.completeExceptionally(e);
        }

        return total;
    }

    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        System.out.println("----------------------------------------");
        System.out.println("doGet: " + Thread.currentThread().getName());
        final AsyncContext asyncContext = req.startAsync();
        readRequestAsync(req)
            .thenCompose(this::process)
            .thenCompose(httpResponseData -> outputResponseAsync(httpResponseData, resp))       
            .thenAccept(a -> asyncContext.complete());
    }
}

http://localhost:3000处的服务器是一个写在节点中的http服务器,它仅在27秒后返回响应,我想向该节点服务器发出请求,而在处理此请求时,我想发出另一个http请求到servlet以查看是否正在使用同一线程。目前,我正在尝试使用payara 5.194进行此操作,但是即使我将两个线程池设置为具有一个线程,应用服务器也似乎会创建另一个线程。因此,我想从您的知识中知道该servlet是否确实在做非阻塞io并且在任何时候都没有阻塞,如果我可以做一些实验来确保这一点,那也将令人惊讶。我认为必须指出,类ServletInputStream是InputStream的子类,因此我真的不知道这是否是非阻塞io。谢谢。

java http servlets nonblocking jakarta-ee
1个回答
-1
投票

我认为这个问题并未引起人们的广泛关注,因为不清楚您要问什么。大多数节点功能应该是异步的,但有一些例外,您可以在此处了解更多信息:https://nodejs.org/en/docs/guides/blocking-vs-non-blocking/

© www.soinside.com 2019 - 2024. All rights reserved.