算法介绍
《Skip Lists: A Probabilistic Alternative to Balanced Trees》 论文标题翻译就是 跳表: 平衡树的概率性替代方案; 跳表是一种可以用来代替平衡树的数据结构。跳表使用概率平衡而不是严格强制的平衡,因此跳跃列表中的插入和删除算法比平衡树的等效算法要简单得多并且速度明显更快。
从论文的标题和介绍, 基本上就能知道跳表是一种怎么样的数据结构, 为了解决平衡树实现的复杂性, 提供一种概率性平衡的数据结构,作为平衡树的平替数据结构, 查询和插入时间复杂度是 O(log n)
.
算法流程
基本原理
-
节点结构:跳表由多个层级组成,每个层级都是一个有序链表。每个节点包含一个值和多个指向下一层级节点的指针。
-
层级索引:跳表的最底层是一个普通的有序链表,每个节点都连接到下一个节点。而在更高的层级,节点以一定的概率连接到更远的节点,形成了一种“跳跃”的效果。这些连接被称为“跳跃指针”,它们允许我们在查找时可以快速地跳过一些节点。
-
查找操作:从跳表的顶层开始,我们沿着每个层级向右移动,直到找到目标值或找到一个大于目标值的节点。然后我们进入下一层级继续查找,直到最底层。这种方式可以在平均情况下实现快速的查找,时间复杂度为 O(log n)。
-
插入和删除操作:在插入新节点时,我们首先执行查找操作,找到合适的插入位置。然后我们在每个层级上插入新节点,并根据一定的概率决定是否要为该节点添加跳跃指针。删除操作类似,我们首先找到要删除的节点,然后将其从每个层级中移除。
查询
level
表示跳表的层级, 而 forward[i]
是每一个层级的链表.
Search(list, searchKey)
x := list→header
// 从跳表的顶层开始,遍历到第一层
for i := list→level downto 1 do
while x→forward[i]→key < searchKey do
x := x→forward[i]
// x→key < searchKey ≤ x→forward[1]→key
// 最终的结果从跳表最底层获取
x := x→forward[1]
if x→key = searchKey then
return x→value
else
return failure
写入
由跳表的定义得出, 跳表的上一层级相当于下一层级的索引, 如果需要构建多级的索引, 首先需要解决: 当前node是否应该索引到上一层级?
基于链表的有序性,首先就能联想到每隔 2 个元素往上建立索引, 但是这样也会带来新的问题: 每次更新元素, 有可能会导致索引的全局更新, 效率反而降低了;
论文这里采用了概率的做法, 只需要在宏观上, 上层的 node 是下层 node 数量的 1/2, 就可以认为上层的索引建立成功, 而不需要保证每隔 n 个 node 向上索引; 这样做有个很明显的优点, 插入 node 时, 不需要去重新更新每一个层级的索引, 效率大大提高; 当然索引的稀疏性得不到保证, 但是在大批量数据背景下, 误差可以忽略不计.
概率性
这里 randomLevel()
实现了 node 所在层级呈概率分布, 第二层概率: 1/2
; 第三层概率: 1/2 * 1/2 = 1/4
;
randomLevel()
lvl := 1
// random() that returns a random value in [0...1)
// p 默认是0.5
while random() < p and lvl < MaxLevel do
lvl := lvl + 1
return lvl
写入的前半部分是在查找元素, 如果找到相同 key, 直接更新, 如果没找到, 则需要插入 node;
在插入 node 之前, 需要先明确一下跳表在实现上是个怎么样的结构, 从基础的单链表出发, 每一个 node 的有 val
和 next
, val
是存储的具体的值 next
指向下一个 node; 从上图的跳表描述来看, 每一层都是一个单链表, 那怎么能直接从上一层快速跳转到下一层呢?
可以把 next
定义成一个数组, 这样通过数组下标的变化, 就能实现链表从上一层级跳转到下一层级. 论文里的 forward
就是 next
的含义.
Insert(list, searchKey, newValue)
local update[1..MaxLevel]
// 执行查找动作, 并且找到被更新节点的前一个节点 update[n]
x := list→header
for i := list→level downto 1 do
while x→forward[i]→key < searchKey do
x := x→forward[i]
-- x→key < searchKey ≤ x→forward[i]→key
update[i] := x
x := x→forward[1]
if x→key = searchKey then
// 如果找到直接更新 value
x→value := newValue
else // 没找到的情况, 需要新增节点
lvl := randomLevel()
if lvl > list→level then
// 如果新生成的层级比当前跳表层级还高的话, 需要更新跳表的层级 level, 并且新的层级update[i]需要指向header
for i := list→level + 1 to lvl do
update[i] := list→header
list→level := lvl
// 创建新节点
x := makeNode(lvl, searchKey, value)
for i := 1 to level do
// update[i] 指向的是新节点的前一个节点
x→forward[i] := update[i]→forward[i]
update[i]→forward[i] := x
删除
Delete(list, searchKey)
local update[1..MaxLevel]
// 执行查找动作, 并且找到被更新节点的前一个节点 update[n]
x := list→header
for i := list→level downto 1 do
while x→forward[i]→key < searchKey do
x := x→forward[i]
update[i] := x
// 找到需要删除的目标节点的最下层级元素
x := x→forward[1]
if x→key = searchKey then
for i := 1 to list→level do
// 判断update[n]的下一个节点是不是 x ; 如果是x,则删除x
if update[i]→forward[i] ≠ x then
break
update[i]→forward[i] := x→forward[i]
free(x)
// 判断最顶层是不是为空, 为空则跳表降级
while list→level > 1 and list→header→forward[list→level] = NIL do
list→level := list→level – 1
时间空间复杂度分析
从论文的思想来看, 很明显, 跳表这种数据结构, 就是以空间换时间的思路解决大数据量查找的问题, 通过多层级索引, 加速查询;
对于空间复杂度, 额外利用的空间就是所有上层的索引链表, 假设有 n 个元素, 元素索引到上层概率为 1/2; 额外的空间为: n/2 + n/4 + n/8 +...
; 根据等比数列求和公式, 额外空间为: n(1-1/2^k)
其中k是项数, 也就是跳表层级; 因此空间复杂度为 O(n)
.
对于时间复杂度, 假设有 n 个元素, 元素索引到上层的概率为 1/2; 对于某个层级而言, 每级索引都是两个结点抽出一个结点作为上一级索引的结点时,所以每一层最多遍历3个结点, 所以最后需要查询的时间就取决于跳表的层级; l=log n
, 所以查询时间复杂度为 O(log n)
.
具体实现
const maxLevel = 32
const pFactor = 0.25
type SkiplistNode struct {
val int
forward []*SkiplistNode
}
type Skiplist struct {
head *SkiplistNode
level int
}
func Constructor() Skiplist {
return Skiplist{&SkiplistNode{-1, make([]*SkiplistNode, maxLevel)}, 0}
}
func (Skiplist) randomLevel() int {
lv := 1
for lv < maxLevel && rand.Float64() < pFactor {
lv++
}
return lv
}
func (s *Skiplist) Search(target int) bool {
curr := s.head
for i := s.level - 1; i >= 0; i-- {
// 找到第 i 层小于且最接近 target 的元素
for curr.forward[i] != nil && curr.forward[i].val < target {
curr = curr.forward[i]
}
}
curr = curr.forward[0]
// 检测当前元素的值是否等于 target
return curr != nil && curr.val == target
}
func (s *Skiplist) Add(num int) {
update := make([]*SkiplistNode, maxLevel)
for i := range update {
update[i] = s.head
}
curr := s.head
for i := s.level - 1; i >= 0; i-- {
// 找到第 i 层小于且最接近 num 的元素
for curr.forward[i] != nil && curr.forward[i].val < num {
curr = curr.forward[i]
}
update[i] = curr
}
lv := s.randomLevel()
s.level = max(s.level, lv)
newNode := &SkiplistNode{num, make([]*SkiplistNode, lv)}
for i, node := range update[:lv] {
// 对第 i 层的状态进行更新,将当前元素的 forward 指向新的节点
newNode.forward[i] = node.forward[i]
node.forward[i] = newNode
}
}
func (s *Skiplist) Erase(num int) bool {
update := make([]*SkiplistNode, maxLevel)
curr := s.head
for i := s.level - 1; i >= 0; i-- {
// 找到第 i 层小于且最接近 num 的元素
for curr.forward[i] != nil && curr.forward[i].val < num {
curr = curr.forward[i]
}
update[i] = curr
}
curr = curr.forward[0]
// 如果值不存在则返回 false
if curr == nil || curr.val != num {
return false
}
for i := 0; i < s.level && update[i].forward[i] == curr; i++ {
// 对第 i 层的状态进行更新,将 forward 指向被删除节点的下一跳
update[i].forward[i] = curr.forward[i]
}
// 更新当前的 level
for s.level > 1 && s.head.forward[s.level-1] == nil {
s.level--
}
return true
}
func max(a, b int) int {
if b > a {
return b
}
return a
}
参考
https://15721.courses.cs.cmu.edu/spring2018/papers/08-oltpindexes1/pugh-skiplists-cacm1990.pdf