中断线程

简介

前几篇文章陆续介绍了线程池(ThreadPool),可汇合线程(join_thread)等技术,其中也用到了当管理类要退出时会通过条件变量唤醒挂起的线程,然后等待其执行完退出。本文按照作者的思路补充设计可中断的线程。

可中断线程

一个可中断的线程大体的实现是这个样子的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class interruptible_thread
{
std::thread internal_thread;
interrupt_flag* flag;
public:
template<typename FunctionType>
interruptible_thread(FunctionType f)
{
//⇽-- - 2
std::promise<interrupt_flag*> p;
//⇽-- - 3
internal_thread = std::thread([f, &p] {
p.set_value(&this_thread_interrupt_flag);
//⇽-- - 4
f();
});
//⇽-- - 5
flag = p.get_future().get();
}

void join() {
internal_thread.join();
}
void interrupt()
{
if (flag)
{
//⇽-- - 6
flag->set();
}
}
};
  1. interrupt_flag 为中断标记,其set操作用来标记中断
  2. internal_thread为内部线程,其回调函数内部先设置interrupt_flag*类型的promise值,再执行回调函数。
  3. 在interruptible_thread构造函数中等待internal_thread回调函数内部设置好flag的promise值后再退出。
  4. this_thread_interrupt_flag是我们定义的线程变量thread_local interrupt_flag this_thread_interrupt_flag;

中断标记

中断标记interrupt_flag类,主要是用来设置中断标记和判断是否已经中断,有可能挂起在条件变量的wait操作上,此时中断就需要唤醒挂起的线程。

为了扩充功能,我们希望设计接口支持在任何锁上等待,那我们使用condition_variable_any支持任意类型的条件变量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class interrupt_flag
{
std::atomic<bool> flag;
std::condition_variable* thread_cond;
std::condition_variable_any* thread_cond_any;
std::mutex set_clear_mutex;
public:
interrupt_flag() :
thread_cond(0), thread_cond_any(0)
{}
void set()
{
flag.store(true, std::memory_order_relaxed);
std::lock_guard<std::mutex> lk(set_clear_mutex);
if (thread_cond)
{
thread_cond->notify_all();
}
else if (thread_cond_any) {
thread_cond_any->notify_all();
}
}
bool is_set() const
{
return flag.load(std::memory_order_relaxed);
}
void set_condition_variable(std::condition_variable& cv)
{
std::lock_guard<std::mutex> lk(set_clear_mutex);
thread_cond = &cv;
}
void clear_condition_variable()
{
std::lock_guard<std::mutex> lk(set_clear_mutex);
thread_cond = 0;
}


template<typename Lockable>
void wait(std::condition_variable_any& cv, Lockable& lk) {
struct custom_lock {
interrupt_flag* self;
Lockable& lk;
custom_lock(interrupt_flag* self_, std::condition_variable_any& cond, Lockable& lk_) :
self(self_), lk(lk_) {
self->set_clear_mutex.lock();
self->thread_cond_any = &cond;
}

void unlock() {
lk.unlock();
self->set_clear_mutex.unlock();
}

void lock() {
std::lock(self->set_clear_mutex, lk);
}

~custom_lock() {
self->thread_cond_any = 0;
self->set_clear_mutex.unlock();
}
};

custom_lock cl(this, cv, lk);
interruption_point();
cv.wait(cl);
interruption_point();
}
};
  1. set函数将停止标记设置为true,然后用条件变量通知挂起的线程。
  2. set_condition_variable 设置flag关联的条件变量,因为需要用指定的条件变量通知挂起的线程。
  3. clear_condition_variable清除关联的条件变量
  4. wait操作封装了接受任意锁的等待操作,wait函数内部定义了custom_lock,封装了加锁,解锁等操作。
  5. wait操作内部构造了custom_lock对象cl主要是对set_clear_mutex加锁,然后在调用cv.wait,这样能和set函数中的通知条件变量构成互斥,这么做的好处就是要么先将flag设置为true并发送通知,要么先wait,然后再发送通知。这样避免了线程在wait处卡死(线程不会错过发送的通知)

