考虑我有以下异步任务运行程序。它将任务放入队列中并逐一执行它们,并在完成时调用 lambda。如果 func 产生某种类型的结果,那么 onComplete 回调应该接收相同类型的参数。
class TaskRunner
{
public:
TaskRunner() :
mPool(new QThreadPool())
{
mPool->setMaxThreadCount(1);
mPool->setExpiryTimeout(-1);
}
public:
// For Funcs returning an instance of std::variant
template <typename Func, typename OnComplete, typename OnError, typename... Args>
std::enable_if_t<is_instance<std::invoke_result_t<Func, Args...>, std::variant>::value>
async(Func func, OnComplete onComplete, OnError onError, Args... args)
{
QMutexLocker lock(&mutex);
(void)QtConcurrent::run(mPool.get(), [=]() {
const auto result = func(args...);
if (result.index() == 0)
onComplete(std::get<0>(result));
else
onError(std::get<1>(result));
});
}
// ... similar templates for std::optional, etc. ...
private:
QScopedPointer<QThreadPool> mPool;
QMutex mutex;
};
用途:
TaskRunner runner;
runner.async([](int a, int b) {
if(b != 0)
return a/b;
else
return std::string("division by zero");},
[](auto result){ std::cout << result << '\n'; },
[](auto error){ std::cout << error << '\n'; },
10, 2);
问题是 onComplete 和 onError 回调应该从主线程调用。我听说这可以通过 Qt 的信号和槽系统来实现,但我无法找出解决方案,更不用说实现它了。
QtConcurrent::run
和一个物体。
事情的经过是:
QtConcurrent::run
在单独的线程中启动函数。您从中得到的是 QFuture
,您可以查询它来获取执行状态。QFuture::waitForFinished
:它将阻塞您的线程,直到执行完成。这是一个在工作线程中运行 5 倍函数的简单示例。
taskNo
(从 1 到 5)在 QtConcurrent::run
中从主线程传递到工作线程,并在信号中返回。为了便于说明,此示例适用于单个工作对象。
不过,您可以使用单独的工作对象,但是您需要在
for
循环内创建指针,在 QtConcurrent::run
中更改它们的线程亲和力(而不是像我一样只打印到屏幕),进行连接并且仅然后开始工作。
无论如何,请始终用
Qt::QueuedConnection
明确标记您的联系。这可能是 QObject::connect
默认情况下会执行的操作,但至少,它使您的意图明确。
在
main
函数中,我在 3 个地方添加了注释以供说明:
taskCompleted
信号后断开连接。main
中创建了线程池。我已经做到了,但它不被使用,除非你取消注释它。QFuture::waitForFinished
作用的说明。如果您取消注释它,工作仍然会在工作线程中完成,但没有并行性:因为它在创建下一个工作线程之前阻塞主线程(请参阅上面我的评论)。Worker.h
#include <QObject>
class Worker : public QObject
{
Q_OBJECT
public:
void run(int taskNo);
signals:
void taskCompleted(Qt::HANDLE, int);
};
Worker.cpp
#include "Worker.h"
#include <QtCore/QDebug>
#include <QtCore/QThread>
void Worker::run(int taskNo) {
qDebug() << "Executing task" << taskNo << "in worker thread" << QThread::currentThreadId();
emit taskCompleted(QThread::currentThreadId(), taskNo);
}
main.cpp
#include <QtCore/QFuture>
#include <QtCore/QThread>
#include <QtCore/QThreadPool>
#include <QtConcurrent/QtConcurrentRun>
#include "Worker.h"
int main(int argc, char* argv[])
{
QApplication a(argc, argv);
QThreadPool pool;
pool.setMaxThreadCount(2);
QObject catchObject;
Worker worker;
QObject::connect(&worker, &Worker::taskCompleted, &catchObject,
[](Qt::HANDLE workerThreadId, int taskNo)
{
qDebug() << "Message from main thread" << QThread::currentThreadId() << ": Task" << taskNo << "completed in thread" << workerThreadId;
}
, static_cast<Qt::ConnectionType>(Qt::QueuedConnection /*| Qt::SingleShotConnection*/) //Uncomment to catch only the first task to complete.
);
qDebug() << "Tasks will now be created from main thread" << QThread::currentThreadId();
for (int taskNo = 1; taskNo <= 5; ++taskNo) {
QtConcurrent::run(
/*&pool,*/ //Uncomment to work with the 2 threads of the thread pool.
[](int taskNo) -> int
{
qDebug() << "Hello from worker thread" << QThread::currentThreadId() << ", starting task" << taskNo;
return taskNo;
}, taskNo
).then(
[&worker](int taskNo) { worker.run(taskNo); }
)/*.waitForFinished()*/; //Uncomment to ahve the main thread wait for the task to finish before resuming the loop.
}
return a.exec();
}