关于锁的思考和总结(一)

2024/05/15

Tags: Lock Mutex CAS Go

func add (a *int) *int { 
  *a++ // 线程不安全
  return a
}

这是一段很典型的线程不安全的代码示例, 在并发场景下, a 的结果是不确定的, 大概率会小于 1000, 原因是 a++ 并非原子操作, 会存在同时有两个协程读取到 a 的值是相同的情况, 执行 a++之后再重新回写时, a的值也是相同的, 想要变为线程安全, 就需要在操作临界资源之前加锁;

Mutex

在操作共享资源之前加锁, 然后操作完临界资源之后释放锁, 保证同时只有一个协程操作临界资源;

var mu sync.Mutex
func addSafe(a *int) *int {
  mu.Lock() // 加锁
  defer mu.Unlock() // 释放锁
  *a++
  return a
}

锁在多线程或多进程环境中实现资源的互斥访问。当一个线程或进程想要访问某个共享资源(如数据结构、文件等)时,它必须首先尝试获取该资源对应的锁。如果锁未被其他线程或进程占用,那么请求的线程或进程将获得锁并继续执行;否则,它将等待,直到锁被释放。

// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
  // Fast path: grab unlocked mutex.
  if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
    if race.Enabled {
      race.Acquire(unsafe.Pointer(m))
    }
    return
  }
  // Slow path (outlined so that the fast path can be inlined)
  m.lockSlow()
}

type Mutex struct {
  state int32 // state 表示当前互斥锁的状态
  sema  uint32 // sema 是用于控制锁状态的信号量
}

golang 的实现中, 如果通过 CompareAndSwapInt32 也就是 CAS 能获取到锁, 表明协程已经能拿到锁了, 此时直接返回;

如果通过 CAS 无法拿到锁, 说明有其他协程拿到锁还未释放, 这时候的选择就会很多了, 可以原地重试, 也就是自旋的特性, 也可以进入队列等待, 等待被唤醒;

一般来说, 编程语言的实现中, 会优先选择自旋, 因为此时还是在用户态, 但是并发量很大时, 自旋反而会降低性能.

// lockSlow() 代码片段
...
if atomic.CompareAndSwapInt32(&m.state, old, new) {
  if old&(mutexLocked|mutexStarving) == 0 {
    break // locked the mutex with CAS
  }
  // If we were already waiting before, queue at the front of the queue.
  queueLifo := waitStartTime != 0
  if waitStartTime == 0 {
    waitStartTime = runtime_nanotime()
  }
  runtime_SemacquireMutex(&m.sema, queueLifo, 1)
 ...
}

CAS 和自旋之后依然没有获取到锁, 为了防止大批量协程自旋, lockSlow() 中, runtime_SemacquireMutex(&m.sema, queueLifo, 1) 给出了一种新的方式去获取锁, 从入参来看, 传入了一个信号量, 是否需要 LIFO 的队列, 和一个值, 实现思路大概是如果没获取到锁, 就进入等待队列, 等待被唤醒;

// func runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int32)
TEXT runtime·runtime_SemacquireMutex(SB), NOSPLIT, $0-12
    MOVL addr+0(FP), BP  // 将互斥锁的地址加载到 BP 寄存器
    MOVL lifo+4(FP), AX  // 将 lifo 参数加载到 AX 寄存器
    MOVL skipframes+8(FP), CX  // 将 skipframes 参数加载到 CX 寄存器

    // 尝试原子地获取互斥锁
acquire:
    MOVL $1, SI          // 将 SI 寄存器设置为 1
    XCHGL SI, 0(BP)      // 使用 XCHGL 指令原子地交换 SI 和互斥锁的值
    TESTL SI, SI         // 检查 SI 寄存器的值(即原互斥锁的值)
    JZ acquired          // 如果 SI 寄存器的值为 0,表示互斥锁已被获取,跳转到 acquired 标签

    // 互斥锁已被占用,阻塞当前协程(goroutine)
    MOVL BP, 0(SP)       // 将互斥锁的地址存储到栈上
    MOVL AX, 4(SP)       // 将 lifo 参数存储到栈上
    MOVL CX, 8(SP)       // 将 skipframes 参数存储到栈上
    CALL runtime·semacquire1(SB)  // 调用 semacquire1 函数阻塞当前协程
    JMP acquire          // 当协程被唤醒时,跳回到 acquire 标签尝试再次获取互斥锁

acquired:
    // 互斥锁已被成功获取,执行后续代码
    // ...
    RET

