使用ASIO进行IPC通信时如何检测断线?

问题描述 投票:0回答:1

我本地有一个服务器和一个客户端,需要通过IPC进行通信。我正在使用 asio::local::stream_protocol::socket 进行本地 IPC 通信。我注意到,当客户端断开连接时,服务器可以检测到它并抛出“文件结束”异常,我可以根据此异常进行处理。但是,当我的服务器断开连接时,我的客户端无法检测到它,但仍然可以成功地将数据写入 asio::local::stream_protocol::socket。

我的问题是:

  1. 为什么服务端写入后客户端还能写入成功 关门了?
  2. 通常是如何检测对方断线的? 在IPC通信中处理?
  3. IPC通信中是否需要添加心跳机制 场景?

这是我的客户端代码(服务器代码有点复杂):



class NovRun
{
public:
    NovRun(asio::io_context &io_context)
        : io_context_(io_context)
        , socket_(io_context)
    {
    }

    awaitable<void> run()
    {
        try
        {
            co_await connect();

            co_await sendAppId();
            co_spawn(io_context_, output(), asio::detached);
            std::thread inputThread([this]() { this->input(); });
            inputThread.detach();
        }
        catch (const std::exception &e)
        {
            LOG_FN(ERR, "[nov_run] Failed - {}", e.what());
        }
    }

    void stop()
    {
        if (running_)
        {
            running_ = false;
        }
        if (socket_.is_open())
        {
            socket_.close();
            LOG_FN(INFO, "[nov_run] Exiting...");
        }
        std::cout << "5" << std::endl;
    }

    void handleSignal(int signal) { co_spawn(io_context_, sendSignalToServer(signal), boost::asio::detached); }

    awaitable<void> sendSignalToServer(int signal)
    {
        Message msg;
        msg.msgtype = 3;
        msg.payload = std::to_string(signal);
        char buf[1024];
        auto size = msg.pack(buf, msg.payload.size());

        co_await async_write(socket_, buffer(buf, size), use_awaitable);
        // LOG_FN(INFO, "[nov_run] Sent signal [{}] to server", signal);
    }

    void setQueryMode() { runMode_ = 1; }

private:
    asio::io_context &io_context_;
    local::stream_protocol::socket socket_;
    hammer::tcp::StreamBuffer recvBuffer_;
    int runMode_{0}; // 0: normal mode, 1: query mode
    bool running_{true};
    std::string appId_;

   
    awaitable<void> connect()
    {
        local::stream_protocol::endpoint endpoint(IPC_SOCKET_PATH);
        co_await socket_.async_connect(endpoint, use_awaitable);
        LOG_FN(INFO, "[nov_run] Connected to novd");
    }

    awaitable<void> sendAppId()
    {
        Message msg;
        msg.msgtype = 2;
        msg.payload = appId_ + "\n";
        char buf[1024];
        auto size = msg.pack(buf, msg.payload.size());

        co_await async_write(socket_, buffer(buf, size), use_awaitable);
        LOG_FN(INFO, "[nov_run] appId sent: {}", appId_);
    }

    awaitable<void> output()
    {
        while (running_)
        {
            try
            {

                char buffer[65535] = {};
                std::size_t n = co_await socket_.async_read_some(asio::buffer(buffer), use_awaitable);
                std::cout << "1" << std::endl;
                if (n == 0)
                {
                    std::cout << "2" << std::endl;
                    break;
                }

                recvBuffer_.feed(buffer, n);
                while (true)
                {
                    // find magic num
                    auto str = recvBuffer_.find('\001');
                    if (str.empty())
                    {
                        break;
                    }

                    // read head
                    auto headerBuf = recvBuffer_.acquire(HDR_LEN);
                    if (headerBuf.empty())
                    {
                        break;
                    }
                    Message msg;
                    msg.unpackHeader(headerBuf.data(), headerBuf.size());
                    auto totalLen = msg.len + HDR_LEN;

                    // read payload
                    auto data = recvBuffer_.acquire(totalLen);
                    if (data.empty())
                    {
                        break;
                    }

                    msg.unpack(data.data(), data.size());
                    std::cout << msg.payload << std::endl;

                    recvBuffer_.consume(totalLen);
                }

                recvBuffer_.commit();
            }
            catch (const boost::system::system_error &e)
            {
                if (e.code() == boost::asio::error::broken_pipe || e.code() == boost::asio::error::eof ||
                    e.code() == boost::asio::error::operation_aborted)
                {
                    LOG_FN(INFO, "[nov_run] disconnected: {}", e.code().message());
                }
                else
                {
                    LOG_FN(ERR, "[nov_run] unexpected error: {}", e.code().message());
                }
                break;
            }
            catch (std::exception &e)
            {
                LOG_FN(ERR, "[nov_run] Error in output: {}", e.what());
                break;
            }
        }
        stop();
    }

