连续 Socket 连接,第二个请求时 ObjectInputStream EOF

问题描述 投票:0回答:1
Java 21 套接字
允许多个连接
使用 ObjectInputStream 和 ObjectOutputStream 在第二个请求时产生 EOF

我有一个客户端、一个(中间人)桥接服务器和一个库存服务器。

  • 客户是指最终用户。
  • 服务器是指授权系统和库存服务器的一种网关

时序图如下:
Communication flow between components

现在解决问题:

  1. 开始
    BridgeServer
  2. 启动
    UserClient

    并观察连接已建立。
  3. 从终端中显示的选项发送任何类型的请求
    (对象)。
    在我的跑步中,我发送了
    AuthenticationRequest
  4. AuthenticationRequest
    处理正确。
  5. 重新发送
    AuthenticationRequest
    任何请求。
  6. EOFException
    BridgeServer
  7. 抛出
java.io.EOFException
    at java.base/java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2933)
    at java.base/java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:3428)
    at java.base/java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:985)
    at java.base/java.io.ObjectInputStream.<init>(ObjectInputStream.java:416)
    at com.pe.distributed.system.bridge.BridgeServer.run(BridgeSide.java:44)
    at com.pe.distributed.system.bridge.BridgeMain.main(BridgeMain.java:5)

对于上下文和可重复性:

用户客户端


import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.Socket;
import java.util.Map;
import java.util.Scanner;
import java.util.logging.Level;
import java.util.logging.Logger;

public record AuthenticationRequest(String username, String password) implements Serializable {
}

public record InventoryQuery() implements Serializable {
}

public record ItemOrder(String itemCode, int quantity) implements Serializable {
}

public class UserClient {
    private static final Logger logger = Logger.getLogger(UserClient.class.getName());
    private static final String EXCEPTION_OCCURRED = "Exception occurred";

    private UserClient() {
    }

    @SuppressWarnings("java:S2189")
    public static void run() {
        Scanner scanner = new Scanner(System.in);
        while (true) {
            try (Socket bridgeSocket = new Socket("127.0.0.1", 5550)) {
                logger.log(Level.INFO, "Connected successfully to bridge server");

                while (true) {
                    displayMainMenu();
                    int choice = scanner.nextInt();
                    scanner.nextLine();

                    // Handle user choice
                    switch (choice) {
                        case 1:
                            authenticate(scanner, bridgeSocket);
                            break;
                        case 2:
                            checkInventory(bridgeSocket);
                            break;
                        case 3:
                            placeOrder(scanner, bridgeSocket);
                            break;
                        default:
                            logger.log(Level.WARNING, "Invalid choice. Please enter a valid option.");
                    }
                }
            } catch (IOException | ClassNotFoundException e) {
                logger.log(Level.SEVERE, EXCEPTION_OCCURRED, e);
            } finally {
                scanner.close();
            }
        }
    }

    @SuppressWarnings("java:S106")
    private static void displayMainMenu() {
        System.out.println("What would you like to do:");
        System.out.println("1. Authenticate");
        System.out.println("2. Check Inventory");
        System.out.println("3. Place an Order");
        System.out.print("Enter your choice: ");
    }

    @SuppressWarnings("java:S106")
    private static void authenticate(Scanner scanner, Socket socket) throws IOException, ClassNotFoundException {
        Map.Entry<String, String> authCreds = getUserAuthenticationInput(scanner);
        AuthenticationRequest authenticationRequest = new AuthenticationRequest(authCreds.getKey(),
                authCreds.getValue());

        ObjectOutputStream bridgeIn = new ObjectOutputStream(socket.getOutputStream());
        bridgeIn.writeObject(authenticationRequest);
        bridgeIn.flush();

        ObjectInputStream bridgeOut = new ObjectInputStream(socket.getInputStream());
        Object response = bridgeOut.readObject();
        logger.log(Level.FINE, "Received response from bridge: {0}", response);
        System.out.println(response);
    }

    @SuppressWarnings("java:S106")
    private static Map.Entry<String, String> getUserAuthenticationInput(Scanner scanner) {
        System.out.println("Please provide credentials.\n");
        System.out.print("Username: ");
        String username = scanner.nextLine();
        System.out.print("Password: ");
        String password = scanner.nextLine();
        return Map.entry(username, password);
    }

