跳表

Redis的SortedSet

SortedSet(zset)是Redis提供的一个非常特别的数据结构,一方面它等价于Java的数据结构Map<String, Double>,可以给每一个元素value赋予一个权重score,另一方面它又类似于TreeSet,内部的元素会按照权重score进行排序,可以得到每个元素的名次,还可以通过score的范围来获取元素的列表。
zset底层实现使用了两个数据结构,第一个是hash,第二个是跳跃列表,hash的作用就是关联元素value和权重score,保障元素value的唯一性,可以通过元素value找到相应的score值。跳跃列表的目的在于给元素value排序,根据score的范围获取元素列表。
https://juejin.im/post/5b53ee7e5188251aaa2d2e16

这是一个基本的跳表数据结构,简单的代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
#include<stdio.h>
#include<stdlib.h>
//跳表的最大层级为32级
#define ZSKIPLIST_MAXLEVEL 32
#define ZSKIPLIST_P 0.25
#include<math.h>

//跳表节点
typedef struct zskiplistNode {
int key;
int value;
struct zskiplistLevel {
struct zskiplistNode *forward;
} level[1];
} zskiplistNode;

//跳表
typedef struct zskiplist {
struct zskiplistNode *header;
int level;
} zskiplist;

//创建跳表的节点
zskiplistNode *zslCreateNode(int level, int key, int value) {
zskiplistNode *zn = (zskiplistNode *)malloc(sizeof(*zn)+level*sizeof(zn->level));
zn->key = key;
zn->value = value;
return zn;
}

//初始化跳表
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
zsl = (zskiplist *) malloc(sizeof(*zsl));
zsl->level = 1;//将层级设置为1
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,NULL,NULL);
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
}
return zsl;
}

//向跳表中插入元素时,通过随机函数获取一个层级,表示插入在哪一层,ZSKIPLIST_P为何设置为0.25?
int zslRandomLevel(void) {
int level = 1;
while ((rand()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

//向跳表中插入元素
zskiplistNode *zslInsert(zskiplist *zsl, int key, int value) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i, level;
x = zsl->header;
//在跳表中寻找合适的位置并插入元素
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->key < key ||
(x->level[i].forward->key == key &&
x->level[i].forward->value < value))) {
x = x->level[i].forward;
}
update[i] = x;
}
//获取元素所在的随机层数
level = zslRandomLevel();
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
update[i] = zsl->header;
}
zsl->level = level;
}
//为新创建的元素创建数据节点
x = zslCreateNode(level,key,value);
for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
}
return x;
}

//跳表中删除节点的操作
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
for (i = 0; i < zsl->level; i++) {
if (update[i]->level[i].forward == x) {
update[i]->level[i].forward = x->level[i].forward;
}
}
//如果层数变了,相应的将层数进行减1操作
while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
zsl->level--;
}

//从跳表中删除元素
int zslDelete(zskiplist *zsl, int key, int value) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;
x = zsl->header;
//寻找待删除元素
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->key < key ||
(x->level[i].forward->key == key &&
x->level[i].forward->value < value))) {
x = x->level[i].forward;
}
update[i] = x;
}
x = x->level[0].forward;
if (x && key == x->key && x->value == value) {
zslDeleteNode(zsl, x, update);
//别忘了释放节点所占用的存储空间
free(x);
return 1;
} else {
//未找到相应的元素
return 0;
}
return 0;
}

//将链表中的元素打印出来
void printZslList(zskiplist *zsl) {
zskiplistNode *x;
x = zsl->header;
for (int i = zsl->level-1; i >= 0; i--) {
zskiplistNode *p = x->level[i].forward;
while (p) {
printf(" %d|%d ",p->key,p->value);
p = p->level[i].forward;
}
printf("\n");
}
}

