C++ 并发三剑客future, promise和async

简介

本文介绍C++ 并发三剑客, future, promise以及async用法。这三个类是实现并发技术的关键,接下来详细介绍一下

async用法

std::async 是一个用于异步执行函数的模板函数,它返回一个 std::future 对象,该对象用于获取函数的返回值。

以下是一个使用 std::async 的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <iostream>
#include <future>
#include <chrono>

// 定义一个异步任务
std::string fetchDataFromDB(std::string query) {
// 模拟一个异步任务,比如从数据库中获取数据
std::this_thread::sleep_for(std::chrono::seconds(5));
return "Data: " + query;
}

int main() {
// 使用 std::async 异步调用 fetchDataFromDB
std::future<std::string> resultFromDB = std::async(std::launch::async, fetchDataFromDB, "Data");

// 在主线程中做其他事情
std::cout << "Doing something else..." << std::endl;

// 从 future 对象中获取数据
std::string dbData = resultFromDB.get();
std::cout << dbData << std::endl;

return 0;
}

在这个示例中,std::async 创建了一个新的线程(或从内部线程池中挑选一个线程)并自动与一个 std::promise 对象相关联。std::promise 对象被传递给 fetchDataFromDB 函数,函数的返回值被存储在 std::future 对象中。在主线程中,我们可以使用 std::future::get 方法从 std::future 对象中获取数据。注意,在使用 std::async 的情况下,我们必须使用 std::launch::async 标志来明确表明我们希望函数异步执行。

上面的例子输出

1
2
Doing something else...
Data: Data

async的启动策略

std::async函数可以接受几个不同的启动策略,这些策略在std::launch枚举中定义。除了std::launch::async之外,还有以下启动策略:

  1. std::launch::deferred:这种策略意味着任务将在调用std::future::get()std::future::wait()函数时延迟执行。换句话说,任务将在需要结果时同步执行。
  2. std::launch::async | std::launch::deferred:这种策略是上面两个策略的组合。任务可以在一个单独的线程上异步执行,也可以延迟执行,具体取决于实现。

默认情况下,std::async使用std::launch::async | std::launch::deferred策略。这意味着任务可能异步执行,也可能延迟执行,具体取决于实现。需要注意的是,不同的编译器和操作系统可能会有不同的默认行为。

future的wait和get

std::future::get()std::future::wait() 是 C++ 中用于处理异步任务的两个方法,它们的功能和用法有一些重要的区别。

  1. std::future::get():

std::future::get() 是一个阻塞调用,用于获取 std::future 对象表示的值或异常。如果异步任务还没有完成,get() 会阻塞当前线程,直到任务完成。如果任务已经完成,get() 会立即返回任务的结果。重要的是,get() 只能调用一次,因为它会移动或消耗掉 std::future 对象的状态。一旦 get() 被调用,std::future 对象就不能再被用来获取结果。
2. std::future::wait():

std::future::wait() 也是一个阻塞调用,但它与 get() 的主要区别在于 wait() 不会返回任务的结果。它只是等待异步任务完成。如果任务已经完成,wait() 会立即返回。如果任务还没有完成,wait() 会阻塞当前线程,直到任务完成。与 get() 不同,wait() 可以被多次调用,它不会消耗掉 std::future 对象的状态。

总结一下,这两个方法的主要区别在于:

  • std::future::get() 用于获取并返回任务的结果,而 std::future::wait() 只是等待任务完成。
  • get() 只能调用一次,而 wait() 可以被多次调用。
  • 如果任务还没有完成,get()wait() 都会阻塞当前线程,但 get() 会一直阻塞直到任务完成并返回结果,而 wait() 只是在等待任务完成。

你可以使用std::future的wait_for()或wait_until()方法来检查异步操作是否已完成。这些方法返回一个表示操作状态的std::future_status值。

1
2
3
4
5
if(fut.wait_for(std::chrono::seconds(0)) == std::future_status::ready) {  
// 操作已完成
} else {
// 操作尚未完成
}