    void input()
    {
        std::string input;
        while (std::getline(std::cin, input) && running_)
        {
            try
            {
                std::cout << "3" << std::endl;
                if (input == "nov_run exit")
                {
                    stop();
                    break;
                }
                if (input.empty())
                {
                    continue;
                }

                input += "\n";
                boost::asio::post(io_context_, [this, input]() {
                    char msgBuf[1024];
                    Message msg;
                    msg.msgtype = 4;
                    msg.payload = input;
                    auto totalLen = msg.pack(msgBuf, input.size());
                    std::cout << "4" << std::endl;
                    async_write(socket_, buffer(msgBuf, totalLen),
                                [](const boost::system::error_code &ec, std::size_t size) {
                                    if (ec)
                                    {
                                        LOG_FN(ERR, "[nov_run] Write failed: {}", ec.message());
                                    }
                                    LOG_FN(ERR, "[nov_run] Write failed: {}", ec.message());
                                    std::cout << size << std::endl;
                                });
                });
            }
            catch (const boost::system::system_error &e)
            {
                if (e.code() == boost::asio::error::broken_pipe || e.code() == boost::asio::error::eof ||
                    e.code() == boost::asio::error::operation_aborted)
                {
                    LOG_FN(INFO, "[nov_run] disconnect: {}", e.code().message());
                }
                else
                {
                    LOG_FN(ERR, "[nov_run] unexpected error: {}", e.code().message());
                }
                break;
            }
            catch (std::exception &e)
            {
                LOG_FN(ERR, "[nov_run] Error in input: {}", e.what());
                break;
            }
        }
        stop();
    }
}
c++ boost-asio ipc
1个回答
0
投票
  1. 为什么服务器关闭后客户端还能写入成功?

    不能。不过还是可以尝试成功的。但它永远不会被成功接收。找出答案的方法是将读取操作挂起(或

    async_wait
    表示错误情况)。

    这里有一些有用的背景知识:TCP 的确认并不能保证数据已交付

  2. 始终保持待读状态

  3. 并非如此,除非您想要远程端故障(例如过载或卡住)的一些额外指示,即使 IP 堆栈没有指示断开连接。如果您不能信任中间网络基础设施,这会增加稳健性,例如

我已经使您的代码变得完整并显着简化。我还用 25 行代码添加了一个简单的 ECHO 服务器。

显着变化:

  • 框架应该大于
    \001
    。例如。对于信号,有效负载为 1 字节。这会导致错误的帧分隔符检测
  • 控制台IO不应该在类中
  • 信号处理应该是!那是因为你需要在停止实例时取消信号(否则
    io_context
    永远不会耗尽工作)
  • 集中错误处理
  • 将 Asio 缓冲区抽象与
    async_read_until
    组合操作结合使用
  • 使用
    asio::deferred
    代替
    asio::use_awaitable
    ;事实上,在最近的 boost 版本中,您可以省略令牌,因为
    deferred
    已成为默认值
  • 进行输出排队
  • 所有流状态都应位于逻辑链上(发布
    stop
    ,在逻辑链上执行写入队列操作)
  • stop()
    中,我们还
    cancel
    异步操作

住在科里鲁

