func add (a *int) *int {
*a++ // 线程不安全
return a
}
这是一段很典型的线程不安全的代码示例, 在并发场景下, a 的结果是不确定的, 大概率会小于 1000, 原因是 a++
并非原子操作, 会存在同时有两个协程读取到 a 的值是相同的情况, 执行 a++
之后再重新回写时, a的值也是相同的, 想要变为线程安全, 就需要在操作临界资源之前加锁;
Mutex
在操作共享资源之前加锁, 然后操作完临界资源之后释放锁, 保证同时只有一个协程操作临界资源;
var mu sync.Mutex
func addSafe(a *int) *int {
mu.Lock() // 加锁
defer mu.Unlock() // 释放锁
*a++
return a
}
锁在多线程或多进程环境中实现资源的互斥访问。当一个线程或进程想要访问某个共享资源(如数据结构、文件等)时,它必须首先尝试获取该资源对应的锁。如果锁未被其他线程或进程占用,那么请求的线程或进程将获得锁并继续执行;否则,它将等待,直到锁被释放。
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}
type Mutex struct {
state int32 // state 表示当前互斥锁的状态
sema uint32 // sema 是用于控制锁状态的信号量
}
在 golang
的实现中, 如果通过 CompareAndSwapInt32
也就是 CAS
能获取到锁, 表明协程已经能拿到锁了, 此时直接返回;
如果通过 CAS
无法拿到锁, 说明有其他协程拿到锁还未释放, 这时候的选择就会很多了, 可以原地重试, 也就是自旋的特性, 也可以进入队列等待, 等待被唤醒;
一般来说, 编程语言的实现中, 会优先选择自旋, 因为此时还是在用户态, 但是并发量很大时, 自旋反而会降低性能.
// lockSlow() 代码片段
...
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
...
}
当 CAS
和自旋之后依然没有获取到锁, 为了防止大批量协程自旋, lockSlow()
中, runtime_SemacquireMutex(&m.sema, queueLifo, 1)
给出了一种新的方式去获取锁, 从入参来看, 传入了一个信号量, 是否需要 LIFO
的队列, 和一个值, 实现思路大概是如果没获取到锁, 就进入等待队列, 等待被唤醒;
// func runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int32)
TEXT runtime·runtime_SemacquireMutex(SB), NOSPLIT, $0-12
MOVL addr+0(FP), BP // 将互斥锁的地址加载到 BP 寄存器
MOVL lifo+4(FP), AX // 将 lifo 参数加载到 AX 寄存器
MOVL skipframes+8(FP), CX // 将 skipframes 参数加载到 CX 寄存器
// 尝试原子地获取互斥锁
acquire:
MOVL $1, SI // 将 SI 寄存器设置为 1
XCHGL SI, 0(BP) // 使用 XCHGL 指令原子地交换 SI 和互斥锁的值
TESTL SI, SI // 检查 SI 寄存器的值(即原互斥锁的值)
JZ acquired // 如果 SI 寄存器的值为 0,表示互斥锁已被获取,跳转到 acquired 标签
// 互斥锁已被占用,阻塞当前协程(goroutine)
MOVL BP, 0(SP) // 将互斥锁的地址存储到栈上
MOVL AX, 4(SP) // 将 lifo 参数存储到栈上
MOVL CX, 8(SP) // 将 skipframes 参数存储到栈上
CALL runtime·semacquire1(SB) // 调用 semacquire1 函数阻塞当前协程
JMP acquire // 当协程被唤醒时,跳回到 acquire 标签尝试再次获取互斥锁
acquired:
// 互斥锁已被成功获取,执行后续代码
// ...
RET
CAS
Mutex
加锁为什么会影响性能呢? 基于Mutex
的原理: 当一个线程尝试获取已被其他线程持有的锁时,它将被阻塞。阻塞的线程会进入睡眠状态,等待锁被释放, 当锁被释放时,操作系统会唤醒等待锁的线程, 使其重新尝试获取锁. 一般来说, 等待的线程会放到队列中等待, 以保障后续唤醒的顺序。
这里耗时的主要原因是线程被阻塞进入睡眠状态, 而后又被唤醒, 这些操作都需要切换到内核态 ,如果说线程在没获取到锁时不进入睡眠状态, 而是原地重试, 那是不是就能提高效率呢? 这就是 CAS
的思路, 先来看看 golang
中对 CAS
的相关实现:
// CompareAndSwapInt32 executes the compare-and-swap operation for an int32 value.
// Consider using the more ergonomic and less error-prone [Int32.CompareAndSwap] instead.
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
这种只有函数声明,没有函数实现的,通常意味着函数的实现在golang
汇编中, 也就是还是需要借助系统的能力实现 CAS
, 从汇编指令上看, 对于 addr 来说, 如果 addr == old , 就把 addr 设置为 new;
在 X86 架构下:
// func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
TEXT runtime·CompareAndSwapInt32(SB), NOSPLIT, $0-13
MOVL addr+0(FP), BP // 将目标地址加载到 BP 寄存器
MOVL old+4(FP), AX // 将 old 参数加载到 AX 寄存器
MOVL new+8(FP), CX // 将 new 参数加载到 CX 寄存器
// 使用 CMPXCHGL 指令原子地比较和交换值
MOVL AX, 0(BP) // 将 AX 寄存器的值存储到目标地址
LOCK CMPXCHGL CX, 0(BP) // 原子地比较和交换目标地址和 CX 寄存器的值
// 设置返回值
SETE AL // 如果比较和交换成功,将 AL 寄存器设置为 1(true),否则设置为 0(false)
MOVB AL, swapped+12(FP) // 将 AL 寄存器的值存储到返回值中
RET
这段汇编中, 最重要的一点就是 LOCK CMPXCHGL
, LOCK
前缀用于确保指令在执行过程中不会被其他处理器或线程中断。当 LOCK 前缀与 CMPXCHGL 指令结合使用时,确保了在比较和交换操作数的过程中,内存位置的值不会被其他处理器或线程修改,从而实现了原子性。
从 CompareAndSwapInt32
函数的返回结果来看, 执行函数可能会失败, 也就是没获取到锁, 往往这种情况会进行重试, 直到执行完成, 也就是自旋的特性; 当然这种自旋是在用户态, 无需切换到内核态, 因此性能上相对更好;
比如在 java
语言中对于 CAS
的使用:
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
ABA 问题
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
在 CAS 中, 对于 addr 来说, 如果 addr == old , 就把 addr 设置为 new, 在此过程中, 依然存在问题;
- 当线程 T1 进入 CAS 之前, 读取了一个 old 值, 假设为 A;
- 线程 T1 时间片耗尽, 线程 T2 开始执行;
- T2 执行 CAS, 把 addr 设置为 B, 然后又执行 CAS , 把 addr 修改回 A;
- 当 T1 继续执行时, 发现 addr 依然为 old A, T1 会认为 addr 没有被修改过;
如果要解决 ABA 问题, 一个可行的思路就是给数据加上一个版本号, 不仅是对比数据的值, 还需要对比数据的版本号, 这个思路其实就是从最底层的资源做了隔离, 在并发的场景中, 如果对数据正确性要求很高, 临界资源兜底策略很重要;
大批量线程自旋问题
基于 CAS 的特性, 没有修改成功可以进行重试, 也就是自旋, 如果在线程数量很多的场景下, 反而会影响性能, 因为会有大量的线程不断自旋, 而且随着线程数量, 自旋的影响会持续放大, 因此在 Java 中, synchronized
会随着并发量的增加, 会有锁升级的机制, 在golang 的 Mutex 实现中, 也是从 CAS 逐渐升级到最后阻塞线程.
总结
-
从编程语言的锁的实现上来看, 从用户态很难实现锁的机制, 因为线程调度是由内核完成的, 不确定当前线程何时耗尽时间片, 如果读到临界资源还没来得及写入, 此时中断, 其他线程获取到 CPU 时间片后, 会读取到相同的临界资源, 当临界资源被修改后, 之前线程拿到的临界资源还是历史值, 此时再修改就是基于错误的值进行修改;
-
当借助系统调用时, 会进入内核态, 无论是
XCHGL
还是CMPXCHGL
, 这些指令都是原子操作; 从操作系统的角度而言, 锁的本质就是一个内存中的标识, 锁的竞争就是通过原子指令去修改这个标识位, 修改成功则表现为获取到了锁, 修改失败则可以自行决定, 自旋或者阻塞, 表现为不同的锁的形式, 自旋锁, 排他锁;
参考
https://zh.wikipedia.org/wiki/%E4%BA%92%E6%96%A5%E9%94%81
https://www.infoq.com/presentations/go-locks/
https://time.geekbang.org/column/article/377913
https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-sync-primitives/#mutex