recv 调用在远程关闭套接字后挂起

问题描述 投票:0回答:1

我正在做套接字编程,并且在弄清楚如何终止阻塞的recv调用时遇到问题,当它已经在等待并且另一端关闭套接字时。我的代码如下。

我的客户如下所示

class Client {
public:

    void connect();

    void disConnect();

    void addPayload(const std::string& payload);

    void setPort(unsigned int port);

private:
    void shutdown();

    void recvThread(unsigned int socketId);
    void sendThread(unsigned int socketId);


    std::thread m_sendthread;

    std::thread m_recvthread;

    unsigned int m_socketId;

    bool m_isShutdown;

    std::mutex m_mutex;

    std::condition_variable m_sendThreadWait;

    std::vector<std::string> m_payloadJsonList;

    unsigned int m_port;
}; 

Client.cpp 如下所示

void Client::connect() {
    LOG(__func__);
    m_socketId = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (m_socketId < 0) {
        LOG(__func__).m("failed to create the connecting socket.");
        return;
    }

    struct sockaddr_in server_addr;
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
    server_addr.sin_port = htons(m_port);

    if (::connect(m_socketId, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
        LOG(__func__).m("Unable to connect.");
        close(m_socketId);
        return;
    }

    LOG(__func__).m("Successfully connected.");

    m_recvthread = std::thread(&Client::recvThread, this, m_socketId);
    m_sendthread = std::thread(&Client::sendThread, this, m_socketId);
}

void Client::addPayload(const std::string& payload) {
    LOG(__func__);
    rapidjson::Document document;
    if (!json::parseJSON(payload, document)) {
        LOG(__func__).d("reason", "invalidJson");
        return;
    }
    {
        std::unique_lock<std::mutex> lock(m_mutex);
        m_payloadJsonList.emplace_back(payload);
    }
}

void Client::recvThread(unsigned int socketId) {
    LOGLX(__func__).m("started.");
    bool joinThread = false;
    do {
        int requestLenBuffer = 0;  // Request size
        std::cout<<"1\n";
        uint8_t bytesRead = recv(socketId, &requestLenBuffer, sizeof(requestLenBuffer), 0);
        std::cout<<"2\n";
        if (0 == bytesRead) {
            {
                std::lock_guard<std::mutex> lock(m_mutex);
                joinThread = m_isShutdown;
            }
            LOG(__func__).m("connection closed by remote.");
            break;
        }

        if (bytesRead < 0) {
            LOG(__func__).m("recv failed reading request size.");
            return;
        }
        int requestInBytes = ntohl(requestLenBuffer);

        LOG(__func__).d("RequestSize", requestInBytes);

        uint8_t bytesRemaining = requestInBytes;
        uint8_t totalBytesRead = 0;
        std::vector<unsigned char> byteBuffer(bytesRemaining + 1);  // +1 for the NUL-terminator

        while (bytesRemaining > 0) {
            std::cout<<"3\n";
            uint8_t bytesRead = recv(socketId, byteBuffer.data() + totalBytesRead, bytesRemaining, 0);
            std::cout<<"4\n";
            if (0 == bytesRead) {
                {
                    std::lock_guard<std::mutex> lock(m_mutex);
                    joinThread = m_isShutdown;
                }
                LOG(__func__).m("connection closed by remote.");
                break;
            } else if (bytesRead < 0) {
                LOG(__func__).m("recv failed reading request.");
                joinThread = true;
                break;
            }
            totalBytesRead += bytesRead;
            bytesRemaining -= bytesRead;
        }

        if (!bytesRemaining && !joinThread) {
            byteBuffer[requestInBytes] = '\0';  // NUL-terminate the string
            LOG(__func__).d("Recieved Request: ", byteBuffer.data());
        }
    } while (!joinThread);
    LOG(__func__).m("completed");
}

void Client::sendThread(unsigned int socketId) {
    LOG(__func__).m("started.");
    std::vector<unsigned char> payloadBuffer;
    do {
        payloadBuffer.clear();
        {
            std::unique_lock<std::mutex> lock(m_mutex);
            m_sendThreadWait.wait(lock, [this] { return m_payloadJsonList.size() || m_isShutdown; });
            if (m_isShutdown) {
                break;
            }
            std::string payload = m_payloadJsonList.back();
            std::vector<unsigned char> buffer(payload.begin(), payload.end());
            payloadBuffer = std::move(buffer);
            m_payloadJsonList.pop_back();
        }

        LOG(__func__).d("Sending Payload", payloadBuffer.data()).d("size", payloadBuffer.size());

        int requestLengthToSend = htonl(payloadBuffer.size());
        uint8_t bytesRemaining = payloadBuffer.size();
        uint8_t bytesAlreadySent = 0;
        send(socketId, &requestLengthToSend, sizeof(requestLengthToSend), 0);

        do {
            uint8_t bytes = send(socketId, &payloadBuffer[0] + bytesAlreadySent, bytesRemaining, 0);
            bytesAlreadySent += bytes;
            bytesRemaining -= bytes;

        } while (bytesRemaining > 0);

    } while (true);
    LOG(__func__).m("completed.");
}

void Client::disConnect() {
    LOG(__func__);
    shutdown();
}

void Client::setPort(unsigned int port) {
    LOG(__func__);
    m_port = port;
}

void Client::shutdown() {
    LOGLX(__func__);
    {
        std::lock_guard<std::mutex> lock(m_mutex);
        m_isShutdown = true;
        m_sendThreadWait.notify_one();
    }

    close(m_socketId);
    //::shutdown(m_socketId, SHUT_RDWR);

    if (m_sendthread.joinable()) {
        m_sendthread.join();
    }

    if (m_recvthread.joinable()) {
        m_recvthread.join();
    }
}

我的客户端应用程序的主类如下所示

int main(int argc, char* argv[]) {
  
    //string jsonPayload = ...read payload;
    Client client;
    client.setPort(12345);
    client.addPayload(jsonPayload);
    client.connect();

    std::thread waitThread = std::thread([]() { std::this_thread::sleep_for(std::chrono::seconds(15)); });

    if (waitThread.joinable()) {
        waitThread.join();
    }
    client.disConnect();
    return 0;
}

我的服务器类如下所示

服务器.h

class Server {
public:
    ~Server() = default;

    void listen() override;
    void disConnect() override;
    void setPort(unsigned int port) override;
    void shutdown() override;

protected:
    void acceptNewConnection();

    void handleConnection();
    void handleRequest();
    void sendThread();
    void recvThread();
    void acceptThread();

    
    Message processBuffer(std::vector<unsigned char>& byteBuffer);

    std::thread m_sendThread;
    std::thread m_recvThread;

    std::thread m_acceptThread;

    bool m_isShutdown;

    std::mutex m_mutex;

    int m_listenSocket;

    unsigned int m_port;

    unsigned int m_clientSocketId;

    std::condition_variable m_sendThreadWait;


    std::vector<Message> m_payloadList;

};

我的 server.cpp 类如下所示

void Server::listen() {
    LOG(__func__);
    m_listenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (m_listenSocket < 0) {
        LOG(__func__).m("Failed to open listen socket.");
        return;
    }

    int optval = 0;
    if (setsockopt(m_listenSocket, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) < 0) {
        LOG(__func__).m("Failed to set SO_REUSEADDR option.");
        close(m_listenSocket);
        return;
    }

    struct sockaddr_in server_addr;
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    server_addr.sin_port = htons(m_port);

    if (bind(m_listenSocket, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
        LOG(__func__).m(" Failed to bind to port.");
        close(m_listenSocket);
        return;
    }

    int backlog = 1;  // We are only interested to pool one client.
    if (::listen(m_listenSocket, backlog) < 0) {
        LOG(__func__).m(" Failed to listen on the listen socket.");
        close(m_listenSocket);
        return;
    }

    LOG(__func__).m("SocketServer is listening.").d("port", m_port);

    m_acceptThread = std::thread(&Server::acceptThread, this);
}
void Server::disConnect() {
    LOG(__func__);
    shutdown();
}

void Server::setPort(unsigned int port) {
    LOG(__func__);
    m_port = port;
}

void Server::shutdown() {
    LOG(__func__);
    {
        std::lock_guard<std::mutex> lock(m_mutex);
        m_isShutdown = true;
        m_sendThreadWait.notify_one();
    }

    close(m_listenSocket);
    close(m_clientSocketId);

    if (m_acceptThread.joinable()) {
        m_acceptThread.join();
    }

    if (m_recvThread.joinable()) {
        m_recvThread.join();
    }

    if (m_sendThread.joinable()) {
        m_sendThread.join();
    }
}

void SocketServer::acceptNewConnection() {
    LOG(__func__);
    socklen_t nlen = sizeof(struct sockaddr_in);
    try {
        int incomingSocketId = accept(m_listenSocket, NULL, &nlen);
        if (incomingSocketId < 0) {
            LOG(__func__).d("reason", "acceptConnectionFailed").m("Failed to accept the client connection");
            return;
        }

        m_clientSocketId = incomingSocketId;
        m_sendThread = std::thread(&Server::sendThread, this);
        m_recvThread = std::thread(&Server::recvThread, this);
    } catch (const std::exception& e) {
        LOG(__func__).d("reason", "acceptConnectionFailed").d("socket closed by remote.", e.what());
        return;
    }
}

void Server::sendThread() {
    LOG(__func__).m("started.");
    do {
        Message message;
        {
            std::unique_lock<std::mutex> lock(m_mutex);
            m_sendThreadWait.wait(lock, [this] { return m_payloadList.size() || m_isShutdown; });
            if (m_isShutdown) {
                break;
            }
            message = m_payloadList.back();
            m_payloadList.pop_back();
        }

        //Process the message and convert to string
        std::string payload = message.toString();

        ACSDK_INFO(LX(__func__).d("Sending Payload", payload).d("size", payload.size()));

        std::vector<unsigned char> buffer(payload.begin(), payload.end());
        int responseLength = payload.size();
        int responseLength_after_htol = htonl(responseLength);
        uint8_t bytesRemaining = responseLength;
        uint8_t bytesAlreadySent = 0;
        send(m_clientSocketId, &responseLength_after_htol, sizeof(responseLength), 0);

        do {
            uint8_t bytes = send(m_clientSocketId, &buffer[0] + bytesAlreadySent, bytesRemaining, 0);
            bytesAlreadySent += bytes;
            bytesRemaining -= bytes;

        } while (bytesRemaining > 0);
    } while (true);
    LOG(__func__).m("completed.");
}

void Server::acceptThread() {
    LOG(__func__).m("started.");
    acceptNewConnection();
    LOG(__func__).m("completed");
}

void Server::recvThread() {
    LOG(__func__).m("started.");
    bool joinThread = false;
    do {
        int requestLenBuffer = 0;
        std::cout<<"1\n";
        uint8_t bytesRead = recv(m_clientSocketId, &requestLenBuffer, sizeof(requestLenBuffer), 0);
        std::cout<<"2\n";
        if (0 == bytesRead) {
            {
                std::lock_guard<std::mutex> lock(m_mutex);
                joinThread = m_isShutdown;
            }
            LOG(__func__).m("connection closed by remote.");
            break;
        }

        if (bytesRead < 0) {
            LOG(__func__).m("recv failed reading request size.");
            return;
        }
        int requestInBytes = ntohl(requestLenBuffer);

        LOG(__func__).d("RequestSize", requestInBytes);

        uint8_t bytesRemaining = requestInBytes;
        uint8_t totalBytesRead = 0;
        std::vector<unsigned char> byteBuffer(bytesRemaining + 1);  // +1 for the NUL-terminator

        while (bytesRemaining > 0) {
            std::cout<<"3\n";
            uint8_t bytesRead = recv(m_clientSocketId, byteBuffer.data() + totalBytesRead, bytesRemaining, 0);
            std::cout<<"4\n";
            if (0 == bytesRead) {
                {
                    std::lock_guard<std::mutex> lock(m_mutex);
                    joinThread = m_isShutdown;
                }
                LOG(__func__).m("connection closed by remote.");
                break;
            } else if (bytesRead < 0) {
                LOG(__func__).m("recv failed reading request.");
                joinThread = true;
                break;
            }
            totalBytesRead += bytesRead;
            bytesRemaining -= bytesRead;
        }

        if (!bytesRemaining && !joinThread) {
            byteBuffer[requestInBytes] = '\0';  // NUL-terminate the string
            LOG(__func__).d("Recieved Message: ", byteBuffer.data());

            /// Process the recieved Request/Message
        }
    } while (!joinThread);
    LOG(__func__).m("completed");
}

服务器应用程序的主类如下所示

int main(int argc, char* argv[]) {
    Server server;
    server.setPort(12345);
    server.listen();

    std::thread waitThread = std::thread([]() { std::this_thread::sleep_for(std::chrono::seconds(15)); });

    if (waitThread.joinable()) {
        waitThread.join();
    }

    server.disConnect();

    std::thread newWaitThread = std::thread([]() { std::this_thread::sleep_for(std::chrono::seconds(2)); });
    if (newWaitThread.joinable()) {
        newWaitThread.join();
    }
    return 0;
}

客户端应用程序的输出

...
Client:connect
Client:connect::Successfully connected.
Client:recvThread::started.
1
Client:sendThread::started.
Client:sendThread:Sending Payload={...},size=160
Client:disConnect
Client:shutdown
Client:sendThread::completed.

服务器应用程序的输出

...
Server:listen
Server:listen::SocketServer is listening.:port=12345
Server:acceptThread::started.
Server:acceptNewConnection
Server:sendThread::started.
Server:acceptThread::completed
Server:recvThread::started.
1
2
Server:recvThread:RequestSize=160
3
4
Server:recvThread:Recieved Request: ={...}
Server:disConnect
Server:shutdown
Server:sendThread::completed.

可以看出,即使我已经关闭了套接字,RecvThread 也没有在两个应用程序上完成,并且正在等待 recv 阻塞调用。

到目前为止我已经尝试/考虑过的事情

  1. 我尝试拨打
    ::shutdown(m_socketId, SHUT_RDWR);
    而不是
    close(m_socketId);
    ,但没有任何改变
  2. 我读到这可能是一个已知问题,并使用非阻塞调用(如 poll()、select() 等)重写我的类,但我希望这是我最后的手段,因为我对我的代码架构很满意并且有已经使用我现有的架构通过套接字传输数据。
  3. 我还阅读了有关在侦听线程上设置 SO_RCVTIMEO 并设置超时的信息,但这是否意味着即使在套接字关闭后,应用程序也会挂在那里,直到我们达到 SO_RCVTIMEO 超时,然后 recv 调用将继续,我将得到一个有机会完成recvThread。
c++ sockets recv
1个回答
0
投票

你愿意

        while (bytesRemaining > 0) {
            std::cout<<"3\n";
            uint8_t bytesRead = recv(socketId, byteBuffer.data() + totalBytesRead, bytesRemaining, 0);
            std::cout<<"4\n";
            if (0 == bytesRead) {
                {
                    std::lock_guard<std::mutex> lock(m_mutex);
                    joinThread = m_isShutdown;
                }
                LOG(__func__).m("connection closed by remote.");
                break;
            } else if (bytesRead < 0) {
                LOG(__func__).m("recv failed reading request.");
                joinThread = true;
                break;
            }
            totalBytesRead += bytesRead;
            bytesRemaining -= bytesRead;
        }

但是在

uint8_t bytesRead = recv(..)

您使用无符号的

uint8_t
(值范围为0到255)。因此意味着
if (bytesRead < 0)
绝对不会触发。您的编译器应该警告您这一点,也许您忽略了警告。

用于

recv()
返回值的正确变量类型是
ssize_t
,本质上是一个 signed
size_t
,如 文档所述

当另一方完全关闭 TCP 套接字时,任何待处理的

recv()
都应中止并返回负错误值。

© www.soinside.com 2019 - 2024. All rights reserved.