深入探索并发编程系列(四)-实现递归锁

当你要为多CPU核优化代码时,有时需要去实现一种新的同步原语。
虽然我不鼓励这么做,但这样的情况却时常发生。就在你打算开始这么做的时候,可能会先找一些例子来参考。这种过程无疑是搬起石头砸自己的脚,但却能减少你再次犯这种错误的次数,这样起码还能给你留下几个脚趾头继续走路。

上一篇文章中,教大家在Win32平台下用C++实现了Benaphore同步原语。Benaphore不是无锁(lock-free)的(自己本身就是锁)注1,但Benaphore的实现可以作为在用户空间实现同步原语的案例,非常简单而又具有指导性意义。当不存在锁竞争时,它也的确能带来非常低的开销。

实现中的不足就是它不是可递归(non recursive)的。这意味着如果同一个线程试图两次获取同一把锁,就会发生死锁注2。本文将其实现扩展以支持可递归锁。

当你有个模块想通过自身提供的公共接口调用自己的时候,可递归锁就显得很有用处了。举个例子,在内存管理中,你可能会遇到这样的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void* MemoryManager::Realloc(void* ptr, size_t size)
{
AUTO_LOCK_MACRO(m_lock);

if (ptr == NULL)
{
return Alloc(size);
}
else if (size == 0)
{
Free(size);
return NULL;
}
else
...
}

void* MemoryManager::Alloc(size_t size)
{
AUTO_LOCK_MACRO(m_lock);

...
}

显然,AUTO_LOCK_MACRO也是C++中的一个宏,用来获取锁并在我们想退出函数域的时候自动释放锁注3

可以看出,如果把NULL传给Realloc,Realloc函数会第一次获取锁,当调用到Alloc函数的时候,会第二次(递归)获取锁。显然,在这个特殊的例子中,要避免锁的递归使用,修改起来很容易,但在一些大型多线程的项目中,你很有可能还会遇到其它的例子。

可以按照以下方法来将Benaphore的实现扩展以支持可递归锁。我在Benaphore类中增加了两个新成员:m_owner,用来保存当前拥有者的线程ID(TID),以及m_recursion,用来保存递归计数器。

有心的读者可能会注意到这份代码中没有采用新的C++ 11原子库标准。这么做是为了从长远来看让这份代码仍然兼容。然而,在2000年代中期的游戏业中,我们就采用了这种方式。下面的代码可以用任意的Microsoft编译器编译,并且所有与Windows相关的调用在其它平台下都有相对应的调用方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// Define this to {} in a retail build:
#define LIGHT_ASSERT(x) { if (!(x)) DebugBreak(); }

class RecursiveBenaphore
{
private:
LONG m_counter;
DWORD m_owner;
DWORD m_recursion;
HANDLE m_semaphore;

public:
RecursiveBenaphore::RecursiveBenaphore()
{
m_counter = 0;
m_owner = 0; // an invalid thread ID
m_recursion = 0;
m_semaphore = CreateSemaphore(NULL, 0, 1, NULL);
}

RecursiveBenaphore::~RecursiveBenaphore()
{
CloseHandle(m_semaphore);
}

void Lock()
{

DWORD tid = GetCurrentThreadId();
if (_InterlockedIncrement(&m_counter) > 1) // x86/64 guarantees acquire semantics
{
if (tid != m_owner)
WaitForSingleObject(m_semaphore, INFINITE);
}
//--- We are now inside the Lock ---
m_owner = tid;
m_recursion++;
}

void Unlock()
{

DWORD tid = GetCurrentThreadId();
LIGHT_ASSERT(tid == m_owner);
DWORD recur = --m_recursion;
if (recur == 0)
m_owner = 0;
DWORD result = _InterlockedDecrement(&m_counter); // x86/64 guarantees release semantics
if (result > 0)
{
if (recur == 0)
ReleaseSemaphore(m_semaphore, 1, NULL);
}
//--- We are now outside the Lock ---
}
};

原始Benaphore类中,第一个调用Lock的线程会获取拥有权而不必跳转到开销昂贵的内核调用中。同时也会做一些记录:将m_owner设为自己的TID,m_recursion设为1。如果同一个线程再次调用Lockm_counterm_recursion的值都会加1.