将任务和future关联

std::packaged_taskstd::future是C++11中引入的两个类,它们用于处理异步任务的结果。

std::packaged_task是一个可调用目标,它包装了一个任务,该任务可以在另一个线程上运行。它可以捕获任务的返回值或异常,并将其存储在std::future对象中,以便以后使用。

std::future代表一个异步操作的结果。它可以用于从异步任务中获取返回值或异常。

以下是使用std::packaged_taskstd::future的基本步骤:

  1. 创建一个std::packaged_task对象,该对象包装了要执行的任务。
  2. 调用std::packaged_task对象的get_future()方法,该方法返回一个与任务关联的std::future对象。
  3. 在另一个线程上调用std::packaged_task对象的operator(),以执行任务。
  4. 在需要任务结果的地方,调用与任务关联的std::future对象的get()方法,以获取任务的返回值或异常。

以下是一个简单的示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
int my_task() {
std::this_thread::sleep_for(std::chrono::seconds(5));
std::cout << "my task run 5 s" << std::endl;
return 42;
}

void use_package() {
// 创建一个包装了任务的 std::packaged_task 对象
std::packaged_task<int()> task(my_task);

// 获取与任务关联的 std::future 对象
std::future<int> result = task.get_future();

// 在另一个线程上执行任务
std::thread t(std::move(task));
t.detach(); // 将线程与主线程分离,以便主线程可以等待任务完成

// 等待任务完成并获取结果
int value = result.get();
std::cout << "The result is: " << value << std::endl;

}

在上面的示例中,我们创建了一个包装了任务的std::packaged_task对象,并获取了与任务关联的std::future对象。然后,我们在另一个线程上执行任务,并等待任务完成并获取结果。最后,我们输出结果。

我们可以使用 std::function 和 std::package_task 来包装带参数的函数。std::package_task 是一个模板类,它包装了一个可调用对象,并允许我们将其作为异步任务传递。

promise 用法

C++11引入了std::promisestd::future两个类,用于实现异步编程。std::promise用于在某一线程中设置某个值或异常,而std::future则用于在另一线程中获取这个值或异常。

下面是std::promise的基本用法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <iostream>
#include <thread>
#include <future>

void set_value(std::promise<int> prom) {
// 设置 promise 的值
prom.set_value(10);
}

int main() {
// 创建一个 promise 对象
std::promise<int> prom;
// 获取与 promise 相关联的 future 对象
std::future<int> fut = prom.get_future();
// 在新线程中设置 promise 的值
std::thread t(set_value, std::move(prom));
// 在主线程中获取 future 的值
std::cout << "Waiting for the thread to set the value...\n";
std::cout << "Value set by the thread: " << fut.get() << '\n';
t.join();
return 0;
}

程序输出

1
2
3
Waiting for the thread to set the value...
promise set value successValue set by the thread:
10

在上面的代码中,我们首先创建了一个std::promise<int>对象,然后通过调用get_future()方法获取与之相关联的std::future<int>对象。然后,我们在新线程中通过调用set_value()方法设置promise的值,并在主线程中通过调用fut.get()方法获取这个值。注意,在调用fut.get()方法时,如果promise的值还没有被设置,则该方法会阻塞当前线程,直到值被设置为止。

除了set_value()方法外,std::promise还有一个set_exception()方法,用于设置异常。该方法接受一个std::exception_ptr参数,该参数可以通过调用std::current_exception()方法获取。下面是一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#include <iostream>
#include <thread>
#include <future>

void set_exception(std::promise<void> prom) {
try {
// 抛出一个异常
throw std::runtime_error("An error occurred!");
} catch(...) {
// 设置 promise 的异常
prom.set_exception(std::current_exception());
}
}

