0%

【Redis数据结构】skiplist的实现

跳表(skiplist)是一种基于有序链表的用于方便快速查找/插入/删除的数据结构, 查询效率(平均O(logN), 最差O(N)) 堪比红黑树, 然而实现与维护却要比红黑树要简单的多。 跳表的每个节点都维护了多个指向其他节点的指针,所以在查找、插入、删除操作时可以跳过一些节点,快速找到操作需要的节点。

跳表是有序集合(sorted_set)的底层实现之一

sorted_set 在元素数量小于 zset-max-ziplist-entries(默认128, 64个有序集合节点) 且 每个元素的长度小于 zset-max-ziplist-value(默认64) 时使用 ziplist 存储, 否则使用跳表。

跳表的实现

跳表的定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/* ZSETs use a specialized version of Skiplists */
// 跳表节点
typedef struct zskiplistNode {

// 字符串类型的集合元素(element)
sds ele;

// 集合元素的分值
double score;

// 前一个节点
struct zskiplistNode *backward;

// 柔性结构体数组level,用于存储跳表节点每层的信息
struct zskiplistLevel {

// level[i].forward 此节点第i层的下一个节点的指针
struct zskiplistNode *forward;

// level[i].span 此节点第 i 层到下一个第 i 层的跳表节点的距离, 距离越大,跳过的节点越多
unsigned long span;
} level[];

} zskiplistNode;

// 跳表
typedef struct zskiplist {

// 跳表的头结点与尾结点
struct zskiplistNode *header, *tail;

// 跳表节点的数量
unsigned long length;

// 整个跳表的最高层, 所有跳表节点中最幸运的那个随机得到的层数大小
int level;
} zskiplist;

跳表的创建(相当于是python的构造函数(init))

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

// 跳表节点的最高层数
#define ZSKIPLIST_MAXLEVEL 64

/* Create a skiplist node with the specified number of levels.
* The SDS string 'ele' is referenced by the node after the call.
* 创建一个层数为 level 的调表节点
* level 由 zslRandomLevel 函数随机生成, 值在1~ZSKIPLIST_MAXLEVE之间
*/
zskiplistNode *zslCreateNode(int level, double score, sds ele) {
zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
zn->score = score;
zn->ele = ele;
return zn;
}

/* Create a new skiplist. */
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;

zsl = zmalloc(sizeof(*zsl));
zsl->level = 1;
zsl->length = 0;

// 头结点是一个特殊的跳表节点 层数为64, 值为null, score为0
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);

// 初始化头节点每层的前进指针为null, 到下一个节点的距离为0
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
// 头结点的下一个节点初始化指向null
zsl->header->backward = NULL;
zsl->tail = NULL;

// 返回跳表的指针
return zsl;
}

跳表插入新节点

跳表插入一个新元素的主要步骤如下:

  1. 找到要需要插入的位置的上一个节点 (时间复杂度: log(N))
  2. 随机生成一个层数, 层数符合幂次定律, 数值越大,生成的几率越小
  3. 为每层的新节点与新节点的前一个节点设置span值(到下一个节点的距离)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/* Returns a random level for the new skiplist node we are going to create.
* The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
* (both inclusive), with a powerlaw-alike distribution where higher
* levels are less likely to be returned. */
int zslRandomLevel(void) {
int level = 1;
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

/* Insert a new node in the skiplist. Assumes the element does not already
* exist (up to the caller to enforce that). The skiplist takes ownership
* of the passed SDS string 'ele'. */
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {

// update[i] 存储的是第i的层前一个节点的指针
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;

// rank[i] 存储的 第i层 头结点的到新节点的前一个节点 的距离
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;

// 确保score是一个数字, nun: not a number
serverAssert(!isnan(score));

x = zsl->header;

// 寻找小于 score 的最大的跳表节点的位置, 也就是新节点需要插入的位置
// 调用 zslInsert 函数前确保了不会出现完全相同ele, score相同的情况下根据ascii字母对比
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert position */
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];

// 下一个节点不存在或下一个节点比score小, 层数往下移动
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score && sdscmp(x->level[i].forward->ele,ele) < 0)))
{
// 记录从头结点到当前节点沿途跳跃了多少节点, 是计算span[i]的基础
rank[i] += x->level[i].span;

// 跳到本层的下一个节点
x = x->level[i].forward;
}
// 找到了新节点要插入的位置, 此时的x存的是新节点在第i层的前一个节点, 赋值给update[i]
update[i] = x;
}

// 随机生成一个层数
level = zslRandomLevel();

// 生成的节点层数比当前的最高层还高的情况, 需要更新层数以及头结点的forward指针
if (level > zsl->level) {
// 对未使用的层(原先的level到新的level) 做初始化
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}

x = zslCreateNode(level,score,ele);

for (i = 0; i < level; i++) {

// 常规的链表插入操作
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;

/* update span covered by update[i] as x is inserted here */
// 设置新节点第i层到下一节点的距离 (为前一个节点之前保存的距离-前一个节点到新节点的距离)
// rank[0]是的头结点新节点第0层前一个节点的距离, rank[i]是头结点到第i层新节点的前一个节点的距离
// 这里需要一张图...
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);

// 前一个节点到新节点的距离, +1是因为需要加上新的节点
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}

/* increment span for untouched levels */
for (i = level; i < zsl->level; i++) {
// 新节点最高层上面的前一个节点的span值需要+1,上个for循环中未设置到
update[i]->level[i].span++;
}

// 设置新节点的后退指针
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;

// 跳表长度+1
zsl->length++;

// 返回新节点的指针
return x;
}

跳表删除节点

步骤如下, 与插入节点类似

  1. 找到待删除的节点x (时间复杂度 O(log(n)))
  2. 修改x前一个节点每层的前进指针与span值 (O(1))
  3. 修改后一个节点每层的后退指针 (O(1))
  4. 修改跳表长度 O(1)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/* Delete an element with matching score/element from the skiplist.
* The function returns 1 if the node was found and deleted, otherwise
* 0 is returned.
*
* If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise
* it is not freed (but just unlinked) and *node is set to the node pointer,
* so that it is possible for the caller to reuse the node (including the
* referenced SDS string at node->ele). */
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;

x = zsl->header;

// 找到比要删除的节点的score值小的最大的节点并记录每层的上一个节点
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score && sdscmp(x->level[i].forward->ele,ele) < 0)))
{
x = x->level[i].forward;
}
update[i] = x;
}
/* We may have multiple elements with the same score, what we need
* is to find the element with both the right score and object. */
x = x->level[0].forward; // 如果要删除的节点存在跳表中, x 就是需要删除的跳表节点
if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
zslDeleteNode(zsl, x, update);
if (!node)
zslFreeNode(x);
else
*node = x;
return 1;
}
return 0; /* not found */
}

/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
for (i = 0; i < zsl->level; i++) {
// 设置x节点前一个节点的每层的前进指针和span值
if (update[i]->level[i].forward == x) {
update[i]->level[i].span += x->level[i].span - 1;
update[i]->level[i].forward = x->level[i].forward;
} else {
update[i]->level[i].span -= 1;
}
}

// 设置x节点后一个节点每层的后退指针
if (x->level[0].forward) {
x->level[0].forward->backward = x->backward;
} else {
zsl->tail = x->backward;
}

// x的节点是最高层节点, 层数一直往下撸, 知道头结点的前进指针不为null(指向第二高节点)
while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
zsl->level--;

// 设置跳表长度
zsl->length--;
}