本篇将会探讨锁的实现,篇幅相比之前的笔记有点长,大家见谅。

锁的基本概念

锁其实一个变量,我们在里面保存了一些重要的信息(包括锁的状态),比如说当前是哪个线程拥有了这把锁,或者是一个想请求这把锁的队列,但这些信息是存在 lock_t 里的。(熟悉 C 语言的盆友可以猜出,这是个结构体)

锁提供给程序员最小化调度机制。

道理就是,具体选择哪个进程、线程执行代码是有操作系统控制的,但是我们可以控制当一个我们不想要执行的进程、线程占用了 CPU 的时候,执行那些我们不想它运行的代码

这就是锁的意义。

Pthread Locks

POSIX 库对锁使用的名字是 mutex,含义是在线程间提供 互斥性(mututal exclusion)

通常,加锁范围不宜太大,这有助于提高并发,粗粒度锁(coarse-grained)和细粒度锁(fine-grained)的概念

设计一把锁

我们又回来了,再次需要硬件老朋友的帮助,不过,这本书中,不会介绍硬件是如何帮助操作系统的,不会去介绍那些指令,只会介绍在使用过程中,操作系统是如何介入,帮助我们构建一个复杂的 locking library。

评估一把锁

首先,设计出来的锁要完成它既有的使命,那就是提供 mutual exclusion。

其次,是公平性,没有竞争这把锁的线程都能够得到公平的机会去争取么?

最后,才是性能的衡量。

控制 Interrupts

这里的 Interrupts 其实我想翻译成 中断 的,但是又怕和 OS 的中断混淆,就复制了英文过来。

最早期实现锁的方式就是控制 Interrupts。

void lock() {
    DisableInterrupts();
}

void unlock() {
    EnableInterrupts();
}

显然,这种方式非常简单,但是缺点非常多,比如说交给用户进程的控制权太大、用户可以“玩弄”CPU,而且需要要切换到特权模式下,再者这种设计不能再多处理器下工作。

Test And Set (Atomic Exchange)

下面的代码是是一次尝试,

01

这种方法也叫做 test-and-set instruction,也叫做 atomic exchange

这个 idea 挺简单的,第一个进入 critical section 的线程会调用 lock(),这时,lock()函数里会尝试 test flag 当前的值是否等于 1(当前 case 下不等于),因此会把 flag 变量置为 1 表示着此时已经拿到了这个锁;而调用 unlock() 时候把 flag 变量置为 0,意味着释放了这个锁。

如果另外一个线程碰巧这时想要争夺这个锁,此时他会停留在 spin-wait 这段代码永久循环,直到操作系统把这个线程换出去,这段时间里 CPU 其实是空转的;或者是拥有锁的线程此时释放了锁,flag=0 了,这时争夺锁的线程就会获得锁。

然而,上面这段代码是有正确性问题的,它并不能保证 mutual exclusion。看下图的执行流:

02

两个线程同时争夺一个锁,此时两个线程的执行流都到了 lock() 函数的 while (flag == 1) 阶段,然而只有一个线程继续往下走,另外一个线程被 Interrupt 了,尽管它们都认为自己已经看到了能成功获取到这个锁的讯息:flag == 1。

根本原因其实还是因为 while (flag == 1) 这段代码不是原子的

设计一把 Worked 的 Spin Lock

Spin Lock 我这里翻译成什么好呢?自旋锁?——以前接触过这个名词,但是不确定是否是同一个概念,因此就留着英文原文了。

上面的那个例子是个好的尝试,但是没有硬件的帮助是步行的,硬件提供了一个叫做 test-and-set 的指令,它的逻辑类似下面的代码:

int TestAndSet(int * old_ptr, int new) {
    int old = *old_ptr; // fetch old value at old_ptr
    *old_ptr = new;     // store ’new’ into old_ptr
    return old; // return the old value
}

在 SPARC 上,这个指令叫做 load/store unsigned byte instruction(ldstub),在 x86 机器上这个指令叫做 atomic exchange instruction(xchg)。