CAS

Mutex 加锁为什么会影响性能呢? 基于Mutex的原理: 当一个线程尝试获取已被其他线程持有的锁时,它将被阻塞。阻塞的线程会进入睡眠状态,等待锁被释放, 当锁被释放时,操作系统会唤醒等待锁的线程, 使其重新尝试获取锁. 一般来说, 等待的线程会放到队列中等待, 以保障后续唤醒的顺序。

这里耗时的主要原因是线程被阻塞进入睡眠状态, 而后又被唤醒, 这些操作都需要切换到内核态 ,如果说线程在没获取到锁时不进入睡眠状态, 而是原地重试, 那是不是就能提高效率呢? 这就是 CAS 的思路, 先来看看 golang 中对 CAS 的相关实现:

// CompareAndSwapInt32 executes the compare-and-swap operation for an int32 value.
// Consider using the more ergonomic and less error-prone [Int32.CompareAndSwap] instead.
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)

这种只有函数声明,没有函数实现的,通常意味着函数的实现在golang汇编中, 也就是还是需要借助系统的能力实现 CAS, 从汇编指令上看, 对于 addr 来说, 如果 addr == old , 就把 addr 设置为 new;

在 X86 架构下:

// func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
TEXT runtime·CompareAndSwapInt32(SB), NOSPLIT, $0-13
    MOVL addr+0(FP), BP  // 将目标地址加载到 BP 寄存器
    MOVL old+4(FP), AX   // 将 old 参数加载到 AX 寄存器
    MOVL new+8(FP), CX   // 将 new 参数加载到 CX 寄存器

    // 使用 CMPXCHGL 指令原子地比较和交换值
    MOVL AX, 0(BP)       // 将 AX 寄存器的值存储到目标地址
    LOCK CMPXCHGL CX, 0(BP)  // 原子地比较和交换目标地址和 CX 寄存器的值

    // 设置返回值
    SETE AL                // 如果比较和交换成功,将 AL 寄存器设置为 1(true),否则设置为 0(false)
    MOVB AL, swapped+12(FP)  // 将 AL 寄存器的值存储到返回值中
    RET

这段汇编中, 最重要的一点就是 LOCK CMPXCHGL , LOCK 前缀用于确保指令在执行过程中不会被其他处理器或线程中断。当 LOCK 前缀与 CMPXCHGL 指令结合使用时,确保了在比较和交换操作数的过程中,内存位置的值不会被其他处理器或线程修改,从而实现了原子性。

CompareAndSwapInt32 函数的返回结果来看, 执行函数可能会失败, 也就是没获取到锁, 往往这种情况会进行重试, 直到执行完成, 也就是自旋的特性; 当然这种自旋是在用户态, 无需切换到内核态, 因此性能上相对更好;

比如在 java 语言中对于 CAS 的使用:

public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}

ABA 问题

func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)

在 CAS 中, 对于 addr 来说, 如果 addr == old , 就把 addr 设置为 new, 在此过程中, 依然存在问题;

  1. 当线程 T1 进入 CAS 之前, 读取了一个 old 值, 假设为 A;
  2. 线程 T1 时间片耗尽, 线程 T2 开始执行;
  3. T2 执行 CAS, 把 addr 设置为 B, 然后又执行 CAS , 把 addr 修改回 A;
  4. 当 T1 继续执行时, 发现 addr 依然为 old A, T1 会认为 addr 没有被修改过;

如果要解决 ABA 问题, 一个可行的思路就是给数据加上一个版本号, 不仅是对比数据的值, 还需要对比数据的版本号, 这个思路其实就是从最底层的资源做了隔离, 在并发的场景中, 如果对数据正确性要求很高, 临界资源兜底策略很重要;

大批量线程自旋问题

基于 CAS 的特性, 没有修改成功可以进行重试, 也就是自旋, 如果在线程数量很多的场景下, 反而会影响性能, 因为会有大量的线程不断自旋, 而且随着线程数量, 自旋的影响会持续放大, 因此在 Java 中, synchronized 会随着并发量的增加, 会有锁升级的机制, 在golang 的 Mutex 实现中, 也是从 CAS 逐渐升级到最后阻塞线程.

总结

参考

https://zh.wikipedia.org/wiki/%E4%BA%92%E6%96%A5%E9%94%81

https://www.infoq.com/presentations/go-locks/

https://time.geekbang.org/column/article/377913

https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-sync-primitives/#mutex