为什么我无法读取 C++20 协程 + Asio 单线程多协程模型中 forkpty 生成的进程的 stdout/stderr?

问题描述 投票:0回答:1

我是 corotine 和 asio 的新手。 我正在开发一个本地流程管理工具,其中客户负责启动程序。该客户端将命令发送到管理该进程的后端守护程序。我正在使用单线程、多协程模型。我的守护进程需要与子进程交互以获取IPC传输到客户端的日志。

我的问题是:为什么我的守护进程卡在code 1 async_read_some 并且不触发读取标准输出,而code 2工作正常?

这是一个简单的演示:

客户代码:

#include <iostream>
#include <pwd.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <thread>
#include <unistd.h>

void receive_output(int sockfd)
{
    char buffer[1024];
    while (true)
    {
        ssize_t n = recv(sockfd, buffer, sizeof(buffer) - 1, 0);
        if (n <= 0)
            break;
        buffer[n] = '\0';
        std::cout << buffer << std::flush;
    }
}

void send_input(int sockfd)
{
    std::string input;
    while (std::getline(std::cin, input))
    {
        input += '\n';
        send(sockfd, input.c_str(), input.size(), 0);
    }
}

int main()
{
    int sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
    struct sockaddr_un addr;
    addr.sun_family = AF_UNIX;
    strcpy(addr.sun_path, "/tmp/socket");

    connect(sockfd, (struct sockaddr *)&addr, sizeof(addr));

    std::string command = "test";
    send(sockfd, command.c_str(), command.size(), 0);

    std::thread output_thread(receive_output, sockfd);
    std::thread input_thread(send_input, sockfd);

    output_thread.join();
    input_thread.join();

    close(sockfd);
    return 0;
}

服务器代码:

#include <boost/asio.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/posix/stream_descriptor.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <cstdlib>
#include <fcntl.h>
#include <iostream>
#include <pty.h>
#include <pwd.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/wait.h>

using boost::asio::awaitable;
using boost::asio::use_awaitable;
namespace asio = boost::asio;

boost::asio::io_context io_context;

void set_nonblocking(int fd)
{
    int flags = fcntl(fd, F_GETFL, 0);
    if (flags == -1)
    {
        perror("fcntl(F_GETFL)");
        return;
    }
    flags |= O_NONBLOCK;
    if (fcntl(fd, F_SETFL, flags) == -1)
    {
        perror("fcntl(F_SETFL)");
    }
}

awaitable<std::string> receive_command(asio::posix::stream_descriptor &client_stream)
{
    char buffer[256];
    std::size_t len = co_await client_stream.async_read_some(boost::asio::buffer(buffer), use_awaitable);
    buffer[len] = '\0';
    co_return std::string(buffer);
}

awaitable<void> run_process_in_pty(const std::string &command, asio::posix::stream_descriptor &client_stream,
                                   int client_fd)
{
    int master_fd;
    pid_t pid = forkpty(&master_fd, NULL, NULL, NULL);
    //   set_nonblocking(master_fd);
    if (pid == 0)
    {
        // Child process: Set user and change directory
        setvbuf(stdout, nullptr, _IONBF, 0);

        // execute echo just for test
        // execl(command.c_str(), "", NULL);
        execl("/bin/echo", "echo", "Testing PTY output!", NULL);

        std::cerr << "Failed to execute command: " << strerror(errno) << std::endl;
        exit(EXIT_FAILURE);
    }
    else
    {
        // Parent process: Handle I/O between client and pty
        asio::posix::stream_descriptor pty_stream(io_context, master_fd);

        // handle output
        co_spawn(
            io_context,
            [&client_stream, &pty_stream]() -> awaitable<void> {
                char buffer[1024];
                try
                {
                    while (true)
                    {
                        // code 1
                        // std::size_t n = co_await pty_stream.async_read_some(boost::asio::buffer(buffer), use_awaitable);

                        // code 2
                        ssize_t n = read(pty_stream.native_handle(), buffer, sizeof(buffer));

                        std::cout << buffer << std::endl;
                        if (n == 0)
                            break; // End of stream

                        //   co_await asio::async_write(
                        //   client_stream, boost::asio::buffer(buffer, n),
                        //   use_awaitable);
                    }
                }
                catch (std::exception &e)
                {
                    std::cerr << strerror(errno) << std::endl;
                }
                co_return;
            },
            asio::detached);

        // handle input



        waitpid(pid, nullptr, 0);

        co_return;
    }
}

