Boost.Cobalt:如何使用 C API 回调中的通道?

问题描述 投票:0回答:1

我想在通过通道进行通信的协程管道中使用 C API。 这是我第一次尝试协程,我对它们的了解有限。

管道形状为:

 --------    1    ---------    2    ------
| source | ----> | process | ----> | sink |
 --------         ---------         ------

每个方框代表一个协程,每个箭头代表一个通道。

C API 用于

process
协程。

它的签名大致是:

bool start_work(consumer_callback)
。这个API是 同步,并为其生成的每个数据调用一次
consumer_callback

我首先考虑写入通道

2
(见上图) 回调,但这会改变回调的签名,所以它不是 有可能。

我更改为将协程句柄传递给回调,从而恢复它。这 恢复的协程然后将数据写入通道

2

简化的代码是:

#include <coroutine>
#include <optional>
#include <string>
#include <boost/cobalt/channel.hpp>
#include <boost/cobalt/main.hpp>
#include <boost/cobalt/promise.hpp>
#include <boost/cobalt/join.hpp>

namespace cobalt = boost::cobalt;

// Data to communicate between the callback and the channel writer.
struct Data {
   std::optional<int> result;
   bool done = false;
   std::coroutine_handle<> coro_handle;
};

using Callback = void (*)(int, void*, bool);

void consumer_callback(int i, void* data, bool done) {
   Data& data_ = *reinterpret_cast<Data*>(data);
   data_.done = done;
   if (!done) {
      data_.result = i;
   }
   data_.coro_handle.resume();
}

// C API that produces results and calls the callback to consume each result.
// Results are integers.
void start_work(void* data, Callback cb) {
    bool done = false;
    for (int i = 0; i < 10; ++i) {
       cb(i, data, done); // !done case
    }
    done = true;
    cb(0, data, done); // done case
}

struct Awaiter : std::suspend_always {
    Data& data;
    bool first;

    bool await_ready() {
        return data.result.has_value();
    }

    void await_suspend(std::coroutine_handle<> h) {
        data.coro_handle = h;
        if (first) start_work(&data, consumer_callback);
    }

    int await_resume() {
        assert(data.result.has_value());
        auto opt = std::exchange(data.result, std::nullopt);
        return opt.value();
    }
};

Awaiter nextResult(Data& data, bool first) {
    return {{}, data, first};
}

cobalt::promise<void> source(cobalt::channel<std::string>& out) {
    co_await out.write("Hello world!");
    out.close();
}

cobalt::promise<void> process(cobalt::channel<std::string>& in, cobalt::channel<int>& out) {
    Data data;
    while (in.is_open() && out.is_open()) {
        auto _ = co_await in.read(); // ignore result for now
        auto first = true;
        while (!data.done || data.result.has_value()) {
            auto i = co_await nextResult(data, first);
            co_await out.write(i);
            first = false;
        }
    }
    in.close();
    out.close();
}

cobalt::promise<void> sink(cobalt::channel<int>& in) {
    while (in.is_open()) {
        auto i = co_await in.read(); // ignore result for now
    }
    in.close();
}

cobalt::main co_main(int argc, char* argv[]) {
    cobalt::channel<std::string> a;
    cobalt::channel<int> b;
    co_await cobalt::join(
        source(a),
        process(a, b),
        sink(b)
    );
    co_return 0;
}

接收器正确接收所有数据,但是当

process
协程完成时,有 在Asio内部有一个协程恢复到空指针。我究竟做错了什么? 谢谢!

环境:

Ubuntu 20.04

提升1.85

g++13 -std=gnu++2a

c++ boost callback coroutine cobalt
1个回答
0
投票

你不能混合同步和异步代码,当你调用

data_.coro_handle.resume();
时,你成为协程的新调用者而不是
cobalt::join
,这很糟糕,特别是你不能从
.resume()
内部调用
await_suspend
cobalt::join
拥有除了协程的对称传输之外,它应该是唯一恢复它的东西,否则你最终会得到 cobalt 等待可能已经完成的协程,因为其他人恢复了它。

您最好的希望是使用线程(或纤程)并在另一个线程中运行

start_work
(这样它就不会破坏该线程的堆栈),然后显式阻止 `std::condition_variable 来完成一项工作完成,或者创建一个 asio 事件 并等待它(如果你不想阻塞当前线程)。

© www.soinside.com 2019 - 2024. All rights reserved.