再谈多线程模型之生产者消费者(单一生产者和多消费者 )(c++11实现)
阅读原文时间:2023年07月08日阅读:1

为缩短篇幅,本系列记录如下:

再谈多线程模型之生产者消费者(基础概念)(c++11实现)

再谈多线程模型之生产者消费者(单一生产者和单一消费者)(c++11实现)

再谈多线程模型之生产者消费者(单一生产者和多消费者)(c++11实现)本文

再谈多线程模型之生产者消费者(多生产者和单一消费者 )(c++11实现)

再谈多线程模型之生产者消费者(多生产者和多消费者 )(c++11实现)

再谈多线程模型之生产者消费者(总结)(c++11实现)

本文涉及到的代码演示环境: VS2017

欢迎留言指正

  • 1.1 生产者和消费者存在互斥与同步

  • 1.2 生产者只有一个,所以,不存在生产者之间互斥

  • 1.3 消费者有个,所以,消费者之间存在互斥。需要考虑到,消费者同时从缓冲区中拿出数据的情况,考虑吃水果的情况,当过盘中放入了多个水果,儿子和女儿就可以同时拿取。当过盘中只有一个水果时,两个消费者,怎么拿?谁先拿谁先吃。类似线程中的锁,谁先拿到锁,谁就能用。 基于 单一生产者&单一消费者中结构体,一个互斥已经不够用了,那就再来一个。

  • 1.4 总结: 到底是生产的快还是消费的快?既然是快,那要怎么处理?如下:

    情况

    处理

    生产者速率 > 消费者速率

    最开始,生产者只有一个,生产一件商品放入缓冲区,但是此时存在多个消费者,处理方法和下面的情况是一致的。慢慢的,就会出现: 商品数量>消费者数量 和 商品数量 < 消费者数量 两种情况出现。当出现商品数量>消费者数量时,需要保证多个消费者不能消费同一个数据,而且,既然有多个商品,那么,消费者之间消费就无需等待了

    生产者速率 < 消费者速率

    生产者只有一个,不存在生产者之间的冲突;然而消费者存在多个,多个消费者之间存在竞争,既然是竞争,那就需要锁,哪个线程先拿到锁,就先消费;因为生产速率跟不上消费速率,所以,消费者与生产者之间不存在冲突

根据上面可知道,对比 单一生产者&单一消费者 的代码,可以知道,仅仅多了消费者之间的竞争。

  • 2.1 结构体模型

    template
    struct repo_
    {
    // 用作互斥访问缓冲区
    std::mutex _mtx_queue;

    // 缓冲区最大size
    unsigned int            _count_max_queue_10 = 10;
    
    // 缓冲区
    std::queue<T>           _queue;
    
    // 缓冲区没有满,通知生产者继续生产
    std::condition_variable _cv_queue_not_full;
    
    // 缓冲区不为空,通知消费者继续消费
    std::condition_variable _cv_queue_not_empty;
    
    // 用于消费者之间的竞争
    std::mutex              _mtx_con;
    // 计算当前已经消费多少数据了
    unsigned int            _cnt_cur_con = 0;
    
    repo_(const unsigned int count_max_queue = 10) :_count_max_queue_10(count_max_queue)
        , _cnt_cur_con(0)
    
    {
        ;
    }
    
    repo_(const repo_&instance) = delete;
    repo_& operator = (const repo_& instance) = delete;
    repo_(const repo_&&instance) = delete;
    repo_& operator = (const repo_&& instance) = delete;

    };

结构体仅仅增加了下面的几行代码

// 用于消费者之间的竞争
std::mutex                _mtx_con;
// 计算当前已经消费多少数据了
unsigned int            _cnt_cur_con = 0;
  • 2.2 消费者线程之间的竞争怎么实现?已经有了用于消费者之间的锁,用法如下:

    template< typename T >
    void thread_con(const int thread_index, repo* param_repo)
    {
    if (nullptr == param_repo || NULL == param_repo)
    return;

    while (true)
    {
        bool is_running = true;
        {
            std::unique_lock<std::mutex> lock(param_repo->_mtx_con);
            // 还没消费到指定的数目,继续消费
            if (param_repo->_cnt_cur_con < cnt_total_10)
            {
                thread_consume_item<T>(thread_index, *param_repo);
                ++param_repo->_cnt_cur_con;
            }
            else
                is_running = false;
    }
    
    std::this_thread::sleep_for(std::chrono::microseconds(16));
    
    // 结束线程
    if ((!is_running))
        break;
    }

    }