awaitable<void> accept_client(asio::posix::stream_descriptor client_stream, int client_fd)
{
    std::string command = co_await receive_command(client_stream);
    std::cout << "Received command: " << command << std::endl;

    co_await run_process_in_pty(command, client_stream, client_fd);
    client_stream.close();
}

awaitable<int> async_accept(int server_fd)
{
    int client_fd;
    co_await asio::post(io_context, use_awaitable);
    client_fd = accept(server_fd, nullptr, nullptr);
    if (client_fd < 0)
    {
        std::cerr << "Failed to accept client connection" << std::endl;
        co_return - 1;
    }
    std::cout << "clientfd, " << client_fd << std::endl;
    set_nonblocking(client_fd);
    co_return client_fd;
}

awaitable<void> daemon_loop()
{
    const char *socket_path = "/tmp/socket";

    if (access(socket_path, F_OK) == 0)
    {
        if (unlink(socket_path) != 0)
        {
            std::cerr << "Failed to remove existing socket file" << std::endl;
            exit(EXIT_FAILURE);
        }
    }

    int server_fd = socket(AF_UNIX, SOCK_STREAM, 0);
    if (server_fd < 0)
    {
        std::cerr << "Failed to create socket" << std::endl;
        exit(EXIT_FAILURE);
    }
    //   set_nonblocking(server_fd);

    struct sockaddr_un addr;
    memset(&addr, 0, sizeof(addr));
    addr.sun_family = AF_UNIX;
    strcpy(addr.sun_path, socket_path);
    if (bind(server_fd, (struct sockaddr *)&addr, sizeof(addr)) < 0)
    {
        std::cerr << "Failed to bind socket" << std::endl;
        exit(EXIT_FAILURE);
    }

    if (listen(server_fd, 5) < 0)
    {
        std::cerr << "Failed to listen on socket" << std::endl;
        exit(EXIT_FAILURE);
    }

    asio::posix::stream_descriptor server_stream(io_context, server_fd);

    while (true)
    {
        int client_fd = co_await async_accept(server_fd);
        asio::posix::stream_descriptor client_stream(io_context, client_fd);
        std::cout << "Accepted connection" << std::endl;
        co_spawn(io_context, accept_client(std::move(client_stream), client_fd), asio::detached);
    }
}

int main()
{
    std::cout << "start" << std::endl;
    co_spawn(io_context, daemon_loop(), asio::detached);
    io_context.run();
    return 0;
}

这是代码1代码2

之间不同的运行结果

代码1: enter image description here

代码2:

enter image description here

c++ c++20 boost-asio ipc coroutine
1个回答
0
投票

最可能的问题是您使用了

::waitpid
,它阻塞了唯一可用的 IO 服务线程。

run_process_in_pty
中的 co_spawn 也对对象生命周期进行快速且宽松的处理。它通过引用捕获
client_stream
pty_stream
,但它们要么是本地变量,要么是来自另一个协程框架的引用。

无论如何,所有代码看起来都非常复杂,以某种方式迫使 C 风格的网络代码进入 Asio 的协程框架。为什么不使用Asio?这样你就可以得到:

awaitable<void> client_connection(Socket client) {
    std::cout << "Accepted connection" << std::endl;
    std::string command = co_await receive_command(client);
    std::cout << "Received command: " << command << std::endl;

    co_await run_process_in_pty(command, client, client.native_handle());
    // client_stream.close(); // implicit due to RAII lifetime
}