int main() {
// 创建一个 promise 对象
std::promise<void> prom;
// 获取与 promise 相关联的 future 对象
std::future<void> fut = prom.get_future();
// 在新线程中设置 promise 的异常
std::thread t(set_exception, std::move(prom));
// 在主线程中获取 future 的异常
try {
std::cout << "Waiting for the thread to set the exception...\n";
fut.get();
} catch(const std::exception& e) {
std::cout << "Exception set by the thread: " << e.what() << '\n';
}
t.join();
return 0;
}

上述代码输出

1
2
Waiting for the thread to set the exception...
Exception set by the thread: An error occurred!

当然我们使用promise时要注意一点,如果promise被释放了,而其他的线程还未使用与promise关联的future,当其使用这个future时会报错。如下是一段错误展示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void use_promise_destruct() {
std::thread t;
std::future<int> fut;
{
// 创建一个 promise 对象
std::promise<int> prom;
// 获取与 promise 相关联的 future 对象
fut = prom.get_future();
// 在新线程中设置 promise 的值
t = std::thread(set_value, std::move(prom));
}
// 在主线程中获取 future 的值
std::cout << "Waiting for the thread to set the value...\n";
std::cout << "Value set by the thread: " << fut.get() << '\n';
t.join();
}

随着局部作用域}的结束,prom可能被释放也可能会被延迟释放,
如果立即释放则fut.get()获取的值会报error_value的错误。

共享类型的future

当我们需要多个线程等待同一个执行结果时,需要使用std::shared_future

以下是一个适合使用std::shared_future的场景,多个线程等待同一个异步操作的结果:

假设你有一个异步任务,需要多个线程等待其完成,然后这些线程需要访问任务的结果。在这种情况下,你可以使用std::shared_future来共享异步任务的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void myFunction(std::promise<int>&& promise) {
// 模拟一些工作
std::this_thread::sleep_for(std::chrono::seconds(1));
promise.set_value(42); // 设置 promise 的值
}

void threadFunction(std::shared_future<int> future) {
try {
int result = future.get();
std::cout << "Result: " << result << std::endl;
}
catch (const std::future_error& e) {
std::cout << "Future error: " << e.what() << std::endl;
}
}

void use_shared_future() {
std::promise<int> promise;
std::shared_future<int> future = promise.get_future();

std::thread myThread1(myFunction, std::move(promise)); // 将 promise 移动到线程中

// 使用 share() 方法获取新的 shared_future 对象

std::thread myThread2(threadFunction, future);

std::thread myThread3(threadFunction, future);

myThread1.join();
myThread2.join();
myThread3.join();
}

在这个示例中,我们创建了一个std::promise<int>对象promise和一个与之关联的std::shared_future<int>对象future。然后,我们将promise对象移动到另一个线程myThread1中,该线程将执行myFunction函数,并在完成后设置promise的值。我们还创建了两个线程myThread2myThread3,它们将等待future对象的结果。如果myThread1成功地设置了promise的值,那么future.get()将返回该值。这些线程可以同时访问和等待future对象的结果,而不会相互干扰。

但是大家要注意,如果一个future被移动给两个shared_future是错误的。

1
2
3
4
5
6
7
8
9
10
11
12
13
void use_shared_future() {
std::promise<int> promise;
std::shared_future<int> future = promise.get_future();

std::thread myThread1(myFunction, std::move(promise)); // 将 promise 移动到线程中

std::thread myThread2(threadFunction, std::move(future));
std::thread myThread3(threadFunction, std::move(future));

myThread1.join();
myThread2.join();
myThread3.join();
}

这种用法是错误的,一个future通过隐式构造传递给shared_future之后,这个shared_future被移动传递给两个线程是不合理的,因为第一次移动后shared_future的生命周期被转移了,接下来myThread3构造时用的std::move(future)future已经失效了,会报错,一般都是no state 之类的错误。

异常处理

std::future 是C++的一个模板类,它用于表示一个可能还没有准备好的异步操作的结果。你可以通过调用 std::future::get 方法来获取这个结果。如果在获取结果时发生了异常,那么 std::future::get 会重新抛出这个异常。