int main() {
zskiplist *list = zslCreate();
zslInsert(list,1,2);
zslInsert(list,4,5);
zslInsert(list,2,2);
zslInsert(list,7,2);
zslInsert(list,7,3);
zslInsert(list,7,3);
printZslList(list);
zslDelete(list,7,2);
printZslList(list);
}
/*
4|5
1|2 2|2 4|5 7|2 7|3 7|3
4|5
1|2 2|2 4|5 7|3 7|3

*/

我们都知道redis中zset有序集,内部实现用了跳表,那么又有哪些优化呢?

Redis中跳表的基本数据结构定义如下,与基本跳表数据结构相比,在Redis中实现的跳表其特点是不仅有前向指针,也存在后向指针,而且在前向指针的结构中存在span跨度字段,这个跨度字段的出现有助于快速计算元素在整个集合中的排名。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//定义跳表的基本数据节点
typedef struct zskiplistNode {
robj *obj; // zset value
double score;// zset score
struct zskiplistNode *backward;//后向指针
struct zskiplistLevel {//前向指针
struct zskiplistNode *forward;
unsigned int span;
} level[];
} zskiplistNode;

typedef struct zskiplist {
struct zskiplistNode *header, *tail;
unsigned long length;
int level;
} zskiplist;

//有序集数据结构
typedef struct zset {
dict *dict;//字典存放value,以value为key
zskiplist *zsl;
} zset;

将如上数据结构转化成更形式化的图形表示,如下图所示
image

  • 可以看到header指针指向的是一个具有固定层级(32层)的表头节点,为什么定义成32,是因为定义成32层理论上对于2^32-1个元素的查询最优,而2^32=4294967296个元素,对于绝大多数的应用来说,已经足够了,所以就定义成了32层,到于为什么查询最优,你可以将其想像成一个32层的完全二叉排序树,算算这个树中节点的数量。

  • Redis中有序集另一个值得注意的地方就是当Score相同的时候,是如何存储的,当集合中两个值的Score相同,这时在跳表中存储会比较这两个值,对这两个值按字典排序存储在跳表结构中

下面来看看Redis对zskiplist/zskiplistNode的相关操作,源码如下所示(源码均出自t_zset.c)

创建跳表结构的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
//分配内存
zsl = zmalloc(sizeof(*zsl));
zsl->level = 1;//默认层级为1
zsl->length = 0;//跳表长度设置为0
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
//因为没有任何元素,将表头节点的前向指针均设置为0
zsl->header->level[j].forward = NULL;
//将表头节点前向指针结构中的跨度字段均设为0
zsl->header->level[j].span = 0;
}
//表头后向指针设置成0
zsl->header->backward = NULL;
//表尾节点设置成NULL
zsl->tail = NULL;
return zsl;
}

在上述代码中调用了zslCreateNode这个函数,函数的源码如下所示。

1
2
3
4
5
6
zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
zn->score = score;
zn->obj = obj;
return zn;
}

执行完上述代码之后会创建如下图所示的跳表结构

image

创建了跳表的基本结构,下面就是插入操作了,Redis中源码如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; //update[32]
unsigned int rank[ZSKIPLIST_MAXLEVEL];//rank[32]
int i, level;
redisAssert(!isnan(score));
x = zsl->header;
//寻找元素插入的位置
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert position */
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
while (x->level[i].forward &&
(x->level[i].forward->score < score || //以下是得分相同的情况下,比较value的字典排序
(x->level[i].forward->score == score &&compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
//产生随机层数
level = zslRandomLevel();
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
//记录最大层数
zsl->level = level;
}
//产生跳表节点
x = zslCreateNode(level,score,obj);
for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
//更新跨度
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
//此种情况只会出现在随机出来的层数小于最大层数时
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
}

上述源码中,有一个产生随机层数的函数,源代码如下所示:

1
2
3
4
5
6
7
8
int zslRandomLevel(void) {
int level = 1;
//#define ZSKIPLIST_P 0.25
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
//#ZSKIPLIST_MAXLEVEL 32
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

图形化的形式描述如下图所示:
image

参考

https://www.cnblogs.com/WJ5888/p/4595306.html

-------------本文结束感谢您的阅读-------------