awaitable<void> daemon_loop() {
    path socket_path = "/tmp/socket";
    auto ex          = co_await asio::this_coro::executor;

    if (exists(socket_path)) {
        if (!is_socket(socket_path))
            throw std::runtime_error("File exists and is not a socket");
        if (!remove(socket_path))
            throw std::runtime_error("Failed to remove existing socket file");
    }

    UNIX::acceptor acc(ex, socket_path.native());
    acc.listen(5);

    while (true) {
        co_spawn(ex, client_connection(co_await acc.async_accept()), asio::detached);
    }
}

int main() {
    boost::asio::io_context io_context;

    std::cout << "start" << std::endl;
    co_spawn(io_context, daemon_loop(), asio::detached);
    io_context.run();
}

奖励演示

作为奖励,让我还演示如何对子进程使用 Boost Process:

住在Coliru

#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/process/v2.hpp>
#include <filesystem>
#include <iostream>
#include <pty.h>
#include <sys/wait.h>

namespace asio = boost::asio;
namespace bp   = boost::process::v2;
using UNIX     = asio::local::stream_protocol;
using Socket   = UNIX::socket;
using std::filesystem::path;
using namespace boost::asio::experimental::awaitable_operators;

asio::awaitable<std::string> receive_command(UNIX::socket& client_stream) {
#if 1
    std::string buf;
    co_await async_read_until(client_stream, asio::dynamic_buffer(buf), '\n', asio::deferred);
    while (buf.ends_with('\n'))
        buf.pop_back();
    co_return buf;
#else
    char buffer[512];
    auto n = co_await client_stream.async_read_some(asio::buffer(buffer), asio::deferred);
    co_return std::string(buffer, n);
#endif
}

asio::awaitable<void> run_process( //
    [[maybe_unused]] std::string const& command, Socket& client_stream) try {
    asio::any_io_executor ex = co_await asio::this_coro::executor;

    //bp::popen child(ex, "/bin/echo", {"Testing PTY output for command [", command, "]"});
    bp::popen child(ex, "/bin/sh", {"-c", command});

    // int client_fd = client_stream.native_handle();

    for (char buffer[1024]; ;) {
        // auto [ec, n] = co_await child.async_read_some(boost::asio::buffer(buffer),
        // asio::as_tuple(asio::deferred));
        auto n = co_await child.async_read_some(boost::asio::buffer(buffer));

        std::cout << quoted(std::string_view(buffer, n)) << std::endl;

        if (n)
            co_await async_write(client_stream, boost::asio::buffer(buffer, n), asio::deferred);

        // if (ec) {
        // std::cout << "End of stream (" << ec.message() << ")" << std::endl;
        // break;
        //}
    }
} catch (boost::system::system_error const& se) {
    std::cerr << "run_process: " << se.code().message() << std::endl;
} catch (std::exception const& e) {
    std::cerr << "run_process: " << e.what() << std::endl;
}

asio::awaitable<void> client_connection(Socket client) {
    std::cout << "Accepted connection" << std::endl;
    std::string command = co_await receive_command(client);
    std::cout << "Received command: " << command << std::endl;

    co_await run_process(command, client);
    client.close(); // implicit due to RAII lifetime
}

asio::awaitable<void> daemon_loop() {
    path socket_path = "/tmp/socket";
    auto ex          = co_await asio::this_coro::executor;

    if (exists(socket_path)) {
        if (!is_socket(socket_path))
            throw std::runtime_error("File exists and is not a socket");
        if (!remove(socket_path))
            throw std::runtime_error("Failed to remove existing socket file");
    }

    UNIX::acceptor acc(ex, socket_path.native());
    acc.listen(5);

    while (true) {
        co_spawn(ex, client_connection(co_await acc.async_accept()), asio::detached);
    }
}

int main() {
    boost::asio::io_context io_context;

    std::cout << "start" << std::endl;
    co_spawn(io_context, daemon_loop(), asio::detached);
    io_context.run();
}

有本地的、更实用的演示:

© www.soinside.com 2019 - 2024. All rights reserved.