The reason it is called “test and set” is that it enables you to “test” the old value (which is what is returned) while simultaneously “setting” the memory location to a new value.

这也是解决了之前代码面临的问题。

下面锁的实现是可行的:

typedef struct __lock_t {
    int flag;
} lock_t;

void init(lock_t * lock) {
    // 0 indicates that lock is available, 1 that it is held
    lock->flag = 0;
}

void lock(lock_t * lock) {
    while (TestAndSet(&lock->flag, 1) == 1)
        ; // spin-wait (do nothing)
}

void unlock(lock_t * lock) {
    lock->flag = 0;
}

之前设计的锁的代码锁面临的问题已经不见了,大家可以想象一下这个锁的正确性的场景。因为 TestAndSet() 同时做了两件事,*test* 和 set,一旦 test 成功,立马 set,此时另外一个线程也想 test,但是很可惜,它这时拿到的值已经是 1 只能继续 spin wait,这就是之前设计的锁失败的原因,它的这两个功能是分步执行的。

这个锁叫做 spin lock,是因为如果拿不到锁它就只会 spin。

评估上面的锁的实现

前面我们已经验证过次设计是正确的,它能提供 mutual exclusion。

再次,它的公平性其实是有操作系统的调度策略保证。

性能方面,这个可能有点堪忧,在单核处理器上面,一个想抢锁却不得的线程将会使 CPU 白白浪费。但在多核处理器上这个情况能会好些(其实我觉得不会好,考虑下很多进程的场景)。

Compare-And-Swap

硬件提供的另外一个有帮助的指令叫做 compare-and-swap 指令(SPARC 的叫法),或者是 compare-and-exchange 指令(x86 的叫法)

下卖的代码显示了这个指令完成的逻辑:

int CompareAndSwap(int * ptr, int expected, int new) {
    int actual = *ptr;
    if (actual == expected)
        *ptr = new;
    return actual;
}

完成的功能其实和 test-and-set 指令差不多,lock() 函数也是。

void lock(lock_t * lock) {
    while (CompareAndSwap(&lock->flag, 0, 1) == 1)
        ;   // spin
}

Load-Linked and Store-Conditional

void lock(lock_t * lock) {
    while (1) {
        while (LoadLinked(&lock->flag) == 1)
            ;   // spin until it's zero
        if (StoreConditional(&lock->flag, 1) == 1)
            reuturn;    // if set-it-to-1 was a success: all done
                        // otherwise: try it all over again
    }
}

void unlock(lock_t * lock) {
    lock->flag = 0;
}

上面的代码可以并到一样:

void lock(lock_t * lock) {
    while (LoadLinked(&lock->flag)||!StoreConditional(&lock->flag, 1))
        ;   // spin
}

这也是一个实现方式吧,但是个人觉得和前面两种方式比较起来没有什么优势,也没有什么劣势,可能这种方法代码更多一些,不如前面的效率高吧,毕竟在内核这种资源很紧张的场景下,前两种方法更有优势。

Fetch-And-Add

最后一个要提起的硬件支持的原语是 fetch-and-add 指令,它会原子地对一个变量加一。

int FetchAndAdd(int * ptr) {
    int old = *ptr;
    *ptr = old + 1;
    return old;
}

此时锁的实现略有不同,也叫做 Ticket Locks。

typedef struct __lock_t {
    int ticket;
    int turn;
} lock_t;

void lock_init(lock_t * lock) {
    lock->ticket = 0;
    lock->turn = 0;
}

void lock(lock_t * lock) {
    int myturn = FetchAndAdd(&lock->ticket);
    while (lock->turn != myturn)
        ;   // turn
}

void unlock(lock_t * lock) {
    FetchAndAdd(&lock->turn);
}

Too Much Spinning

前面的多种锁的实现方法,在争抢锁失败的时候,都会 spin,而这种方式会比较浪费 CPU。下面的问题是想想有什么办法替换 spin 或者减少 spin 的时间。

一个简单的办法:Just Yield

那就是放弃 CPU,去休眠,等待操作系统 OS 的下一次调度

void init() {
    flag = 0;
}

