跳转至

期中周 期中测试题,你做对了么?

你好,我是卢誉声。

为了帮助你巩固知识,提升能力,期中周我给你出了一道实战题目,基于课程里的代码扩展现有协程框架,实现高级任务调度。题目描述你可以通过这个链接回顾。

这一讲,我会把参考代码和解题思路公布出来。

答案解析

既然要在现有代码上增加功能,我们就有必要先熟悉原有架构,再决定在哪个模块或哪个层面上追加功能。

通过分析,可以发现任务的执行与调度是通过asyncpp.task模块实现的。同时,我们又是在为现有框架提供高优先级调度的能力。因此,新增的模块应该是供asyncpp.task使用的。

基于这样的考虑,我们在原有的架构基础上,追加了一个high performance(asyncpp.hp)模块,供asyncpp.task模块实现高性能的线程调度。补充后的架构图是后面这样。

从图中可以看到,asyncpp.task模块引用了asyncpp.hp模块。新的模块提供了高优先级线程调度和管理的能力。

我们来看一下具体实现。首先是:task子模块。

export module asyncpp.hp:task;

import asyncpp.core;
import <functional>;
import <vector>;
import <mutex>;

namespace asyncpp::hp {

export struct AsyncHpTask {
    using ResumeHandler = std::function<void()>;
    using TaskHandler = std::function<void()>;

    // 协程唤醒函数
    ResumeHandler resumeHandler;
    // 计算任务函数
    TaskHandler taskHandler;
};

export class AsyncHpTaskQueue {
public:
    static AsyncHpTaskQueue& getInstance();

    void enqueue(const AsyncHpTask& item) {
        std::lock_guard<std::mutex> guard(_queueMutex);

        _queue.push_back(item);
    }

    bool dequeue(AsyncHpTask* item) {
        std::lock_guard<std::mutex> guard(_queueMutex);

        if (_queue.size() == 0) {
            return false;
        }

        *item = _queue.back();
        _queue.pop_back();

        return true;
    }

    size_t getSize() const {
        return _queue.size();
    }

private:
    // 高性能计算任务队列
    std::vector<AsyncHpTask> _queue;
    // 高性能计算任务队列互斥锁,用于实现线程同步,确保队列操作的线程安全
    std::mutex _queueMutex;
};

AsyncHpTaskQueue& AsyncHpTaskQueue::getInstance() {
    static AsyncHpTaskQueue queue;

    return queue;
}

}

这部分代码跟之前的asyncpp.io:task基本相同,唯一区别是调度的任务类型不同,用于处理高性能计算任务。

接下来的重点是:loop的实现。

module;

#ifndef _WINDOWS_
#include <Windows.h>
#endif // _WINDOWS_

export module asyncpp.hp:loop;

import :task;
import asyncpp.task.queue;

import <thread>;
import <chrono>;
import <thread>;
import <functional>;

namespace asyncpp::hp {
    export class AsyncHpLoop {
    public:
        // 常量,定义了任务循环的等待间隔时间(单位为毫秒)
        static const int32_t SLEEP_MS = 100;

        static AsyncHpLoop& start();

    private:
        // 支持单例模式,将其定义为private,防止外部调用构造函数
        AsyncHpLoop() {
            _thread = std::jthread(std::bind(&AsyncHpLoop::loopMain, this));
            auto nativeWorkerHandle = _thread.native_handle();
            ::SetThreadPriority(nativeWorkerHandle, THREAD_PRIORITY_HIGHEST);
        }
        // 支持单例模式,通过delete修饰符说明拷贝构造函数不可调用
        AsyncHpLoop(const AsyncHpLoop&) = delete;
        // 支持单例模式,通过delete修饰符说明赋值操作符不可调用
        AsyncHpLoop& operator=(const AsyncHpLoop&) = delete;

        void loopExecution() {
            AsyncHpTask opItem;
            if (!AsyncHpTaskQueue::getInstance().dequeue(&opItem)) {
                return;
            }

            opItem.taskHandler();

            auto& asyncEventQueue = asyncpp::task::AsyncTaskQueue::getInstance();
            asyncEventQueue.enqueue({
                .handler = opItem.resumeHandler
            });
        }

        void loopMain() {
            while (true) {
                loopExecution();
                std::this_thread::sleep_for(std::chrono::milliseconds(SLEEP_MS));
            }
        }

