用内存顺序实现内存模型

前情回顾

前文我们介绍了六种内存顺序,以及三种内存模型,本文通过代码示例讲解六种内存顺序使用方法,并实现相应的内存模型。

memory_order_seq_cst

memory_order_seq_cst代表全局一致性顺序,可以用于 store, loadread-modify-write 操作, 实现 sequencial consistent 的顺序模型. 在这个模型下, 所有线程看到的所有操作都有一个一致的顺序, 即使这些操作可能针对不同的变量, 运行在不同的线程.

我们看一下之前写的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
std::atomic<bool> x, y;
std::atomic<int> z;

void write_x_then_y() {
x.store(true, std::memory_order_relaxed); // 1
y.store(true, std::memory_order_relaxed); // 2
}

void read_y_then_x() {
while (!y.load(std::memory_order_relaxed)) { // 3
std::cout << "y load false" << std::endl;
}

if (x.load(std::memory_order_relaxed)) { //4
++z;
}

}

void TestOrderRelaxed() {

std::thread t1(write_x_then_y);
std::thread t2(read_y_then_x);
t1.join();
t2.join();
assert(z.load() != 0); // 5
}

上面的代码loadstore都采用的是memory_order_relaxed。线程t1按次序执行1和2,但是线程t2看到的可能是y为true,x为false。进而导致TestOrderRelaxed触发断言z为0.
如果换成memory_order_seq_cst则能保证所有线程看到的执行顺序是一致的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

void write_x_then_y() {
x.store(true, std::memory_order_seq_cst); // 1
y.store(true, std::memory_order_seq_cst); // 2
}

void read_y_then_x() {
while (!y.load(std::memory_order_seq_cst)) { // 3
std::cout << "y load false" << std::endl;
}

if (x.load(std::memory_order_seq_cst)) { //4
++z;
}

}

void TestOrderSeqCst() {

std::thread t1(write_x_then_y);
std::thread t2(read_y_then_x);
t1.join();
t2.join();
assert(z.load() != 0); // 5
}

上面的代码x和y采用的是memory_order_seq_cst, 所以当线程t2执行到3处并退出循环时我们可以断定y为true,因为是全局一致性顺序,所以线程t1已经执行完2处将y设置为true,那么线程t1也一定执行完1处代码并对t2可见,所以当t2执行至4处时x为true,那么会执行z++保证z不为零,从而不会触发断言。

实现 sequencial consistent 模型有一定的开销. 现代 CPU 通常有多核, 每个核心还有自己的缓存. 为了做到全局顺序一致, 每次写入操作都必须同步给其他核心. 为了减少性能开销, 如果不需要全局顺序一致, 我们应该考虑使用更加宽松的顺序模型.

memory_order_relaxed

memory_order_relaxed 可以用于 store, load 和 read-modify-write 操作, 实现 relaxed 的顺序模型.
前文我们介绍过这种模型下, 只能保证操作的原子性和修改顺序 (modification order) 一致性, 无法实现 synchronizes-with 的关系。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void TestOrderRelaxed() {
std::atomic<bool> rx, ry;

std::thread t1([&]() {
rx.store(true, std::memory_order_relaxed); // 1
ry.store(true, std::memory_order_relaxed); // 2
});


std::thread t2([&]() {
while (!ry.load(std::memory_order_relaxed)); //3
assert(rx.load(std::memory_order_relaxed)); //4
});

t1.join();
t2.join();
}

上面的代码在一定程度上会触发断言。因为线程t1执行完1,2之后,有可能2操作的结果先放入内存中被t2看到,此时t2执行退出3循环进而执行4,此时t2看到的rx值为false触发断言。

我们称2和3不构成同步关系, 2 “ not synchronizes with “ 3

如果能保证2的结果立即被3看到, 那么称 2 “synchronizes with “ 3。

如果2 同步于 3还有一层意思就是 如果在线程t1 中 1 先于 2(sequence before), 那么 1先行于3。那我们可以理解t2执行到3处时,可以获取到t1执行1操作的结果,也就是rx为true.