类似的,当同一个线程调用Unlock时,会将m_counterm_recursion的值减1。但当m_recursion的值被减到0时,只会调用ReleaseSemaphore一次。如果此时m_recursion的值仍大于0,意味着当前线程在外部作用域内仍然持有这把锁,这时候将拥有权交给其它线程是不安全的。

这时,你在网上随便一搜,就能找到许多有问题的无锁代码和同步原语。那你凭什么会相信本文中的代码有什么不一样呢?
首先,本文的代码经过了压力测试(stress tested)。在我看来,这是最有价值的东西。我写了个小的测试程序,其生成许许多多的线程,每个线程都获取锁N(随机)次,递归深度也是随机的。它会更新每把锁里面共享的数据并做一致性检查。可以在这里下载源码

recursivemutex

为了更好的测试,以下是TryLock方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
bool TryLock()
{

DWORD tid = GetCurrentThreadId();
if (m_owner == tid)
{
// Already inside the lock
_InterlockedIncrement(&m_counter);
}
else
{
LONG result = _InterlockedCompareExchange(&m_counter, 1, 0);
if (result != 0)
return false;
//--- We are now inside the Lock ---
m_owner = tid;
}
m_recursion++;
return true;
}

对细节感兴趣的人,可以继续往下看:

  • RecursiveBenaphore::Unlock方法中,在调用_InterlockedDecrement之前将m_owner设回0是非常重要的,否则,有可能会发生数据破坏。举个例子,假设两个线程的TID分别为123与456。线程123刚完成了对Unlock的调用并将m_owner设为123。接下来有可能会发生下面的事情:

    1. 两个线程同时进入RecursiveBenaphore::Unlock
    2. 线程456执行_InterlockedIncrement,得到返回值1,因此会跳过WaitForSingleObject
    3. 线程123执行_InterlockedIncrement,得到返回值2
    4. 由于线程456还没来得及更新m_owner的值,线程123执行检查发现id == m_owner,也会跳过WaitForSingleObject

    不久之后,两个线程都从Lock中返回,每个线程都认为自己获取了锁。这时锁中保护的数据就有可能被破坏。实际上,如果你下载了测试源码,并将RecursiveBenaphore::Unlock这一部分删除,很快就会运行失败。

  • 另外,在RecursiveBenaphore::Unlock方法中,m_recurtion的值只有一次被拷贝到本地变量中,并从那时开始使用变量的值。在执行_InterlockedDecrement之后,不会再次去读取m_recurtion的值。那时,另一个线程可能已经改变它的值了。

  • 你可能会注意到,m_recurtion的值被修改的时候,不会用到任何原子操作。那是因为在调用Lock中的_InterlockedIncrementUnlock中的_InterlockedDecrement之间,获取锁的线程对m_ownerm_recurtion都独自占有访问权,拥有所有必要的aquire和release语义注4。在m_recurtion使用原子操作是没必要的,而且会很浪费。

最后一点又是如何保证的呢?在非竞争情况下是通过原子操作来保证的(fast path),而在有竞争的情况下,是通过信号量来保证的(slow path)。在X86和X64平台下,对_InterlockedIncrement注5的调用会生成lock xadd指令,其充当一个完整的memory barrier注6,以确保acquire和release语义。这是x86/x64独有的性质。如果你把这份代码放到多核的iOS设备上运行,比如说IPAD2,用OSAtomicIncrement32替代_InterlockedIncrement是远远不够的。你还必须要调用OSAtomicIncrement32Barrier才能得到类似的保证。就算在Xbox360上,能共享Win32的API,但是却是运行在PowerPC上,正确的函数调用实际上是InterlockedIncrementAcquire

读到这一章节,相信很多读者都已经懵逼了。希望这种懵逼是值得的。我会在以后的文章中更多的谈谈内存访问语义。

对于那些还没有钻研过同步原语的读者来说,RecursiveBenaphore类给大家展示了,可递归锁实现的代码是多么的微妙。 这里每个细节的存在都是有原因的,里面的执行顺序很关键,隐含的一些假设和保证都发挥着重要作用注7

译者注

注1:lock free并不是说不含锁的意思。不包含锁的英文一般是lockless。实际上,lock free考察的是由一组线程构成的一个系统。不管如何,至少能保证一个线程make progress因此保证系统是在make progress,那么这个算法或者系统或者说实现是lock free的。

