我是系统编程新手,正在尝试
io_uring
。我开始设计一个网络程序,并确定了一些 CPU 密集型工作,我认为这些工作应该卸载到线程池。不过,我不确定将其与环线程同步的方法以及相关的权衡。
我想到的第一个解决方案是用
pthread_mutex
和 pthread_cond
锁定的结构(例如队列)。这似乎不合适,因为我怀疑 io_uring_enter
或 pthread_cond_wait
是否足够频繁地返回,并且在正确的条件下生活在同一个循环中。即使他们这样做了,在这个热循环中引入另一个系统调用似乎也很笨拙。
我当前的设计涉及一对在环和线程池之间共享的文件描述符:每个方向一个。由于只有一个进程,因此指针适合作为描述符上的消息传递。如果读取和写入是原子的,则描述符提供同步:仅例如池端需要
read
/write
调用,另一端需要常规 io_uring
操作,无需锁定这些操作或指针底层的内存。
我也知道
IORING_OP_MSG_RING
,但由于 CPU 密集型工作可以完全分离,我宁愿在之后立即在(单个)环上安排任何后续 IO。
PIPE_BUF
限制是原子性的唯一条件吗?我从 Jens Axboe 中找到了两段摘录,名为
IORING_OP_MSG_RING
,专门作为这个问题的解决方案。
来自 2022 年内核食谱演讲的幻灯片(幻灯片 31):
[IORING_OP_MSG_RING] 用于在之间传递例如工作项指针 每个线程都有自己的环
来自 liburing
GitHub 页面上的(当前,唯一)wiki 文章 io_uring 和 2023 年网络
一个用例可能是处理新连接和单独的后端 处理所述连接的线程,提供了一种传递 从一个环到另一个环的连接。 io_uring_prep_msg_ring() 是一种方法 设立这样的 SQE。或者可以直接从线程中使用 处理给定的连接,将昂贵的工作转移给另一个连接 线程。
根据我的理解,这是这种模式的演示:
#define _GNU_SOURCE // for gettid()
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <liburing.h>
typedef struct io_uring io_uring;
typedef struct io_uring_params io_uring_params;
typedef struct io_uring_sqe io_uring_sqe;
typedef struct io_uring_cqe io_uring_cqe;
#define MQSIZE 64 // size of main-thread (work-sending) queue
#define TQSIZE 32 // size of work-thread (work-receiving) queue
#define NJOBS 32 // number of jobs
#define NTHREADS 4 // number of threads
// represents some unit of work
typedef struct job_unit {
int input;
int output;
int done;
} job_unit;
void * work ( void * arg ) {
io_uring ring;
io_uring_sqe * sqe;
io_uring_cqe * cqe;
int root_fd = *( ( int * ) arg );
int tid = gettid();
io_uring_params params = {};
// since each thread has a ring, only this thread will touch this ring
params.flags |= IORING_SETUP_SINGLE_ISSUER;
// the work-thread should process each incoming job sequentially --
// there's probably little use in having queued jobs available, e.g.
// between context switches
// I'm don't feel confident in my understanding of these switches
// (it is noted that the computation here is negligible)
params.flags |= IORING_SETUP_COOP_TASKRUN;
params.flags |= IORING_SETUP_DEFER_TASKRUN;
// "share async backend", but not SQ/CQ
params.flags |= IORING_SETUP_ATTACH_WQ;
params.wq_fd = root_fd;
assert( 0 == io_uring_queue_init_params( TQSIZE, &ring, ¶ms ) );
// tell main-thread ring about this one by sending its fd
// IOSQE_CQE_SKIP_SUCCESS helps reduce bookkeeping by eliding the completion
assert( ( sqe = io_uring_get_sqe( &ring ) ) );
io_uring_prep_msg_ring( sqe, root_fd, 0, ring.ring_fd, 0 );
io_uring_sqe_set_flags( sqe, IOSQE_CQE_SKIP_SUCCESS );
io_uring_submit( &ring );
while ( 1 ) {
assert( 0 == io_uring_wait_cqe( &ring, &cqe ) );
// stop signal: terminate thread
if ( !cqe->user_data ) {
io_uring_cqe_seen( &ring, cqe );
break;
}
// mutate the struct in some interesting way
job_unit * job = ( job_unit * ) cqe->user_data;
printf( "thread #%d assigned job #%d\n", tid, job->input );
job->output = job->input * 2; // the "work"
job->done = 1;
// send the pointer back
assert( ( sqe = io_uring_get_sqe( &ring ) ) );
io_uring_prep_msg_ring( sqe, root_fd, 0, ( uint64_t ) job, 0 );
io_uring_sqe_set_flags( sqe, IOSQE_CQE_SKIP_SUCCESS );
io_uring_cqe_seen( &ring, cqe );
io_uring_submit( &ring );
}
printf( "thread #%d exit\n", tid );
io_uring_queue_exit( &ring );
return NULL;
}
int main () {
io_uring ring;
io_uring_sqe * sqe = NULL;
io_uring_cqe * cqe = NULL;
io_uring_params params = {};
pthread_t threads[ NTHREADS ] = {};
job_unit jobs[ NJOBS ] = {}; // would usually be from heap
int work_fds[ NTHREADS ] = {}; // work-thread ring FDs
int work_fd_num = 0; // dual iter/counter for work_fds
int i; // loop var
params.flags |= IORING_SETUP_SINGLE_ISSUER;
// see similar lines above in work()
params.flags |= IORING_SETUP_COOP_TASKRUN;
params.flags |= IORING_SETUP_DEFER_TASKRUN;
// we don't test for message failure, but I think the style of this example
// lends itself to the way libuv handles things, including callbacks that
// can manage their own means of failure
params.flags |= IORING_SETUP_SUBMIT_ALL;
assert( 0 == io_uring_queue_init_params( MQSIZE, &ring, ¶ms ) );
// start threads, passing in our ring's fd
for ( i = 0 ; i < NTHREADS ; i++ )
assert( 0 == pthread_create( &threads[ i ], NULL, work, &ring.ring_fd ) );
// fill out the table of work-thread ring FDs
for ( i = 0 ; i < NTHREADS ; i++ ) {
assert( 0 == io_uring_wait_cqe( &ring, &cqe ) );
work_fds[ work_fd_num++ ] = cqe->user_data;
io_uring_cqe_seen( &ring, cqe );
}
// dispatch jobs
for ( i = 0 ; i < NJOBS ; i++ ) {
work_fd_num = ( work_fd_num + 1 ) % NTHREADS;
jobs[ i ].input = i;
jobs[ i ].done = 0;
assert( ( sqe = io_uring_get_sqe( &ring ) ) );
io_uring_prep_msg_ring( sqe, work_fds[ work_fd_num ], 0,
( uint64_t ) &jobs[ i ], 0 );
io_uring_sqe_set_flags( sqe, IOSQE_CQE_SKIP_SUCCESS );
}
io_uring_submit( &ring );
// collect results
for ( i = 0; i < NJOBS ; i++ ) {
assert( 0 == io_uring_wait_cqe( &ring, &cqe ) );
job_unit * job = ( job_unit * ) cqe->user_data;
printf( "job %d done\n", job->input );
io_uring_cqe_seen( &ring, cqe );
}
// broadcast shutdown to threads
for ( i = 0 ; i < NTHREADS ; i++ ) {
work_fd_num = ( work_fd_num + 1 ) % NTHREADS;
assert( ( sqe = io_uring_get_sqe( &ring ) ) );
io_uring_prep_msg_ring( sqe, work_fds[ work_fd_num ], 0, 0, 0 );
io_uring_sqe_set_flags( sqe, IOSQE_CQE_SKIP_SUCCESS );
}
io_uring_submit( &ring );
// join and report results
for ( i = 0 ; i < NTHREADS ; i++ )
assert( 0 == pthread_join( threads[ i ], NULL ) );
for ( i = 0 ; i < NJOBS ; i++ )
printf( "%-2d ", jobs[ i ].output );
printf( "\n" );
io_uring_queue_exit( &ring );
return 0;
}
一些有趣的点:
io_uring
来说是一种特别糟糕的想法,因为写入的值需要一直存在到完成为止。使用上面的示例作为模型,为了将指针发送到堆栈上的数组,需要分配一个单独的指针数组。本质上是一个管理队列写入的队列。IORING_SETUP_SINGLE_ISSUER
和 IORING_SETUP_ATTACH_WQ
是明显的优化。 IORING_SETUP_COOP_TASKRUN
提出了一个令人信服的案例,但我仍然不确定IORING_SETUP_DEFER_TASKRUN
在这里是否有益。正如我上面所指出的,我在这里有点超出了我的深度,所以我只是前瞻性地写这个答案,并提供我迄今为止的进展。如果有人知道更多,我很乐意接受另一个答案或修改这个答案。