期中周 期中测试题,你做对了么?
你好,我是卢誉声。
为了帮助你巩固知识,提升能力,期中周我给你出了一道实战题目,基于课程里的代码扩展现有协程框架,实现高级任务调度。题目描述你可以通过这个链接回顾。
这一讲,我会把参考代码和解题思路公布出来。
答案解析
既然要在现有代码上增加功能,我们就有必要先熟悉原有架构,再决定在哪个模块或哪个层面上追加功能。
通过分析,可以发现任务的执行与调度是通过asyncpp.task模块实现的。同时,我们又是在为现有框架提供高优先级调度的能力。因此,新增的模块应该是供asyncpp.task使用的。
基于这样的考虑,我们在原有的架构基础上,追加了一个high performance(asyncpp.hp)模块,供asyncpp.task模块实现高性能的线程调度。补充后的架构图是后面这样。
从图中可以看到,asyncpp.task模块引用了asyncpp.hp模块。新的模块提供了高优先级线程调度和管理的能力。
我们来看一下具体实现。首先是:task子模块。
export module asyncpp.hp:task;
import asyncpp.core;
import <functional>;
import <vector>;
import <mutex>;
namespace asyncpp::hp {
export struct AsyncHpTask {
using ResumeHandler = std::function<void()>;
using TaskHandler = std::function<void()>;
// 协程唤醒函数
ResumeHandler resumeHandler;
// 计算任务函数
TaskHandler taskHandler;
};
export class AsyncHpTaskQueue {
public:
static AsyncHpTaskQueue& getInstance();
void enqueue(const AsyncHpTask& item) {
std::lock_guard<std::mutex> guard(_queueMutex);
_queue.push_back(item);
}
bool dequeue(AsyncHpTask* item) {
std::lock_guard<std::mutex> guard(_queueMutex);
if (_queue.size() == 0) {
return false;
}
*item = _queue.back();
_queue.pop_back();
return true;
}
size_t getSize() const {
return _queue.size();
}
private:
// 高性能计算任务队列
std::vector<AsyncHpTask> _queue;
// 高性能计算任务队列互斥锁,用于实现线程同步,确保队列操作的线程安全
std::mutex _queueMutex;
};
AsyncHpTaskQueue& AsyncHpTaskQueue::getInstance() {
static AsyncHpTaskQueue queue;
return queue;
}
}
这部分代码跟之前的asyncpp.io:task基本相同,唯一区别是调度的任务类型不同,用于处理高性能计算任务。
接下来的重点是:loop的实现。
module;
#ifndef _WINDOWS_
#include <Windows.h>
#endif // _WINDOWS_
export module asyncpp.hp:loop;
import :task;
import asyncpp.task.queue;
import <thread>;
import <chrono>;
import <thread>;
import <functional>;
namespace asyncpp::hp {
export class AsyncHpLoop {
public:
// 常量,定义了任务循环的等待间隔时间(单位为毫秒)
static const int32_t SLEEP_MS = 100;
static AsyncHpLoop& start();
private:
// 支持单例模式,将其定义为private,防止外部调用构造函数
AsyncHpLoop() {
_thread = std::jthread(std::bind(&AsyncHpLoop::loopMain, this));
auto nativeWorkerHandle = _thread.native_handle();
::SetThreadPriority(nativeWorkerHandle, THREAD_PRIORITY_HIGHEST);
}
// 支持单例模式,通过delete修饰符说明拷贝构造函数不可调用
AsyncHpLoop(const AsyncHpLoop&) = delete;
// 支持单例模式,通过delete修饰符说明赋值操作符不可调用
AsyncHpLoop& operator=(const AsyncHpLoop&) = delete;
void loopExecution() {
AsyncHpTask opItem;
if (!AsyncHpTaskQueue::getInstance().dequeue(&opItem)) {
return;
}
opItem.taskHandler();
auto& asyncEventQueue = asyncpp::task::AsyncTaskQueue::getInstance();
asyncEventQueue.enqueue({
.handler = opItem.resumeHandler
});
}
void loopMain() {
while (true) {
loopExecution();
std::this_thread::sleep_for(std::chrono::milliseconds(SLEEP_MS));
}
}
// jthread对象,为高性能计算线程,jthread让该线程结束之前整个进程都不会结束
std::jthread _thread;
};
AsyncHpLoop& AsyncHpLoop::start() {
static AsyncHpLoop ioLoop;
return ioLoop;
}
}
我们在这段代码里不仅使用jthread,还在AsyncHpLoop构造函数中使用了native_handle,设置了线程优先级,用于高性能的任务调度。
最后,我们在asyncpp.task:asyncify中,使用了新的模块。
export module asyncpp.task:asyncify;
export import asyncpp.task.queue;
export import :loop;
export import :coroutine;
import asyncpp.core;
import asyncpp.hp;
namespace asyncpp::task {
using asyncpp::core::Invocable;
// 默认的AsyncTaskSuspender(当任务函数返回类型不为void时)
template <typename ResultType>
void defaultAsyncAwaitableSuspend(
Awaitable<ResultType>* awaitable,
AsyncTaskResumer resumer,
CoroutineHandle& h
) {
auto& asyncTaskQueue = AsyncTaskQueue::getInstance();
asyncTaskQueue.enqueue({
.handler = [resumer, awaitable] {
awaitable->_taskResult = awaitable->_taskHandler();
resumer();
}
});
}
// 默认的AsyncTaskSuspender(当任务函数返回类型为void时)
template <>
void defaultAsyncAwaitableSuspend<void>(
Awaitable<void>* awaitable,
AsyncTaskResumer resumer,
CoroutineHandle& h
) {
auto& asyncTaskQueue = AsyncTaskQueue::getInstance();
asyncTaskQueue.enqueue({
.handler = [resumer, awaitable] {
awaitable->_taskHandler();
resumer();
}
});
}
template <typename ResultType>
void hpAsyncAwaitableSuspend(
Awaitable<ResultType>* awaitable,
AsyncTaskResumer resumer,
CoroutineHandle& h
) {
asyncpp::hp::AsyncHpTask operationItem{
.resumeHandler = [h] {
h.resume();
},
.taskHandler = [awaitable]() {
awaitable->_taskResult = awaitable->_taskHandler();
}
};
asyncpp::hp::AsyncHpTaskQueue::getInstance().enqueue(operationItem);
}
export template <Invocable T>
auto asyncify(
T taskHandler,
AsyncTaskSuspender<std::invoke_result_t<T>> suspender =
defaultAsyncAwaitableSuspend<std::invoke_result_t<T>>
) {
return Awaitable<std::invoke_result_t<T>>{
._taskHandler = taskHandler,
._suspender = suspender
};
}
export template <Invocable T>
auto asyncify(
T taskHandler,
bool highPriority
) {
if (highPriority) {
return Awaitable<std::invoke_result_t<T>>{
._taskHandler = taskHandler,
._suspender = hpAsyncAwaitableSuspend<std::invoke_result_t<T>>
};
}
return Awaitable<std::invoke_result_t<T>>{
._taskHandler = taskHandler,
._suspender = defaultAsyncAwaitableSuspend<std::invoke_result_t<T>>
};
}
}
你可以先关注一下第8行代码,这里我们导入了hp模块的符号。接着,还在第46行实现了hpAsyncAwaitableSuspend,用于实现高性能版本的调度。
最后,我们在第76行实现了类似于之前的asyncify工具,用于将一个普通的函数f转换成一个返回Awaitable对象的函数asyncF。通过这个分区实现的工具,可以让库的用户更容易使用coroutine。
这道题目的源代码,你可以从这里获取。
期中周即将告一段落,你可以再利用周末时间温习一下之前所学。有不明白的地方欢迎和我在留言区交流,下周我们继续回到课程主线,继续学习C++ Ranges的用法,敬请期待。
- Family mission 👍(0) 💬(1)
感觉asyncpp.task:asyncify 模块中36行使用单例模式构造AsyncHpTaskQueue而不是AsyncTaskQueue::getInstance(),写错了吧
2023-12-12