void lock() {
    while (TestAndSet(&flag, 1) == 1)
        yield();
}

void unlock() {
    flag = 0;
}

这种方法有好有坏吧,下一次的被调度依赖于 OS 的调度策略是怎样的,如果是轮询的话,100 个线程争抢 一个锁,那么将会有 99 个线程,主动放弃 CPU,此时情况是非常糟糕的。

用一个队列吧

上面的线程直接放弃了 CPU 的执行权,把控制权直接交给了 OS,万一操作系统做了一个糟糕的觉得,就是上面提到的那样,那就很不划算。因此,我们简单起见,使用一个队列维持一个先进先出队列。

如果当前争抢锁的失败了,它会主要去休眠,并把自己加入当前锁维护的队列当中;等到当前锁的拥有者释放锁的时候,取出队首的线程ID并且唤醒它。

在 Solaris 系统中,park() 是把当前线程休眠的系统调用,unpark() 则相反。

typedef struct __lock_t {
    int flag;
    int guard;
    queue_t * q;
} lock_t;

void lock_init(lock_t * m) {
    m->flag = 0;
    m->guard = 0;
    queue_init(m->q);
}

void lock(lock_t * m) {
    while (TestAndSet(&m->guard, 1) == 1) {
        // spin
    }
    if (m->flag == 0) {
        m->flag = 1; // lock is acquired,这才是真正的锁的标志
        m->guard = 0;
    } else {
        queue_add(m->q, gettid());
        m->guard = 0;
        park();
    }
}

void unlock(lock_t * m) {
    while (TestAndSet(&m->guard, 1) == 1) {
        // spin
    }
    if (queue_empty(m->q)) {
        m->flag = 0; // let go of lock; no one wants it
    } else {
        unpark(queue_remove(m->q)); // hold lock (for next thread!)
    }
    m->guard = 0;
}

这里可以注意下,这里的 spin 时间可以大大缩短,因为有队列可以暂存起来嘛。当 m->flag = 1 时,就表示锁已经获取了,就需要 m->guard = 1 继续来守护了,此时可以释放。这时,如果有线程被唤醒或者是在 spin 阶段跟着 spinning 着的线程 继续往下走了(因为 m->guard 被释放出来了嘛),此时它们会走到 else 分支,把自己加入锁把持的队列当中。

不同的操作系统,不同的线程支持

目前为止我们仅仅介绍了一种类型的线程模型用来介绍我们的线程是什么。其实在不同的操作系统平台线程有不同的实现,大致相似,细节不同。

下面是 Linux 的 futex 锁实现,

void mutex_lock (int * mutex) {
    int v;
    / * Bit 31 was clear, we got the mutex (this is the fastpath) */
    if (atomic_bit_test_set (mutex, 31) == 0)
        return
    atomic_increment (mutex);
    while (1) {
        if (atomic_bit_test_set (mutex, 31) == 0) {
            atomic_decrement (mutex);
            return;
        }

        v = * mutex;
        if (v >= 0)
            continue;
        futex_wait (mutex, v);
    }
}

void mutex_unlock (int * mutex) {
    / * Adding 0x80000000 to the counter results in 0 if and only if there are not other interested threads * /

    if (atomic_add_zero (mutex, 0x80000000))
        return;

    /* There are other threads waiting for this mutex, wake one of them up. * /
    futex_wake (mutex);
}

两阶段锁

两阶段是哪两阶段呢?第一,spin 阶段;第二,入队列阶段。(如同我们前面介绍的那样)

Linux 的锁是两阶段锁的实现。

两阶段是有意义的,特别是第一阶段,在锁将要释放的阶段,而且我们编程应该遵循细粒度锁的设计,减少 critical section,这也是加速程序执行的一个方法,这样其实就意味着锁的时间尽可能少,因此不要那么快放弃 CPU,spin 一会儿是有意义的。

但不是绝对,只能说 Linux 操作系统的这么认为的。

另外,两阶段锁同样也是混合设计的一个例子(hybrid approach),这个设计在我们前面的文章中有提到,计算机世界里很常见。

引用

《Locks》

Comments
Write a Comment