跳跃表,是一种有序的数据结构,可以在一个节点中维护多个执行其他节点的指针,从而快速访问其他节点。大多数情况下跳跃表的效率可以跟平衡树相媲美,并且跳跃表的实现相对简单,redis使用跳跃表做有序集合的底层实现。
数据结构
redis的跳跃表包含头尾指针、跳跃表的长度以及维护的跳跃表中的最大层数。跳跃表的接点由key、score、前驱节点以及多个后继节点的指针组成。
1、跳跃表
typedef struct zskiplist { // 头节点和尾节点 struct zskiplistNode *header, *tail; // 跳跃表的长度,即节点数量 unsigned long length; // 层数最大节点的层数 int level; } zskiplist;
2、节点
typedef struct zskiplistNode { // 有序表的key sds ele; // key的分数 double score; // 当前节点的前驱节点 struct zskiplistNode *backward; // 层,维护了多个节点的指针以及span struct zskiplistLevel { // 前进指针,指向后继节点 struct zskiplistNode *forward; // 跨度,表示相邻节点的跨度,即相差了几个节点 unsigned long span; } level[]; } zskiplistNode;
特点
1、跳跃表是有序集合的底层实现。
2、每个跳跃表节点的level都是 1 - 32的随机整数。
3、跳跃表的节点按照score大小有序排列,分值相同时,按照成员ele大小排列。
4、跳跃表中的每个成员ele必须是唯一的,但是score可以相同。
源码
1、初始化跳跃表
/* Create a new skiplist. */ zskiplist *zslCreate(void) { int j; zskiplist *zsl; zsl = zmalloc(sizeof(*zsl)); zsl->level = 1; zsl->length = 0; zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL); for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) { zsl->header->level[j].forward = NULL; zsl->header->level[j].span = 0; } zsl->header->backward = NULL; zsl->tail = NULL; return zsl; }
2、插入一个元素
/* Insert a new node in the skiplist. Assumes the element does not already * exist (up to the caller to enforce that). The skiplist takes ownership * of the passed SDS string 'ele'. */ zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) { zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; unsigned int rank[ZSKIPLIST_MAXLEVEL]; int i, level; serverAssert(!isnan(score)); x = zsl->header; // 寻找新节点需要插入的位置 for (i = zsl->level-1; i >= 0; i--) { // rank[i] 表示从最上层开始查找所需插入位置寻至该层所遍历的节点数,即span和 /* store rank that is crossed to reach the insert position */ rank[i] = i == (zsl->level-1) ? 0 : rank[i+1]; while (x->level[i].forward && (x->level[i].forward->score < score || (x->level[i].forward->score == score && sdscmp(x->level[i].forward->ele,ele) < 0))) { rank[i] += x->level[i].span; x = x->level[i].forward; } // update[i]表示每一层需要更新的节点(forward和span) update[i] = x; } /* we assume the element is not already inside, since we allow duplicated * scores, reinserting the same element should never happen since the * caller of zslInsert() should test in the hash table if the element is * already inside or not. */ // 如果新生成的节点level层数,介于 1 - 32之间,如果大于当前跳跃表的最大层数,则将大于的层数初始化 level = zslRandomLevel(); if (level > zsl->level) { for (i = zsl->level; i < level; i++) { // 将rank[i]置为0,表示在该层寻找插入的位置需要遍历的节点数为0 rank[i] = 0; // 在该层需要更新的是头节点 update[i] = zsl->header; // 头节点在该层的span为原来跳跃表的长度 update[i]->level[i].span = zsl->length; } // 设置跳跃表的最大层数 zsl->level = level; } // 创建新节点,并将新节点插入跳跃表中,并更新每一层的需要更新节点的forward和span // 从第1层开始更新 x = zslCreateNode(level,score,ele); for (i = 0; i < level; i++) { x->level[i].forward = update[i]->level[i].forward; update[i]->level[i].forward = x; /* update span covered by update[i] as x is inserted here */ // rank[i]表示新节点的在i层待插入位置的所查找的跨度和 // A = update[i], C = update[i]->level[i].forward, B = x // A -> C 插入 B, 那么A = update[i], update[i]->level[i].span就是A到C的span, (rank[0] - rank[i])就是A到B的前一个节点span和 // x->level[i].span 表示B到C的距离, 插入新节点后 AC = update[i]->level[i].span + 1, AB = rank[0] - rank[i] + 1 // 下面的一行 <=> level[i].span = (update[i]->level[i].span + 1) - (rank[0] - rank[i] + 1); x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]); // A到B的距离,(rank[0] - rank[i])就是A到B的前一个节点span和 update[i]->level[i].span = (rank[0] - rank[i]) + 1; } /* increment span for untouched levels */ // 如果新节点层数小于跳跃表的最大层数,则将大于新节点level每一层的待更新的节点的span + 1 for (i = level; i < zsl->level; i++) { update[i]->level[i].span++; } // 设置新节点的前驱节点,如果为跳跃表为空,则为null, 否则前驱节点为最底层的待更新的节点 x->backward = (update[0] == zsl->header) ? NULL : update[0]; // 如果插入节点为跳跃表的中间点则重置新节点的后继节点的backward if (x->level[0].forward) x->level[0].forward->backward = x; else // 如果插入节点为跳跃表的最后一个点则设置tail zsl->tail = x; // 跳跃表长度增加1 zsl->length++; return x; }
3、删除一个节点
/* Internal function used by zslDelete, zslDeleteRangeByScore and * zslDeleteRangeByRank. */ void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) { int i; for (i = 0; i < zsl->level; i++) { // 如果该层的后继节点是待删除的节点,则重置该节点的forward和span if (update[i]->level[i].forward == x) { // x->level[i].span - 1 是因为节点数少1 update[i]->level[i].span += x->level[i].span - 1; update[i]->level[i].forward = x->level[i].forward; } else { // 如果待更新节点的后继不是待删节点则span - 1 update[i]->level[i].span -= 1; } } // 重置前驱节点 if (x->level[0].forward) { x->level[0].forward->backward = x->backward; } else { // 如果是叶节点则只需重置tail zsl->tail = x->backward; } // 重新设置跳跃表的最大level while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL) zsl->level--; zsl->length--; }
4、删除一个元素
/* Delete an element with matching score/element from the skiplist. * The function returns 1 if the node was found and deleted, otherwise * 0 is returned. * * If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise * it is not freed (but just unlinked) and *node is set to the node pointer, * so that it is possible for the caller to reuse the node (including the * referenced SDS string at node->ele). */ int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) { zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; int i; x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { while (x->level[i].forward && (x->level[i].forward->score < score || (x->level[i].forward->score == score && sdscmp(x->level[i].forward->ele,ele) < 0))) { x = x->level[i].forward; } update[i] = x; } /* We may have multiple elements with the same score, what we need * is to find the element with both the right score and object. */ // 最后查找的 x 的后继则是我们要删除的节点 x = x->level[0].forward; if (x && score == x->score && sdscmp(x->ele,ele) == 0) { zslDeleteNode(zsl, x, update); // 如果不需要返回待删除节点,则释放节点 if (!node) zslFreeNode(x); else *node = x; return 1; } return 0; /* not found */ }
5、清空跳跃表
/* Free a whole skiplist. */ void zslFree(zskiplist *zsl) { zskiplistNode *node = zsl->header->level[0].forward, *next; zfree(zsl->header); while(node) { next = node->level[0].forward; zslFreeNode(node); node = next; } zfree(zsl); }