我们知道,基于mutex的实现里,如果持有锁的线程被调度出去,那么其他线程都得等待该线程被重新调度并且等待它释放锁;假如它crash了,那么系统将无法make progress。lock free是non blocking的,一个线程被阻塞或者crash,不会影响到其他线程的正常推进。

注2:non recursive,不可递归的;non reentrant,不可重入的。说一把锁是non reentrant和non recursive是一个意思。

注3:一般说来,在工业级C++项目里不会出现这样的代码:

1
2
3
4
5
6
void f()
{

mutex.lock();
//do sth here
mutex.unlock();
}

一般都是通过C++的RAII机制,利用对象的构造函数和析构函数,来封装获取锁和释放锁。例如

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class MutexGuard
{
public:
MutexGuard(Mutex &mutex):mutex_(mutex)
{
mutex_.lock();
}
~MutexGuard()
{
mutex_.unlock();
}
private:
Mutex &mutex_;
};

void f()
{

MutexGuard guard(my_mutex);//构造函数里调用了lock
//do sth here...
//......
//guard 被析构,调用了unlock.
}

注4:后面会有文章专门介绍acquire 和release语义,这里做简单的描述。以临界区保护为例:

1
2
3
4
mutex.lock();
//critical section
a = 1;
mutex.unlock;

我们知道,cpu和编译器是可以对代码和指令进行乱序的。然而,这里a=1这行代码不允许被拉到lock操作之前执行,也不能被拉到unlock之后执行。否则,临界区的内容就可能被多个线程读写,造成数据败坏,也无所谓临界区了。我们说lock带有acquire语义,粗俗的说就是它提供了这样的保证:它之下的代码不能被拉到它之上执行;unlock带有release语义,粗俗的说就是它提供了这样的保证:它之上的代码不能被拉到它之下执行。显然,这得程序员和编译器、CPU达成协议和共识。

注5:gcc下可以使用__sync_add_and_fetch等内置的原子操作。

注6:memory barrier是用来提供和保证acquire 和release语义的基础设施,用来防止CPU和编译器乱序用的。后面也会有详细介绍。

注7:本文讨论的都是windows下的实现,我们一起来看看linux下的情形。

pthread_mutex_t默认是不可递归的,如果想让它可递归,有两种方法。

第一种方法非常简单,就是初始化的时候设置相应的属性即可。例如:

1
2
3
4
5
pthread_mutex_t mutex;
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);//设置可递归
pthread_mutex_init(&mutex, &attr);

第二种方法,就是利用mutex,我们也做一层封装。下面的代码仅供参考,仅适用于X86平台:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include <stdint.h>
#include <pthread.h>
Class MyRecursiveLock
{
public:
MyRecursiveLock()
{
lock_holder_ = NULL;
hold_counter_ = 0;
pthread_mutex_init(&lock_, NULL);
}
int lock()
{

int ret = 0;
pthread_t curr_id = pthread_self();
if (lock_holder_ == curr_id) {
++hold_counter_;
} else {
pthread_mutex_lock(&lock_);
lock_holder_ = curr_id;
hold_counter_ = 1;
}
return ret;
}
int unlock()
{

int ret = 0;
pthread_t curr_id = pthread_self();
if (lock_holder_ != curr_id) {
ret = -1;
} else {
if (--hold_counter_ == 0) {
lock_holder_ = NULL;
pthread_mutex_unlock(&lock_);
}
}
return ret;
}
private:
pthread_mutex_t lock_;
pthread_t lock_holder_;
int64_t hold_counter_;
};

Acknowledgement

本文由睡眼惺忪的小叶先森Diting0x 共同完成,在原文的基础上添加了许多精华注释,帮助大家理解。

感谢好友小伙伴-小伙伴儿 skyline09_ 阅读了初稿,并给出宝贵的意见。

原文: http://preshing.com/20120305/implementing-a-recursive-mutex/

本文遵守Attribution-NonCommercial-NoDerivatives 4.0 International License (CC BY-NC-ND 4.0)
仅为学习使用,未经博主同意,请勿转载
本系列文章已经获得了原作者preshing的授权。版权归原作者和本网站共同所有

攒点碎银娶媳妇