title: 几种简单并行算法的实现(for_each,find以及partial_sum)
date: 2024-01-29 19:56:02
tags: C++
categories: C++
简介
前文介绍了几种数据划分的方式,包括按照线程数量划分,按照递归方式划分,以及按照任务类型划分等。
本文结合之前的划分方式,基于stl的find, for_each以及partial_sum等算法实现并行版本。
并行版本for_each
实现并行的for_each,最简单的方式就是将数据划分,每个线程分别处理一段连续的数据即可。
在介绍并行版本之前,我们先实现一个管理线程 的类join_threads,用来管控线程防止线程过早退出
1 | class join_threads |
接下来我们实现第一种方式
1 | template<typename Iterator, typename Func> |
1 我们规定如果处理的数量不超过25个则用单线程。否则根据处理的数量划分任务,计算开辟的线程数,如果要开辟的线程数大于内核线程的数量,则以内核线程数为准。
2 根据实际开辟的线程数num_threads计算每个线程处理的块大小。并且初始化两个vector,分别用来存储处理结果的future和处理任务的线程。
3 我们在(2处)代码生成了一个任务task,然后获取future赋值给vector对应下标为i的future元素,并且把任务绑定给对应下标为i的thread。
4 numthreads-1个线程并行处理for_each,剩下的主线程处理余下的for_each,最后通过futures.get汇总
第二种划分方式是我们采取递归的方式,我们知道采用递归的方式无法提前开辟准确数量的线程,我们采用async帮我们完成这个任务
1 | template<typename Iterator, typename Func> |
async可以帮助我们判断是否需要开启线程还是自动串行执行。每次我们将要处理的数据一分为2,前半部分交给一个async开辟线程处理,后半部分在本线程处理。而所谓的本线程不一定是主线程,因为我们通过async递归执行parallel_for_each,也就相当于在一个线程里独立执行了。
find的并行实现
find 的并行查找方式还是分两种,一种是将要查找的区间划分为几个段,每段交给一个线程查找。
另一种是采用递归的方式每次折半,前半部分交给一个线程查找,后半部分留在本线程查找。
我们先说第一种
find比较特殊,我们要防止线程忙等待,也要防止线程在其他线程已经查找到值后做无谓的浪费。可以用一个共享的全局atomic变量表示是否找到目标。
因为主线程要获取某个线程查找到的迭代器位置,所以我们用promise 设置 value为迭代器
1 | template<typename Iterator, typename MatchType> |
1 find_element重载了()运算符,接受四个参数,分别是迭代器的开始,迭代起的结束,要查找的数值,以及用来通知外部的promise,还有线程之间用来检测是否有某个线程完成查找的原子变量。
2 find_element重载()的逻辑就是查找这个区间内满足某个值的位置,并将这个位置的迭代起设置到promise中,然后将完成的原子变量标记为true。
说第二种方式,利用递归折半查找,我们可以用async帮助我们完成并行任务。
1 | template<typename Iterator, typename MatchType> |
1 并行查找的方式种我们先根据长度是否小于50决定是否开启并行任务,如果小于50则采取单线程方式。
2 如果采用并行的方式,我们将长度折半,前半部分交给async,后半部分交给本线程。
3 最后我们在主线程中汇合,获取结果。
partial_sum并行版本
C++ 提供了累计计算求和的功能,比如一个vector中存储的数据为{1,2,3},那么经过计算,第一个元素仍然为1,第二个元素为1+2, 第三个元素为1+2+3,结果为{1,3,6}.
关于并行版本我们可以这么思考,假设元数组为{1,2,3,4,5,6,7},那我们可以划分为三个部分,第一部分为{1,2,3}交给第一个线程处理, 第二部分{4,5,6}交给第二个线程处理,7交给本线程处理。
但是我们要考虑的一个问题是线程2要用到线程1最后计算的结果,线程1计算后{1,3,6},线程2需要用到6做累加,我们可以先让线程1计算出第3个元素值6,再将这个6传递给线程2,剩下的就可以并行计算了。同样的道理本线程要处理最后一个元素的累加结果,他需要等到线程2处理完第6个元素的值。
所以基本思路是每个线程优先处理分区的最后一个元素,通过promise设置给其他线程,在这个阶段线程之间是串行的,等到所有线程都开始计算其他位置后就是并行了。
1 | template<typename Iterator> |
1 定义了process_chunk类,重载了()运算符,在重载的逻辑里我们先计算区间内的partial_sum累计求和(2处)
2 因为我们处理的区间不一定是首个区间,也就是他还需要加上前面区间处理得出的最后一个元素的值,所以我们通过previouse_end_value判断本区间不是首个区间,并且加上前面处理的结果。优先将最后一个值计算出来设置给promise。然后在利用for_each遍历计算其他位置的值。
总结
本文介绍了如何并行设计stl的相关算法,读者有好的思路可以互相交流一下。
测试代码和项目代码链接:
https://gitee.com/secondtonone1/boostasio-learn/tree/master/concurrent/day21-ParallenAlgorithm
视频链接
https://space.bilibili.com/271469206/channel/collectiondetail?sid=1623290