t2线程中3先于4(sequence before),那么1 操作先行于 4. 也就是1 操作的结果可以立即被4获取。进而不会触发断言。

怎样保证2 同步于 3 是解决问题的关键, 我们引入 Acquire-Release 内存顺序。

Acquire-Release

在 acquire-release 模型中, 会使用 memory_order_acquire, memory_order_release 和 memory_order_acq_rel 这三种内存顺序. 它们的用法具体是这样的:

对原子变量的 load 可以使用 memory_order_acquire 内存顺序. 这称为 acquire 操作.

对原子变量的 store 可以使用 memory_order_release 内存顺序. 这称为 release 操作.

read-modify-write 操作即读 (load) 又写 (store), 它可以使用 memory_order_acquire, memory_order_release 和 memory_order_acq_rel:

  1. 如果使用 memory_order_acquire, 则作为 acquire 操作;
  2. 如果使用 memory_order_release, 则作为 release 操作;
  3. 如果使用 memory_order_acq_rel, 则同时为两者.

Acquire-release 可以实现 synchronizes-with 的关系. 如果一个 acquire 操作在同一个原子变量上读取到了一个 release 操作写入的值, 则这个 release 操作 “synchronizes-with” 这个 acquire 操作.

我们可以通过Acquire-release 修正 TestOrderRelaxed函数以达到同步的效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void TestReleaseAcquire() {
std::atomic<bool> rx, ry;

std::thread t1([&]() {
rx.store(true, std::memory_order_relaxed); // 1
ry.store(true, std::memory_order_release); // 2
});


std::thread t2([&]() {
while (!ry.load(std::memory_order_acquire)); //3
assert(rx.load(std::memory_order_relaxed)); //4
});

t1.join();
t2.join();
}

上面的例子中我们看到ry.store使用的是std::memory_order_release, ry.load使用的是std::memory_order_relaxed.

t1执行到2将ry 设置为true, 因为使用了Acquire-release 顺序, 所以 t2 执行到3时读取ry为true, 因此2和3 可以构成同步关系。

又因为单线程t1内 1 sequence before 2,所以1 happens-before 3.
因为单线程t2内 3 sequence before 4. 所以 1 happens-before 4.

可以断定4 不会触发断言。

我们从cpu结构图理解这一情景

https://cdn.llfc.club/1697539893049.jpg

到此大家一定要记住仅 Acquire-release能配合达到 synchronizes-with效果,再就是memory_order_seq_cst可以保证全局顺序唯一,其他情况的内存顺序都能保证顺序,使用时需注意。

Acquire-release 的开销比 sequencial consistent 小. 在 x86 架构下, memory_order_acquire 和 memory_order_release 的操作不会产生任何其他的指令, 只会影响编译器的优化: 任何指令都不能重排到 acquire 操作的前面, 且不能重排到 release 操作的后面; 否则会违反 acquire-release 的语义. 因此很多需要实现 synchronizes-with 关系的场景都会使用 acquire-release.

Release sequences

我们再考虑一种情况,多个线程对同一个变量release操作,另一个线程对这个变量acquire,那么只有一个线程的release操作喝这个acquire线程构成同步关系。

看下面的代码 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void ReleasAcquireDanger2() {
std::atomic<int> xd{0}, yd{ 0 };
std::atomic<int> zd;

std::thread t1([&]() {
xd.store(1, std::memory_order_release); // (1)
yd.store(1, std::memory_order_release); // (2)
});

std::thread t2([&]() {
yd.store(2, std::memory_order_release); // (3)
});


std::thread t3([&]() {
while (!yd.load(std::memory_order_acquire)); //(4)
assert(xd.load(std::memory_order_acquire) == 1); // (5)
});

t1.join();
t2.join();
t3.join();
}

我们可以看到t3在yd为true的时候才会退出,那么导致yd为true的有两种情况,一种是1,另一种是2, 所以5处可能触发断言。

并不是只有在 acquire 操作读取到 release 操作写入的值时才能构成 synchronizes-with 关系. 为了说这种情况, 我们需要引入 release sequence 这个概念.

