我想在通过通道进行通信的协程管道中使用 C API。 这是我第一次尝试协程,我对它们的了解有限。
管道形状为:
-------- 1 --------- 2 ------
| source | ----> | process | ----> | sink |
-------- --------- ------
每个方框代表一个协程,每个箭头代表一个通道。
C API 用于
process
协程。
它的签名大致是:
bool start_work(consumer_callback)
。这个API是
同步,并为其生成的每个数据调用一次 consumer_callback
。
我首先考虑写入通道
2
(见上图)
回调,但这会改变回调的签名,所以它不是
有可能。
我更改为将协程句柄传递给回调,从而恢复它。这 恢复的协程然后将数据写入通道
2
。
简化的代码是:
#include <coroutine>
#include <optional>
#include <string>
#include <boost/cobalt/channel.hpp>
#include <boost/cobalt/main.hpp>
#include <boost/cobalt/promise.hpp>
#include <boost/cobalt/join.hpp>
namespace cobalt = boost::cobalt;
// Data to communicate between the callback and the channel writer.
struct Data {
std::optional<int> result;
bool done = false;
std::coroutine_handle<> coro_handle;
};
using Callback = void (*)(int, void*, bool);
void consumer_callback(int i, void* data, bool done) {
Data& data_ = *reinterpret_cast<Data*>(data);
data_.done = done;
if (!done) {
data_.result = i;
}
data_.coro_handle.resume();
}
// C API that produces results and calls the callback to consume each result.
// Results are integers.
void start_work(void* data, Callback cb) {
bool done = false;
for (int i = 0; i < 10; ++i) {
cb(i, data, done); // !done case
}
done = true;
cb(0, data, done); // done case
}
struct Awaiter : std::suspend_always {
Data& data;
bool first;
bool await_ready() {
return data.result.has_value();
}
void await_suspend(std::coroutine_handle<> h) {
data.coro_handle = h;
if (first) start_work(&data, consumer_callback);
}
int await_resume() {
assert(data.result.has_value());
auto opt = std::exchange(data.result, std::nullopt);
return opt.value();
}
};
Awaiter nextResult(Data& data, bool first) {
return {{}, data, first};
}
cobalt::promise<void> source(cobalt::channel<std::string>& out) {
co_await out.write("Hello world!");
out.close();
}
cobalt::promise<void> process(cobalt::channel<std::string>& in, cobalt::channel<int>& out) {
Data data;
while (in.is_open() && out.is_open()) {
auto _ = co_await in.read(); // ignore result for now
auto first = true;
while (!data.done || data.result.has_value()) {
auto i = co_await nextResult(data, first);
co_await out.write(i);
first = false;
}
}
in.close();
out.close();
}
cobalt::promise<void> sink(cobalt::channel<int>& in) {
while (in.is_open()) {
auto i = co_await in.read(); // ignore result for now
}
in.close();
}
cobalt::main co_main(int argc, char* argv[]) {
cobalt::channel<std::string> a;
cobalt::channel<int> b;
co_await cobalt::join(
source(a),
process(a, b),
sink(b)
);
co_return 0;
}
接收器正确接收所有数据,但是当
process
协程完成时,有
在Asio内部有一个协程恢复到空指针。我究竟做错了什么?
谢谢!
环境:
Ubuntu 20.04
提升1.85
g++13 -std=gnu++2a
你不能混合同步和异步代码,当你调用
data_.coro_handle.resume();
时,你成为协程的新调用者而不是cobalt::join
,这很糟糕,特别是你不能从.resume()
内部调用await_suspend
,cobalt::join
拥有除了协程的对称传输之外,它应该是唯一恢复它的东西,否则你最终会得到 cobalt 等待可能已经完成的协程,因为其他人恢复了它。
您最好的希望是使用线程(或纤程)并在另一个线程中运行
start_work
(这样它就不会破坏该线程的堆栈),然后显式阻止 `std::condition_variable 来完成一项工作完成,或者创建一个 asio 事件 并等待它(如果你不想阻塞当前线程)。