利用并行和函数式编程提高运算效率

简介

前文介绍了async用法,很多朋友说用的不多,我对async的理解就是开辟一个一次性的线程执行并行任务,主线程可以通过future在合适的时机执行等待汇总结果。本文通过并行和函数式编程,演示快速排序提升效率的一种方式。

快速排序

快速排序(Quick Sort)是一种高效的排序算法,采用分治法的思想进行排序。以下是快速排序的基本步骤:

  1. 选择一个基准元素(pivot):从数组中选择一个元素作为基准元素。选择基准元素的方式有很多种,常见的是选择数组的第一个元素或最后一个元素。
  2. 分区(partitioning):重新排列数组,把比基准元素小的元素放在它的左边,把比基准元素大的元素放在它的右边。在这个过程结束时,基准元素就处于数组的最终位置。
  3. 递归排序子数组:递归地对基准元素左边和右边的两个子数组进行快速排序。

以下是一个基本的快速排序的C++实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//c++ 版本的快速排序算法
template<typename T>
void quick_sort_recursive(T arr[], int start, int end) {
if (start >= end) return;
T key = arr[start];
int left = start, right = end;
while(left < right) {
while (arr[right] >= key && left < right) right--;
while (arr[left] <= key && left < right) left++;
std::swap(arr[left], arr[right]);
}

if (arr[left] < key) {
std::swap(arr[left], arr[start]);
}


quick_sort_recursive(arr, start, left - 1);
quick_sort_recursive(arr, left + 1, end);
}

template<typename T>
void quick_sort(T arr[], int len) {
quick_sort_recursive(arr, 0, len - 1);
}

排序演示

假设一开始序列{xi}是:5,3,7,6,4,1,0,2,9,10,8。

此时,ref=5,i=1,j=11,从后往前找,第一个比5小的数是x8=2,因此序列为:2,3,7,6,4,1,0,5,9,10,8。

此时i=1,j=8,从前往后找,第一个比5大的数是x3=7,因此序列为:2,3,5,6,4,1,0,7,9,10,8。

此时,i=3,j=8,从第8位往前找,第一个比5小的数是x7=0,因此:2,3,0,6,4,1,5,7,9,10,8。

此时,i=3,j=7,从第3位往后找,第一个比5大的数是x4=6,因此:2,3,0,5,4,1,6,7,9,10,8。

此时,i=4,j=7,从第7位往前找,第一个比5小的数是x6=1,因此:2,3,0,1,4,5,6,7,9,10,8。

此时,i=4,j=6,从第4位往后找,直到第6位才有比5大的数,这时,i=j=6,ref成为一条分界线,它之前的数都比它小,之后的数都比它大,对于前后两部分数,可以采用同样的方法来排序。

调用比较简单

1
2
3
4
5
6
7
8
9
10
11
12
void test_quick_sort() {

int num_arr[] = { 537641029108 };
int length = sizeof(num_arr) / sizeof(int);
quick_sort(num_arr, length );
std::cout << "sorted result is ";
for (int i = 0; i < length; i++) {
std::cout << " " << num_arr[i];
}

std::cout << std::endl;
}

这种实现方式比较依赖存储数据的数据结构,比如上面是通过数组存储的,那如果我想实现list容器中元素的排序怎么办?我既不想关注存储的容器,也不想关注存储的类型,想实现一套通用的比较规则?那就需要函数式编程来解决

函数式编程

C++函数式编程是一种编程范式,它将计算视为数学上的函数求值,并避免改变状态和使用可变数据。在函数式编程中,程序是由一系列函数组成的,每个函数都接受输入并产生输出,而且没有任何副作用。

在C++中,函数式编程可以使用函数指针、函数对象(functor)和lambda表达式等机制来实现。这些机制允许您编写可以像普通函数一样调用的代码块,并将它们作为参数传递给其他函数或作为返回值返回。

C++11引入了一些新功能,如constexpr函数和表达式模板,这些功能使得在C++中进行函数式编程更加容易和直观。

我们用函数式编程修改上面的快速排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
template<typename T>
std::list<T> sequential_quick_sort(std::list<T> input)
{
if (input.empty())
{
return input;
}
std::list<T> result;

// ① 将input中的第一个元素放入result中,并且将这第一个元素从input中删除
result.splice(result.begin(), input, input.begin());

// ② 取result的第一个元素,将来用这个元素做切割,切割input中的列表。
T const& pivot = *result.begin();

// ③std::partition 是一个标准库函数,用于将容器或数组中的元素按照指定的条件进行分区,
// 使得满足条件的元素排在不满足条件的元素之前。
// 所以经过计算divide_point指向的是input中第一个大于等于pivot的元素
auto divide_point = std::partition(input.begin(), input.end(),
[&](T const& t) {return t < pivot; });

// ④ 我们将小于pivot的元素放入lower_part中
std::list<T> lower_part;
lower_part.splice(lower_part.end(), input, input.begin(),
divide_point);

// ⑤我们将lower_part传递给sequential_quick_sort 返回一个新的有序的从小到大的序列
//lower_part 中都是小于divide_point的值
auto new_lower(
sequential_quick_sort(std::move(lower_part)));
// ⑥我们剩余的input列表传递给sequential_quick_sort递归调用,input中都是大于divide_point的值。
auto new_higher(
sequential_quick_sort(std::move(input)));
//⑦到此时new_higher和new_lower都是从小到大排序好的列表
//将new_higher 拼接到result的尾部
result.splice(result.end(), new_higher);
//将new_lower 拼接到result的头部
result.splice(result.begin(), new_lower);
return result;
}

