Skip Lists 阅读笔记

2023/10/01

Tags: Algorithm Paper Skip Lists

算法介绍

《Skip Lists: A Probabilistic Alternative to Balanced Trees》 论文标题翻译就是 跳表: 平衡树的概率性替代方案; 跳表是一种可以用来代替平衡树的数据结构。跳表使用概率平衡而不是严格强制的平衡,因此跳跃列表中的插入和删除算法比平衡树的等效算法要简单得多并且速度明显更快。

从论文的标题和介绍, 基本上就能知道跳表是一种怎么样的数据结构, 为了解决平衡树实现的复杂性, 提供一种概率性平衡的数据结构,作为平衡树的平替数据结构, 查询和插入时间复杂度是 O(log n).

算法流程

基本原理

skiplist-1

查询

level 表示跳表的层级, 而 forward[i] 是每一个层级的链表.

Search(list, searchKey)
    x := list→header
    // 从跳表的顶层开始,遍历到第一层
    for i := list→level downto 1 do
        while x→forward[i]→key < searchKey do
            x := x→forward[i]
    // x→key < searchKey ≤ x→forward[1]→key
    // 最终的结果从跳表最底层获取
    x := x→forward[1]
    if x→key = searchKey then 
        return x→value
    else 
        return failure

写入

由跳表的定义得出, 跳表的上一层级相当于下一层级的索引, 如果需要构建多级的索引, 首先需要解决: 当前node是否应该索引到上一层级?

基于链表的有序性,首先就能联想到每隔 2 个元素往上建立索引, 但是这样也会带来新的问题: 每次更新元素, 有可能会导致索引的全局更新, 效率反而降低了;

论文这里采用了概率的做法, 只需要在宏观上, 上层的 node 是下层 node 数量的 1/2, 就可以认为上层的索引建立成功, 而不需要保证每隔 n 个 node 向上索引; 这样做有个很明显的优点, 插入 node 时, 不需要去重新更新每一个层级的索引, 效率大大提高; 当然索引的稀疏性得不到保证, 但是在大批量数据背景下, 误差可以忽略不计.

概率性

这里 randomLevel() 实现了 node 所在层级呈概率分布, 第二层概率: 1/2; 第三层概率: 1/2 * 1/2 = 1/4;

randomLevel()
    lvl := 1
    // random() that returns a random value in [0...1)
    // p 默认是0.5
    while random() < p and lvl < MaxLevel do
        lvl := lvl + 1
    return lvl

写入的前半部分是在查找元素, 如果找到相同 key, 直接更新, 如果没找到, 则需要插入 node;

在插入 node 之前, 需要先明确一下跳表在实现上是个怎么样的结构, 从基础的单链表出发, 每一个 node 的有 valnext, val 是存储的具体的值 next 指向下一个 node; 从上图的跳表描述来看, 每一层都是一个单链表, 那怎么能直接从上一层快速跳转到下一层呢?

可以把 next 定义成一个数组, 这样通过数组下标的变化, 就能实现链表从上一层级跳转到下一层级. 论文里的 forward 就是 next 的含义.

Insert(list, searchKey, newValue)
    local update[1..MaxLevel]
    // 执行查找动作, 并且找到被更新节点的前一个节点 update[n]
    x := list→header
    for i := list→level downto 1 do
        while x→forward[i]→key < searchKey do
            x := x→forward[i]
        -- x→key < searchKey ≤ x→forward[i]→key
        update[i] := x
    x := x→forward[1]
    if x→key = searchKey then 
        // 如果找到直接更新 value
        x→value := newValue
    else // 没找到的情况, 需要新增节点
        lvl := randomLevel()
        if lvl > list→level then
            // 如果新生成的层级比当前跳表层级还高的话, 需要更新跳表的层级 level, 并且新的层级update[i]需要指向header
            for i := list→level + 1 to lvl do
                update[i] := list→header
            list→level := lvl
        // 创建新节点
        x := makeNode(lvl, searchKey, value)
        for i := 1 to level do
            // update[i] 指向的是新节点的前一个节点
            x→forward[i] := update[i]→forward[i]
            update[i]→forward[i] := x

删除

