AIO 异步 I/O 中的回调

问题描述 投票:0回答:1

我在互联网上的

AIO
异步 I/O 中找到了有关使用回调的讨论。然而,我的发现让我感到困惑。下面列出了来自 Linux AIO 站点 的示例代码。在此代码中,
AIO
用于读取文件的内容。我的问题是,在我看来,实际处理该文件内容的代码必须在某个点上对执行进行某种阻止,直到读取完成。这里的代码根本没有这样的块。我期待在
pthread_mutex_lock
编程中看到类似于
pthread
的某种调用。我想我可以在
aio_read()
调用之后放入一个虚拟循环,该循环将阻止执行,直到读取完成。但这让我又回到了阻止执行的最简单方法,然后我看不到建立回调的所有编码开销能获得什么。我显然错过了一些东西。有人可以告诉我这是什么吗?

这是代码。 (顺便说一句,原版是 C++ 语言;我已将其改编为 C 语言。)

#include <stdio.h>
#include <stdlib.h>
#include <strings.h>
#include <aio.h>
//#include <bits/stdc++.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <signal.h>

const int BUFSIZE = 1024;




void aio_completion_handler(sigval_t sigval)
{
    struct aiocb *req;

    req = (struct aiocb *)sigval.sival_ptr; //Pay attention here.
    /*Check again if the asynchrony is complete?*/
    if (aio_error(req) == 0)
    {
        int ret = aio_return(req);
        printf("ret == %d\n", ret);
        printf("%s\n", (char *)req->aio_buf);
    }
    close(req->aio_fildes);
    free((void *)req->aio_buf);
    while (1)
    {
        printf("The callback function is being executed...\n");
        sleep(1);
    }
}



int main(void)
{
    struct aiocb my_aiocb;
    int fd = open("file.txt", O_RDONLY);
    if (fd < 0)
        perror("open");
    bzero((char *)&my_aiocb, sizeof(my_aiocb));

    my_aiocb.aio_buf = malloc(BUFSIZE);
    if (!my_aiocb.aio_buf)
        perror("my_aiocb.aio_buf");

    my_aiocb.aio_fildes = fd;
    my_aiocb.aio_nbytes = BUFSIZE;
    my_aiocb.aio_offset = 0;

    //Fill in callback information
    /*
    Using SIGEV_THREAD to request a thread callback function as a notification method
    */
    my_aiocb.aio_sigevent.sigev_notify = SIGEV_THREAD;
    my_aiocb.aio_sigevent.sigev_notify_function = aio_completion_handler;
    my_aiocb.aio_sigevent.sigev_notify_attributes = NULL;
    /*
    The context to be transmitted is loaded into the handler (in this case, a reference to the aiocb request itself).
    In this handler, we simply refer to the arrived sigval pointer and use the AIO function to verify that the request has been completed.
    */
    my_aiocb.aio_sigevent.sigev_value.sival_ptr = &my_aiocb;

    int ret = aio_read(&my_aiocb);
    if (ret < 0)
        perror("aio_read");
    /*    <---- A real code would process the data read from the file.
     *          So execution needs to be blocked until it is clear that the
     *          read is complete.  Right here I could put in:
     *          while (aio_error(%my_aiocb) == EINPROGRESS) {}
     *          But is there some other way involving a callback?
     *          If not, what has creating a callback done for me?
     */
    //The calling process continues to execute
    while (1)
    {
        printf("The main thread continues to execute...\n");
        sleep(1);
    }
    return 0;
}
c asynchronous io aio
1个回答
0
投票

我期待在 pthread 编程中看到类似于 pthread_mutex_lock 的某种调用。

您的思考方向是正确的。操作完成后(无论成功还是失败),POSIX AIO 在相邻的 POSIX 线程中运行完成处理程序。因此使用 POSIX 线程同步原语:互斥体和条件变量。

我想我可以在 aio_read() 调用之后放入一个虚拟循环,该循环将阻止执行直到读取完成。

要等待异步操作完成,有aio_suspend函数。是的,它应该在同类循环中使用:

const struct aiocb *list[1] = { &my_aiocb };

while (aio_error (&my_aiocb) == EINPROGRESS)
    aio_suspend (list, 1, NULL);

但是在使用回调函数的情况下,这样的完成等待是浪费资源,而且本身就是不正确的:如果你需要回调函数,那么你显然需要与它的完成同步,而不是与异步的完成同步之后调用该函数的操作。

所以我重申一下:使用 POSIX 线程同步原语来同步 POSIX 线程。使用示例:

/*
 * POSIX Asynchronous I/O Usage Example
 *
 * Copyright (c) 2023 Alexei A. Smekalkine <[email protected]>
 *
 * SPDX-License-Identifier: BSD-2-Clause
 */

#include <errno.h>
#include <stdio.h>
#include <string.h>

#include <aio.h>
#include <fcntl.h>
#include <pthread.h>
#include <unistd.h>

typedef void aio_cb (union sigval u);

static
int aio_pread (struct aiocb *o, int fd, void *data, size_t count, off_t offset,
           aio_cb cb, void *cookie)
{
    o->aio_fildes = fd;
    o->aio_buf    = data;
    o->aio_nbytes = count;
    o->aio_offset = offset;

    o->aio_sigevent.sigev_value.sival_ptr   = cookie;
    o->aio_sigevent.sigev_notify            = SIGEV_THREAD;
    o->aio_sigevent.sigev_notify_function   = cb;
    o->aio_sigevent.sigev_notify_attributes = NULL;

    return aio_read (o);
}

struct work {
    pthread_mutex_t lock;
    pthread_cond_t  cond;
    int done;
    struct aiocb cb;
};

static void work_cont (union sigval u)
{
    struct work *o = u.sival_ptr;
    const int error = aio_error (&o->cb);

    if (error != 0)
        fprintf (stderr, "E: aio read: %s\n", strerror (error));
    else
        fprintf (stderr, "N: got %zd bytes\n", aio_return (&o->cb));

    close (o->cb.aio_fildes);  /* or do some other work with result */

    pthread_mutex_lock (&o->lock);
    o->done = 1;
    pthread_cond_signal (&o->cond);
    pthread_mutex_unlock (&o->lock);
}

static void work_wait (struct work *o)
{
    pthread_mutex_lock (&o->lock);

    while (!o->done)
        pthread_cond_wait (&o->cond, &o->lock);

    pthread_mutex_unlock (&o->lock);
}

int main (int argc, char *argv[])
{
    int fd;
    struct work o = { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER };
    char line[80];

    if (argc != 2) {
        fprintf (stderr, "usage:\n\taio-cb-test <file-to-read>\n");
        return 1;
    }

    if ((fd = open (argv[1], O_RDONLY)) == -1) {
        perror (argv[1]);
        return 1;
    }

    if (aio_pread (&o.cb, fd, line, sizeof (line), 0, work_cont, &o) != 0) {
        perror ("E: aio read");
        goto no_read;
    }

    work_wait (&o);
    return 0;
no_read:
    close (fd);
    return 1;
}

构建并启动:

$ cc aio-cb-test.c -o aio-cb-test
$ ./aio-cb-test aio-cb-test.c 
D: got 80 bytes
/*
 * POSIX Asynchronous I/O Usage Example
D: put 43 bytes
D: Signal that we are done our work
D: Got done signal
$ 

请注意,这里只有完成标志(我们工作的结束标记)的工作才受到互斥锁的保护。使用aiocb受到操作协议的保护:异步操作成功启动后,aiocb的所有权传递给AIO子系统,当操作完成时,所有权传递给回调函数。

© www.soinside.com 2019 - 2024. All rights reserved.