文章

linux内核数据结构-XArray

linux内核数据结构-XArray

Guide

XArray 是一种抽象数据类型,其行为就像一个非常大的指针数组。它满足许多与哈希或传统的可调整大小的数组相同的需求。与散列不同,它允许您以高效缓存的方式明智地转到下一个或上一个条目。与可调整大小的数组相比,无需复制数据或更改 MMU 映射即可增长数组。与双向链表相比,它具有更高的内存效率、可并行性和缓存友好性。它利用 RCU 来执行查找而无需锁定。

XArray与树、哈希表比较

特性维度 XArray (基树) 平衡树 (红黑树/AVL) 哈希表
键类型 整数(通常连续) 任意可比较类型 任意可哈希类型
平均查找 O(k) O(log n) O(1)
最坏查找 O(k) O(log n) O(n)(冲突时)
范围查询 优秀(天然支持) 良好(有序性) 很差(无序)
插入速度 中等(需再平衡) 很快(桶定位)
删除速度 中等(需再平衡) 快(但可能需rehash)
内存开销 低(稀疏存储) 中等(平衡信息) 较高(桶数组+冲突链)
缓存局部性 优秀(节点内连续) 中等(指针跳转) 差(随机访问)
锁粒度 细(支持节点级锁) 中等(子树锁) 粗(全表锁)
顺序遍历 中等(需层级遍历) 优秀(中序遍历) 很差(无序)
动态扩展 自动(按需增长) 自动(自平衡) 需rehash(成本高)
实现复杂度 中等 高(旋转逻辑) 低到中等
内存碎片 中等 高(冲突链)

Xarray 的核心是一个灵活的数组,通过 index 索引 entry (默认为 NULL)。它维护着 Index 到 entry 的映射关系,类似于一个 Radix Tree。

Xarray 中的 entry 存储单位称为 Slot,可存放特定范围的整数值、字节对齐的指针或 NULL。Xarray 会利用 entry 指针地址的末尾比特位存储类型信息,以便区分是数值、指针还是空值,并支持 Tag 等特性

由于指针是经过内存对齐的(任何从kmalloc()和 alloc_page()返回的指针来说都是如此),所以低位的比特都为0,可以用这些位存储entry类型信息

下图通过将索引值4295与内存地址0xFFFFCAFEBABE的映射的查找来简单描述xarray的存储机制

xarray xarray

内核内存地址对齐

内存对齐是为了换取性能正确性

硬件的限制与效率

尽管我们常把内存看作一个连续的字节数组,但 CPU 并不是按字节(Byte)来读取内存的。现代 CPU 通常以“字(Word)”或“缓存行(Cache Line)”为单位读取数据(例如 32 位系统一次读 4 字节,64 位系统一次读 8 字节)。

对齐访问:如果一个 4 字节的整数存储在地址 0x00,CPU 只需一次读取周期。

跨页/跨行访问:如果该整数存储在 0x03,它就会跨越两个读取周期。CPU 必须分别读取两个字,然后进行移位和拼接,这会显著降低性能。

原子性(Atomicity)

在多核编程中,原子操作至关重要。如果一个数据跨越了缓存行,CPU 就无法保证在一次操作中完成读写。这可能导致在一个核读取数据时,另一个核只更新了该数据的一半,从而引发严重的并发错误。

特定架构的强制要求

某些架构(如老款 ARM 或 MIPS)不支持非对齐访问。如果你尝试访问一个未对齐的地址,硬件会直接触发 Alignment Fault 异常。

XArray 的末位标记

xarray_addr_align 地址对齐

XArray 的实现中,它使用指针的低位来区分不同类型的节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/* 末位是1 则代表是值  否则是指针*/
static inline bool xa_is_value(const void *entry)
{
  return (unsigned long)entry & 1;
}
/* 右移一位返回条目的原值 */
static inline unsigned long xa_to_value(const void *entry)
{
  return (unsigned long)entry >> 1;
}
/* 是否内部条目 末位是10 */
static inline bool xa_is_internal(const void *entry)
{
  return ((unsigned long)entry & 3) == 2;
}
/* 右移两位 返回内部条目的原值 */
static inline unsigned long xa_to_internal(const void *entry)
{
  return (unsigned long)entry >> 2;
}
/* 判断节点 节点的内存地址大于4k*/
static inline bool xa_is_node(const void *entry)
{
  return xa_is_internal(entry) && (unsigned long)entry > 4096;
}
/* 内部节点指针直接将末位的****10通过-2换为****00即可 */
static inline struct xa_node *xa_to_node(const void *entry)
{
  return (struct xa_node *)((unsigned long)entry - 2);
}