因为消费者之间存在竞争,所以,消费者一开始就需要竞争锁,哪个先拿到锁就先消费。

  • 2.3 完整源码

    #pragma once

    #include
    #include
    #include
    #include
    #include
    #include

    std::mutex _mtx;
    std::condition_variable _cv_not_full;
    std::condition_variable _cv_not_empty;

    const int max_queue_size_10 = 10;

    enum
    {
    // 总生产数目
    cnt_total_10 = 10,
    };

    template
    struct repo_
    {
    // 用作互斥访问缓冲区
    std::mutex _mtx_queue;

    // 缓冲区最大size
    unsigned int            _count_max_queue_10 = 10;
    
    // 缓冲区
    std::queue<T>           _queue;
    
    // 缓冲区没有满,通知生产者继续生产
    std::condition_variable _cv_queue_not_full;
    
    // 缓冲区不为空,通知消费者继续消费
    std::condition_variable _cv_queue_not_empty;
    
    // 用于消费者之间的竞争
    std::mutex              _mtx_con;
    // 计算当前已经消费多少数据了
    unsigned int            _cnt_cur_con = 0;
    
    repo_(const unsigned int count_max_queue = 10) :_count_max_queue_10(count_max_queue)
        , _cnt_cur_con(0)
    
    {
        ;
    }
    
    repo_(const repo_&instance) = delete;
    repo_& operator = (const repo_& instance) = delete;
    repo_(const repo_&&instance) = delete;
    repo_& operator = (const repo_&& instance) = delete;

    };

    template
    using repo = repo_;

    //----------------------------------------------------------------------------------------

    // 生产者生产数据
    template
    void thread_produce_item(const int &thread_index, repo& param_repo, const T& repo_item)
    {
    std::unique_lock lock(param_repo._mtx_queue);

    // 1. 生产者只要发现缓冲区没有满, 就继续生产
    param_repo._cv_queue_not_full.wait(lock, [&] { return param_repo._queue.size() < param_repo._count_max_queue_10; });
    
    // 2. 将生产好的商品放入缓冲区
    param_repo._queue.push(repo_item);
    
    // log to console
    std::cout << "生产者" << thread_index << "生产数据:" << repo_item << "\n";
    
    // 3. 通知消费者可以消费了
    //param_repo._cv_queue_not_empty.notify_one();
    param_repo._cv_queue_not_empty.notify_one();

    }

    //----------------------------------------------------------------------------------------
    // 消费者消费数据

    template
    T thread_consume_item(const int thread_index, repo& param_repo)
    {
    std::unique_lock lock(param_repo._mtx_queue);

    // 1. 消费者需要等待【缓冲区不为空】的信号
    param_repo._cv_queue_not_empty.wait(lock, [&] {return !param_repo._queue.empty(); });
    
    // 2. 拿出数据
    T item;
    item = param_repo._queue.front();
    param_repo._queue.pop();
    
    std::cout << "消费者" << thread_index << "从缓冲区中拿出一组数据:" << item << std::endl;
    
    // 3. 通知生产者,继续生产
    param_repo._cv_queue_not_full.notify_one();
    
    return item;

    }

    //----------------------------------------------------------------------------------------

    /**

    • @ brief: 生产者线程
    • @ thread_index - 线程标识,区分是哪一个线程
    • @ count_max_produce - 最大生产次数
    • @ param_repo - 缓冲区
    • @ return - void

    / template< typename T > void thread_pro(const int thread_index, const int count_max_produce, repo param_repo)
    {
    if (nullptr == param_repo || NULL == param_repo)
    return;

    for (int item = 0; item < count_max_produce; ++item)
    {
        thread_produce_item<T>(thread_index, *param_repo, item);
    std::this_thread::sleep_for(std::chrono::microseconds(16));
    }

    }

    /**

    • @ brief: 消费者线程
    • @ thread_index - 线程标识,区分线程
    • @ param_repo - 缓冲区
    • @ return - void

    / template< typename T > void thread_con(const int thread_index, repo param_repo)
    {
    if (nullptr == param_repo || NULL == param_repo)
    return;

    while (true)
    {
        bool is_running = true;
        {
            std::unique_lock<std::mutex> lock(param_repo->_mtx_con);
            // 还没消费到指定的数目,继续消费
            if (param_repo->_cnt_cur_con < cnt_total_10)
            {
                thread_consume_item<T>(thread_index, *param_repo);
                ++param_repo->_cnt_cur_con;
            }
            else
                is_running = false;
    }
    
    std::this_thread::sleep_for(std::chrono::microseconds(16));
    
    // 结束线程
    if ((!is_running))
        break;
    }

    }

    // 入口函数
    //----------------------------------------------------------------------------------------

    int main(int argc, char *argv[], char *env[])
    {
    // 缓冲区
    repo repository;
    // 线程池
    std::vector vec_thread;

    // 生产者
    vec_thread.push_back(std::thread(thread_pro<int>, 1, cnt_total_10, &repository));
    
    // 消费者
    vec_thread.push_back(std::thread(thread_con<int>, 1, &repository));
    vec_thread.push_back(std::thread(thread_con<int>, 2, &repository));
    
    for (auto &item : vec_thread)
    {
        item.join();
    }
    
    return 0;

    }

入口函数创建了1个消费者和两个消费者。 消费者代号分别为 1 和 2。

  • 2.4 可能结果