Delete(list, searchKey)
    local update[1..MaxLevel]
    // 执行查找动作, 并且找到被更新节点的前一个节点 update[n]
    x := list→header
    for i := list→level downto 1 do
        while x→forward[i]→key < searchKey do
            x := x→forward[i]
        update[i] := x
    // 找到需要删除的目标节点的最下层级元素
    x := x→forward[1]
    if x→key = searchKey then
        for i := 1 to list→level do
            // 判断update[n]的下一个节点是不是 x ; 如果是x,则删除x
            if update[i]→forward[i] ≠ x then 
                break
            update[i]→forward[i] := x→forward[i]
        free(x)
        // 判断最顶层是不是为空, 为空则跳表降级
        while list→level > 1 and list→header→forward[list→level] = NIL do
            list→level := list→level – 1

时间空间复杂度分析

从论文的思想来看, 很明显, 跳表这种数据结构, 就是以空间换时间的思路解决大数据量查找的问题, 通过多层级索引, 加速查询;

对于空间复杂度, 额外利用的空间就是所有上层的索引链表, 假设有 n 个元素, 元素索引到上层概率为 1/2; 额外的空间为: n/2 + n/4 + n/8 +...; 根据等比数列求和公式, 额外空间为: n(1-1/2^k) 其中k是项数, 也就是跳表层级; 因此空间复杂度为 O(n).

对于时间复杂度, 假设有 n 个元素, 元素索引到上层的概率为 1/2; 对于某个层级而言, 每级索引都是两个结点抽出一个结点作为上一级索引的结点时,所以每一层最多遍历3个结点, 所以最后需要查询的时间就取决于跳表的层级; l=log n, 所以查询时间复杂度为 O(log n).

具体实现

const maxLevel = 32
const pFactor = 0.25

type SkiplistNode struct {
    val     int
    forward []*SkiplistNode
}

type Skiplist struct {
    head  *SkiplistNode
    level int
}

func Constructor() Skiplist {
    return Skiplist{&SkiplistNode{-1, make([]*SkiplistNode, maxLevel)}, 0}
}

func (Skiplist) randomLevel() int {
    lv := 1
    for lv < maxLevel && rand.Float64() < pFactor {
        lv++
    }
    return lv
}

func (s *Skiplist) Search(target int) bool {
    curr := s.head
    for i := s.level - 1; i >= 0; i-- {
        // 找到第 i 层小于且最接近 target 的元素
        for curr.forward[i] != nil && curr.forward[i].val < target {
            curr = curr.forward[i]
        }
    }
    curr = curr.forward[0]
    // 检测当前元素的值是否等于 target
    return curr != nil && curr.val == target
}

func (s *Skiplist) Add(num int) {
    update := make([]*SkiplistNode, maxLevel)
    for i := range update {
        update[i] = s.head
    }
    curr := s.head
    for i := s.level - 1; i >= 0; i-- {
        // 找到第 i 层小于且最接近 num 的元素
        for curr.forward[i] != nil && curr.forward[i].val < num {
            curr = curr.forward[i]
        }
        update[i] = curr
    }
    lv := s.randomLevel()
    s.level = max(s.level, lv)
    newNode := &SkiplistNode{num, make([]*SkiplistNode, lv)}
    for i, node := range update[:lv] {
        // 对第 i 层的状态进行更新,将当前元素的 forward 指向新的节点
        newNode.forward[i] = node.forward[i]
        node.forward[i] = newNode
    }
}

func (s *Skiplist) Erase(num int) bool {
    update := make([]*SkiplistNode, maxLevel)
    curr := s.head
    for i := s.level - 1; i >= 0; i-- {
        // 找到第 i 层小于且最接近 num 的元素
        for curr.forward[i] != nil && curr.forward[i].val < num {
            curr = curr.forward[i]
        }
        update[i] = curr
    }
    curr = curr.forward[0]
    // 如果值不存在则返回 false
    if curr == nil || curr.val != num {
        return false
    }
    for i := 0; i < s.level && update[i].forward[i] == curr; i++ {
        // 对第 i 层的状态进行更新,将 forward 指向被删除节点的下一跳
        update[i].forward[i] = curr.forward[i]
    }
    // 更新当前的 level
    for s.level > 1 && s.head.forward[s.level-1] == nil {
        s.level--
    }
    return true
}

func max(a, b int) int {
    if b > a {
        return b
    }
    return a
}

参考

https://15721.courses.cs.cmu.edu/spring2018/papers/08-oltpindexes1/pugh-skiplists-cacm1990.pdf

https://leetcode.cn/problems/design-skiplist/solutions/1696545/she-ji-tiao-biao-by-leetcode-solution-e8yh/