XArray 数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#ifndef XA_CHUNK_SHIFT
/* 每个node下slot数量的位移值   */
#define XA_CHUNK_SHIFT  (IS_ENABLED(CONFIG_BASE_SMALL) ? 4 : 6)
#endif
/* 
 * 每个node下slot数量 通过上面的位移值计算
 * 默认是6的2^6 = 64个 如果小设备 可以配置为2^4 =16个
 */
#define XA_CHUNK_SIZE   (1UL << XA_CHUNK_SHIFT)
/* 
 * 掩码 位运算offset使用 可快速计算某个层级node上的index对应的offset
 * index >> shift & XA_CHUNK_MASK 
 */
#define XA_CHUNK_MASK   (XA_CHUNK_SIZE - 1)


struct xarray {
  spinlock_t  xa_lock;  // 自旋锁,保护并发访问
  gfp_t   xa_flags;     // 内存分配标志
  void __rcu *  xa_head;// 指向根节点的 RCU 指针
};

struct xa_node {
  unsigned char shift;                  // 该节点覆盖的 index 位数
  unsigned char offset;                 // 在父节点 slots[] 中的偏移
  unsigned char count;                  // slots[] 中非 NULL 的数量
  unsigned char nr_values;              // 值(非指针)槽位数
  struct xa_node __rcu *parent;         // 父节点
  struct xarray *array;                 // 所属 XArray
  void __rcu  *slots[XA_CHUNK_SIZE];    // 槽位数组 XA_CHUNK_SIZE = 64 (6 bits)
  ...
};

struct xa_state {
  struct xarray *xa;          // 目标 XArray
  unsigned long xa_index;     // 目标 index
  unsigned char xa_shift;     // 当前节点的位移
  unsigned char xa_sibs;      // 当前层级需要处理的兄弟节点数量
  unsigned char xa_offset;    // 当前槽位偏移
  unsigned char xa_pad;       // 编译器内存对齐填充
  struct xa_node *xa_node;    // 当前节点
  struct xa_node *xa_alloc;   // 新分配的节点
  xa_update_node_t xa_update; // 节点更新的回调函数
  struct list_lru *xa_lru;    // 内存回收lru链表
  ...
};

XArray 核心API

API Description
XARRAY_INIT(name, flags) 静态初始化xa
xa_init(struct xarray *xa) 动态初始化xa
void *xa_load(struct xarray *, unsigned long index) 获取条目
void *xa_store(struct xarray *, unsigned long index, void *entry, gfp_t) 存储条目
void *xa_erase(struct xarray *, unsigned long index) 清除条目
xa_insert(struct xarray *xa, unsigned long index, void *entry, gfp_t gfp) 目标位置为空则存储 否则报错
xa_empty(const struct xarray *xa) 检测xa是否为空
xa_is_value(const void *entry) 检测是否值条目
xa_to_value(const void *entry) 将条目转换为整数值

XArray 相关结构的初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
 * 无论是静态还是动态的初始化,都比较简单,只是初始了一下自旋锁和标识,根节点是 NULL
 * 静态初始化(编译期)
 */
#define XARRAY_INIT(name, flags) {                \
  .xa_lock = __SPIN_LOCK_UNLOCKED(name.xa_lock),  \
  .xa_flags = flags,                              \
  .xa_head = NULL,                                \
}

/**
 * 动态初始化(运行期)
 */
static inline void xa_init(struct xarray *xa)
{
  xa_init_flags(xa, 0);
}

static inline void xa_init_flags(struct xarray *xa, gfp_t flags)
{
  spin_lock_init(&xa->xa_lock);
  xa->xa_flags = flags;
  xa->xa_head = NULL;
}

/**
 * xa_state 的初始化宏
 * @name: 此次操作状态的名字(一般是xas)
 * @array: 被操作的 xarray 
 * @index: 初始化的 index
 */
#define XA_STATE(name, array, index)                    \
  struct xa_state name = __XA_STATE(array, index, 0, 0)
#define __XA_STATE(array, index, shift, sibs)  {  \
  .xa = array,                                    \
  .xa_index = index,                              \
  .xa_shift = shift,                              \
  .xa_sibs = sibs,                                \
  .xa_offset = 0,                                 \
  .xa_pad = 0,                                    \
  .xa_node = XAS_RESTART,                         \
  .xa_alloc = NULL,                               \
  .xa_update = NULL,                              \
  .xa_lru = NULL,                                 \
}

