我是 corotine 和 asio 的新手。 我正在开发一个本地流程管理工具,其中客户负责启动程序。该客户端将命令发送到管理该进程的后端守护程序。我正在使用单线程、多协程模型。我的守护进程需要与子进程交互以获取IPC传输到客户端的日志。
我的问题是:为什么我的守护进程卡在code 1 async_read_some 并且不触发读取标准输出,而code 2工作正常?
这是一个简单的演示:
客户代码:
#include <iostream>
#include <pwd.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <thread>
#include <unistd.h>
void receive_output(int sockfd)
{
char buffer[1024];
while (true)
{
ssize_t n = recv(sockfd, buffer, sizeof(buffer) - 1, 0);
if (n <= 0)
break;
buffer[n] = '\0';
std::cout << buffer << std::flush;
}
}
void send_input(int sockfd)
{
std::string input;
while (std::getline(std::cin, input))
{
input += '\n';
send(sockfd, input.c_str(), input.size(), 0);
}
}
int main()
{
int sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
struct sockaddr_un addr;
addr.sun_family = AF_UNIX;
strcpy(addr.sun_path, "/tmp/socket");
connect(sockfd, (struct sockaddr *)&addr, sizeof(addr));
std::string command = "test";
send(sockfd, command.c_str(), command.size(), 0);
std::thread output_thread(receive_output, sockfd);
std::thread input_thread(send_input, sockfd);
output_thread.join();
input_thread.join();
close(sockfd);
return 0;
}
服务器代码:
#include <boost/asio.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/posix/stream_descriptor.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <cstdlib>
#include <fcntl.h>
#include <iostream>
#include <pty.h>
#include <pwd.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/wait.h>
using boost::asio::awaitable;
using boost::asio::use_awaitable;
namespace asio = boost::asio;
boost::asio::io_context io_context;
void set_nonblocking(int fd)
{
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1)
{
perror("fcntl(F_GETFL)");
return;
}
flags |= O_NONBLOCK;
if (fcntl(fd, F_SETFL, flags) == -1)
{
perror("fcntl(F_SETFL)");
}
}
awaitable<std::string> receive_command(asio::posix::stream_descriptor &client_stream)
{
char buffer[256];
std::size_t len = co_await client_stream.async_read_some(boost::asio::buffer(buffer), use_awaitable);
buffer[len] = '\0';
co_return std::string(buffer);
}
awaitable<void> run_process_in_pty(const std::string &command, asio::posix::stream_descriptor &client_stream,
int client_fd)
{
int master_fd;
pid_t pid = forkpty(&master_fd, NULL, NULL, NULL);
// set_nonblocking(master_fd);
if (pid == 0)
{
// Child process: Set user and change directory
setvbuf(stdout, nullptr, _IONBF, 0);
// execute echo just for test
// execl(command.c_str(), "", NULL);
execl("/bin/echo", "echo", "Testing PTY output!", NULL);
std::cerr << "Failed to execute command: " << strerror(errno) << std::endl;
exit(EXIT_FAILURE);
}
else
{
// Parent process: Handle I/O between client and pty
asio::posix::stream_descriptor pty_stream(io_context, master_fd);
// handle output
co_spawn(
io_context,
[&client_stream, &pty_stream]() -> awaitable<void> {
char buffer[1024];
try
{
while (true)
{
// code 1
// std::size_t n = co_await pty_stream.async_read_some(boost::asio::buffer(buffer), use_awaitable);
// code 2
ssize_t n = read(pty_stream.native_handle(), buffer, sizeof(buffer));
std::cout << buffer << std::endl;
if (n == 0)
break; // End of stream
// co_await asio::async_write(
// client_stream, boost::asio::buffer(buffer, n),
// use_awaitable);
}
}
catch (std::exception &e)
{
std::cerr << strerror(errno) << std::endl;
}
co_return;
},
asio::detached);
// handle input
waitpid(pid, nullptr, 0);
co_return;
}
}
awaitable<void> accept_client(asio::posix::stream_descriptor client_stream, int client_fd)
{
std::string command = co_await receive_command(client_stream);
std::cout << "Received command: " << command << std::endl;
co_await run_process_in_pty(command, client_stream, client_fd);
client_stream.close();
}
awaitable<int> async_accept(int server_fd)
{
int client_fd;
co_await asio::post(io_context, use_awaitable);
client_fd = accept(server_fd, nullptr, nullptr);
if (client_fd < 0)
{
std::cerr << "Failed to accept client connection" << std::endl;
co_return - 1;
}
std::cout << "clientfd, " << client_fd << std::endl;
set_nonblocking(client_fd);
co_return client_fd;
}
awaitable<void> daemon_loop()
{
const char *socket_path = "/tmp/socket";
if (access(socket_path, F_OK) == 0)
{
if (unlink(socket_path) != 0)
{
std::cerr << "Failed to remove existing socket file" << std::endl;
exit(EXIT_FAILURE);
}
}
int server_fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (server_fd < 0)
{
std::cerr << "Failed to create socket" << std::endl;
exit(EXIT_FAILURE);
}
// set_nonblocking(server_fd);
struct sockaddr_un addr;
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
strcpy(addr.sun_path, socket_path);
if (bind(server_fd, (struct sockaddr *)&addr, sizeof(addr)) < 0)
{
std::cerr << "Failed to bind socket" << std::endl;
exit(EXIT_FAILURE);
}
if (listen(server_fd, 5) < 0)
{
std::cerr << "Failed to listen on socket" << std::endl;
exit(EXIT_FAILURE);
}
asio::posix::stream_descriptor server_stream(io_context, server_fd);
while (true)
{
int client_fd = co_await async_accept(server_fd);
asio::posix::stream_descriptor client_stream(io_context, client_fd);
std::cout << "Accepted connection" << std::endl;
co_spawn(io_context, accept_client(std::move(client_stream), client_fd), asio::detached);
}
}
int main()
{
std::cout << "start" << std::endl;
co_spawn(io_context, daemon_loop(), asio::detached);
io_context.run();
return 0;
}
这是代码1和代码2
之间不同的运行结果代码2:
最可能的问题是您使用了
::waitpid
,它阻塞了唯一可用的 IO 服务线程。
run_process_in_pty
中的 co_spawn 也对对象生命周期进行快速且宽松的处理。它通过引用捕获 client_stream
和 pty_stream
,但它们要么是本地变量,要么是来自另一个协程框架的引用。
无论如何,所有代码看起来都非常复杂,以某种方式迫使 C 风格的网络代码进入 Asio 的协程框架。为什么不使用Asio?这样你就可以得到:
awaitable<void> client_connection(Socket client) {
std::cout << "Accepted connection" << std::endl;
std::string command = co_await receive_command(client);
std::cout << "Received command: " << command << std::endl;
co_await run_process_in_pty(command, client, client.native_handle());
// client_stream.close(); // implicit due to RAII lifetime
}
awaitable<void> daemon_loop() {
path socket_path = "/tmp/socket";
auto ex = co_await asio::this_coro::executor;
if (exists(socket_path)) {
if (!is_socket(socket_path))
throw std::runtime_error("File exists and is not a socket");
if (!remove(socket_path))
throw std::runtime_error("Failed to remove existing socket file");
}
UNIX::acceptor acc(ex, socket_path.native());
acc.listen(5);
while (true) {
co_spawn(ex, client_connection(co_await acc.async_accept()), asio::detached);
}
}
int main() {
boost::asio::io_context io_context;
std::cout << "start" << std::endl;
co_spawn(io_context, daemon_loop(), asio::detached);
io_context.run();
}
作为奖励,让我还演示如何对子进程使用 Boost Process:
#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/process/v2.hpp>
#include <filesystem>
#include <iostream>
#include <pty.h>
#include <sys/wait.h>
namespace asio = boost::asio;
namespace bp = boost::process::v2;
using UNIX = asio::local::stream_protocol;
using Socket = UNIX::socket;
using std::filesystem::path;
using namespace boost::asio::experimental::awaitable_operators;
asio::awaitable<std::string> receive_command(UNIX::socket& client_stream) {
#if 1
std::string buf;
co_await async_read_until(client_stream, asio::dynamic_buffer(buf), '\n', asio::deferred);
while (buf.ends_with('\n'))
buf.pop_back();
co_return buf;
#else
char buffer[512];
auto n = co_await client_stream.async_read_some(asio::buffer(buffer), asio::deferred);
co_return std::string(buffer, n);
#endif
}
asio::awaitable<void> run_process( //
[[maybe_unused]] std::string const& command, Socket& client_stream) try {
asio::any_io_executor ex = co_await asio::this_coro::executor;
//bp::popen child(ex, "/bin/echo", {"Testing PTY output for command [", command, "]"});
bp::popen child(ex, "/bin/sh", {"-c", command});
// int client_fd = client_stream.native_handle();
for (char buffer[1024]; ;) {
// auto [ec, n] = co_await child.async_read_some(boost::asio::buffer(buffer),
// asio::as_tuple(asio::deferred));
auto n = co_await child.async_read_some(boost::asio::buffer(buffer));
std::cout << quoted(std::string_view(buffer, n)) << std::endl;
if (n)
co_await async_write(client_stream, boost::asio::buffer(buffer, n), asio::deferred);
// if (ec) {
// std::cout << "End of stream (" << ec.message() << ")" << std::endl;
// break;
//}
}
} catch (boost::system::system_error const& se) {
std::cerr << "run_process: " << se.code().message() << std::endl;
} catch (std::exception const& e) {
std::cerr << "run_process: " << e.what() << std::endl;
}
asio::awaitable<void> client_connection(Socket client) {
std::cout << "Accepted connection" << std::endl;
std::string command = co_await receive_command(client);
std::cout << "Received command: " << command << std::endl;
co_await run_process(command, client);
client.close(); // implicit due to RAII lifetime
}
asio::awaitable<void> daemon_loop() {
path socket_path = "/tmp/socket";
auto ex = co_await asio::this_coro::executor;
if (exists(socket_path)) {
if (!is_socket(socket_path))
throw std::runtime_error("File exists and is not a socket");
if (!remove(socket_path))
throw std::runtime_error("Failed to remove existing socket file");
}
UNIX::acceptor acc(ex, socket_path.native());
acc.listen(5);
while (true) {
co_spawn(ex, client_connection(co_await acc.async_accept()), asio::detached);
}
}
int main() {
boost::asio::io_context io_context;
std::cout << "start" << std::endl;
co_spawn(io_context, daemon_loop(), asio::detached);
io_context.run();
}
有本地的、更实用的演示: