[Qt] 运用QtConcurrent中的MapReduce模型进行简单的C++并行运算
阅读原文时间:2021年04月24日阅读:2

前言:

  开始写这个小程序的起因是项目开发中版本Release后交给工厂进行eMMC数据写入生产时候,事先需要计算一下从eMMC母片中导出的二进制BIN文件的CheckSum,以便与eMMC编程器写入时候计算的CheckSum对比是否一致。CheckSum的计算方式就是最简单的二进制校验和,因为数据量相对大一点,有可能会超过32bit长度,所以可以理解为CheckSum(64bit)。
  以前的项目中使用的是一个几行代码的Python脚本,打开一个文件然后一个字节一个字节的加上去。使用Python for Windows试了一下,效果感人,以前项目使用的8GB的eMMC,算一次CheckSum也要超过半小时的时间,这次新项目使用的是32GB的eMMC,算一次岂不是毛都要白了。公司开发用PC机CPU有的是i7,最低也是i5,不充分使用多核的优势岂不是浪费。在网上找了一下,也没找到符合需求的免费工具,于是决定自给自足,写个计算CheckSum的小程序完成这个工作。
  开始构思时候都没听说或使用过Qt下面的这个MapReduce模型,以前使用的比较多的是创建QThread对象,然后使用movetoThread方法让自定义的派生类的成员函数都运行在新创建的线程下,其他听说过的还有使用QThreadPool和QtConcurrent::run运行多线程代码的。
  之后偶然看到了Qt自带的QtConcurrent Progress Dialog Example中使用QtConcurrent::map来调用多线程执行函数,便深入了解了一下,发现QtConcurrent提供的这个MapReduce模型用来完成这种多线程并行计算是非常适合的,那些线程之间需要进行数据同步的问题已经全都考虑好了。


对MapReduce模型的基本介绍:

  MapReduce最早是由Google公司研究提出的一种面向大规模数据处理的并行计算模型和方法。Google公司设计MapReduce的初衷主要是为了解决其搜索引擎中大规模网页数据的并行化处理。Google公司发明了MapReduce之后首先用其重新改写了其搜索引擎中的Web文档索引处理系统。但由于MapReduce可以普遍应用于很多大规模数据的计算问题,因此自发明MapReduce以后,Google公司内部进一步将其广泛应用于很多大规模数据处理问题。到目前为止,Google公司内有上万个各种不同的算法问题和程序都使用MapReduce进行处理。

MapReduce是面向大数据并行处理的计算模型、框架和平台,它隐含了以下三层含义:

  1. MapReduce是一个基于集群的高性能并行计算平台(Cluster Infrastructure)。它允许用市场上普通的商用服务器构成一个包含数十、数百至数千个节点的分布和并行计算集群。
  2. MapReduce是一个并行计算与运行软件框架(Software Framework)。它提供了一个庞大但设计精良的并行计算软件框架,能自动完成计算任务的并行化处理,自动划分计算数据和计算任务,在集群节点上自动分配和执行任务以及收集计算结果,将数据分布存储、数据通信、容错处理等并行计算涉及到的很多系统底层的复杂细节交由系统负责处理,大大减少了软件开发人员的负担。
  3. MapReduce是一个并行程序设计模型与方法(Programming Model & Methodology)。它借助于函数式程序设计语言Lisp的设计思想,提供了一种简便的并行程序设计方法,用Map和Reduce两个函数编程实现基本的并行计算任务,提供了抽象的操作和并行编程接口,以简单方便地完成大规模数据的编程和计算处理。

文档中对Qt Concurrent的描述如下:

  The QtConcurrent namespace provides high-level APIs that make it possible to write multi-threaded programs without using low-level threading primitives such as mutexes, read-write locks, wait conditions, or semaphores. Programs written with QtConcurrent automatically adjust the number of threads used according to the number of processor cores available. This means that applications written today will continue to scale when deployed on multi-core systems in the future.
  QtConcurrent includes functional programming style APIs for parallel list processing, including a MapReduce and FilterReduce implementation for shared-memory (non-distributed) systems, and classes for managing asynchronous computations in GUI applications
The QtConcurrent namespace provides high-level APIs that make it possible to write multi-threaded programs without using low-level threading primitives.

  上文可见QtConcurrent提供的是用于共享内存(本机非分布式)方式的MapReduce和FilterReduce工具,用于数据列表的并行处理。接口封装为了high-level API,不必再去调用包括互斥锁(mutex)/读写锁(read-write lock)/信号量(semaphore)/状态等待(wait condition)等这些low-level的线程控制接口。并且使用QtConcurrent编写的程序会自动根据CPU的核心数来调整使用的线程数量,这意味着现在编写的程序即使将来被用在更高等级的CPU硬件上也可以充分的利用系统的硬件资源,不至于浪费硬件性能。

QtConcurrent中Map-Reduce和Filter-Reduce的接口说明。

Concurrent Map and Map-Reduce

QtConcurrent::map() applies a function to every item in a container, modifying the items in-place.
QtConcurrent::mapped() is like map(), except that it returns a new container with the modifications.
QtConcurrent::mappedReduced() is like mapped(), except that the modified results are reduced or folded into a single result.
map函数用于对数据列中的每一项调用特定函数进行处理,直接修改的是数据列中每一项的引用,所以结果直接反映到原始数据列上。
mapped函数与map函数类似,区别在于mapped函数可以返回一个新的数据类型,适用于进行数据处理后需要整理为新的数据结构的情况。
mappedReduced函数是更进一步的对每一项处理后的数据进行整合处理,可以通过reduceFunction来合并整理为一个最终处理结果。

Concurrent Filter and Filter-Reduce

QtConcurrent::filter() removes all items from a container based on the result of a filter function.
QtConcurrent::filtered() is like filter(), except that it returns a new container with the filtered results.
QtConcurrent::filteredReduced() is like filtered(), except that the filtered results are reduced or folded into a single result.
filter函数filter函数用于对数据列中的每一项调用特定函数进行过滤,特定的filterFunction返回true则保留此项,返回false则从数据列中移除此项。
filtered函数与filter函数类似,区别在于filtered函数可以返回一个新的数据类型,适用于进行数据过滤后需要整理出过滤结果的情况。
filteredReduced函数是更进一步的对每一项过滤后的过滤结果进行整合处理,可以通过reduceFunction来合并整理为一个最终过滤结果。


选择合适的处理函数

  由Qt提供的函数可以明确,对于计算和处理的情况,选用Map-Reduce方式更合适,如果是进行数据过滤则选用Filter-Reduce方式。
  对于大文件计算CheckSum,选择QtConcurrent::mappedReduced来处理应该是很合适的。大致思路是将大文件分割为固定大小的文件块(标记每块的起始offset即可),将切割后的各块起始offset整理为一个QList数据列表,其中的每一项传递给mapFunction,mapFunction中从offset位置开始读取指定长度的数据,并计算CheckSum,将计算结果返回给reduceFunction函数来进行汇总。最终reduceFunction中将每个文件块的CheckSum相加得到最终整个文件的CheckSum。

  Qt Concurrent支持几种STL兼容的容器和迭代器,但是使用QList和QVector这种Qt自带的支持随机访问迭代器的容器可以获得最佳的性能。
  Qt Concurrent supports several STL-compatible container and iterator types, but works best with Qt containers that have random-access iterators, such as QList or QVector. The map and filter functions accept both containers and begin/end iterators.


关键代码如下:

对文件分割后进行处理的结构体

typedef struct {
    int index;
    qint64 offset;
    quint64 length;
    quint64 checksum;
    Checksumer *checksumer_ptr;
} Split_st;

进行CheckSum计算的主要函数