XArray获取 xa_load

xa_load xa_load

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/*
 * 从index处加载XArray的条目
 */
void *xa_load(struct xarray *xa, unsigned long index)
{
  XA_STATE(xas, xa, index);
  void *entry;
  rcu_read_lock();                    // rcu读锁
  do {
    entry = xa_zero_to_null(xas_load(&xas));
  } while (xas_retry(&xas, entry));   // 检查是否有重试标识
  rcu_read_unlock();

  return entry;
}
/*
 * 根据当前操作状态(xas)从 XArray 中加载条目。它只负责寻找现有的条目,绝不会扩展树结构。
 * 通常返回 XArray 中的数据条目,特殊情况下返回 NULL 或异常标记。
 */
void *xas_load(struct xa_state *xas)
{
  void *entry = xas_start(xas);

  while (xa_is_node(entry)) {
    struct xa_node *node = xa_to_node(entry);

    if (xas->xa_shift > node->shift)
      break;
    entry = xas_descend(xas, node);   // 往下走树
    if (node->shift == 0)
      break;
  }
  return entry;
}
EXPORT_SYMBOL_GPL(xas_load);
/* XArray 游标(xa_state)的初始化/重置 */
static void *xas_start(struct xa_state *xas)
{
  void *entry;

  /* 
   * 如果 xas->xa_node 的低 2 位都是 0(表示有效状态)
   * 说明游标已经在树上的某个位置 直接 reload 当前位置的 entry
   */
  if (xas_valid(xas))
    return xas_reload(xas);
  if (xas_error(xas))
    return NULL;

  entry = xa_head(xas->xa);   // 初始化根
  if (!xa_is_node(entry)) {
    /* 根是普通 entry(不是 node),只能在 index 0 存东西 */
    if (xas->xa_index)
      return set_bounds(xas);   // index != 0 但树只有根,越界错误
  } else {
    /* 根是 node,检查 index 是否超出树的寻址范围 */
    if ((xas->xa_index >> xa_to_node(entry)->shift) > XA_CHUNK_MASK)
      return set_bounds(xas);   // 越界错误
  }

  xas->xa_node = NULL;
  return entry;
}

/* 树节点下探 */
static __always_inline void *xas_descend(struct xa_state *xas,
            struct xa_node *node)
{
  /* 根据index 算出在node处的offset */
  unsigned int offset = get_offset(xas->xa_index, node);
  void *entry = xa_entry(xas->xa, node, offset);

  xas->xa_node = node;
  while (xa_is_sibling(entry)) {
    /*
     * 背景:multi-index entry(多索引条目)
     * XArray 支持一个 entry 同时占据多个连续的 index 槽位,这就是 multi-index entry。
     * 当 XA_STATE_ORDER(xas, xa, index, order) 初始化时:
     * xas->xa_shift = order - (order % XA_CHUNK_SHIFT);
     * xas->xa_sibs  = (1 << (order % XA_CHUNK_SHIFT)) - 1;
     * xa_sibs 表示兄弟槽位数量(siblings),即除主槽外还需要额外占用几个槽。
     * 例如 order=2,xa_sibs=3,意味着要占用 4 个连续槽。
     */
    offset = xa_to_sibling(entry);
    entry = xa_entry(xas->xa, node, offset);
    if (node->shift && xa_is_node(entry))
      entry = XA_RETRY_ENTRY;
  }

  xas->xa_offset = offset;
  return entry;
}
static unsigned int get_offset(unsigned long index, struct xa_node *node)
{
  /*
   * (index >> node->shift) & XA_CHUNK_MASK
   * 索引右移shift 位与 掩码 111111 得出index在node中的offset
   */
  return (index >> node->shift) & XA_CHUNK_MASK;
}

XArray存储 xa_store

xa_store xa_store

xa_store - 在 XArray 的 index 存放 entry (entry为NULL时视为删除)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
void *xa_store(struct xarray *xa, unsigned long index, void *entry, gfp_t gfp)
{
  void *curr;
  xa_lock(xa);                              // spin_lock(&(xa)->xa_lock)
  curr = __xa_store(xa, index, entry, gfp);
  xa_unlock(xa);                            // spin_unlock(&(xa)->xa_lock)

  return curr;
}
EXPORT_SYMBOL(xa_store);

/**
 * @xa: XArray.
 * @index: 索引位置
 * @entry: 条目内容
 * @gfp: 内存分配标识
 *
 * 在调用此函数时必须持有xa_lock,如果需要分配内存,此方法将:
 * 先释放锁->再分配内存->再锁定 (避免死锁)
 * 返回 index 的老entry 或 错误码
 */
void *__xa_store(struct xarray *xa, unsigned long index, void *entry, gfp_t gfp)
{
  XA_STATE(xas, xa, index);               // 调用宏初始化xa_state操作结构
  void *curr;

  if (WARN_ON_ONCE(xa_is_advanced(entry)))
    return XA_ERROR(-EINVAL);

  if (xa_track_free(xa) && !entry)        // 如需跟踪空闲位,返回特定值
    entry = XA_ZERO_ENTRY;

  do {
    curr = xas_store(&xas, entry);        // 利用xa_state结构存储条目
    if (xa_track_free(xa))
      xas_clear_mark(&xas, XA_FREE_MARK);
    /*
     * __xas_nomem
     * 当 XArray 在插入元素时发现需要新节点,但内存不足时,
     * xas->xa 里会被设置成一个特殊值:XA_ERROR(-ENOMEM)。
     * 这时,__xas_nomem() 就会被调用来“补救”。 
     * 例如临时释放锁、去分配新的节点内存,然后再重新拿锁恢复操作。
     */
  } while (__xas_nomem(&xas, gfp));      

  return xas_result(&xas, xa_zero_to_null(curr)); //将旧条目转换返回
}
EXPORT_SYMBOL(__xa_store);
/**
 * __xas_nomem() - 按需分配内存.
 * @xas: XArray 操作状态.
 * @gfp: 内存分配标记.
 *
 * Return: 如果需要并成功分配了内存 返回 true.
 */
static bool __xas_nomem(struct xa_state *xas, gfp_t gfp)
  __must_hold(xas->xa->xa_lock)
{
  unsigned int lock_type = xa_lock_type(xas->xa);

  if (xas->xa_node != XA_ERROR(-ENOMEM)) {
    xas_destroy(xas);
    return false;
  }
  if (xas->xa->xa_flags & XA_FLAGS_ACCOUNT)
    gfp |= __GFP_ACCOUNT;
  if (gfpflags_allow_blocking(gfp)) {
    xas_unlock_type(xas, lock_type);
    xas->xa_alloc = kmem_cache_alloc_lru(radix_tree_node_cachep, xas->xa_lru, gfp);
    xas_lock_type(xas, lock_type);
  } else {
    xas->xa_alloc = kmem_cache_alloc_lru(radix_tree_node_cachep, xas->xa_lru, gfp);
  }
  if (!xas->xa_alloc)
    return false;
  xas->xa_alloc->parent = NULL;
  XA_NODE_BUG_ON(xas->xa_alloc, !list_empty(&xas->xa_alloc->private_list));
  xas->xa_node = XAS_RESTART;
  return true;
}

xas_store - 将 entry 存入到 XArray 的核心逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
 /**
 * xas_store() - 将entry在xarray中存储 如果 entry 为 NULL,就是删除;
 * 同时处理多 slot(siblings)、释放多余节点、维护计数和标记,并保证 RCU 安全。
 * @xas: XArray 操作状态
 * @entry: 新entry.
 *
 * 返回: 索引的旧条目.
 */
void *xas_store(struct xa_state *xas, void *entry)
{
  struct xa_node *node;
  void __rcu **slot = &xas->xa->xa_head;
  unsigned int offset, max;
  int count = 0;
  int values = 0;
  void *first, *next;
  bool value = xa_is_value(entry);        // (unsigned long)entry & 1

  if (entry) {
    /*
     * allow_root 判断 XArray 的根节点,是否可以合法地承载这个 entry
     * (既不是node 也不是零条目 即 指针 或者value)
     * 注意:零条目和NULL并不相同:
     * 零条目在不存储实际数据的情况下,占用一个索引位置并影响树的结构
     * 可通过 xa_reserve() 预留零条目
     */
    bool allow_root = !xa_is_node(entry) && !xa_is_zero(entry);
    first = xas_create(xas, allow_root);  // 在指定位置创建slot见下节
  } else {
    first = xas_load(xas);                // 删除操作,只读取当前值
  }
  /* 
   * (unsigned long)xas->xa_node & 3;
   * 内核对象的指针地址是4/8字节对齐的。这意味着地址的最后两位永远是0
   * XArray利用了这一点 将xa_node的指针,在出错或特殊情况下塞入最后两位
   */
  if (xas_invalid(xas))
    return first;
  node = xas->xa_node;
  if (node && (xas->xa_shift < node->shift))
    xas->xa_sibs = 0;
  if ((first == entry) && !xas->xa_sibs)  // 新老条目相同 且不需处理兄弟槽
    return first;

  next = first;
  
  offset = xas->xa_offset;               
  max = xas->xa_offset + xas->xa_sibs;   // 最后一个兄弟槽位置
  if (node) {
    slot = &node->slots[offset];         // 主槽位置(真实 entry 存这里)
    if (xas->xa_sibs)
      xas_squash_marks(xas);             // 将兄弟槽的标志合并到主条目上(第一个)
  }
  if (!entry)
    xas_init_marks(xas);                 // 删除操作 清除标志

  for (;;) {
    rcu_assign_pointer(*slot, entry);
    /* 原 entry 是 node  新 entry 不是 node 需要释放原 entry 的子树*/
    if (xa_is_node(next) && (!node || node->shift))
      xas_free_nodes(xas, xa_to_node(next));
    if (!node)
      break;
    /*
     * 维护 xa 的 count 与 values 统计
     * 计算 slot 上变化导致的 entry 数量变化
     * slot 上的 entry 从空更新为非空 count += 1
     * slot 上的 entry 从非空更新为空 count += -1
     * values 同理
     */
    count += !next - !entry;
    values += !xa_is_value(first) - !value;
    if (entry) {
      if (offset == max)                      // 到达最后一个兄弟槽,停止
        break;
      if (!xa_is_sibling(entry))
        entry = xa_mk_sibling(xas->xa_offset);// 后续槽写 sibling 指针
    } else {
      if (offset == XA_CHUNK_MASK)            // 删除时走到 chunk 边界停止
        break;
    }
    /* 从 offset 到 max 遍历其他的兄弟槽  */
    next = xa_entry_locked(xas->xa, node, ++offset);
    if (!xa_is_sibling(next)) {
      if (!entry && (offset > max))
        break;
      first = next;
    }
    slot++;
  }
  update_node(xas, node, count, values);    // 更新/清理数据 
  return first;
}
EXPORT_SYMBOL_GPL(xas_store);

static void update_node(struct xa_state *xas, struct xa_node *node,
      int count, int values)
{
  if (!node || (!count && !values))
    return;
  /* 更新 node 的统计数据 */
  node->count += count;
  node->nr_values += values;
  XA_NODE_BUG_ON(node, node->count > XA_CHUNK_SIZE);
  XA_NODE_BUG_ON(node, node->nr_values > XA_CHUNK_SIZE);
  xas_update(xas, node);    // 回调节点更新事件
  if (count < 0)
    xas_delete_node(xas);   // 清理空节点 (entry == null 时)
}

xas_create - 创建存储 entry 的 slot

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/*
 * xas_create() - 创建用来存储 entry 的 slot
 * @allow_root: 是否能在根节点存放 entry 
 *
 * Return: 如果 slot 存在 返回其内容,如果 slot 是新创建的,返回%NULL
 * 如果创建slot失败 将在 xas 存储错误
 */
static void *xas_create(struct xa_state *xas, bool allow_root)
{
  struct xarray *xa = xas->xa;
  void *entry;
  void __rcu **slot;
  struct xa_node *node = xas->xa_node;
  int shift;
  unsigned int order = xas->xa_shift;
  
  if (xas_top(node)) {                    // 默认从xas_store过来是根节点进入此分支
    entry = xa_head_locked(xa);           // rcu引用xa->head 在博客标签RCU中了解更多
    xas->xa_node = NULL;
    if (!entry && xa_zero_busy(xa))       // entry为空 且开启了零条目占用
      entry = XA_ZERO_ENTRY;
    /* xas_expand将新节点添加到树的头部,直到它达到足够的高度,能够包含@xas->xa_index  */
    shift = xas_expand(xas, entry);       // 见下节
    if (shift < 0)
      return NULL;
    if (!shift && !allow_root)            // 没有shift 也不能在根节点 让root变为node
      shift = XA_CHUNK_SHIFT;
    entry = xa_head_locked(xa);           // rcu 引用 xa->head 
    slot = &xa->xa_head;
  } else if (xas_error(xas)) {
    return NULL;
  } else if (node) {                      // 非 root 起步
    unsigned int offset = xas->xa_offset;

    shift = node->shift;
    entry = xa_entry_locked(xa, node, offset);
    slot = &node->slots[offset];
  } else {                                // 极小树 
    shift = 0;
    entry = xa_head_locked(xa);
    slot = &xa->xa_head;
  }
  /* 将树的根node 与xas的index所在的node中间层串连(创建)起来 */
  while (shift > order) {                 // 从当前的shift 触达到entry存放的目标层级 order
    shift -= XA_CHUNK_SHIFT;
    if (!entry) {                         // 当前层级没有entry 初始化新node
      node = xas_alloc(xas, shift);
      if (!node)
        break;
      if (xa_track_free(xa))
        node_mark_all(node, XA_FREE_MARK);
      /* 把新node指到slot上 */
      rcu_assign_pointer(*slot, xa_mk_node(node));
    } else if (xa_is_node(entry)) {
      node = xa_to_node(entry);           // 复用当前层级的node
    } else {
      break;
    }
    entry = xas_descend(xas, node);       // 进入刚创建/复用的 node,取下一层 entry
    slot = &node->slots[xas->xa_offset];  // offset在xas_descend中计算过了
  }

  return entry;                           // 返回老entry
}