interruption_point函数内部判断flag是否为true,如果为true则抛出异常,这里作者处理的突兀了一些。读者可将这个函数改为bool返回值,调用者根据返回值判断是否继续等都可以。

1
2
3
4
5
6
7
void interruption_point()
{
if (this_thread_interrupt_flag.is_set())
{
throw thread_interrupted();
}
}

thread_interrupted为我们自定义的异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class thread_interrupted : public std::exception
{
public:
thread_interrupted() : message("thread interrupted.") {}
~thread_interrupted() throw () {
}

virtual const char* what() const throw () {
return message.c_str();
}

private:
std::string message;
};

接下来定义一个类clear_cv_on_destruct

1
2
3
4
5
struct clear_cv_on_destruct {
~clear_cv_on_destruct(){
this_thread_interrupt_flag.clear_condition_variable();
}
};

clear_cv_on_destruct 这个类主要是用来在析构时释放和flag关联的条件变量。

除此之外,我们还可以封装几个不同版本的等待
支持普通条件变量的等待

1
2
3
4
5
6
7
8
9
10
void interruptible_wait(std::condition_variable& cv,
std::unique_lock<std::mutex>& lk)
{
interruption_point();
this_thread_interrupt_flag.set_condition_variable(cv);
clear_cv_on_destruct guard;
interruption_point();
cv.wait_for(lk, std::chrono::milliseconds(1));
interruption_point();
}

支持谓词的等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
template<typename Predicate>
void interruptible_wait(std::condition_variable& cv,
std::unique_lock<std::mutex>& lk,
Predicate pred)
{
interruption_point();
this_thread_interrupt_flag.set_condition_variable(cv);
clear_cv_on_destruct guard;
while (!this_thread_interrupt_flag.is_set() && !pred())
{
cv.wait_for(lk, std::chrono::milliseconds(1));
}
interruption_point();
}

上面两个版本采用wait_for而不用wait是因为如果等待之前条件变量的通知已经发送,线程之后才调用wait就会发生死等,所以这里采用的wait_for

支持future的等待

1
2
3
4
5
6
7
8
9
10
11
template<typename T>
void interruptible_wait(std::future<T>& uf)
{
while (!this_thread_interrupt_flag.is_set())
{
if (uf.wait_for(std::chrono::milliseconds(1)) ==
std::future_status::ready)
break;
}
interruption_point();
}

接下来我们用案例测试上面的案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#include <iostream>
#include "interupthread.h"
std::vector<interruptible_thread> background_threads;
std::mutex mtx1;
std::mutex mtx2;
std::condition_variable cv1;
std::condition_variable_any cv2;
void start_background_processing() {
background_threads.push_back([]() {
try {
std::unique_lock<std::mutex> lock(mtx1);
interruptible_wait(cv1, lock);
}
catch (std::exception& ex) {
std::cout << "catch exception is " << ex.what() << std::endl;
}

});

background_threads.push_back([]() {
try {
std::unique_lock<std::mutex> lock(mtx2);
this_thread_interrupt_flag.wait(cv2, mtx2);
}
catch (std::exception& ex) {
std::cout << "catch exception is " << ex.what() << std::endl;
}

});
}

int main()
{
start_background_processing();
for (unsigned i = 0; i < background_threads.size(); i++) {
background_threads[i].interrupt();
}

for (unsigned i = 0; i < background_threads.size(); i++) {
background_threads[i].join();
}
}

上面的案例中启动了两个线程,每个线程回调函数中调用我们封装的可中断的等待。在主函数中断两个线程,并测试两个线程能否在等待中中断。

程序输出

1
2
catch exception is thread interrupted.
catch exception is thread interrupted.

总结

本文介绍了中断线程的设计,说简单点还是设置终止标记为true,利用条件变量通知挂起的线程唤醒。

源码链接:

https://gitee.com/secondtonone1/boostasio-learn/tree/master/concurrent/day23-interupthread

视频链接:

https://space.bilibili.com/271469206/channel/collectiondetail?sid=1623290