利用 Windows 线程池定制的 4 种方式完成任务(Windows 核心编程)
阅读原文时间:2023年07月10日阅读:2
  • 说起底层的线程操作一般都不会陌生,Windows 提供了 CreateThread 函数来创建线程,为了同步线程的操作,Windows 提供了事件内核对象、互斥量内核对象、关键段、原子操作等等方式,但总体来说操作线程难度还是非常大的,需要考虑到诸多的问题,比如死锁等,能不能建立一种机制即提高性能又方便操作线程呢,正因为有这个需求 Windows 提出了线程池的概念,目前线程池可以做以下四个定制的工作,分别是:(1) 以异步方式调用一个函数 (2) 每隔一段时间调用一个函数 (3) 在内核对象触发时调用一个函数 (4) 在异步 I/O 请求时调用一个函数

  • 异步方式调用一个函数是所有线程池中最简单的一个操作

  • 来看看执行这个操作需要哪些函数:

    (1)PTP_WORK CreateThreadpoolWork (2)SubmitThreadpoolWork (3)WaitForThreadpoolWorkCallbacks (4)CloseThreadpoolWork (5)VOID CALLBACK WorkCallback(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work);

  • 值得注意的是 WaitForThreadpoolWorkCallbacks 函数,想象以下当某一个时刻有 2 个线程正在执行操作,而 3 个线程等待,如果我想等待这 5 个线程都执行完操作,那么向 WaitForThreadpoolWorkCallbacks 的第二个函数中传入 FALSE 即可,如果只等待 2 个线程完成的话传入 TRUE 即可。例子如下:

    #include
    #include
    #include
    using namespace std;

    DWORD ThreadPoolTest1(VOID);
    VOID CALLBACK WorkCallback(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work);

    int main(int argc, char **argv)
    {
    ThreadPoolTest1();
    return 0;
    }

    DWORD ThreadPoolTest1(VOID)
    {
    // 创建一个线程池工作项,第三个参数用于定制线程池
    PTP_WORK ThreadPool = CreateThreadpoolWork(WorkCallback, NULL, NULL);

    // 提交 4 次线程池请求,也就是运行 4 次线程池函数
    SubmitThreadpoolWork(ThreadPool);
    SubmitThreadpoolWork(ThreadPool);
    SubmitThreadpoolWork(ThreadPool);
    SubmitThreadpoolWork(ThreadPool);
    
    // TRUE 为只能取消尚未运行的任务,FALSE 是等待所有的任务完成
    WaitForThreadpoolWorkCallbacks(ThreadPool, FALSE);
    
    // 销毁线程池工作项
    CloseThreadpoolWork(ThreadPool);
    return TRUE;

    }

    // 线程池函数原型
    VOID CALLBACK WorkCallback(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work)
    {
    // 模拟工作 600 毫秒
    cout << "WorkCallback" << endl;
    Sleep(600);
    }

  • 每个一段时间调用一个函数,稍微复杂一点

  • 看看需要调用哪些函数:

    (1)VOID CALLBACK TimeoutCallback(PTP_CALLBACK_INSTANCE pInstance, PVOID pvContext, PTP_TIMER Work) (2)PTP_TIMER CreateThreadpoolTimer() (3)IsThreadpoolTimerSet() (4)SetThreadpoolTimer() (5)WaitForThreadpoolTimerCallbacks() (6)CloseThreadpoolTimer()

  • 例子如下:

    #include
    #include
    #include
    using namespace std;

    DWORD ThreadPoolTest2(VOID);
    VOID CALLBACK TimeoutCallback(PTP_CALLBACK_INSTANCE pInstance, PVOID pvContext, PTP_TIMER Work);

    // 控制线程变量
    LONG Statistics;

    int main(int argc, char **argv)
    {
    ThreadPoolTest2();
    return 0;
    }

    DWORD ThreadPoolTest2(VOID)
    {
    // 创建一个定时器线程池对象
    PTP_TIMER ThreadPoolTimer = CreateThreadpoolTimer(TimeoutCallback, NULL, NULL);

    // 判断线程池的定时器是否已经被设置过,其实定时器可以多次被设置
    if (IsThreadpoolTimerSet(ThreadPoolTimer))
    {
        cout << "该定时器已经被设置" << endl;
        return FALSE;
    }
    
    // 设置定时器的时间
    LARGE_INTEGER  time; time.QuadPart = -1; // -1 比较特殊表示立即执行,负数表示相对时间,正数表示绝对时间
    FILETIME filetime; filetime.dwLowDateTime = time.LowPart; filetime.dwHighDateTime = time.HighPart;
    SetThreadpoolTimer(ThreadPoolTimer, &filetime, 1000, 0);
    
    // 等待线程池中的线程操作完成
    while (Statistics < 5); Sleep(100);
    WaitForThreadpoolTimerCallbacks(ThreadPoolTimer, FALSE);
    
    // 销毁定时器线程池对象
    CloseThreadpoolTimer(ThreadPoolTimer);
    return TRUE;

    }

    // 定时器线程池线程原型
    VOID CALLBACK TimeoutCallback(PTP_CALLBACK_INSTANCE pInstance, PVOID pvContext, PTP_TIMER Work)
    {
    cout << "[*] TimeoutCallback" << endl;
    InterlockedExchangeAdd(&Statistics, 1);
    }

  • 值得注意的是 WaitForThreadpoolTimerCallbacks 并不具有等待全部的线程池线程操作完成,所以我利用原子操作等待 Statistics 变为 5;SetThreadpoolTimer 的最后一个参数用于将多个计时器分为一组,这个参数用的比较少。运行结果如下:

  • 什么是内核对象,内核对象就是内核的一个对象,可以被用于完成线程的同步问题。所有的内核对象都有两种状态,一个是触发态一个是非触发态。所以完成这样一个需求的线程池首先需要设置一个非触发态的内核对象,并且和这个线程池相互绑定,等到这个内核对象变为触发状态时该线程池就会调用一个函数来处理接下来的问题

  • 来看看完成这个定制的线程池需要哪些函数:

    (1) CreateThreadpoolWait():创建一个等待内核对象的线程池对象 (2) SetThreadpoolWait():将线程池函数和内核对象绑定 (3) WaitForThreadpoolWaitCallbacks():等待线程池函数操作完成 (4) CloseThreadpoolWait():销毁等待内核对象的线程池对象 (5) VOID CALLBACK WaitCallback(PTP_CALLBACK_INSTANCE pInstance, PVOID Context, PTP_WAIT Wait, TP_WAIT_RESULT WaitResult):线程池函数原型

  • 例子如下:

    #include
    #include
    #include
    using namespace std;

    DWORD ThreadPoolTest3(VOID);
    VOID CALLBACK WaitCallback(PTP_CALLBACK_INSTANCE pInstance, PVOID Context, PTP_WAIT Wait, TP_WAIT_RESULT WaitResult);

    // 用于控制线程的计数
    LONG ThreadCount;

    int main(int argc, char **argv)
    {
    ThreadPoolTest3();
    return 0;
    }

    DWORD ThreadPoolTest3(VOID)
    {
    PTP_WAIT ThreadPool[5]; HANDLE Event;

    // 创建一个事件内核对象
    Event = CreateEvent(NULL, TRUE, FALSE, NULL);
    
    for (size_t i = 0; i < 5; i++)
    {
        // 创建一个等待内核对象为触发状态的线程池,并且将线程池等待对象和事件内核对象相互绑定
        ThreadPool[i] = CreateThreadpoolWait(WaitCallback, NULL, NULL);
        SetThreadpoolWait(ThreadPool[i], Event, 0);
    }
    
    // 将内核对象变为触发状态
    SetEvent(Event);
    
    // 等待操作完成
    while (ThreadCount < 5) Sleep(0);
    
    // 关闭等待内核对象触发的工作对象
    for (size_t i = 0; i < 5; i++)
    {
        CloseThreadpoolWait(ThreadPool[i]);
    }
    return TRUE;

    }

    // 完成当内核对象触发线程池的线程函数原型
    VOID CALLBACK WaitCallback(PTP_CALLBACK_INSTANCE pInstance, PVOID Context, PTP_WAIT Wait, TP_WAIT_RESULT WaitResult)
    {
    // SetThreadpoolWait(ThreadWait, Event, NULL);
    cout << "WaitCallback" << endl;
    InterlockedExchangeAdd(&ThreadCount, 1);
    }

  • 在异步 I/O 完成时调用一个函数,这个实现起来较为复杂

  • 来看看使用这个线程池需要哪些函数

    (1) CreateThreadpoolIo:创建一个等待 I/O 完成的线程池对象 (2) StartThreadpoolIo:完成端口关联,且需要在 ReadFile 和 WriteFile 函数之前调用 (3) WaitForThreadpoolIoCallbacks:等待线程池中的线程操作完成 (4) CloseThreadpoolIo:销毁线程池对象 (5) VOID CALLBACK OverlappedCompletionRoutine(PTP_CALLBACK_INSTANCE pInstance, PVOID pvContext, PVOID pOverlapped, ULONG IoResult, ULONG_PTR NumberOfBytesTransferred, PTP_IO pIo):线程池对象函数原型

  • 例子如下:

    #include
    #include
    #include
    using namespace std;

    DWORD ThreadPoolTest4(VOID);
    VOID CALLBACK OverlappedCompletionRoutine(PTP_CALLBACK_INSTANCE pInstance, PVOID pvContext, PVOID pOverlapped, ULONG IoResult, ULONG_PTR NumberOfBytesTransferred, PTP_IO pIo);

    HANDLE file;
    DWORD Size;
    PBYTE BufferFile;

    int main(int argc, char **argv)
    {
    ThreadPoolTest4();
    return 0;
    }

    DWORD ThreadPoolTest4(VOID)
    {
    // D盘 post.txt 打开文件
    DWORD LastError;
    file = CreateFile(TEXT("D:\post.txt"), GENERIC_READ, 0, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL);
    LastError = GetLastError();

    // 获取文件的大小
    LARGE_INTEGER FileSize = { 0 };
    GetFileSizeEx(file, &FileSize);
    Size = FileSize.LowPart;
    
    // 创建一个等待 I/O 完成的线程池对象
    PTP_IO ThreadIO = CreateThreadpoolIo(file, OverlappedCompletionRoutine, NULL, NULL);
    StartThreadpoolIo(ThreadIO);
    
    // 用于 ReadFile 读取文件数据的缓冲区
    BufferFile = (PBYTE)malloc(Size);
    OVERLAPPED overlapped = { 0 }; overlapped.Offset = 0;
    ReadFile(file, BufferFile, Size, NULL, &overlapped);
    
    // 等待线程池对象
    WaitForThreadpoolIoCallbacks(ThreadIO, FALSE);
    
    // 销毁等待 I/O 完成的线程池对象
    CloseThreadpoolIo(ThreadIO);
    return TRUE;

    }

    // 等待 I/O 完成的线程池对象线程原型
    VOID CALLBACK OverlappedCompletionRoutine(PTP_CALLBACK_INSTANCE pInstance, PVOID pvContext, PVOID pOverlapped, ULONG IoResult, ULONG_PTR NumberOfBytesTransferred, PTP_IO pIo)
    {
    cout << "文件内容" << BufferFile << endl;
    }

  • 打印结果如下:

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章