xas_expand - 树按需膨胀

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
/*
 * xas_expand将节点添加到树的头部,直到它达到足够的高度,能够包含@xas->xa_index
 */
static int xas_expand(struct xa_state *xas, void *head)
{
  struct xarray *xa = xas->xa;
  struct xa_node *node = NULL;
  unsigned int shift = 0;
  unsigned long max = xas_max(xas);             // 操作单条目的情况下 xas->xa_index

  if (!head) {
    if (max == 0)
      return 0;                                 // 只存0(第一个位置) 直接在head上存条目即可
    while ((max >> shift) >= XA_CHUNK_SIZE)     // 算出够用的shift值  max向右移n位可以放到slot里
      shift += XA_CHUNK_SHIFT;
    return shift + XA_CHUNK_SHIFT;              // 右移的位数 + 基础的shift
   /*
    * index=100, max=100
    * while ((100 >> shift) >= 64) :
    *  shift=0:  (100 >> 0) = 100 >= 64  ✓  shift += 6 → shift=6
    *  shift=6:  (100 >> 6) = 1 < 64     ✗  退出
    * 需要的树高 = 6 + 6 = 12,即二层树
    *  第1层(shift=6):覆盖 index 0-63
    *  第2层(shift=0):覆盖 index = 64-127
    */
  } else if (xa_is_node(head)) {                // 树非空,在当前head的shift基础上长新根
    node = xa_to_node(head);                    
    shift = node->shift + XA_CHUNK_SHIFT;
  }
  xas->xa_node = NULL;                          // 目前在root位置 还未下探任何node
  /*
   * max_index 算出 xarray节点的最大存储索引范围
   * if (!xa_is_node(entry))
   *  return 0;
   * return (XA_CHUNK_SIZE << xa_to_node(entry)->shift) - 1;
   */
  while (max > max_index(head)) {               // 只要当前head的范围没到max 就延伸树
    xa_mark_t mark = 0;

    XA_NODE_BUG_ON(node, shift > BITS_PER_LONG);
    node = xas_alloc(xas, shift);                     // 分配新node
    if (!node)
      return -ENOMEM;

    node->count = 1;                                  // 默认新节点的存储数量是1
    if (xa_is_value(head))
      node->nr_values = 1;                            // 原先节点直接是个值类型 值数量初始化1
    RCU_INIT_POINTER(node->slots[0], head);           // 将新节点的第一个槽设为原节点的值
    /*
     * 扩展前:
     * xa->xa_head ─→ old_head (shift=6, 包含 index 0-63 的内容)
     * 扩展后:
     * xa->xa_head ─→ new_node (shift=12)
     *                ├─ slots[0] ─→ old_head (shift=6)
     *                ├─ slots[1] ─→ NULL
     *                └─ slots[63] ─→ NULL
     *
     */
    /* 原 head 可能已经有 mark 新节点必须“继承”这些状态 */
    for (;;) {
      if (xa_track_free(xa) && mark == XA_FREE_MARK) {
        node_mark_all(node, XA_FREE_MARK);
        if (!xa_marked(xa, XA_FREE_MARK)) {
          node_clear_mark(node, 0, XA_FREE_MARK);
          xa_mark_set(xa, XA_FREE_MARK);
        }
      } else if (xa_marked(xa, mark)) {
        node_set_mark(node, 0, mark);
      }
      if (mark == XA_MARK_MAX)
        break;
      mark_inc(mark);
    }

    /* 新节点初始化完成,将其添加到XArray中 */
    if (xa_is_node(head)) {
      xa_to_node(head)->offset = 0;         // 旧树根在新根中的偏移是 0
      /* 老head的parent指向新节点(新节点的index范围更大 树延伸了) */
      rcu_assign_pointer(xa_to_node(head)->parent, node);
    }
    head = xa_mk_node(node);
    rcu_assign_pointer(xa->xa_head, head);  // 将更大范围的节点设为XArray的根节点
    xas_update(xas, node);                  // 回调更新函数

    shift += XA_CHUNK_SHIFT;
  }

  xas->xa_node = node;                      // 将此节点设为操作状态的节点
  return shift;
}
/**
 * 比较简单的逻辑 主要是申请内存、初始化xas->xa_alloc
 */
