单链表在查询、随机位置插入等操作时的时间复杂度为O(n),为了进一步优化时间效率,Redis设计了跳跃表来弥补单链表在查询操作上的不足之处。

内容较多,具体安排如下:

# 跳跃表

# 跳跃表的结构

跳跃表实质上是一个多层的有序链表,基础结构示意图如下:

如图所示,链表有许多层,最底层(第0层)是一个单向的有序链表,而越往上走,链表中的节点数就越少,但要满足在高层出现的节点一定会在底层出现。这样假如我们要寻找值为51的节点,先在最高层(第2层)寻找,51 > 21,转移至第1层,继续寻找,41 < 51 < 61,因此从41节点往下到第0层,最终找到51节点。跳跃表与SDS(什么是SDS?)一样,通过用空间换取时间的方法,将查询节点的时间复杂度降至O(logN),提高程序运行效率。

Redis中跳跃表并不简单是由多层的单链表拼合组成,为了满足日常需要,还加入了其他辅助属性:

跳表的数据结构如下(代码在server.h里),对照上图示例不难理解各个属性的意义(见代码注释):

/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
    sds ele;			// 储存数据的SDS
    double score;	// 用于排序的分数
    struct zskiplistNode *backward;			// 反向指针
    struct zskiplistLevel {							// 不同的单链表层
        struct zskiplistNode *forward;	// 指向下一个节点
        unsigned long span;							// 当前层的跨度(即到下一个节点间时会跳过多少个节点,span越大跳过的节点越多)
    } level[];
} zskiplistNode;
typedef struct zskiplist {
    struct zskiplistNode *header, *tail;	// 头尾指针
    unsigned long length;									// 链表长度(元素个数,不包含头节点)
    int level;														// 层高
} zskiplist;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

需要注意的是,跳表的反向指针实际上是独立于各层的,也可以说,其仅在第0层存在,不是每一层都有反向指针;另外,第一个节点(非头节点)的反向指针指向空,而不是头节点。

跳跃表存储的是有序数据,而其包含的首尾指针、长度属性可以帮助我们在O(1)的时间内获取到链表长度,最大/小值等。

# 跳跃表的操作

# 1. 构建跳跃表

跳表的第0层一定存在,而最大层高由#define ZSKIPLIST_MAXLEVEL 32定义。由于跳跃表高层中出现的节点一定在低层中出现,因此越高的层数其出现频率应该越低,否则跳跃表就失去了意义。

Redis中节点的层数通过随机概率#define ZSKIPLIST_P 0.25定义,随机生成一个值,当值小于p*0xFFFF时则增加一层,不断循环,最终返回level和ZSKIPLIST_MAXLEVEL的最小值:

/* Returns a random level for the new skiplist node we are going to create.
 * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
 * (both inclusive), with a powerlaw-alike distribution where higher
 * levels are less likely to be returned. */
int zslRandomLevel(void) {
    int level = 1;
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1;
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
1
2
3
4
5
6
7
8
9
10

这里可以计算一下每个节点的期望

当 p = 0.25 时, E = 1.33

注意!Redis跳表是有头节点的,而头节点必须生成所有的level

# 2. 跳跃表的增删改查

链表的增删改都需要先定位到相应的位置,上文已经简单描述过跳表的查询过程,从高层向低层逐渐查询。当确定了相应位置,那么插入只需遵循普通链表的插入方式即可。在增删的过程中,由于每一层的节点跨度不一,为了准确获取每一层的插入位置,需要用两个辅助数组update[]rank[],这里以插入为例通过代码分析跳表的插入过程(具体解析见代码注释):












 


 











 



















 







 





















/* Insert a new node in the skiplist. Assumes the element does not already
 * exist (up to the caller to enforce that). The skiplist takes ownership
 * of the passed SDS string 'ele'. */
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;
    serverAssert(!isnan(score));
    x = zsl->header;
  	/* 确定每一层的插入位置 */
    for (i = zsl->level-1; i >= 0; i--) {		// 从zsl的最高层开始,逐层向下
        /* store rank that is crossed to reach the insert position */
        // rank 是一个 int 数组, rank[i]用于确定第i层的插入位置(每一层每个节点的span都不同,rank相当于插入时前方的span值)
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
      	// 条件很好理解,分数比目标小或者分数一样时元素的排序也小,则一直乡下找
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                    sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
      	// update 是 node 的数组,update[i] 代表第i层插入位置的前一个节点
        update[i] = x;
    }
    /* we assume the element is not already inside, since we allow duplicated
     * scores, reinserting the same element should never happen since the
     * caller of zslInsert() should test in the hash table if the element is
     * already inside or not. */
  	// 随机生成当前节点层数
    level = zslRandomLevel();
  	// 层数比整个表都高的部分,直接和头节点连接
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
    }
  	
    x = zslCreateNode(level,score,ele);
  	// 对每一层进行插入
    for (i = 0; i < level; i++) {
      	// 当前节点指向前驱节点(update[i])的下一个
        x->level[i].forward = update[i]->level[i].forward;
      	// 前驱节点指向自己
        update[i]->level[i].forward = x;
        /* update span covered by update[i] as x is inserted here */
      	// 更新自己的span = 前驱节点span - (自己绝对位置 - 当前层的前驱累计) = 两个节点之间的跨度
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
      	// 更新前驱节点的span
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }
    /* increment span for untouched levels */
  	// 对于没有更新的层(上层部分),span都会增加一个
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }
  	// 尾指针
    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    zsl->length++;
    return x;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75

