我本地有一个服务器和一个客户端,需要通过IPC进行通信。我正在使用 asio::local::stream_protocol::socket 进行本地 IPC 通信。我注意到,当客户端断开连接时,服务器可以检测到它并抛出“文件结束”异常,我可以根据此异常进行处理。但是,当我的服务器断开连接时,我的客户端无法检测到它,但仍然可以成功地将数据写入 asio::local::stream_protocol::socket。
我的问题是:
这是我的客户端代码(服务器代码有点复杂):
class NovRun
{
public:
NovRun(asio::io_context &io_context)
: io_context_(io_context)
, socket_(io_context)
{
}
awaitable<void> run()
{
try
{
co_await connect();
co_await sendAppId();
co_spawn(io_context_, output(), asio::detached);
std::thread inputThread([this]() { this->input(); });
inputThread.detach();
}
catch (const std::exception &e)
{
LOG_FN(ERR, "[nov_run] Failed - {}", e.what());
}
}
void stop()
{
if (running_)
{
running_ = false;
}
if (socket_.is_open())
{
socket_.close();
LOG_FN(INFO, "[nov_run] Exiting...");
}
std::cout << "5" << std::endl;
}
void handleSignal(int signal) { co_spawn(io_context_, sendSignalToServer(signal), boost::asio::detached); }
awaitable<void> sendSignalToServer(int signal)
{
Message msg;
msg.msgtype = 3;
msg.payload = std::to_string(signal);
char buf[1024];
auto size = msg.pack(buf, msg.payload.size());
co_await async_write(socket_, buffer(buf, size), use_awaitable);
// LOG_FN(INFO, "[nov_run] Sent signal [{}] to server", signal);
}
void setQueryMode() { runMode_ = 1; }
private:
asio::io_context &io_context_;
local::stream_protocol::socket socket_;
hammer::tcp::StreamBuffer recvBuffer_;
int runMode_{0}; // 0: normal mode, 1: query mode
bool running_{true};
std::string appId_;
awaitable<void> connect()
{
local::stream_protocol::endpoint endpoint(IPC_SOCKET_PATH);
co_await socket_.async_connect(endpoint, use_awaitable);
LOG_FN(INFO, "[nov_run] Connected to novd");
}
awaitable<void> sendAppId()
{
Message msg;
msg.msgtype = 2;
msg.payload = appId_ + "\n";
char buf[1024];
auto size = msg.pack(buf, msg.payload.size());
co_await async_write(socket_, buffer(buf, size), use_awaitable);
LOG_FN(INFO, "[nov_run] appId sent: {}", appId_);
}
awaitable<void> output()
{
while (running_)
{
try
{
char buffer[65535] = {};
std::size_t n = co_await socket_.async_read_some(asio::buffer(buffer), use_awaitable);
std::cout << "1" << std::endl;
if (n == 0)
{
std::cout << "2" << std::endl;
break;
}
recvBuffer_.feed(buffer, n);
while (true)
{
// find magic num
auto str = recvBuffer_.find('\001');
if (str.empty())
{
break;
}
// read head
auto headerBuf = recvBuffer_.acquire(HDR_LEN);
if (headerBuf.empty())
{
break;
}
Message msg;
msg.unpackHeader(headerBuf.data(), headerBuf.size());
auto totalLen = msg.len + HDR_LEN;
// read payload
auto data = recvBuffer_.acquire(totalLen);
if (data.empty())
{
break;
}
msg.unpack(data.data(), data.size());
std::cout << msg.payload << std::endl;
recvBuffer_.consume(totalLen);
}
recvBuffer_.commit();
}
catch (const boost::system::system_error &e)
{
if (e.code() == boost::asio::error::broken_pipe || e.code() == boost::asio::error::eof ||
e.code() == boost::asio::error::operation_aborted)
{
LOG_FN(INFO, "[nov_run] disconnected: {}", e.code().message());
}
else
{
LOG_FN(ERR, "[nov_run] unexpected error: {}", e.code().message());
}
break;
}
catch (std::exception &e)
{
LOG_FN(ERR, "[nov_run] Error in output: {}", e.what());
break;
}
}
stop();
}
void input()
{
std::string input;
while (std::getline(std::cin, input) && running_)
{
try
{
std::cout << "3" << std::endl;
if (input == "nov_run exit")
{
stop();
break;
}
if (input.empty())
{
continue;
}
input += "\n";
boost::asio::post(io_context_, [this, input]() {
char msgBuf[1024];
Message msg;
msg.msgtype = 4;
msg.payload = input;
auto totalLen = msg.pack(msgBuf, input.size());
std::cout << "4" << std::endl;
async_write(socket_, buffer(msgBuf, totalLen),
[](const boost::system::error_code &ec, std::size_t size) {
if (ec)
{
LOG_FN(ERR, "[nov_run] Write failed: {}", ec.message());
}
LOG_FN(ERR, "[nov_run] Write failed: {}", ec.message());
std::cout << size << std::endl;
});
});
}
catch (const boost::system::system_error &e)
{
if (e.code() == boost::asio::error::broken_pipe || e.code() == boost::asio::error::eof ||
e.code() == boost::asio::error::operation_aborted)
{
LOG_FN(INFO, "[nov_run] disconnect: {}", e.code().message());
}
else
{
LOG_FN(ERR, "[nov_run] unexpected error: {}", e.code().message());
}
break;
}
catch (std::exception &e)
{
LOG_FN(ERR, "[nov_run] Error in input: {}", e.what());
break;
}
}
stop();
}
}
为什么服务器关闭后客户端还能写入成功?
不能。不过还是可以尝试成功的。但它永远不会被成功接收。找出答案的方法是将读取操作挂起(或
async_wait
表示错误情况)。
这里有一些有用的背景知识:TCP 的确认并不能保证数据已交付
始终保持待读状态
并非如此,除非您想要远程端故障(例如过载或卡住)的一些额外指示,即使 IP 堆栈没有指示断开连接。如果您不能信任中间网络基础设施,这会增加稳健性,例如
我已经使您的代码变得完整并显着简化。我还用 25 行代码添加了一个简单的 ECHO 服务器。
显着变化:
\001
。例如。对于信号,有效负载为 1 字节。这会导致错误的帧分隔符检测io_context
永远不会耗尽工作)async_read_until
组合操作结合使用asio::deferred
代替asio::use_awaitable
;事实上,在最近的 boost 版本中,您可以省略令牌,因为 deferred
已成为默认值stop
,在逻辑链上执行写入队列操作)stop()
中,我们还 cancel
异步操作#include <boost/asio.hpp>
#include <deque>
#include <fmt/ranges.h>
#include <iostream>
#include <set>
namespace asio = boost::asio;
namespace local = asio::local;
using boost::system::error_code;
using Coro = asio::awaitable<void>;
#define LOG_FN(level, format, ...) fmt::print("[{}]\t[nov_run]\t" format "\n", #level, ##__VA_ARGS__)
constexpr int HDR_LEN = 8; // 4 bytes for msgtype, 4 bytes for payload length
// USE MORE BYTES! 0x01 is will occur naturally in binary data
constexpr std::string_view FRAME_SEP = "\xfc\x17\x74\x63\x52\x57\xf1\x01"; // generated random bytes
namespace hammer::tcp {
struct Message {
uint32_t msgtype;
std::string payload;
void unpack(std::string_view buf) {
if (!buf.ends_with(FRAME_SEP))
throw std::runtime_error("Invalid frame separator");
buf.remove_suffix(FRAME_SEP.length());
if (buf.length() < HDR_LEN)
throw std::runtime_error("Invalid header size: " + std::to_string(buf.length()));
msgtype = ntohl(*(uint32_t*)buf.data());
buf.remove_prefix(sizeof(uint32_t));
auto len = ntohl(*(uint32_t*)(buf.data()));
buf.remove_prefix(sizeof(uint32_t));
if (buf.length() < len)
throw std::runtime_error("Insufficient payload");
if (buf.length() > len)
throw std::runtime_error("Excess payload");
payload = buf.substr(0, len);
}
uint32_t pack(char* out, uint32_t size) const {
uint32_t totalLen = HDR_LEN + payload.size() + FRAME_SEP.length();
if (totalLen > size)
throw std::runtime_error("buffer size exceeded");
*(uint32_t*)out = htonl(msgtype);
out += 4;
*(uint32_t*)(out) = htonl(payload.size());
out += 4;
out = std::copy(payload.begin(), payload.end(), out);
out = std::copy(FRAME_SEP.begin(), FRAME_SEP.end(), out);
return totalLen;
}
};
} // namespace hammer::tcp
using hammer::tcp::Message;
constexpr auto IPC_SOCKET_PATH = "/tmp/novd.sock";
class NovRun {
auto stop_on_disconnect(std::string scope) {
return [this, scope = std::move(scope)](std::exception_ptr e) {
namespace E = asio::error;
try {
try {
if (e)
std::rethrow_exception(e);
} catch (boost::system::system_error const& e) {
auto c = e.code();
if (c == E::broken_pipe || c == E::eof || c == E::operation_aborted)
LOG_FN(ERR, "Disconnect in {}: {}", scope, e.code().message());
throw;
}
} catch (std::exception const& e) {
LOG_FN(ERR, "Error in {}: {}", scope, e.what());
stop();
}
};
}
public:
NovRun(asio::io_context& io_context)
: io_context_(io_context)
, signals_(io_context, SIGINT, SIGTERM)
, socket_(io_context) {
do_wait_signals();
}
void run() {
co_spawn(io_context_, [this] -> Coro {
try {
co_await socket_.async_connect(IPC_SOCKET_PATH, asio::deferred);
LOG_FN(INFO, "Connected to novd");
sendAppId();
co_spawn(io_context_, do_read_loop(), stop_on_disconnect("read_loop"));
} catch (std::exception const& e) {
LOG_FN(ERR, "Failed - {}", e.what());
}
},
asio::detached);
}
void stop() {
if (running_.exchange(false)) {
LOG_FN(INFO, "Exiting...");
asio::post(io_context_, [this] {
error_code ignore_ec;
signals_.cancel(ignore_ec);
socket_.cancel(ignore_ec);
socket_.close(ignore_ec);
});
}
}
void send(Message msg) {
if (!running_)
throw std::runtime_error("Connection is closed"); // TODO proper exception
do_send(std::move(msg));
}
void sendSignalToServer(int signal) {
send({3, std::to_string(signal)});
LOG_FN(INFO, "Sent signal {} [{}] to server", ::strsignal(signal), signal);
}
// void setQueryMode() { runMode_ = 1; }
private:
asio::io_context& io_context_;
asio::signal_set signals_;
local::stream_protocol::socket socket_;
// int runMode_{0}; // 0: normal mode, 1: query mode
std::atomic_bool running_{true};
std::string appId_ = "DEFAULT-APP-ID-STRING";
std::deque<std::vector<char>> outbox_;
void sendAppId() {
send({2, appId_ + '\n'});
LOG_FN(INFO, "appId sent: {}", appId_);
}
void do_send(Message msg) { // on logical strand
char msgBuf[1024]; // std::array, perhaps?
uint32_t packed = msg.pack(msgBuf, sizeof(msgBuf));
outbox_.emplace_back(msgBuf, msgBuf + packed);
if (outbox_.size() == 1) // only start the write loop if not running
co_spawn(io_context_, do_write_loop(), stop_on_disconnect("write_loop"));
}
Coro do_write_loop() {
while (running_ && !outbox_.empty()) {
co_await async_write(socket_, asio::buffer(outbox_.front()));
outbox_.pop_front();
}
}
Coro do_read_loop() {
for (std::string buf; running_;) {
// find frame separator
while (size_t n = co_await async_read_until(socket_, asio::dynamic_buffer(buf), FRAME_SEP)) {
LOG_FN(DEBUG, "Received: n={} raw {::02x}", n, std::span(buf));
Message msg;
msg.unpack(buf.substr(0, n)); // parse single message
buf.erase(0, n); // like asio::dynamic_buffer(buf).consume(n);
LOG_FN(INFO, "Received: type:{} {::02x}", msg.msgtype, std::span(msg.payload));
}
}
}
void do_wait_signals() {
signals_.async_wait([this](error_code ec, int signal) {
if (!ec) {
sendSignalToServer(signal);
do_wait_signals(); // to keep waiting for signals
}
});
}
};
void consoleInput(NovRun& connection) try {
for (std::string input; std::cout << "Enter input: " && getline(std::cin, input);) {
if (input == "nov_run exit")
break;
if (!input.empty())
connection.send({4, std::move(input) + '\n'});
}
connection.stop();
} catch (std::exception const& e) {
LOG_FN(ERR, "Error in consoleInput: {}", e.what());
connection.stop();
}
void emulate_client() {
asio::io_context io_context(1);
NovRun novRun(io_context);
novRun.run();
std::thread input(consoleInput, std::ref(novRun));
io_context.run();
input.join();
}
void emulate_server() {
asio::io_context io_context(1);
std::remove(IPC_SOCKET_PATH);
co_spawn(
io_context,
[] -> Coro {
auto ex = co_await asio::this_coro::executor;
local::stream_protocol::acceptor acceptor(ex, IPC_SOCKET_PATH);
acceptor.listen();
for (;;) {
co_spawn(ex, [socket = co_await acceptor.async_accept()] mutable -> Coro {
try {
std::array<char, 1024> buf;
while (size_t n = co_await socket.async_read_some(asio::buffer(buf))) {
LOG_FN(INFO, "Echo: {::02x}", std::span(buf).subspan(0, n));
co_await async_write(socket, asio::buffer(buf,n ));
}
} catch (std::exception const& e) {
LOG_FN(ERR, "Error in echo session: {}", e.what());
}
},
asio::detached);
}
},
asio::detached);
io_context.run();
}
int main(int argc, char** argv) {
if (std::set<std::string_view>(argv + 1, argv + argc, std::less<std::string_view>{}).contains("server"))
emulate_server();
else
emulate_client();
}
本地demo,先展示客户端退出,再展示服务端退出:(点击查看原图)