#include <boost/asio.hpp>
#include <deque>
#include <fmt/ranges.h>
#include <iostream>
#include <set>
namespace asio  = boost::asio;
namespace local = asio::local;
using boost::system::error_code;
using Coro = asio::awaitable<void>;
#define LOG_FN(level, format, ...) fmt::print("[{}]\t[nov_run]\t" format "\n", #level, ##__VA_ARGS__)

constexpr int              HDR_LEN   = 8; // 4 bytes for msgtype, 4 bytes for payload length

// USE MORE BYTES! 0x01 is will occur naturally in binary data
constexpr std::string_view FRAME_SEP = "\xfc\x17\x74\x63\x52\x57\xf1\x01"; // generated random bytes

namespace hammer::tcp {
    struct Message {
        uint32_t    msgtype;
        std::string payload;

        void unpack(std::string_view buf) {
            if (!buf.ends_with(FRAME_SEP))
                throw std::runtime_error("Invalid frame separator");
            buf.remove_suffix(FRAME_SEP.length());

            if (buf.length() < HDR_LEN)
                throw std::runtime_error("Invalid header size: " + std::to_string(buf.length()));

            msgtype = ntohl(*(uint32_t*)buf.data());
            buf.remove_prefix(sizeof(uint32_t));
            auto len = ntohl(*(uint32_t*)(buf.data()));
            buf.remove_prefix(sizeof(uint32_t));

            if (buf.length() < len)
                throw std::runtime_error("Insufficient payload");

            if (buf.length() > len)
                throw std::runtime_error("Excess payload");
            payload = buf.substr(0, len);
        }

        uint32_t pack(char* out, uint32_t size) const {
            uint32_t totalLen = HDR_LEN + payload.size() + FRAME_SEP.length();

            if (totalLen > size)
                throw std::runtime_error("buffer size exceeded");

            *(uint32_t*)out   = htonl(msgtype);
            out              += 4;
            *(uint32_t*)(out) = htonl(payload.size());
            out              += 4;
            out               = std::copy(payload.begin(), payload.end(), out);
            out               = std::copy(FRAME_SEP.begin(), FRAME_SEP.end(), out);
            return totalLen;
        }
    };
} // namespace hammer::tcp

using hammer::tcp::Message;
constexpr auto IPC_SOCKET_PATH = "/tmp/novd.sock";

class NovRun {
    auto stop_on_disconnect(std::string scope) {
        return [this, scope = std::move(scope)](std::exception_ptr e) {
            namespace E = asio::error;
            try {
                try {
                    if (e)
                        std::rethrow_exception(e);
                } catch (boost::system::system_error const& e) {
                    auto c = e.code();
                    if (c == E::broken_pipe || c == E::eof || c == E::operation_aborted)
                        LOG_FN(ERR, "Disconnect in {}: {}", scope, e.code().message());
                    throw;
                }
            } catch (std::exception const& e) {
                LOG_FN(ERR, "Error in {}: {}", scope, e.what());
                stop();
            }
        };
    }

  public:
    NovRun(asio::io_context& io_context)
        : io_context_(io_context)
        , signals_(io_context, SIGINT, SIGTERM)
        , socket_(io_context) {
        do_wait_signals();
    }

    void run() {
        co_spawn(io_context_, [this] -> Coro {
                try {
                    co_await socket_.async_connect(IPC_SOCKET_PATH, asio::deferred);
                    LOG_FN(INFO, "Connected to novd");
                    sendAppId();
                    co_spawn(io_context_, do_read_loop(), stop_on_disconnect("read_loop"));
                } catch (std::exception const& e) {
                    LOG_FN(ERR, "Failed - {}", e.what());
                }
            },
            asio::detached);
    }

    void stop() {
        if (running_.exchange(false)) {
            LOG_FN(INFO, "Exiting...");
            asio::post(io_context_, [this] {
                error_code ignore_ec;
                signals_.cancel(ignore_ec);
                socket_.cancel(ignore_ec);
                socket_.close(ignore_ec);
            });
        }
    }

    void send(Message msg) {
        if (!running_)
            throw std::runtime_error("Connection is closed"); // TODO proper exception

        do_send(std::move(msg));
    }

