我在互联网上的
AIO
异步 I/O 中找到了有关使用回调的讨论。然而,我的发现让我感到困惑。下面列出了来自 Linux AIO 站点 的示例代码。在此代码中,AIO
用于读取文件的内容。我的问题是,在我看来,实际处理该文件内容的代码必须在某个点上对执行进行某种阻止,直到读取完成。这里的代码根本没有这样的块。我期待在 pthread_mutex_lock
编程中看到类似于 pthread
的某种调用。我想我可以在 aio_read()
调用之后放入一个虚拟循环,该循环将阻止执行,直到读取完成。但这让我又回到了阻止执行的最简单方法,然后我看不到建立回调的所有编码开销能获得什么。我显然错过了一些东西。有人可以告诉我这是什么吗?
这是代码。 (顺便说一句,原版是 C++ 语言;我已将其改编为 C 语言。)
#include <stdio.h>
#include <stdlib.h>
#include <strings.h>
#include <aio.h>
//#include <bits/stdc++.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <signal.h>
const int BUFSIZE = 1024;
void aio_completion_handler(sigval_t sigval)
{
struct aiocb *req;
req = (struct aiocb *)sigval.sival_ptr; //Pay attention here.
/*Check again if the asynchrony is complete?*/
if (aio_error(req) == 0)
{
int ret = aio_return(req);
printf("ret == %d\n", ret);
printf("%s\n", (char *)req->aio_buf);
}
close(req->aio_fildes);
free((void *)req->aio_buf);
while (1)
{
printf("The callback function is being executed...\n");
sleep(1);
}
}
int main(void)
{
struct aiocb my_aiocb;
int fd = open("file.txt", O_RDONLY);
if (fd < 0)
perror("open");
bzero((char *)&my_aiocb, sizeof(my_aiocb));
my_aiocb.aio_buf = malloc(BUFSIZE);
if (!my_aiocb.aio_buf)
perror("my_aiocb.aio_buf");
my_aiocb.aio_fildes = fd;
my_aiocb.aio_nbytes = BUFSIZE;
my_aiocb.aio_offset = 0;
//Fill in callback information
/*
Using SIGEV_THREAD to request a thread callback function as a notification method
*/
my_aiocb.aio_sigevent.sigev_notify = SIGEV_THREAD;
my_aiocb.aio_sigevent.sigev_notify_function = aio_completion_handler;
my_aiocb.aio_sigevent.sigev_notify_attributes = NULL;
/*
The context to be transmitted is loaded into the handler (in this case, a reference to the aiocb request itself).
In this handler, we simply refer to the arrived sigval pointer and use the AIO function to verify that the request has been completed.
*/
my_aiocb.aio_sigevent.sigev_value.sival_ptr = &my_aiocb;
int ret = aio_read(&my_aiocb);
if (ret < 0)
perror("aio_read");
/* <---- A real code would process the data read from the file.
* So execution needs to be blocked until it is clear that the
* read is complete. Right here I could put in:
* while (aio_error(%my_aiocb) == EINPROGRESS) {}
* But is there some other way involving a callback?
* If not, what has creating a callback done for me?
*/
//The calling process continues to execute
while (1)
{
printf("The main thread continues to execute...\n");
sleep(1);
}
return 0;
}
我期待在 pthread 编程中看到类似于 pthread_mutex_lock 的某种调用。
您的思考方向是正确的。操作完成后(无论成功还是失败),POSIX AIO 在相邻的 POSIX 线程中运行完成处理程序。因此使用 POSIX 线程同步原语:互斥体和条件变量。
我想我可以在 aio_read() 调用之后放入一个虚拟循环,该循环将阻止执行直到读取完成。
要等待异步操作完成,有aio_suspend函数。是的,它应该在同类循环中使用:
const struct aiocb *list[1] = { &my_aiocb };
while (aio_error (&my_aiocb) == EINPROGRESS)
aio_suspend (list, 1, NULL);
但是在使用回调函数的情况下,这样的完成等待是浪费资源,而且本身就是不正确的:如果你需要回调函数,那么你显然需要与它的完成同步,而不是与异步的完成同步之后调用该函数的操作。
所以我重申一下:使用 POSIX 线程同步原语来同步 POSIX 线程。使用示例:
/*
* POSIX Asynchronous I/O Usage Example
*
* Copyright (c) 2023 Alexei A. Smekalkine <[email protected]>
*
* SPDX-License-Identifier: BSD-2-Clause
*/
#include <errno.h>
#include <stdio.h>
#include <string.h>
#include <aio.h>
#include <fcntl.h>
#include <pthread.h>
#include <unistd.h>
typedef void aio_cb (union sigval u);
static
int aio_pread (struct aiocb *o, int fd, void *data, size_t count, off_t offset,
aio_cb cb, void *cookie)
{
o->aio_fildes = fd;
o->aio_buf = data;
o->aio_nbytes = count;
o->aio_offset = offset;
o->aio_sigevent.sigev_value.sival_ptr = cookie;
o->aio_sigevent.sigev_notify = SIGEV_THREAD;
o->aio_sigevent.sigev_notify_function = cb;
o->aio_sigevent.sigev_notify_attributes = NULL;
return aio_read (o);
}
struct work {
pthread_mutex_t lock;
pthread_cond_t cond;
int done;
struct aiocb cb;
};
static void work_cont (union sigval u)
{
struct work *o = u.sival_ptr;
const int error = aio_error (&o->cb);
if (error != 0)
fprintf (stderr, "E: aio read: %s\n", strerror (error));
else
fprintf (stderr, "N: got %zd bytes\n", aio_return (&o->cb));
close (o->cb.aio_fildes); /* or do some other work with result */
pthread_mutex_lock (&o->lock);
o->done = 1;
pthread_cond_signal (&o->cond);
pthread_mutex_unlock (&o->lock);
}
static void work_wait (struct work *o)
{
pthread_mutex_lock (&o->lock);
while (!o->done)
pthread_cond_wait (&o->cond, &o->lock);
pthread_mutex_unlock (&o->lock);
}
int main (int argc, char *argv[])
{
int fd;
struct work o = { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER };
char line[80];
if (argc != 2) {
fprintf (stderr, "usage:\n\taio-cb-test <file-to-read>\n");
return 1;
}
if ((fd = open (argv[1], O_RDONLY)) == -1) {
perror (argv[1]);
return 1;
}
if (aio_pread (&o.cb, fd, line, sizeof (line), 0, work_cont, &o) != 0) {
perror ("E: aio read");
goto no_read;
}
work_wait (&o);
return 0;
no_read:
close (fd);
return 1;
}
构建并启动:
$ cc aio-cb-test.c -o aio-cb-test
$ ./aio-cb-test aio-cb-test.c
D: got 80 bytes
/*
* POSIX Asynchronous I/O Usage Example
D: put 43 bytes
D: Signal that we are done our work
D: Got done signal
$
请注意,这里只有完成标志(我们工作的结束标记)的工作才受到互斥锁的保护。使用aiocb受到操作协议的保护:异步操作成功启动后,aiocb的所有权传递给AIO子系统,当操作完成时,所有权传递给回调函数。