针对一个原子变量 M 的 release 操作 A 完成后, 接下来 M 上可能还会有一连串的其他操作. 如果这一连串操作是由

  1. 同一线程上的写操作
  2. 任意线程上的 read-modify-write 操作
    这两种构成的, 则称这一连串的操作为以 release 操作 A 为首的 release sequence. 这里的写操作和 read-modify-write 操作可以使用任意内存顺序.

如果一个 acquire 操作在同一个原子变量上读到了一个 release 操作写入的值, 或者读到了以这个 release 操作为首的 release sequence 写入的值, 那么这个 release 操作 “synchronizes-with” 这个 acquire 操作.

看下面的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void ReleaseSequence() {
std::vector<int> data;
std::atomic<int> flag{ 0 };

std::thread t1([&]() {
data.push_back(42); //(1)
flag.store(1, std::memory_order_release); //(2)
});

std::thread t2([&]() {
int expected = 1;
while (!flag.compare_exchange_strong(expected, 2, std::memory_order_relaxed)) // (3)
expected = 1;
});

std::thread t3([&]() {
while (flag.load(std::memory_order_acquire) < 2); // (4)
assert(data.at(0) == 42); // (5)
});

t1.join();
t2.join();
t3.join();
}

我们考虑t3要想退出首先flag要等于2,那么就要等到t2将flag设置为2,而flag设置为2又要等到t1将flag设置为1. 所以我们捋一下顺序 2->3->4

t1中操作2是release操作,以2为开始,其他线程(t2)的读改写在release操作之后,我们称之为release sequence, t3要读取release sequence写入的值,所以我们称t1的release操作 “synchronizes with “ t3的 acquire 操作。

memory_order_consume

memory_order_consume 其实是 acquire-release 模型的一部分, 但是它比较特殊, 它涉及到数据间相互依赖的关系. 就是前文我们提及的 carries dependencydependency-ordered before.

我们复习一下

如果操作 a “sequenced-before” b, 且 b 依赖 a 的数据, 则 a “carries a dependency into” b. 一般来说, 如果 a 的值用作 b 的一个操作数, 或者 b 读取到了 a 写入的值, 都可以称为 b 依赖于 a

1
2
3
p++;   // (1)
i++; // (2)
p[i] // (3)

(1) “sequenced-before” (2), (2) “sequenced-before” (3), 而(1)和(2)的值作为(3)的下表运算符[]的操作数。

我们可以称(1) “carries a dependency into “ (3), (2) “carries a dependency into “ (3), 但是(1)和(2)不是依赖关系。

memory_order_consume 可以用于 load 操作. 使用 memory_order_consume 的 load 称为 consume 操作. 如果一个 consume 操作在同一个原子变量上读到了一个 release 操作写入的值, 或以其为首的 release sequence 写入的值, 则这个 release 操作 “dependency-ordered before” 这个 consume 操作.

看下面这个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void ConsumeDependency() {
std::atomic<std::string*> ptr;
int data;

std::thread t1([&]() {
std::string* p = new std::string("Hello World"); // (1)
data = 42; // (2)
ptr.store(p, std::memory_order_release); // (3)
});

std::thread t2([&]() {
std::string* p2;
while (!(p2 = ptr.load(std::memory_order_consume))); // (4)
assert(*p2 == "Hello World"); // (5)
assert(data == 42); // (6)
});

t1.join();
t2.join();
}

t2执行到(4)处时,需要等到ptr非空才能退出循环,这就依赖t1执行完(3)操作。

因此(3) “dependency-ordered before” (4), 根据前文我们介绍了dependency等同于synchronizes ,所以(3) “inter-thread happens-before”. (4)

因为(2) “sequenced before” (3), 所以(2) “happens-before “ (4)

因为(4) “sequenced before” (5), 所以(2) “happens-before “ (5)

因为(5) “sequenced before” (6), 所以(2) “happens-before “ (6)

所以(6)处断言不会触发,同样的道理(5)处断言也不会触发。

单例模式改良

