Appearance
Redis 跳表 (Skip List) 详解 
数据结构定义 
每一个skiplist的node的结构体如下:
c
typedef struct zskiplistNode {
    sds ele;                    // 元素值
    double score;               // 分数
    struct zskiplistNode *backward;  // 后向指针,指向前一个节点
    struct zskiplistLevel {
        struct zskiplistNode *forward;  // 前向指针,指向下一个节点
        unsigned long span;             // 跨度,表示到下一个节点在level0上跨越的节点数
    } level[];                  // 层级数组
} zskiplistNode;重要概念:
ele:存储的元素值score:节点的分数,用于排序backward:后向指针,只在level 0存在,用于反向遍历forward:前向指针,指向下一个节点span:跨度,表示从当前节点的某一层到下一个节点在level 0上跨越了多少个节点
跳表的主结构:
c
typedef struct zskiplist {
    struct zskiplistNode *header, *tail;  // 头尾指针
    unsigned long length;                  // 节点数量
    int level;                            // 当前最高层级
} zskiplist;初始化过程 
c
zskiplist *zslCreate(void) {
    int j;
    zskiplist *zsl;
    zsl = zmalloc(sizeof(*zsl));
    zsl->level = 1;
    zsl->length = 0;
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
        zsl->header->level[j].forward = NULL;
        zsl->header->level[j].span = 0;
    }
    zsl->header->backward = NULL;
    zsl->tail = NULL;
    return zsl;
}初始化特点:
ZSKIPLIST_MAXLEVEL = 32,预分配32层- 创建一个虚拟头节点,拥有32层结构
 - 所有层的
forward = NULL,span = 0 - 这是一个空的跳表
 
初始状态:
Level 31: [HEADER] -> NULL
Level 30: [HEADER] -> NULL
   ...
Level 1:  [HEADER] -> NULL
Level 0:  [HEADER] -> NULL插入操作详解 
插入新节点使用以下函数:
会维护一个update数组,这个数组的作用是记录在每一层当中,新节点的前一个节点。也就是在插入的时候需要更新的节点,它需要调整自己的forward指针以及对应的span。以及一个rank数组,这个数组的作用是记录在每一层当中,到达插入位置时累计跨越的节点数量。
核心概念:
update[i]:记录在第i层中,新节点的前一个节点(需要更新其forward指针的节点)rank[i]:记录在第i层中,到达插入位置时累计的排名(跨越的节点数)
核心流程:
- 从最高层开始向下搜索,找到每一层中新节点应该插入的位置
 - 在每一层中,找到第一个score大于插入score的节点,或score相等但ele大于插入ele的节点
 - 记录每一层的前驱节点到update数组,记录累计排名到rank数组
 - 生成新节点的随机层高
 - 更新所有相关的forward指针和span值
 
c
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned long rank[ZSKIPLIST_MAXLEVEL];
    int i, level;
    serverAssert(!isnan(score));
    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        /* store rank that is crossed to reach the insert position */
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                    sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    level = zslRandomLevel();
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
    }
    x = zslCreateNode(level,score,ele);
    for (i = 0; i < level; i++) {
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;
        /* update span covered by update[i] as x is inserted here */
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }
    /* increment span for untouched levels */
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }
    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    zsl->length++;
    return x;
}实例演示 
span 和 rank 的核心概念 
- span(跨度):表示从当前节点的某一层到下一个节点在 level 0 上跨越了多少个节点
 - rank(排名):表示在查找过程中累计经过的节点数量
 
完整插入过程示例 
第一步:插入节点 B(score=2, level=2) 
插入前:
Level 1: [HEADER] -> NULL
Level 0: [HEADER] -> NULL查找过程:
rank[1] = 0, rank[0] = 0update[1] = HEADER, update[0] = HEADER
插入后:
Level 1: [HEADER] -> [B] -> NULL     (HEADER的span=1)
Level 0: [HEADER] -> [B] -> NULL     (HEADER的span=1)第二步:插入节点 C(score=3, level=1) 
查找过程:
- Level 1: HEADER -> B(B.score=2 < 3,继续) -> NULL,
rank[1] = 1, update[1] = B - Level 0: B -> NULL(到达末尾),
rank[0] = 1, update[0] = B 
插入后:
Level 1: [HEADER] -> [B] -----------> NULL     (HEADER的span=1, B的span=1)
Level 0: [HEADER] -> [B] -> [C] -> NULL       (HEADER的span=1, B的span=1)插入C后,Level 1中B的span需要更新为2,因为从B到下一个节点(NULL)在level 0上跨越了2个节点(B和C):
Level 1: [HEADER] -> [B] -----------> NULL     (HEADER的span=1, B的span=2)
Level 0: [HEADER] -> [B] -> [C] -> NULL       (HEADER的span=1, B的span=1)第三步:插入节点 D(score=4, level=1) 
查找过程:
- Level 1: HEADER -> B(B.score=2 < 4,继续) -> NULL,
rank[1] = 1, update[1] = B - Level 0: B -> C(C.score=3 < 4,继续) -> NULL,
rank[0] = 2, update[0] = C 
span 更新计算:
- C的新span = (rank[0] - rank[0]) + 1 = (2 - 2) + 1 = 1
 - D的span = C原span - (rank[0] - rank[0]) = 1 - (2 - 2) = 1
 - B的span需要+1(因为底层多了一个节点):2 + 1 = 3
 