    @SuppressWarnings("java:S106")
    private static void checkInventory(Socket socket) throws IOException, ClassNotFoundException {
        ObjectOutputStream bridgeIn = new ObjectOutputStream(socket.getOutputStream());
        bridgeIn.writeObject(new InventoryQuery());
        bridgeIn.flush();

        ObjectInputStream bridgeOut = new ObjectInputStream(socket.getInputStream());
        Object response = bridgeOut.readObject();
        logger.log(Level.FINE, "Received response from bridge: {0}", response);
        System.out.println(response);
    }

    private static void placeOrder(Scanner scanner, Socket socket) throws IOException, ClassNotFoundException {
        Map.Entry<String, Integer> orderDetails = getUserOrderInput(scanner);
        ItemOrder itemOrder = new ItemOrder(orderDetails.getKey(), orderDetails.getValue());

        ObjectOutputStream bridgeIn = new ObjectOutputStream(socket.getOutputStream());
        bridgeIn.writeObject(itemOrder);
        bridgeIn.flush();

        ObjectInputStream bridgeOut = new ObjectInputStream(socket.getInputStream());
        Object response = bridgeOut.readObject();
        logger.log(Level.FINE, "Received response from bridge: {0}", response);
    }

    @SuppressWarnings("java:S106")
    private static Map.Entry<String, Integer> getUserOrderInput(Scanner scanner) {
        System.out.println("Place your order!\n");

        String itemCode;
        int desiredQuantity;

        // Input validation loop for item code
        do {
            System.out.print("Item-code: ");
            itemCode = scanner.nextLine();
        } while (itemCode == null || itemCode.isEmpty());

        // Input validation loop for quantity
        do {
            System.out.print("Quantity: ");
            while (!scanner.hasNextInt()) {
                System.out.println("Invalid input. Please enter a valid number.");
                scanner.next(); // Consume the invalid input
            }
            desiredQuantity = scanner.nextInt();
            scanner.nextLine(); // Consume newline character
        } while (desiredQuantity <= 0);

        return Map.entry(itemCode, desiredQuantity);
    }
}

BridgeServer

import lombok.Getter;
import lombok.Setter;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;

@Getter
@Setter
public class User {

    String username;
    String password;

    public User(String username, String password) {
        this.username = username;
        this.password = password;
    }
}

public record AuthenticationResponse(String message, Boolean isAuthenticated) implements Serializable {
}

public record AuthenticationRequest(String username, String password) implements Serializable {
}

public class BridgeServer {
    private static final Logger logger = Logger.getLogger(BridgeServer.class.getName());
    private static final Map<String, User> users;

    static {
        users = new ConcurrentHashMap<>();
        // Populate inventory with sample items
        users.put("user1", new User("user1", "pass1"));
        users.put("user2", new User("user2", "pass2"));
        users.put("user3", new User("user3", "pass3"));
    }

    @SuppressWarnings("java:S2189")
    public static void run() {
        try (ExecutorService executorService = Executors.newCachedThreadPool();
             ServerSocket serverSocket = new ServerSocket(5550)) {
            logger.info("Bridge server started, waiting for connections...");

            //noinspection InfiniteLoopStatement
            while (true) {
                Socket userSocket = serverSocket.accept();
                String clientKey = userSocket.getInetAddress().toString() + ":" + userSocket.getPort();
                logger.log(Level.INFO, "User client with key {0} connected.", clientKey);

                executorService.submit(new BridgeConnectionHandler(userSocket, clientKey));
            }
        } catch (IOException e) {
            logger.log(Level.SEVERE, "IOException occurred", e);
        }
    }