static void *xas_alloc(struct xa_state *xas, unsigned int shift)
{
  struct xa_node *parent = xas->xa_node;
  struct xa_node *node = xas->xa_alloc;

  if (xas_invalid(xas))
    return NULL;

  if (node) {
    xas->xa_alloc = NULL;                     // 已经预分配过内存了
  } else {
    gfp_t gfp = GFP_NOWAIT | __GFP_NOWARN;
    if (xas->xa->xa_flags & XA_FLAGS_ACCOUNT)
      gfp |= __GFP_ACCOUNT;
    /* 申请内存新建node 并将内存关联到xas的lru链上 */
    node = kmem_cache_alloc_lru(radix_tree_node_cachep, xas->xa_lru, gfp);
    if (!node) {
      xas_set_err(xas, -ENOMEM);              // 内存不够 设置错误位 外层会按需重试
      return NULL;
    }
  }

  if (parent) {
    node->offset = xas->xa_offset;
    parent->count++;
    XA_NODE_BUG_ON(node, parent->count > XA_CHUNK_SIZE);
    xas_update(xas, parent);                  // 触发节点回调函数 xas->xa_update(node)
  }
  XA_NODE_BUG_ON(node, shift > BITS_PER_LONG);
  XA_NODE_BUG_ON(node, !list_empty(&node->private_list));
  node->shift = shift;
  node->count = 0;
  node->nr_values = 0;
  RCU_INIT_POINTER(node->parent, xas->xa_node);
  node->array = xas->xa;
  return node;
}

清理逻辑:释放旧node、 entry为空时的树压缩

第一步 xas_init_marks - 清理旧node的mark
1
2
3
4
5
6
7
8
9
10
11
void *xas_store(struct xa_state *xas, void *entry)
{
  ……
  /*
   * 必须在将条目设置为 NULL 之前清除标记,
   * 否则 xas_for_each_marked 可能会找到 NULL 条目并提前终止
   */
  if (!entry)
    xas_init_marks(xas);  // 遍历所有 mark 位,逐一 clear 
  ……
}
第二步:xas_free_nodes — 释放被替换的子树
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
void *xas_store(struct xa_state *xas, void *entry)
{
  ……
  rcu_assign_pointer(*slot, entry);           // 先写 NULL,断开引用
  if (xa_is_node(next) && (!node || node->shift))
      xas_free_nodes(xas, xa_to_node(next));  // 再释放旧node
  ……
}

/**
 * 释放此节点及其引用的所有节点
 * @xas: XArray 操作状态
 * @top: 被释放的 node
 *
 * 此节点已从树中删除。我们现在必须释放它及其所有子节点。
 * 可能有RCU引用到树中,因此我们必须用重试标记替换所有条目。
 */
static void xas_free_nodes(struct xa_state *xas, struct xa_node *top)
{
  unsigned int offset = 0;
  struct xa_node *node = top;

  for (;;) {
    void *entry = xa_entry_locked(xas->xa, node, offset);   // rcu引用保护

    if (node->shift && xa_is_node(entry)) {
      node = xa_to_node(entry);             // 往节点子树下层钻
      offset = 0;
      continue;
    }
    /* 
     * 到叶子层:把非 NULL 的 entry 替换成 XA_RETRY_ENTRY 
     * 为什么要写 XA_RETRY_ENTRY 而不是直接 free?
     * RCU 读者可能正在无锁遍历这棵子树。直接 free 会导致 use-after-free。
     * 写入 XA_RETRY_ENTRY 后RCU 读者看到 retry标记会重新从根开始查找,
     * 此时子树已经从主树断开,读者自然找不到旧路径,安全退出。
     */
    if (entry)
      RCU_INIT_POINTER(node->slots[offset], XA_RETRY_ENTRY);
    
    offset++;
    while (offset == XA_CHUNK_SIZE) {
      /* 当前节点处理完,继续处理上层的 parent->slots[offset +1] */
      struct xa_node *parent;

      parent = xa_parent_locked(xas->xa, node);
      offset = node->offset + 1;
      node->count = 0;
      node->nr_values = 0;
      xas_update(xas, node);
      xa_node_free(node);
      if (node == top)
        return;
      node = parent;
    }
  }
}