void Checksumer::ChecksumProcesser()
{
    if (false == m_filepath.isEmpty()){
        QFileInfo fileInfo(m_filepath);

        if(fileInfo.isReadable()) {
            if (fileInfo.size() > 0){
            }
            else{
                qWarning("ChecksumProcesser::file size error : %lld", fileInfo.size());
                return;
            }
        }
        else{
            qWarning("ChecksumProcesser::File is unreadable!!!");
            return;
        }

        // Start Checksumming
        qint64 ChecksummingTime = 0;
        m_elapsedtime.restart();

        m_status = CHECKSUMER_CHECKSUMMING;

        // Prepare the QList
        m_splitlist.clear();

        qint64 filesize = fileInfo.size();

        int splitcount = split_roundup(filesize, EVERY_SPLIT_SIZE_SMALL);

        if (1 == splitcount){
#ifdef DEBUG_LOGOUT_ON
            // Get System ideal ThreadProcess Number
            int threadNumber = QThread::idealThreadCount();
            qDebug("ChecksumProcesser::file size : %lld, thread number : %d", filesize, threadNumber);
            qDebug("splitcount:%d", splitcount);
#endif
            emit Checksumer_RangeChangedSignal(0, 1);

            Split_st tempSplit;
            tempSplit.index = 0;
            tempSplit.length = filesize;
            tempSplit.checksum = 0;
            tempSplit.offset = 0;
            tempSplit.checksumer_ptr = this;

            Split_st result = splitChecksum(tempSplit);

            emit Checksumer_ValueChangedSignal(1);
            m_status = CHECKSUMER_COMPLETE;
            ChecksummingTime = m_elapsedtime.elapsed();
            emit Checksumer_ChecksumResultSignal(result.checksum, ChecksummingTime);
#ifdef DEBUG_LOGOUT_ON
            qreal elapsedtime = (qreal)(ChecksummingTime)/1000;
            qDebug("Checksum Result(0x%llX), TotalTime (%.2f) sec", result.checksum, elapsedtime);
#endif
        }
        else{
#ifdef DEBUG_LOGOUT_ON
            // Get System ideal ThreadProcess Number
            int threadNumber = QThread::idealThreadCount();
            qDebug("ChecksumProcesser::file size : %lld, thread number : %d", filesize, threadNumber);
#endif

            Split_st tempSplit;
            tempSplit.index = 0;
            tempSplit.length = 0;
            tempSplit.checksum = 0;
            tempSplit.offset = 0;
            tempSplit.checksumer_ptr = this;

#ifdef DEBUG_LOGOUT_ON
            qDebug("splitcount:%d", splitcount);
#endif

            for (int loop = 0; loop < splitcount - 1; loop++){
                tempSplit.index = loop;
                tempSplit.offset = (qint64)(EVERY_SPLIT_SIZE_SMALL * loop);
                tempSplit.length = EVERY_SPLIT_SIZE_SMALL;
                m_splitlist.append(tempSplit);
            }

            /* last split */
            tempSplit.index = splitcount - 1;
            tempSplit.offset = (qint64)(EVERY_SPLIT_SIZE_SMALL * (splitcount - 1));

            qint64 lastsplitsize = filesize%EVERY_SPLIT_SIZE_SMALL;
            if(0 == lastsplitsize){
                tempSplit.length = EVERY_SPLIT_SIZE_SMALL;
            }
            else{
                tempSplit.length = lastsplitsize;
            }
            m_splitlist.append(tempSplit);

            emit Checksumer_RangeChangedSignal(0, (splitcount - 1));

            quint64 final_checksum = QtConcurrent::mappedReduced(m_splitlist, Checksumer::splitChecksum, Checksumer::reduceResult, QtConcurrent::ReduceOptions(QtConcurrent::OrderedReduce | QtConcurrent::SequentialReduce));

            m_status = CHECKSUMER_COMPLETE;
            ChecksummingTime = m_elapsedtime.elapsed();
            emit Checksumer_ChecksumResultSignal(final_checksum, ChecksummingTime);
#ifdef DEBUG_LOGOUT_ON
            qreal elapsedtime = (qreal)(ChecksummingTime)/1000;
            qDebug("Checksum Result(0x%llX), TotalTime (%.2f) sec", final_checksum, elapsedtime);
#endif
        }

        m_elapsedtime.invalidate();
    }
    else{
        qWarning("ChecksumProcesser::File path is empty!!!");
        return;
    }
}

MapFunction

Split_st Checksumer::splitChecksum(const Split_st &split)
{
    //qDebug("split.index(%d), split.offset(%lld), split.length(%lld), in thread(%08X)", split.index, split.offset, split.length, (quint32)(QThread::currentThreadId()) );
    Split_st subchecksum;
    subchecksum = split;
    subchecksum.checksum = 0;

    QFile file(m_filepath);

    if(file.open(QIODevice::ReadOnly)) {
        qint64 offset = split.offset;
        if (file.seek(offset)){
            QByteArray filebuffer = file.read(split.length);

            foreach (const char &byte, filebuffer)
            {
                subchecksum.checksum += (quint8)byte;
            }
        }
        else{
            qWarning("seek failed: %lld", offset);
        }

        file.close();
    }

    return subchecksum;
}

ReduceFunction

void Checksumer::reduceResult(quint64 &checksum, const Split_st &split)
{
    checksum += split.checksum;
    emit split.checksumer_ptr->Checksumer_ValueChangedSignal(split.index);
#ifdef DEBUG_LOGOUT_ON
    qDebug("reduceResult:split.index(%d), checksum(0x%llX), offset(0x%llX), length(0x%llX), endaddress(0x%llX)", split.index, checksum, split.offset, split.length, (split.offset + split.length));
#endif
}

执行效率测试

对文件切分大小进行了适当调优之后计算8GB eMMC二进制BIN文件(7.28GB) CheckSum的时间大概100秒左右,计算32GB eMMC的BIN文件(29.1GB)的CheckSum时间388.8秒左右,基本是可以忍受了。

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章