    @SuppressWarnings("java:S2189")
    private record BridgeConnectionHandler(Socket userSocket, String clientKey) implements Runnable {
        @Override
        public void run() {
            try (ObjectInputStream in = new ObjectInputStream(userSocket.getInputStream());
                 ObjectOutputStream out = new ObjectOutputStream(userSocket.getOutputStream())) {

                boolean isAuthenticated = false;
                //noinspection InfiniteLoopStatement
                while (true) {
                    Object request = in.readObject();
                    // If the client is authenticated, handle user requests
                    if (isAuthenticated) {
                        if (!(request instanceof AuthenticationRequest)) {
                            handleUserRequest(in, out);
                        } else {
                            out.writeObject(new AuthenticationResponse("Client already authenticated with user "
                                    + users.get(clientKey), true));
                            out.flush();
                        }
                    } else if (request instanceof AuthenticationRequest authenticationRequest) {
                        AuthenticationResponse authenticationResponse = handleAuthenticationRequest(authenticationRequest);
                        out.writeObject(authenticationResponse);
                        out.flush();

                        isAuthenticated = authenticationResponse.isAuthenticated();
                    } else {
                        out.writeObject(new AuthenticationResponse("Not authenticated", false));
                        out.flush();
                    }
                }
            } catch (IOException | ClassNotFoundException e) {
                logger.log(Level.SEVERE, "Exception occurred", e);
            }
        }

        private AuthenticationResponse handleAuthenticationRequest(AuthenticationRequest authenticationRequest) {
            boolean authenticated = authenticate(authenticationRequest.username(), authenticationRequest.password());
            return new AuthenticationResponse(authenticated ? "Authentication successful" : "Authentication failed", authenticated);
        }

        private static synchronized boolean authenticate(String username, String password) {
            User user = users.get(username);
            return user != null && user.getPassword().equals(password);
        }

        private void handleUserRequest(ObjectInputStream bridgeIn, ObjectOutputStream userOut) throws IOException {
            try (Socket inventorySocket = new Socket("127.0.0.1", 12346);
                 ObjectOutputStream inventoryOut = new ObjectOutputStream(inventorySocket.getOutputStream());
                 ObjectInputStream inventoryIn = new ObjectInputStream(inventorySocket.getInputStream())) {

                Object request;
                while ((request = bridgeIn.readObject()) != null) {
                    logger.log(Level.INFO, "Received request from user: {0}", request);

                    // Forward request to inventory side
                    inventoryOut.writeObject(request);
                    inventoryOut.flush();

                    // Receive response from inventory side
                    Object response = inventoryIn.readObject();
                    logger.log(Level.INFO, "Received response from inventory: {0}", response);

                    // Send response back to user side
                    userOut.writeObject(response);
                    userOut.flush();
                }
            } catch (ClassNotFoundException e) {
                logger.log(Level.SEVERE, "Exception occurred", e);
            }
        }
    }

    public static void main(String[] args) {
        run();
    }

}

我特别提到了 EOF 被扔进

BridgeServer
中,特别是在
BridgeConnectionHandler
 <br> Object request = in.readObject();
下,请求通过的第 时间。

每次编写对象时,我都会彻底地

flush()

  1. 一些答案说套接字/连接正在关闭,因此
    ObjectInputStream
    会按预期产生
    EOFException

  2. 其他人说
    ObjectInputStream
    对象和实际 Socket 的
    InputStream
    可能不同步

也许我误解了套接字的功能, 但是

  • 我希望我的服务器保持连接处于活动状态,直到客户端关闭它。
  • 并确保所有请求都能得到正确处理。

当然没有

EOFException
错误。

希望得到一些反馈!

java sockets serversocket objectinputstream objectoutputstream
1个回答
0
投票

事实证明问题在于,而不是传递和使用

ObjectInputStream
ObjectOutputStream
在初始连接时创建,

try (ObjectInputStream in = new ObjectInputStream(userSocket.getInputStream());
     ObjectOutputStream out = new ObjectOutputStream(userSocket.getOutputStream()))

我们在方法的每次调用中创建了新实例

authenticate
checkInventory
placeOrder
例如。

ObjectOutputStream bridgeIn = new ObjectOutputStream(socket.getOutputStream());
    bridgeIn.writeObject(authenticationRequest);
    bridgeIn.flush();
ObjectInputStream bridgeOut = new ObjectInputStream(socket.getInputStream());

自:

ObjectOutputStream
在创建时写入流头。
如果您创建一个新的
ObjectOutputStream
并写入同一个OutputStream(即socket.getOutputStream()),则每个新流都会写入自己的标头
当相应的
ObjectInputStream
尝试读取数据时,它期望以连续的顺序读取对象。
但是,在流中间遇到新标头可能会使其混乱,导致 EOFException 或其他意外行为。

这就是导致 EOF 的原因,一旦我将原始且唯一的输入和输出流传递给方法,错误就消失了。

© www.soinside.com 2019 - 2024. All rights reserved.