以下是一个例子,演示了如何在 std::future 中获取异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <iostream>
#include <future>
#include <stdexcept>
#include <thread>

void may_throw()
{
// 这里我们抛出一个异常。在实际的程序中,这可能在任何地方发生。
throw std::runtime_error("Oops, something went wrong!");
}

int main()
{
// 创建一个异步任务
std::future<void> result(std::async(std::launch::async, may_throw));

try
{
// 获取结果(如果在获取结果时发生了异常,那么会重新抛出这个异常)
result.get();
}
catch (const std::exception &e)
{
// 捕获并打印异常
std::cerr << "Caught exception: " << e.what() << std::endl;
}

return 0;
}

在这个例子中,我们创建了一个异步任务 may_throw,这个任务会抛出一个异常。然后,我们创建一个 std::future 对象 result 来表示这个任务的结果。在 main 函数中,我们调用 result.get() 来获取任务的结果。如果在获取结果时发生了异常,那么 result.get() 会重新抛出这个异常,然后我们在 catch 块中捕获并打印这个异常。

上面的例子输出

1
Caught exception: Oops, something went wrong!

线程池

我们可以利用上面提到的std::packaged_taskstd::promise构建线程池,提高程序的并发能力。
先了解什么是线程池:

线程池是一种多线程处理形式,它处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

线程池可以避免在处理短时间任务时创建与销毁线程的代价,它维护着多个线程,等待着监督管理者分配可并发执行的任务,从而提高了整体性能。

下面是我提供的一套线程池源码,目前用在公司的项目中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#ifndef __THREAD_POOL_H__
#define __THREAD_POOL_H__

#include <atomic>
#include <condition_variable>
#include <future>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>

class ThreadPool {
public:
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;

static ThreadPool& instance() {
static ThreadPool ins;
return ins;
}

using Task = std::packaged_task<void()>;


~ThreadPool() {
stop();
}

template <class F, class... Args>
auto commit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
using RetType = decltype(f(args...));
if (stop_.load())
return std::future<RetType>{};

auto task = std::make_shared<std::packaged_task<RetType()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));

std::future<RetType> ret = task->get_future();
{
std::lock_guard<std::mutex> cv_mt(cv_mt_);
tasks_.emplace([task] { (*task)(); });
}
cv_lock_.notify_one();
return ret;
}

int idleThreadCount() {
return thread_num_;
}

private:
ThreadPool(unsigned int num = 5)
: stop_(false) {
{
if (num < 1)
thread_num_ = 1;
else
thread_num_ = num;
}
start();
}
void start() {
for (int i = 0; i < thread_num_; ++i) {
pool_.emplace_back([this]() {
while (!this->stop_.load()) {
Task task;
{
std::unique_lock<std::mutex> cv_mt(cv_mt_);
this->cv_lock_.wait(cv_mt, [this] {
return this->stop_.load() || !this->tasks_.empty();
});
if (this->tasks_.empty())
return;

task = std::move(this->tasks_.front());
this->tasks_.pop();
}
this->thread_num_--;
task();
this->thread_num_++;
}
});
}
}
void stop() {
stop_.store(true);
cv_lock_.notify_all();
for (auto& td : pool_) {
if (td.joinable()) {
std::cout << "join thread " << td.get_id() << std::endl;
td.join();
}
}
}

private:
std::mutex cv_mt_;
std::condition_variable cv_lock_;
std::atomic_bool stop_;
std::atomic_int thread_num_;
std::queue<Task> tasks_;
std::vector<std::thread> pool_;
};

#endif // !__THREAD_POOL_H__

总结

本文介绍了如何使用future, promise以及async用法

视频链接

https://space.bilibili.com/271469206/channel/collectiondetail?sid=1623290

源码链接

https://gitee.com/secondtonone1/boostasio-learn