用如下方式调用

1
2
3
4
5
6
7
8
9
void test_sequential_quick() {
std::list<int> numlists = { 6,1,0,7,5,2,9,-1 };
auto sort_result = sequential_quick_sort(numlists);
std::cout << "sorted result is ";
for (auto iter = sort_result.begin(); iter != sort_result.end(); iter++) {
std::cout << " " << (*iter);
}
std::cout << std::endl;
}

这个函数是一个使用快速排序算法对链表进行排序的实现。快速排序是一种常用的排序算法,它的基本思想是选择一个基准元素,然后将数组分为两部分,一部分是小于基准元素的元素,另一部分是大于基准元素的元素。然后对这两部分再分别进行快速排序。这个函数使用了C++的模板,可以处理任何数据类型的链表。函数的主要步骤包括:

  1. 将链表的第一个元素作为基准元素,并将其从链表中删除。

  2. 使用std::partition函数将链表分为两部分,一部分是小于基准元素的元素,另一部分是大于或等于基准元素的元素。

  3. 对这两部分分别进行递归排序。\n4. 将排序后的两部分和基准元素合并,返回排序后的链表。

并行方式

我们提供并行方式的函数式编程,可以极大的利用cpu多核的优势,这在并行计算中很常见。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//并行版本
template<typename T>
std::list<T> parallel_quick_sort(std::list<T> input)
{
if (input.empty())
{
return input;
}
std::list<T> result;
result.splice(result.begin(), input, input.begin());
T const& pivot = *result.begin();
auto divide_point = std::partition(input.begin(), input.end(),
[&](T const& t) {return t < pivot; });
std::list<T> lower_part;
lower_part.splice(lower_part.end(), input, input.begin(),
divide_point);
// ①因为lower_part是副本,所以并行操作不会引发逻辑错误,这里可以启动future做排序
std::future<std::list<T>> new_lower(
std::async(&parallel_quick_sort<T>, std::move(lower_part)));

// ②
auto new_higher(
parallel_quick_sort(std::move(input)));
result.splice(result.end(), new_higher);
result.splice(result.begin(), new_lower.get());
return result;
}

测试调用如下

1
2
3
4
5
6
7
8
9
void test_sequential_quick() {
std::list<int> numlists = { 6,1,0,7,5,2,9,-1 };
auto sort_result = sequential_quick_sort(numlists);
std::cout << "sorted result is ";
for (auto iter = sort_result.begin(); iter != sort_result.end(); iter++) {
std::cout << " " << (*iter);
}
std::cout << std::endl;
}

我们对lower_part的排序调用了std::async并行处理。而higher_part则是串行执行的。这么做提高了计算的并行能力,但有人会问如果数组的大小为1024,那么就是2的10次方,需要启动10个线程执行,这仅是对一个1024大小的数组的排序,如果有多个数组排序,开辟线程会不会很多?其实不用担心这个,因为async的实现方式在上一节中已经提及了,是通过std::launch::async或者std::launch::deffered完成的。编译器会计算当前能否开辟线程,如果能则使用std::launch::async模式开辟线程,如果不能则采用std::launch::deffered串行执行。当然,我们也可以通过我们上文提及的线程池方式实现并行计算

ThreadPool方式的并行排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//线程池版本
//并行版本
template<typename T>
std::list<T> thread_pool_quick_sort(std::list<T> input)
{
if (input.empty())
{
return input;
}
std::list<T> result;
result.splice(result.begin(), input, input.begin());
T const& pivot = *result.begin();
auto divide_point = std::partition(input.begin(), input.end(),
[&](T const& t) {return t < pivot; });
std::list<T> lower_part;
lower_part.splice(lower_part.end(), input, input.begin(),
divide_point);
// ①因为lower_part是副本,所以并行操作不会引发逻辑错误,这里投递给线程池处理
auto new_lower = ThreadPool::commit(&parallel_quick_sort<T>, std::move(lower_part));
// ②
auto new_higher(
parallel_quick_sort(std::move(input)));
result.splice(result.end(), new_higher);
result.splice(result.begin(), new_lower.get());
return result;
}

通过如下方式测试

1
2
3
4
5
6
7
8
9
void test_thread_pool_sort() {
std::list<int> numlists = { 6,1,0,7,5,2,9,-1 };
auto sort_result = thread_pool_quick_sort(numlists);
std::cout << "sorted result is ";
for (auto iter = sort_result.begin(); iter != sort_result.end(); iter++) {
std::cout << " " << (*iter);
}
std::cout << std::endl;
}

到此我们实现了多种版本的快速排序,并不是鼓励读者造轮子,而是提供一种并行处理的思想,相信读者在后续的工作中在合适的时机采用并行处理的方式,可以极大的提高程序处理问题的效率。

总结

本文介绍了如何使用future, promise以及async用法

视频链接

https://space.bilibili.com/271469206/channel/collectiondetail?sid=1623290

源码链接

https://gitee.com/secondtonone1/boostasio-learn