对码当歌,猿生几何?

redis跳跃表数据结构源码解析

跳跃表,是一种有序的数据结构,可以在一个节点中维护多个执行其他节点的指针,从而快速访问其他节点。大多数情况下跳跃表的效率可以跟平衡树相媲美,并且跳跃表的实现相对简单,redis使用跳跃表做有序集合的底层实现。

数据结构

redis的跳跃表包含头尾指针、跳跃表的长度以及维护的跳跃表中的最大层数。跳跃表的接点由key、score、前驱节点以及多个后继节点的指针组成。

1、跳跃表

typedef struct zskiplist {
    // 头节点和尾节点
    struct zskiplistNode *header, *tail;
    // 跳跃表的长度,即节点数量
    unsigned long length;
    // 层数最大节点的层数
    int level;
} zskiplist;

2、节点

typedef struct zskiplistNode {
    // 有序表的key
    sds ele;
    // key的分数
    double score;
    // 当前节点的前驱节点    
    struct zskiplistNode *backward;
    // 层,维护了多个节点的指针以及span
    struct zskiplistLevel {
        // 前进指针,指向后继节点
        struct zskiplistNode *forward;
        // 跨度,表示相邻节点的跨度,即相差了几个节点
        unsigned long span;
    } level[];
} zskiplistNode;

特点

1、跳跃表是有序集合的底层实现。

2、每个跳跃表节点的level都是 1 - 32的随机整数。

3、跳跃表的节点按照score大小有序排列,分值相同时,按照成员ele大小排列。

4、跳跃表中的每个成员ele必须是唯一的,但是score可以相同。

源码

1、初始化跳跃表

/* Create a new skiplist. */
zskiplist *zslCreate(void) {
    int j;
    zskiplist *zsl;
    
    zsl = zmalloc(sizeof(*zsl));
    zsl->level = 1;
    zsl->length = 0;
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
        zsl->header->level[j].forward = NULL;
        zsl->header->level[j].span = 0;
    }
    zsl->header->backward = NULL;
    zsl->tail = NULL;
    return zsl;
}

2、插入一个元素

/* Insert a new node in the skiplist. Assumes the element does not already
 * exist (up to the caller to enforce that). The skiplist takes ownership
 * of the passed SDS string 'ele'. */
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;
    
    serverAssert(!isnan(score));
    x = zsl->header;
    // 寻找新节点需要插入的位置
    for (i = zsl->level-1; i >= 0; i--) {
        // rank[i] 表示从最上层开始查找所需插入位置寻至该层所遍历的节点数,即span和
        /* store rank that is crossed to reach the insert position */
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                    sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        // update[i]表示每一层需要更新的节点(forward和span)
        update[i] = x;
    }
    /* we assume the element is not already inside, since we allow duplicated
         * scores, reinserting the same element should never happen since the
         * caller of zslInsert() should test in the hash table if the element is
         * already inside or not. */
    // 如果新生成的节点level层数,介于 1 - 32之间,如果大于当前跳跃表的最大层数,则将大于的层数初始化
    level = zslRandomLevel();
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            // 将rank[i]置为0,表示在该层寻找插入的位置需要遍历的节点数为0
            rank[i] = 0;
            // 在该层需要更新的是头节点
            update[i] = zsl->header;
            // 头节点在该层的span为原来跳跃表的长度
            update[i]->level[i].span = zsl->length;
        }
        // 设置跳跃表的最大层数
        zsl->level = level;
    }
    
    // 创建新节点,并将新节点插入跳跃表中,并更新每一层的需要更新节点的forward和span 
    // 从第1层开始更新
    x = zslCreateNode(level,score,ele);
    for (i = 0; i < level; i++) {
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;
        /* update span covered by update[i] as x is inserted here */
        // rank[i]表示新节点的在i层待插入位置的所查找的跨度和
        // A = update[i], C = update[i]->level[i].forward,  B = x
        // A -> C 插入 B, 那么A = update[i], update[i]->level[i].span就是A到C的span, (rank[0] - rank[i])就是A到B的前一个节点span和
        // x->level[i].span 表示B到C的距离, 插入新节点后 AC = update[i]->level[i].span + 1, AB = rank[0] - rank[i] + 1
        // 下面的一行 <=> level[i].span = (update[i]->level[i].span + 1) - (rank[0] - rank[i] + 1); 
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        // A到B的距离,(rank[0] - rank[i])就是A到B的前一个节点span和
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }
    
    /* increment span for untouched levels */
    // 如果新节点层数小于跳跃表的最大层数,则将大于新节点level每一层的待更新的节点的span + 1
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }
    
    // 设置新节点的前驱节点,如果为跳跃表为空,则为null, 否则前驱节点为最底层的待更新的节点
    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    
    // 如果插入节点为跳跃表的中间点则重置新节点的后继节点的backward
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        // 如果插入节点为跳跃表的最后一个点则设置tail
        zsl->tail = x;
    // 跳跃表长度增加1
    zsl->length++;
    return x;
}

3、删除一个节点

/* Internal function used by zslDelete, zslDeleteRangeByScore and
 * zslDeleteRangeByRank. */
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
    int i;
    for (i = 0; i < zsl->level; i++) {
        // 如果该层的后继节点是待删除的节点,则重置该节点的forward和span
        if (update[i]->level[i].forward == x) {
            // x->level[i].span - 1 是因为节点数少1
            update[i]->level[i].span += x->level[i].span - 1;
            update[i]->level[i].forward = x->level[i].forward;
        } else {
            // 如果待更新节点的后继不是待删节点则span - 1
            update[i]->level[i].span -= 1;
        }
    }
    // 重置前驱节点
    if (x->level[0].forward) {
        x->level[0].forward->backward = x->backward;
    } else {
        // 如果是叶节点则只需重置tail
        zsl->tail = x->backward;
    }
    // 重新设置跳跃表的最大level
    while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
        zsl->level--;
    zsl->length--;
}

4、删除一个元素

/* Delete an element with matching score/element from the skiplist.
 * The function returns 1 if the node was found and deleted, otherwise
 * 0 is returned.
 *
 * If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise
 * it is not freed (but just unlinked) and *node is set to the node pointer,
 * so that it is possible for the caller to reuse the node (including the
 * referenced SDS string at node->ele). */
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;
    
    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward &&
            (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    /* We may have multiple elements with the same score, what we need
         * is to find the element with both the right score and object. */
    // 最后查找的 x 的后继则是我们要删除的节点
    x = x->level[0].forward;
    if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
        zslDeleteNode(zsl, x, update);
        // 如果不需要返回待删除节点,则释放节点
        if (!node)
            zslFreeNode(x);
        else
            *node = x;
        return 1;
    }
    return 0; /* not found */
}

5、清空跳跃表

/* Free a whole skiplist. */
void zslFree(zskiplist *zsl) {
    zskiplistNode *node = zsl->header->level[0].forward, *next;
    
    zfree(zsl->header);
    while(node) {
        next = node->level[0].forward;
        zslFreeNode(node);
        node = next;
    }
    zfree(zsl);
}