还记得我们之前用智能指针双重检测方式实现的单例模式吗?我当时说过是存在线程安全问题的,看看下面这段单例模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//利用智能指针解决释放问题
class SingleAuto
{
private:
SingleAuto()
{
}
SingleAuto(const SingleAuto&) = delete;
SingleAuto& operator=(const SingleAuto&) = delete;
public:
~SingleAuto()
{
std::cout << "single auto delete success " << std::endl;
}
static std::shared_ptr<SingleAuto> GetInst()
{
// 1 处
if (single != nullptr)
{
return single;
}
// 2 处
s_mutex.lock();
// 3 处
if (single != nullptr)
{
s_mutex.unlock();
return single;
}
// 4处
single = std::shared_ptr<SingleAuto>(new SingleAuto);
s_mutex.unlock();
return single;
}
private:
static std::shared_ptr<SingleAuto> single;
static std::mutex s_mutex;
};

我们写一段代码测试一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
std::shared_ptr<SingleAuto> SingleAuto::single = nullptr;
std::mutex SingleAuto::s_mutex;

void TestSingle() {
std::thread t1([]() {
std::cout << "thread t1 singleton address is 0X: " << SingleAuto::GetInst() << std::endl;
});

std::thread t2([]() {
std::cout << "thread t2 singleton address is 0X: " << SingleAuto::GetInst() << std::endl;
});

t2.join();
t1.join();
}

虽然可以正常输出两次的地址都是同一个,但是我们的单例会存在安全隐患。
1处和4处代码存在线程安全问题,因为4处代码在之前的文章中我谈过,new一个对象再赋值给变量时会存在多个指令顺序

第一种情况

1
2
3
1 为对象allocate一块内存空间
2 调用construct构造对象
3 将构造到的对象地址返回

第二种情况

1
2
3
1 为对象allocate一块内存空间
2 先将开辟的空间地址返回
3 调用construct构造对象

如果是第二种情况,在4处还未构造对象就将地址返回赋值给single,而此时有线程运行至1处判断single不为空直接返回单例实例,如果该线程调用这个单例的成员函数就会崩溃。

为了解决这个问题,我们可以通过内存模型来解决

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
//利用智能指针解决释放问题
class SingleMemoryModel
{
private:
SingleMemoryModel()
{
}
SingleMemoryModel(const SingleMemoryModel&) = delete;
SingleMemoryModel& operator=(const SingleMemoryModel&) = delete;
public:
~SingleMemoryModel()
{
std::cout << "single auto delete success " << std::endl;
}
static std::shared_ptr<SingleMemoryModel> GetInst()
{
// 1 处
if (_b_init.load(std::memory_order_acquire))
{
return single;
}
// 2 处
s_mutex.lock();
// 3 处
if (_b_init.load(std::memory_order_relaxed))
{
s_mutex.unlock();
return single;
}
// 4处
single = std::shared_ptr<SingleMemoryModel>(new SingleMemoryModel);
_b_init.store(true, std::memory_order_release);
s_mutex.unlock();
return single;
}
private:
static std::shared_ptr<SingleMemoryModel> single;
static std::mutex s_mutex;
static std::atomic<bool> _b_init ;
};

std::shared_ptr<SingleMemoryModel> SingleMemoryModel::single = nullptr;
std::mutex SingleMemoryModel::s_mutex;
std::atomic<bool> SingleMemoryModel::_b_init = false;

然后我们测试

1
2
3
4
5
6
7
8
9
10
11
12
void TestSingleMemory() {
std::thread t1([]() {
std::cout << "thread t1 singleton address is 0x: " << SingleMemoryModel::GetInst() << std::endl;
});

std::thread t2([]() {
std::cout << "thread t2 singleton address is 0x: " << SingleMemoryModel::GetInst() << std::endl;
});

t2.join();
t1.join();
}

也可以看到输出的地址一致,但是我们这个改进的版本防止了线程安全问题。

总结

本文介绍了如何通过内存顺序实现内存模型,以及优化了单例模式。

源码链接

https://gitee.com/secondtonone1/boostasio-learn/tree/master/concurrent/day11-AcquireRelease

视频链接

https://space.bilibili.com/271469206/channel/collectiondetail?sid=1623290