文章

linux内核笔记 自旋锁(spinlock)

linux内核笔记 自旋锁(spinlock)

自旋锁 spin_lock

spin_lock 是一种 忙等(busy waiting)锁。 当一个 CPU 想要进入临界区,但锁已被其他 CPU 持有时,它不会睡眠,而是反复循环检查锁状态,直到锁被释放。 它适合:临界区很短不可睡眠 的场景(比如中断上下文)。

锁的结构

1
2
3
4
5
6
7
8
9
10
11
12
typedef struct spinlock {
    union {
        struct raw_spinlock rlock;
#ifdef CONFIG_DEBUG_LOCK_ALLOC
# define LOCK_PADSIZE (offsetof(struct raw_spinlock, dep_map))
        struct {
            u8 __padding[LOCK_PADSIZE];
            struct lockdep_map dep_map; // debug相关 锁依赖图
        };
#endif
    };
} spinlock_t;

raw_spinlock_t中的成员是一个架构相关的结构:

1
2
3
4
5
6
7
8
9
10
typedef struct raw_spinlock {
    arch_spinlock_t raw_lock;
#ifdef CONFIG_DEBUG_SPINLOCK
    unsigned int magic, owner_cpu;
    void *owner;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
    struct lockdep_map dep_map;
#endif
} raw_spinlock_t;

arch_spinlock_t 在大部分架构上 它是队列自旋锁 qspinlock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
typedef struct qspinlock {
    union {
        atomic_t val;

#ifdef __LITTLE_ENDIAN
        struct {
            u8  locked; 
            u8  pending;  // atomic_t是32位  通过union将32位的结构分散到各个字段上以快速访问
        };
        struct {
            u16 locked_pending;
            u16 tail;
        };
#else
        struct {
            u16 tail;
            u16 locked_pending;
        };
        struct {
            u8  reserved[2];
            u8  pending;
            u8  locked;
        };
#endif
    };
} arch_spinlock_t;

spinlock_structure spinlock的结构

自旋锁的函数实现

内核代码中最常出现的核心自旋锁内联函数:spin_lock(spinlock_t *lock)

1
2
3
4
5
6
7
8
9
10
11
12
#if defined(CONFIG_SMP) || defined(CONFIG_DEBUG_SPINLOCK)
# include <linux/spinlock_api_smp.h>    // 多处理器结构
#else
# include <linux/spinlock_api_up.h>
#endif

#define raw_spin_lock(lock) _raw_spin_lock(lock)

static __always_inline void spin_lock(spinlock_t *lock)
{
    raw_spin_lock(&lock->rlock);
}

重点关注_raw_spin_lock在多处理器(Symmetric Multi-Processing )结构中的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#ifdef CONFIG_INLINE_SPIN_LOCK
#define _raw_spin_lock(lock) __raw_spin_lock(lock)
#endif

static inline void __raw_spin_lock(raw_spinlock_t *lock)
{
    preempt_disable();  // 禁用抢占

    // 宏 默认展开为:do { } while (0) 如果配置了CONFIG_LOCKDEP,则会检测死锁依赖
    spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
    /* 
     * do_raw_spin_trylock() 是一个**尝试获取锁(不阻塞)**的函数;
     * do_raw_spin_lock() 是一个阻塞自旋等待锁释放的函数;
     * LOCK_CONTENDED() 先尝试一次非阻塞的 trylock,再在失败时进入阻塞路径并记录锁竞争情况 
     *     -> if (!try(_lock)) { lock(_lock);}
     */
    LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}

回到spinlock.h,发现阻塞(do_raw_spin_lock)与非阻塞(do_raw_spin_trylock)的lock函数将逻辑交到了架构相关的arch_spin_lock(arch_spinlock_t)arch_spin_trylock(arch_spinlock_t)中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static inline void do_raw_spin_lock(raw_spinlock_t *lock) __acquires(lock)
{
    __acquire(lock);                // 同检测死锁依赖的切入口 在不配置的情况是个空
    arch_spin_lock(&lock->raw_lock);
    mmiowb_spin_lock();             // 确保在特殊架构上所有对 MMIO 寄存器的写操作在释放锁之前被提交到设备
}