删除同理,不过在删除时候不需要确定每个层的插入位置,因此不需要rank[]进行辅助,具体细节不再赘述。

跳跃表将插入、删除、查询的平均时间复杂度都降到了O(logN)级别。

# 压缩列表

在一定条件下,redis会将数据储存在压缩列表中,当有序集合或散列表的元素个数比较少,且元素都是短字符串时,Redis便使用压缩列表作为其底层数据存储结构。

# 压缩列表的结构

压缩表的结构相对简单,在内存中会占用连续的地址空间:

zlbytes: 压缩列表的总字节长度,uint32_t ,故最长为 2^32 -1 字节;

zltail: 列表尾元素相对于压缩列表起始地址的偏移量,uint32_t,占4个字节;

zllen:压缩列表中的元素个数,uint16_t,2个字节,最大数量 2^8 - 1 = 65535;

extryX:压缩列表中元素的编码结构

zlend压缩列表的结尾,占1个字节,恒为0xFF。

# 压缩列表元素

压缩列表的一个特点是其中的元素是经过编码储存的,压缩列表元素的编码结构如下:

previous_entry_length:前一个entry的字节长度,占1个或5个字节;

encoding:当前元素的编码类型,即content字段存储的数据类型(整数或者字节数组),数据内容存储在content字段。编码的具体类型不再拓展,可查阅其他相关资料;

content:储存数据内容。

对编码数据进行解码后,会存在一个数据结构中,具体内容如下:

/* We use this function to receive information about a ziplist entry.
 * Note that this is not how the data is actually encoded, is just what we
 * get filled by a function in order to operate more easily. */
typedef struct zlentry {
    unsigned int prevrawlensize; // previous_entry_length 的长度
    unsigned int prevrawlen;     // previous_entry_length 的内容
    unsigned int lensize;        // encoding 的长度
    unsigned int len;            // 数据长度
    unsigned int headersize;     // previous_entry_length 与 encoding 长度和
    unsigned char encoding;      // 数据类型
    unsigned char *p;            // 指向元素首地址
} zlentry;
1
2
3
4
5
6
7
8
9
10
11
12

# 压缩表的相关操作

压缩表的增删改查操作需要O(N)的复杂度,当空间不够时会重新分配空间,需要注意的是,当进行增加、删除操作时,**新空间大小不一定是原空间大小+新元素大小,**因为当新元素entryX插入/删除后,entryX+1的头指针会发生改变,而头指针的大小是不确定的,如果元素的大小大于头指针1字节要求,则需要增加头部大小,因此整个空间大小是不确定的。

另外,压缩表的遍历也有不同。从后向前遍历可以根据每个元素的头部直接计算出下一个节点的位置,但是从前向后遍历则需要将每个元素进行解码,才能计算出下一个节点的位置。

# 压缩表与跳表的应用

  1. Redis的有序集合、散列和列表都直接或者间接使用了压缩列表。当有序集合或散列表的元素个数比较少,且元素都是短字符串时,Redis便使用压缩列表作为其底层数据存储结构;列表使用快速链表(quicklist)数据结构存储,而快速链表就是双向链表与压缩列表的组合。

  2. 在Redis中,跳跃表主要应用于有序集合的底层实现:

    Redis的配置文件中关于有序集合底层实现的两个配置:

    1)zset-max-ziplist-entries 128:zset采用压缩列表时,元素个数最大值。默认值为128;

    2)zset-max-ziplist-value 64:zset采用压缩列表时,每个元素的字符串长度最大值。默认值为64。

    zset插入第一个元素时,会判断下面两种条件:

    ❏ zset-max-ziplist-entries的值是否等于0;

    ❏ zset-max-ziplist-value小于要插入元素的字符串长度。

    满足任一条件Redis就会采用跳跃表作为底层实现,否则采用压缩列表作为底层实现方式。


参考资料: 《Redis 5 设计与源码分析》