插入后:
Level 1: [HEADER] -> [B] -----------------> NULL     (HEADER的span=1, B的span=3)
Level 0: [HEADER] -> [B] -> [C] -> [D] -> NULL       (所有span都=1)第四步:插入节点 E(score=5, level=3) 
查找过程:
- Level 2: 不存在,从Level 1开始
 - Level 1: HEADER -> B(B.score=2 < 5,继续) -> NULL,
rank[1] = 1, update[1] = B - Level 0: B -> C -> D(D.score=4 < 5,继续) -> NULL,
rank[0] = 3, update[0] = D 
span 更新计算:
对于 level 0(update[0] = D):
c
// E 的 span = D 原来的 span - (rank[0] - rank[0]) = 1 - 0 = 1
x->level[0].span = 1 - (3 - 3) = 1;
// D 的新 span = (rank[0] - rank[0]) + 1 = 1  
update[0]->level[0].span = (3 - 3) + 1 = 1;对于 level 1(update[1] = B):
c
// E 的 span = B 原来的 span - (rank[0] - rank[1]) = 3 - 2 = 1
x->level[1].span = 3 - (3 - 1) = 1;
// B 的新 span = (rank[0] - rank[1]) + 1 = 3
update[1]->level[1].span = (3 - 1) + 1 = 3;对于 level 2(新层,update[2] = HEADER):
c
// E 的 span = HEADER 原来的 span - (rank[0] - rank[2]) = 3 - 3 = 0,但实际是1
x->level[2].span = 3 - (3 - 0) = 0; // 这里需要特殊处理为1
// HEADER 的新 span = (rank[0] - rank[2]) + 1 = 4
update[2]->level[2].span = (3 - 0) + 1 = 4;最终结果:
Level 2: [HEADER] -----------------> [E] -> NULL    (HEADER的span=4)
Level 1: [HEADER] -> [B] -----------> [E] -> NULL    (HEADER的span=1, B的span=3, E的span=1)
Level 0: [HEADER] -> [B] -> [C] -> [D] -> [E] -> NULL (各span都=1)查找操作详解 
查找某个值的排名 
c
unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) {
    zskiplistNode *x;
    unsigned long rank = 0;
    int i;
    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward &&
            (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                sdscmp(x->level[i].forward->ele,ele) <= 0))) {
            rank += x->level[i].span;
            x = x->level[i].forward;
        }
        /* x might be equal to zsl->header, so test if obj is non-NULL */
        if (x->ele && x->score == score && sdscmp(x->ele,ele) == 0) {
            return rank;
        }
    }
    return 0;
}查找排名过程详解:
使用之前构建的跳表结构查找C(score=3)的排名:
Level 2: [HEADER] -----------------> [E] -> NULL    (HEADER的span=4)
Level 1: [HEADER] -> [B] -----------> [E] -> NULL    (HEADER的span=1, B的span=3)
Level 0: [HEADER] -> [B] -> [C] -> [D] -> [E] -> NULL (各span都=1)查找过程:
Level 2: 从HEADER开始,rank=0
- E.score=5 > 3,不能前进,停在HEADER
 
Level 1: 继续从HEADER开始
- B.score=2 < 3,可以前进,rank += 1,移动到B
 - E.score=5 > 3,不能继续前进,停在B
 
Level 0: 继续从B开始
- C.score=3 <= 3(条件中是<=),可以前进,rank += 1,移动到C
 - 检查C是否就是目标(C.score=3且ele匹配),找到目标,返回rank=2
 
核心原理:
rank累计已经跨越的节点数量- 在每一层从当前位置继续向前查找,直到找不到满足条件的节点
 - 当找到目标节点时,当前的rank就是该节点的排名
 
查找某个排名的值 
c
zskiplistNode *zslGetElementByRankFromNode(zskiplistNode *start_node, int start_level, unsigned long rank) {
    zskiplistNode *x;
    unsigned long traversed = 0;
    int i;
    x = start_node;
    for (i = start_level; i >= 0; i--) {
        while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
        {
            traversed += x->level[i].span;
            x = x->level[i].forward;
        }
        if (traversed == rank) {
            return x;
        }
    }
    return NULL;
}
zskiplistNode *zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
    return zslGetElementByRankFromNode(zsl->header, zsl->level - 1, rank);
}查找排名过程详解:
使用之前构建的跳表结构查找排名为3的节点(即节点D):
Level 2: [HEADER] -----------------> [E] -> NULL    (HEADER的span=4)
Level 1: [HEADER] -> [B] -----------> [E] -> NULL    (HEADER的span=1, B的span=3)
Level 0: [HEADER] -> [B] -> [C] -> [D] -> [E] -> NULL (各span都=1)查找过程:
Level 2: 从HEADER开始,traversed=0
- HEADER->level[2].span = 4,traversed + 4 = 4 > 3,不能前进
 
Level 1: 继续从HEADER开始
- HEADER->level[1].span = 1,traversed + 1 = 1 <= 3,前进到B,traversed = 1
 - B->level[1].span = 3,traversed + 3 = 4 > 3,不能继续前进
 
Level 0: 继续从B开始
- B->level[0].span = 1,traversed + 1 = 2 <= 3,前进到C,traversed = 2
 - C->level[0].span = 1,traversed + 1 = 3 <= 3,前进到D,traversed = 3
 - traversed == rank(3),找到目标节点D
 
核心原理:
traversed累计已经跨越的节点数量- 在每一层找到最远可达的位置,但不超过目标排名
 - 当 
traversed == rank时,当前节点就是目标节点 
