我正在做套接字编程,并且在弄清楚如何终止阻塞的recv调用时遇到问题,当它已经在等待并且另一端关闭套接字时。我的代码如下。
我的客户如下所示
class Client {
public:
void connect();
void disConnect();
void addPayload(const std::string& payload);
void setPort(unsigned int port);
private:
void shutdown();
void recvThread(unsigned int socketId);
void sendThread(unsigned int socketId);
std::thread m_sendthread;
std::thread m_recvthread;
unsigned int m_socketId;
bool m_isShutdown;
std::mutex m_mutex;
std::condition_variable m_sendThreadWait;
std::vector<std::string> m_payloadJsonList;
unsigned int m_port;
};
Client.cpp 如下所示
void Client::connect() {
LOG(__func__);
m_socketId = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (m_socketId < 0) {
LOG(__func__).m("failed to create the connecting socket.");
return;
}
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
server_addr.sin_port = htons(m_port);
if (::connect(m_socketId, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
LOG(__func__).m("Unable to connect.");
close(m_socketId);
return;
}
LOG(__func__).m("Successfully connected.");
m_recvthread = std::thread(&Client::recvThread, this, m_socketId);
m_sendthread = std::thread(&Client::sendThread, this, m_socketId);
}
void Client::addPayload(const std::string& payload) {
LOG(__func__);
rapidjson::Document document;
if (!json::parseJSON(payload, document)) {
LOG(__func__).d("reason", "invalidJson");
return;
}
{
std::unique_lock<std::mutex> lock(m_mutex);
m_payloadJsonList.emplace_back(payload);
}
}
void Client::recvThread(unsigned int socketId) {
LOGLX(__func__).m("started.");
bool joinThread = false;
do {
int requestLenBuffer = 0; // Request size
std::cout<<"1\n";
uint8_t bytesRead = recv(socketId, &requestLenBuffer, sizeof(requestLenBuffer), 0);
std::cout<<"2\n";
if (0 == bytesRead) {
{
std::lock_guard<std::mutex> lock(m_mutex);
joinThread = m_isShutdown;
}
LOG(__func__).m("connection closed by remote.");
break;
}
if (bytesRead < 0) {
LOG(__func__).m("recv failed reading request size.");
return;
}
int requestInBytes = ntohl(requestLenBuffer);
LOG(__func__).d("RequestSize", requestInBytes);
uint8_t bytesRemaining = requestInBytes;
uint8_t totalBytesRead = 0;
std::vector<unsigned char> byteBuffer(bytesRemaining + 1); // +1 for the NUL-terminator
while (bytesRemaining > 0) {
std::cout<<"3\n";
uint8_t bytesRead = recv(socketId, byteBuffer.data() + totalBytesRead, bytesRemaining, 0);
std::cout<<"4\n";
if (0 == bytesRead) {
{
std::lock_guard<std::mutex> lock(m_mutex);
joinThread = m_isShutdown;
}
LOG(__func__).m("connection closed by remote.");
break;
} else if (bytesRead < 0) {
LOG(__func__).m("recv failed reading request.");
joinThread = true;
break;
}
totalBytesRead += bytesRead;
bytesRemaining -= bytesRead;
}
if (!bytesRemaining && !joinThread) {
byteBuffer[requestInBytes] = '\0'; // NUL-terminate the string
LOG(__func__).d("Recieved Request: ", byteBuffer.data());
}
} while (!joinThread);
LOG(__func__).m("completed");
}
void Client::sendThread(unsigned int socketId) {
LOG(__func__).m("started.");
std::vector<unsigned char> payloadBuffer;
do {
payloadBuffer.clear();
{
std::unique_lock<std::mutex> lock(m_mutex);
m_sendThreadWait.wait(lock, [this] { return m_payloadJsonList.size() || m_isShutdown; });
if (m_isShutdown) {
break;
}
std::string payload = m_payloadJsonList.back();
std::vector<unsigned char> buffer(payload.begin(), payload.end());
payloadBuffer = std::move(buffer);
m_payloadJsonList.pop_back();
}
LOG(__func__).d("Sending Payload", payloadBuffer.data()).d("size", payloadBuffer.size());
int requestLengthToSend = htonl(payloadBuffer.size());
uint8_t bytesRemaining = payloadBuffer.size();
uint8_t bytesAlreadySent = 0;
send(socketId, &requestLengthToSend, sizeof(requestLengthToSend), 0);
do {
uint8_t bytes = send(socketId, &payloadBuffer[0] + bytesAlreadySent, bytesRemaining, 0);
bytesAlreadySent += bytes;
bytesRemaining -= bytes;
} while (bytesRemaining > 0);
} while (true);
LOG(__func__).m("completed.");
}
void Client::disConnect() {
LOG(__func__);
shutdown();
}
void Client::setPort(unsigned int port) {
LOG(__func__);
m_port = port;
}
void Client::shutdown() {
LOGLX(__func__);
{
std::lock_guard<std::mutex> lock(m_mutex);
m_isShutdown = true;
m_sendThreadWait.notify_one();
}
close(m_socketId);
//::shutdown(m_socketId, SHUT_RDWR);
if (m_sendthread.joinable()) {
m_sendthread.join();
}
if (m_recvthread.joinable()) {
m_recvthread.join();
}
}
我的客户端应用程序的主类如下所示
int main(int argc, char* argv[]) {
//string jsonPayload = ...read payload;
Client client;
client.setPort(12345);
client.addPayload(jsonPayload);
client.connect();
std::thread waitThread = std::thread([]() { std::this_thread::sleep_for(std::chrono::seconds(15)); });
if (waitThread.joinable()) {
waitThread.join();
}
client.disConnect();
return 0;
}
我的服务器类如下所示
服务器.h
class Server {
public:
~Server() = default;
void listen() override;
void disConnect() override;
void setPort(unsigned int port) override;
void shutdown() override;
protected:
void acceptNewConnection();
void handleConnection();
void handleRequest();
void sendThread();
void recvThread();
void acceptThread();
Message processBuffer(std::vector<unsigned char>& byteBuffer);
std::thread m_sendThread;
std::thread m_recvThread;
std::thread m_acceptThread;
bool m_isShutdown;
std::mutex m_mutex;
int m_listenSocket;
unsigned int m_port;
unsigned int m_clientSocketId;
std::condition_variable m_sendThreadWait;
std::vector<Message> m_payloadList;
};
我的 server.cpp 类如下所示
void Server::listen() {
LOG(__func__);
m_listenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (m_listenSocket < 0) {
LOG(__func__).m("Failed to open listen socket.");
return;
}
int optval = 0;
if (setsockopt(m_listenSocket, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) < 0) {
LOG(__func__).m("Failed to set SO_REUSEADDR option.");
close(m_listenSocket);
return;
}
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
server_addr.sin_port = htons(m_port);
if (bind(m_listenSocket, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
LOG(__func__).m(" Failed to bind to port.");
close(m_listenSocket);
return;
}
int backlog = 1; // We are only interested to pool one client.
if (::listen(m_listenSocket, backlog) < 0) {
LOG(__func__).m(" Failed to listen on the listen socket.");
close(m_listenSocket);
return;
}
LOG(__func__).m("SocketServer is listening.").d("port", m_port);
m_acceptThread = std::thread(&Server::acceptThread, this);
}
void Server::disConnect() {
LOG(__func__);
shutdown();
}
void Server::setPort(unsigned int port) {
LOG(__func__);
m_port = port;
}
void Server::shutdown() {
LOG(__func__);
{
std::lock_guard<std::mutex> lock(m_mutex);
m_isShutdown = true;
m_sendThreadWait.notify_one();
}
close(m_listenSocket);
close(m_clientSocketId);
if (m_acceptThread.joinable()) {
m_acceptThread.join();
}
if (m_recvThread.joinable()) {
m_recvThread.join();
}
if (m_sendThread.joinable()) {
m_sendThread.join();
}
}
void SocketServer::acceptNewConnection() {
LOG(__func__);
socklen_t nlen = sizeof(struct sockaddr_in);
try {
int incomingSocketId = accept(m_listenSocket, NULL, &nlen);
if (incomingSocketId < 0) {
LOG(__func__).d("reason", "acceptConnectionFailed").m("Failed to accept the client connection");
return;
}
m_clientSocketId = incomingSocketId;
m_sendThread = std::thread(&Server::sendThread, this);
m_recvThread = std::thread(&Server::recvThread, this);
} catch (const std::exception& e) {
LOG(__func__).d("reason", "acceptConnectionFailed").d("socket closed by remote.", e.what());
return;
}
}
void Server::sendThread() {
LOG(__func__).m("started.");
do {
Message message;
{
std::unique_lock<std::mutex> lock(m_mutex);
m_sendThreadWait.wait(lock, [this] { return m_payloadList.size() || m_isShutdown; });
if (m_isShutdown) {
break;
}
message = m_payloadList.back();
m_payloadList.pop_back();
}
//Process the message and convert to string
std::string payload = message.toString();
ACSDK_INFO(LX(__func__).d("Sending Payload", payload).d("size", payload.size()));
std::vector<unsigned char> buffer(payload.begin(), payload.end());
int responseLength = payload.size();
int responseLength_after_htol = htonl(responseLength);
uint8_t bytesRemaining = responseLength;
uint8_t bytesAlreadySent = 0;
send(m_clientSocketId, &responseLength_after_htol, sizeof(responseLength), 0);
do {
uint8_t bytes = send(m_clientSocketId, &buffer[0] + bytesAlreadySent, bytesRemaining, 0);
bytesAlreadySent += bytes;
bytesRemaining -= bytes;
} while (bytesRemaining > 0);
} while (true);
LOG(__func__).m("completed.");
}
void Server::acceptThread() {
LOG(__func__).m("started.");
acceptNewConnection();
LOG(__func__).m("completed");
}
void Server::recvThread() {
LOG(__func__).m("started.");
bool joinThread = false;
do {
int requestLenBuffer = 0;
std::cout<<"1\n";
uint8_t bytesRead = recv(m_clientSocketId, &requestLenBuffer, sizeof(requestLenBuffer), 0);
std::cout<<"2\n";
if (0 == bytesRead) {
{
std::lock_guard<std::mutex> lock(m_mutex);
joinThread = m_isShutdown;
}
LOG(__func__).m("connection closed by remote.");
break;
}
if (bytesRead < 0) {
LOG(__func__).m("recv failed reading request size.");
return;
}
int requestInBytes = ntohl(requestLenBuffer);
LOG(__func__).d("RequestSize", requestInBytes);
uint8_t bytesRemaining = requestInBytes;
uint8_t totalBytesRead = 0;
std::vector<unsigned char> byteBuffer(bytesRemaining + 1); // +1 for the NUL-terminator
while (bytesRemaining > 0) {
std::cout<<"3\n";
uint8_t bytesRead = recv(m_clientSocketId, byteBuffer.data() + totalBytesRead, bytesRemaining, 0);
std::cout<<"4\n";
if (0 == bytesRead) {
{
std::lock_guard<std::mutex> lock(m_mutex);
joinThread = m_isShutdown;
}
LOG(__func__).m("connection closed by remote.");
break;
} else if (bytesRead < 0) {
LOG(__func__).m("recv failed reading request.");
joinThread = true;
break;
}
totalBytesRead += bytesRead;
bytesRemaining -= bytesRead;
}
if (!bytesRemaining && !joinThread) {
byteBuffer[requestInBytes] = '\0'; // NUL-terminate the string
LOG(__func__).d("Recieved Message: ", byteBuffer.data());
/// Process the recieved Request/Message
}
} while (!joinThread);
LOG(__func__).m("completed");
}
服务器应用程序的主类如下所示
int main(int argc, char* argv[]) {
Server server;
server.setPort(12345);
server.listen();
std::thread waitThread = std::thread([]() { std::this_thread::sleep_for(std::chrono::seconds(15)); });
if (waitThread.joinable()) {
waitThread.join();
}
server.disConnect();
std::thread newWaitThread = std::thread([]() { std::this_thread::sleep_for(std::chrono::seconds(2)); });
if (newWaitThread.joinable()) {
newWaitThread.join();
}
return 0;
}
客户端应用程序的输出
...
Client:connect
Client:connect::Successfully connected.
Client:recvThread::started.
1
Client:sendThread::started.
Client:sendThread:Sending Payload={...},size=160
Client:disConnect
Client:shutdown
Client:sendThread::completed.
服务器应用程序的输出
...
Server:listen
Server:listen::SocketServer is listening.:port=12345
Server:acceptThread::started.
Server:acceptNewConnection
Server:sendThread::started.
Server:acceptThread::completed
Server:recvThread::started.
1
2
Server:recvThread:RequestSize=160
3
4
Server:recvThread:Recieved Request: ={...}
Server:disConnect
Server:shutdown
Server:sendThread::completed.
可以看出,即使我已经关闭了套接字,RecvThread 也没有在两个应用程序上完成,并且正在等待 recv 阻塞调用。
到目前为止我已经尝试/考虑过的事情
::shutdown(m_socketId, SHUT_RDWR);
而不是close(m_socketId);
,但没有任何改变你愿意
while (bytesRemaining > 0) {
std::cout<<"3\n";
uint8_t bytesRead = recv(socketId, byteBuffer.data() + totalBytesRead, bytesRemaining, 0);
std::cout<<"4\n";
if (0 == bytesRead) {
{
std::lock_guard<std::mutex> lock(m_mutex);
joinThread = m_isShutdown;
}
LOG(__func__).m("connection closed by remote.");
break;
} else if (bytesRead < 0) {
LOG(__func__).m("recv failed reading request.");
joinThread = true;
break;
}
totalBytesRead += bytesRead;
bytesRemaining -= bytesRead;
}
但是在
uint8_t bytesRead = recv(..)
您使用无符号的
uint8_t
(值范围为0到255)。因此意味着 if (bytesRead < 0)
绝对不会触发。您的编译器应该警告您这一点,也许您忽略了警告。
用于
recv()
返回值的正确变量类型是 ssize_t
,本质上是一个 signed size_t
,如 文档所述。
当另一方完全关闭 TCP 套接字时,任何待处理的
recv()
都应中止并返回负错误值。