        // jthread对象,为高性能计算线程,jthread让该线程结束之前整个进程都不会结束
        std::jthread _thread;
    };

    AsyncHpLoop& AsyncHpLoop::start() {
        static AsyncHpLoop ioLoop;

        return ioLoop;
    }
}

我们在这段代码里不仅使用jthread,还在AsyncHpLoop构造函数中使用了native_handle,设置了线程优先级,用于高性能的任务调度。

最后,我们在asyncpp.task:asyncify中,使用了新的模块。

export module asyncpp.task:asyncify;

export import asyncpp.task.queue;
export import :loop;
export import :coroutine;

import asyncpp.core;
import asyncpp.hp;

namespace asyncpp::task {
    using asyncpp::core::Invocable;

    // 默认的AsyncTaskSuspender(当任务函数返回类型不为void时)
    template <typename ResultType>
    void defaultAsyncAwaitableSuspend(
        Awaitable<ResultType>* awaitable,
        AsyncTaskResumer resumer,
        CoroutineHandle& h
    ) {
        auto& asyncTaskQueue = AsyncTaskQueue::getInstance();
        asyncTaskQueue.enqueue({
            .handler = [resumer, awaitable] {
                awaitable->_taskResult = awaitable->_taskHandler();
                resumer();
            }
        });
    }

    // 默认的AsyncTaskSuspender(当任务函数返回类型为void时)
    template <>
    void defaultAsyncAwaitableSuspend<void>(
        Awaitable<void>* awaitable,
        AsyncTaskResumer resumer,
        CoroutineHandle& h
    ) {
        auto& asyncTaskQueue = AsyncTaskQueue::getInstance();
        asyncTaskQueue.enqueue({
            .handler = [resumer, awaitable] {
                awaitable->_taskHandler();
                resumer();
            }
        });
    }

    template <typename ResultType>
    void hpAsyncAwaitableSuspend(
        Awaitable<ResultType>* awaitable,
        AsyncTaskResumer resumer,
        CoroutineHandle& h
    ) {
        asyncpp::hp::AsyncHpTask operationItem{
            .resumeHandler = [h] {
                h.resume();
            },
            .taskHandler = [awaitable]() {
                awaitable->_taskResult = awaitable->_taskHandler();
            }
        };

        asyncpp::hp::AsyncHpTaskQueue::getInstance().enqueue(operationItem);
    }

    export template <Invocable T>
    auto asyncify(
        T taskHandler,
        AsyncTaskSuspender<std::invoke_result_t<T>> suspender =
        defaultAsyncAwaitableSuspend<std::invoke_result_t<T>>
    ) {
        return Awaitable<std::invoke_result_t<T>>{
            ._taskHandler = taskHandler,
                ._suspender = suspender
        };
    }

    export template <Invocable T>
    auto asyncify(
        T taskHandler,
        bool highPriority
    ) {
        if (highPriority) {
            return Awaitable<std::invoke_result_t<T>>{
                ._taskHandler = taskHandler,
                ._suspender = hpAsyncAwaitableSuspend<std::invoke_result_t<T>>
            };
        }
        return Awaitable<std::invoke_result_t<T>>{
            ._taskHandler = taskHandler,
            ._suspender = defaultAsyncAwaitableSuspend<std::invoke_result_t<T>>
        };
    }
}

你可以先关注一下第8行代码,这里我们导入了hp模块的符号。接着,还在第46行实现了hpAsyncAwaitableSuspend,用于实现高性能版本的调度。

最后,我们在第76行实现了类似于之前的asyncify工具,用于将一个普通的函数f转换成一个返回Awaitable对象的函数asyncF。通过这个分区实现的工具,可以让库的用户更容易使用coroutine。

这道题目的源代码,你可以从这里获取。

期中周即将告一段落,你可以再利用周末时间温习一下之前所学。有不明白的地方欢迎和我在留言区交流,下周我们继续回到课程主线,继续学习C++ Ranges的用法,敬请期待。

精选留言(1)
  • Family mission 👍(0) 💬(1)

    感觉asyncpp.task:asyncify 模块中36行使用单例模式构造AsyncHpTaskQueue而不是AsyncTaskQueue::getInstance(),写错了吧

    2023-12-12