static inline int do_raw_spin_trylock(raw_spinlock_t *lock)
{
    int ret = arch_spin_trylock(&(lock)->raw_lock);
    if (ret)
        mmiowb_spin_lock();
    return ret;
}

arch_spin_lockarch_spin_trylock 不同架构下的实现不同,在了解几种不同的锁实现之前,先来讨论下的同步锁的本质:内存顺序性

内存顺序性 (memory ordering)

锁的本质不仅是互斥,还有内存顺序保证

线程 A 在临界区写入的数据,如果没有内存顺序保证,线程 B 在获取锁后不一定能看到 A 的写入。这是非常致命的。

memory_ordering 并发中的内存顺序性

为防止多核环境下CPU将load与store的指令乱序执行,kernel使用 store-release / load-acquire语义来保证。

x86属于强内存模型x86-tso,仅允许store-load重排,但arm64允许所有类型的指令重排,在锁的场景下,x86指令原生支持,arm64需要通过内存屏障保证

acquire_release store-release与load-acquire`

store-release 之前的所有写一定在 load-acquire之后的所有读 之前对其他CPU可见。

票据自旋锁 ticket_spinlock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#define arch_spin_lock(l)       ticket_spin_lock(l)
#define arch_spin_trylock(l)    ticket_spin_trylock(l)
#define arch_spin_unlock(l)     ticket_spin_unlock(l)

static __always_inline void ticket_spin_lock(arch_spinlock_t *lock)
{
    /* 
     * 32位 前16位为当前锁值  后16位为拿到的票据,当相等时 视为拿到锁
     * 将lock->val原子+1 后16位视为当前线程拿到的票据
     */
    u32 val = atomic_fetch_add(1<<16, &lock->val);
    u16 ticket = val >> 16;

    if (ticket == (u16)val)
        return;

    /*
     * 反复(自旋)读取 变量的值,直到指定的条件表达式 (c) 为真为止。
     * acquire语义确保读取操作的内存顺序性,避免在多 CPU 环境下出现数据竞争
     * 内存顺序性 rcpc (针对不同的val内存地址的指令可重排序)
     */
    atomic_cond_read_acquire(&lock->val, ticket == (u16)VAL);
    smp_mb();
}

static __always_inline bool ticket_spin_trylock(arch_spinlock_t *lock)
{
    u32 old = atomic_read(&lock->val);
    if ((old >> 16) != (old & 0xffff))
        return false;
    /**
     * 内存顺序性 rcsc 内存指令全局一致性
     * 比较 lock->val 和 old  如果相等 更新为v 返回true 否则返回false
     */  
    return atomic_try_cmpxchg(&lock->val, &old, old + (1<<16));
}

static __always_inline void ticket_spin_unlock(arch_spinlock_t *lock)
{
    u16 *ptr = (u16 *)lock + IS_ENABLED(CONFIG_CPU_BIG_ENDIAN);
    u32 val = atomic_read(&lock->val);
    smp_store_release(ptr, (u16)val + 1); // store_release语义  将锁的票据+1
}

atomic_cond_read_acquire宏的实现比较简单:循环读取变量值,如果不满足条件则调用cpu_relax降低功耗,因此票据自旋锁是不公平的

1
2
3
4
5
6
7
8
9
10
11
#define smp_cond_load_relaxed(ptr, cond_expr) ({        \
    typeof(ptr) __PTR = (ptr);                          \
    __unqual_scalar_typeof(*ptr) VAL;                   \
    for (;;) {                                          \
        VAL = READ_ONCE(*__PTR);                        \
        if (cond_expr)                                  \
            break;                                      \
        cpu_relax();                                    \
    }                                                   \
    (typeof(*ptr))VAL;                                  \
})

队列自旋锁 qspinlock

队列自旋锁结构

qspinlock_bits qspinlock的32位结构

图中可以看出,qspinlock结构中的32-bit字段val被分为3个部分:

MCS队列位 pending位 锁位

队列自旋锁是kernel实现的一种兼具轻度自旋➕公平排队的一种自旋锁实现,并解决了缓存行颠簸(Cacheline Bouncing)问题,也是现代内核中自旋锁的默认实现。

缓存行颠簸: 在传统的自旋锁中,当一个锁被多个 CPU 竞争时,所有等待锁的 CPU 都会在同一个共享内存地址(即锁变量本身)上进行自旋(不断尝试写入或读取)。每次一个 CPU 尝试写入该共享地址时,都会使其他所有 CPU 缓存中该地址所在的缓存行(Cacheline)失效(Invalidate)。这导致缓存行在不同 CPU 的 L1/L2 缓存中频繁地来回传输,即缓存行颠簸,严重浪费总线带宽,极大地降低系统性能。

MCS锁的解决方案: 将自旋变量分散到每个等待线程的本地内存中,等待者只在自己的本地内存地址上自旋(只读)。当锁释放时,释放者会点对点(Point-to-Point)地通知队列中的下一个等待者。

队列自旋锁这里的设计思路是: 最多允许一个线程(pending)抢锁,抢不到就老老实实排 MCS 队列。

  • 0 个等待者 → 直接 cmpxchg 拿锁(快路径)
  • 1 个等待者 → 用 pending 位优化(省一次 cacheline 传递)
  • ≥2 个等待者 → 退化成经典 MCS 队列(公平、可扩展)
MCS队列结构 (Mellor-Crummey and Scott Lock)
1
2
3
4
5
struct mcs_spinlock {
    struct mcs_spinlock *next;
    int locked; /* 1 表示当前已获取锁 */
    int count;  /* CPU队列次数统计 */
};
队列自旋锁的位掩码与偏移
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#define _Q_LOCKED_OFFSET    0
#define _Q_LOCKED_BITS      8
#define _Q_LOCKED_MASK      _Q_SET_MASK(LOCKED)

#define _Q_PENDING_OFFSET   (_Q_LOCKED_OFFSET + _Q_LOCKED_BITS)
#define _Q_PENDING_BITS     8
#define _Q_PENDING_MASK     _Q_SET_MASK(PENDING)

#define _Q_TAIL_IDX_OFFSET  (_Q_PENDING_OFFSET + _Q_PENDING_BITS)
#define _Q_TAIL_IDX_BITS    2
#define _Q_TAIL_IDX_MASK    _Q_SET_MASK(TAIL_IDX)

#define _Q_TAIL_CPU_OFFSET (_Q_TAIL_IDX_OFFSET + _Q_TAIL_IDX_BITS)
#define _Q_TAIL_CPU_BITS (32 - _Q_TAIL_CPU_OFFSET)
#define _Q_TAIL_CPU_MASK _Q_SET_MASK(TAIL_CPU)

#define _Q_TAIL_OFFSET      _Q_TAIL_IDX_OFFSET
#define _Q_TAIL_MASK        (_Q_TAIL_IDX_MASK | _Q_TAIL_CPU_MASK)

#define _Q_LOCKED_VAL       (1U << _Q_LOCKED_OFFSET)
#define _Q_PENDING_VAL      (1U << _Q_PENDING_OFFSET)

队列自旋锁的核心接口实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#define arch_spin_lock(l)           queued_spin_lock(l)
#define arch_spin_trylock(l)        queued_spin_trylock(l)
#define arch_spin_unlock(l)         queued_spin_unlock(l)

static __always_inline void queued_spin_lock(struct qspinlock *lock)
{
    int val = 0;
    /*
     * 直接 cmpxchg 拿锁(快路径)
     */
    if (likely(atomic_try_cmpxchg_acquire(&lock->val, &val, _Q_LOCKED_VAL)))
        return;
    queued_spin_lock_slowpath(lock, val);
}
static __always_inline int queued_spin_trylock(struct qspinlock *lock)
{   
    int val = atomic_read(&lock->val);  // 原子读lock->val 
    if (unlikely(val))  // 如果已经被锁
        return 0;
    /*
     * 如果刚出来的数没有变化   尝试将lock->val的值设为已锁的值(默认是1)
     * 注意:atomic_try_cmpxchg_acquire 与票据锁不同,使用了acquire语义,
     * 相对票据锁的全局顺序一致的atomic_try_cmpxchg() 对指令的重排序限制稍微宽松一些
     */
    return likely(atomic_try_cmpxchg_acquire(&lock->val, &val, _Q_LOCKED_VAL));
}

static __always_inline void queued_spin_unlock(struct qspinlock *lock)
{
    /**
     * 解锁 这里使用了store_release(写不能重排到此之后)语义
     */
    smp_store_release(&lock->locked, 0);
}

等待队列的核心结构与辅助函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
struct qnode {  // 每个cpu一个qnode 即mcs队列
    struct mcs_spinlock mcs;
};

/**
 * 计算当前排队编号,将cpu编号与cpu的竞争排序idx 合并在一起
 * 并编码进 lock->val 的高位区域
 * 31                       0
 * +------------------------+
 * | tail | pending | lock  |
 * +------------------------+
 */
static inline __pure u32 encode_tail(int cpu, int idx)
{
    u32 tail;
    tail  = (cpu + 1) << _Q_TAIL_CPU_OFFSET; 
    tail |= idx << _Q_TAIL_IDX_OFFSET; /* assume < 4 */

    return tail;
}

慢路径queued_spin_lock_slowpath的核心实现;所有pv相关的函数是指在半虚拟化下的逻辑 Paravirtualization,实现在qspinlock_paravirt.h;lockevent相关是做锁性能优化的数据收集。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
/* 每个CPU分配一个长度为4的qnode数组 */
static DEFINE_PER_CPU_ALIGNED(struct qnode, qnodes[_Q_MAX_NODES]);

void __lockfunc queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
{
    struct mcs_spinlock *prev, *next, *node;
    u32 old, tail;
    int idx;

    BUILD_BUG_ON(CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS));

    if (pv_enabled()) // 半虚拟化环境,直接走性能更高的hypervisor路径
        goto pv_queue;

    if (virt_spin_lock(lock)) // 旧虚拟化机制
        return;

    /*
     * 等待pending位与上任持锁位交接
     * 0,1,0 -> 0,0,1
     */
    if (val == _Q_PENDING_VAL) {
        int cnt = _Q_PENDING_LOOPS;
        val = atomic_cond_read_relaxed(&lock->val,
                            (VAL != _Q_PENDING_VAL) || !cnt--);
    }

    if (val & ~_Q_LOCKED_MASK)  // 有pending或队列 去排队
        goto queue;

    /*
     * 使用汇编指令bts 尝试锁定pending位 返回的是锁定之前的值
     */
    val = queued_fetch_set_pending_acquire(lock);

    if (unlikely(val & ~_Q_LOCKED_MASK)) {
        /*
         * 如果已经有队列了,且由于之前的val没有pending位,需要清除pending
         */
        if (!(val & _Q_PENDING_MASK))
            clear_pending(lock);    // 将lock的pending位置0

        goto queue; // 去排队
    }

    /*
     * 抢占到pending位后,等待锁持有者释放锁位
     *
     * 0,1,1 -> *,1,0
     *
     * smp_cond_load_acquire 带有禁止指令重排序(读指令不能重排到此之前)的内存屏障
     */
    if (val & _Q_LOCKED_MASK)
        smp_cond_load_acquire(&lock->locked, !VAL);

    /*
     * 拿到锁,清除pending位
     */
    clear_pending_set_locked(lock);
    lockevent_inc(lock_pending);
    return;

    /*
     * 结束pending位的乐观自旋  开始进入mcs队列
     */
queue:
    lockevent_inc(lock_slowpath);
pv_queue:
    node = this_cpu_ptr(&qnodes[0].mcs); // 获取当前cpu第一个mcs结构
    idx = node->count++;    // 将当前cpu的重入次数+1
    // 将当前u与次数合并  得到排队号
    tail = encode_tail(smp_processor_id(), idx);
    // 性能跟踪函数 忽略
    trace_contention_begin(lock, LCB_F_SPIN);

    /* 当重入次数超出限制,不再参与排队,直接自旋等待锁释放*/
    if (unlikely(idx >= _Q_MAX_NODES)) {
        lockevent_inc(lock_no_node);
        while (!queued_spin_trylock(lock))
            cpu_relax();
        goto release;
    }
    node = grab_mcs_node(node, idx); // 将node设置为统计次数对应的qnodes[idx]

    lockevent_cond_inc(lock_use_node2 + idx - 1, idx);
    barrier();
    /* 
     * 初始化 MCS node 成为(locked=0, next=NULL)
     * barrier()防止上面的`node->count++`被**部分编译器**重排到实际node的初始化之前
     */
    node->locked = 0;
    node->next = NULL;
    pv_init_node(node); // 半虚拟化环境下的处理 参考 qspinlock_paravirt.h

    /*
     * 排队(将节点连接到 MCS 队列尾部、进入本地自旋)是一个相对复杂的、涉及原子操作的过程。
     * 当前的per-cpu中的node已经触发一次冷缓存
     * 如果能通过简单的 trylock 避免排队,就能节省大量 CPU 周期。
     */
    if (queued_spin_trylock(lock))
        goto release;

    /* Symmetric Multiprocessing(对称多处理),Write Memory Barrier(写内存屏障)
     * 在通过 xchg_tail() 发布更新后的尾节点之前,必须确保 @node 的初始化已完成,
     * 且后续通过 WRITE_ONCE(prev->next, node) 将 @node 链接到等待队列时不会引发冲突。
     */
    smp_wmb();

    /*
     * 将排队号发布到队列尾部
     * p,*,* -> n,*,*
     */
    old = xchg_tail(lock, tail);
    next = NULL;

    if (old & _Q_TAIL_MASK) {
        prev = decode_tail(old, qnodes);  // 找到老tail的qnode

        WRITE_ONCE(prev->next, node);   // 将 @node 链接到等待队列
        pv_wait_node(node, prev);       // 半虚拟化环境下的处理 

        /*
         * 架构相关,通过宽松语义自旋读取locked属性,满足条件后增加内存读屏障获取新值
         */
        arch_mcs_spin_lock_contended(&node->locked); 

        /*
         * 等待锁释放时,队列后面可能有新的线程进来排队 
         * 预读下个node缓存行以提升后面解锁时的延时
         */
        next = READ_ONCE(node->next);
        if (next)
            prefetchw(next);
    }

    /*
     * 线程已经成功排队到 qspinlock 队列的头部,现在它需要等待锁被释放。lock和pending都重置
     *
     * *,x,y -> *,0,0
     *
     * 循环等待必须使用 load-acquire (读不能重排到此之前)以匹配解锁和设置锁位时使用的
     * store-release (写不能重排到此之后) set_locked() 设置锁位函数中并没有全内存屏障 
     *
     * 如果 PV(半虚拟化)机制处于活跃状态,函数 pv_wait_head_or_lock 将会尝试获取锁并
     * 返回一个非零值。 因此,我们必须跳过 atomic_cond_read_acquire() 的调用
     * (等待交由了给半虚拟化下的处理)。
     */
    if ((val = pv_wait_head_or_lock(lock, node)))
        goto locked;

    val = atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_PENDING_MASK));

locked:
    /*
     * n,0,0 -> 0,0,1 : lock, 无竞争
     * *,*,0 -> *,*,1 : lock, 有竞争
     *
     * 如果自己是队列中唯一的等待者 (lock value 的tail == tail) tail即是当前节点的排队号码
     * 如果没有竞争,直接通过比较更新持锁.
     * 否则需要抢锁并唤醒等待队列
     */

    if ((val & _Q_TAIL_MASK) == tail) {
        if (atomic_try_cmpxchg_relaxed(&lock->val, &val, _Q_LOCKED_VAL))
            goto release;    // 没有竞争 
    }

    set_locked(lock);
    /*
     * 竞争路径,要么后面有排队,_Q_PENDING_VAL被设置,
     * 这将检测队列后面的尾部@next  并确保可见性
     */
    if (!next)
        next = smp_cond_load_relaxed(&node->next, (VAL));
    /*
     * store_release 语义设置下个等候线程的locked标识
     */
    arch_mcs_spin_unlock_contended(&next->locked);
    pv_kick_node(lock, next);       // 半虚拟化下处理 忽略

release:
    trace_contention_end(lock, 0);

    __this_cpu_dec(qnodes[0].mcs.count);    // 恢复per_cpu mcs计数
}
EXPORT_SYMBOL(queued_spin_lock_slowpath);
本文由作者按照 CC BY 4.0 进行授权