static void xa_node_free(struct xa_node *node)
{
  XA_NODE_BUG_ON(node, !list_empty(&node->private_list));
  
  node->array = XA_RCU_FREE;          // 毒化指针

  /* 
   * 注册一个 RCU 回调函数 radix_tree_node_rcu_free,
   * 等所有当前 RCU 读者退出临界区之后,才真正执行释放。
   *
   * XArray 支持无锁读(xa_load 用 rcu_dereference)。
   * writer 持锁删除节点时,可能有 reader 正在无锁遍历这个节点。
   * 如果立即 kmem_cache_free,读者就会 use-after-free。
   * call_rcu 保证:宽限期(grace period)结束后,
   * 所有在删除时刻之前开始的 RCU 读者都已经退出,此时再 free 才安全。
   */
  call_rcu(&node->rcu_head, radix_tree_node_rcu_free);
}

1
2
3
4
5
6
7
8
9
10
11
void radix_tree_node_rcu_free(struct rcu_head *head)
{
  struct radix_tree_node *node =
      container_of(head, struct radix_tree_node, rcu_head);

  memset(node->slots, 0, sizeof(node->slots));    // 清空 slots
  memset(node->tags, 0, sizeof(node->tags));      // 清空 tags
  INIT_LIST_HEAD(&node->private_list);            // 重置链表头

  kmem_cache_free(radix_tree_node_cachep, node);  // 还给 slab   
}
第三步 xas_delete_node — 向上收缩空节点

xas_storeupdate_node 中 count<0时触发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
static void update_node(struct xa_state *xas, struct xa_node *node,
    int count, int values)
{
  if (!node || (!count && !values))
    return;

  node->count += count;
  node->nr_values += values;
  xas_update(xas, node); 
  if (count < 0)
    xas_delete_node(xas);
}

static void xas_delete_node(struct xa_state *xas)
{
  struct xa_node *node = xas->xa_node;

  for (;;) {
    struct xa_node *parent;

    if (node->count)
      break;

    parent = xa_parent_locked(xas->xa, node);
    xas->xa_node = parent;
    xas->xa_offset = node->offset;
    xa_node_free(node);                     // 释放空节点

    if (!parent) {                          // 树彻底空了
      xas->xa->xa_head = NULL;
      xas->xa_node = XAS_BOUNDS;
      return;
    }

    parent->slots[xas->xa_offset] = NULL;
    parent->count--;
    node = parent;                          // 继续向上检查
    xas_update(xas, node);
  }

  if (!node->parent)
    xas_shrink(xas);                        // 根节点 尝试压缩树
}
第四步 xas_shrink — 尝试压缩树高
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
static void xas_shrink(struct xa_state *xas)
{
  struct xarray *xa = xas->xa;
  struct xa_node *node = xas->xa_node;

  for (;;) {
    void *entry;

    if (node->count != 1)                 // 根节点有多个 entry,不压缩
      break;
    entry = xa_entry_locked(xa, node, 0);
    if (!entry)                           // slot[0] 是空的,不压缩
      break;
    /* 下层是叶子节点  但本层非最底子树 防止出现叶子节点对应的idx不精确的场景 */
    if (!xa_is_node(entry) && node->shift)
      break;
    if (xa_zero_busy(xa))
      entry = xa_zero_to_null(entry);
    xas->xa_node = XAS_BOUNDS;
    /* 把 slot[0] 的内容提升为新的 xa_head */
    RCU_INIT_POINTER(xa->xa_head, entry);
    if (xa_track_free(xa) && !node_get_mark(node, 0, XA_FREE_MARK))
      xa_mark_clear(xa, XA_FREE_MARK);

    node->count = 0;
    node->nr_values = 0;
    if (!xa_is_node(entry))
      RCU_INIT_POINTER(node->slots[0], XA_RETRY_ENTRY);
    xas_update(xas, node);
    xa_node_free(node);           // 释放旧根节点
    if (!xa_is_node(entry))       // 新 head 是叶子,结束
      break;
    node = xa_to_node(entry);
    node->parent = NULL;          // 新根节点,parent 置 NULL
  }
}
本文由作者按照 CC BY 4.0 进行授权