    void sendSignalToServer(int signal) {
        send({3, std::to_string(signal)});
        LOG_FN(INFO, "Sent signal {} [{}] to server", ::strsignal(signal), signal);
    }

    // void setQueryMode() { runMode_ = 1; }

  private:
    asio::io_context&              io_context_;
    asio::signal_set               signals_;
    local::stream_protocol::socket socket_;
    // int                         runMode_{0}; // 0: normal mode, 1: query mode
    std::atomic_bool               running_{true};
    std::string                    appId_ = "DEFAULT-APP-ID-STRING";
    std::deque<std::vector<char>>  outbox_;

    void sendAppId() {
        send({2, appId_ + '\n'});
        LOG_FN(INFO, "appId sent: {}", appId_);
    }

    void do_send(Message msg) { // on logical strand
        char     msgBuf[1024];  // std::array, perhaps?
        uint32_t packed = msg.pack(msgBuf, sizeof(msgBuf));

        outbox_.emplace_back(msgBuf, msgBuf + packed);

        if (outbox_.size() == 1) // only start the write loop if not running
            co_spawn(io_context_, do_write_loop(), stop_on_disconnect("write_loop"));
    }

    Coro do_write_loop() {
        while (running_ && !outbox_.empty()) {
            co_await async_write(socket_, asio::buffer(outbox_.front()));
            outbox_.pop_front();
        }
    }

    Coro do_read_loop() {
        for (std::string buf; running_;) {
            // find frame separator
            while (size_t n = co_await async_read_until(socket_, asio::dynamic_buffer(buf), FRAME_SEP)) {
                LOG_FN(DEBUG, "Received: n={} raw {::02x}", n, std::span(buf));
                Message msg;

                msg.unpack(buf.substr(0, n)); // parse single message
                buf.erase(0, n);              // like asio::dynamic_buffer(buf).consume(n);

                LOG_FN(INFO, "Received: type:{} {::02x}", msg.msgtype, std::span(msg.payload));
            }
        }
    }

    void do_wait_signals() {
        signals_.async_wait([this](error_code ec, int signal) {
            if (!ec) {
                sendSignalToServer(signal);
                do_wait_signals(); // to keep waiting for signals
            }
        });
    }
};

void consoleInput(NovRun& connection) try {
    for (std::string input; std::cout << "Enter input: " && getline(std::cin, input);) {
        if (input == "nov_run exit")
            break;

        if (!input.empty())
            connection.send({4, std::move(input) + '\n'});
    }
    connection.stop();
} catch (std::exception const& e) {
    LOG_FN(ERR, "Error in consoleInput: {}", e.what());
    connection.stop();
}

void emulate_client() {
    asio::io_context io_context(1);

    NovRun novRun(io_context);
    novRun.run();

    std::thread input(consoleInput, std::ref(novRun));
    io_context.run();

    input.join();
}

void emulate_server() {
    asio::io_context io_context(1);

    std::remove(IPC_SOCKET_PATH);

    co_spawn(
        io_context,
        [] -> Coro {
            auto ex = co_await asio::this_coro::executor;
            local::stream_protocol::acceptor acceptor(ex, IPC_SOCKET_PATH);
            acceptor.listen();

            for (;;) {
                co_spawn(ex, [socket = co_await acceptor.async_accept()] mutable -> Coro {
                        try {
                            std::array<char, 1024> buf;
                            while (size_t n = co_await socket.async_read_some(asio::buffer(buf))) {
                                LOG_FN(INFO, "Echo: {::02x}", std::span(buf).subspan(0, n));
                                co_await async_write(socket, asio::buffer(buf,n ));
                            }
                        } catch (std::exception const& e) {
                            LOG_FN(ERR, "Error in echo session: {}", e.what());
                        }
                    },
                    asio::detached);
            }
        },
        asio::detached);

    io_context.run();
}

int main(int argc, char** argv) {
    if (std::set<std::string_view>(argv + 1, argv + argc, std::less<std::string_view>{}).contains("server"))
        emulate_server();
    else
        emulate_client();
}

本地demo,先展示客户端退出,再展示服务端退出:(点击查看原图)

© www.soinside.com 2019 - 2024. All rights reserved.