Codis的Slots-Rebalance算法

一、简述

Codis中所有的key都被hash到1024个slots上,在每个slots分配均匀的前提下,如果一个分片中的slots过多,该分片中存储的key的数量也就越多,该分片对应的负载也就越大,在扩缩容之后为了保证集群中各分片的负载均衡,需要调整分片的slots的数量。

1.1、使用场景

Codis的集群初始为2个分片,当业务增长需要扩容到4个分片的时候,我们可以手动指定slots指挥Codis进行数据迁移,也可以使用AutoRebalanceCodis自动的进行Slots数据迁移。

1.2、迁移原则

  • 尽可能的均匀分配Slots;

  • 尽量减少迁移的Slots的数量;

二、Rebalance算法

2.1、Slots分配方案

  • 统计当前迁移中Slots的结果,用于当前迁移方案的基础数据;

  • 按照每个Group可分配Slots的最大限制,统计Group中需要迁入/出的Slots信息;

  • 依据现有的GroupSlots的数量构建红黑树,统计分配Slots

  • 审核并存储迁移方案;

2.2、代码实现

func (s *Topom) SlotsRebalance(confirm bool) (map[int]int, error) {
s.mu.Lock()
defer s.mu.Unlock()
ctx, err := s.newContext()
if err != nil {
return nil, err
}

/* 获取所有group的id,每一个group必须拥有redis实例,
* 依据id从小到大排序group,其中group的id最小值为1
*/
var groupIds []int
for _, g := range ctx.group {
if len(g.Servers) != 0 {
groupIds = append(groupIds, g.Id)
}
}
sort.Ints(groupIds)

if len(groupIds) == 0 {
return nil, errors.Errorf("no valid group could be found")
}

/* 每一个分片(组)都拥有3个属性:
* assigned: 需要给当前的group分配的slots的数量
* pendings: 当前group需要移出的slots信息,其中key为group的id,value为slots的数组
* moveout: 当前group需要移出/入(为负数时代表移入)的slots数量,其中key为group的id,value为slots的数量
* docking为需要最终操作的slots的列表
*/
var (
assigned = make(map[int]int)
pendings = make(map[int][]int)
moveout = make(map[int]int)
docking []int
)

/* 获取group的当前的slots的数量 */
var groupSize = func(gid int) int {
return assigned[gid] + len(pendings[gid]) - moveout[gid]
}

/* 遍历slots,获取正在迁移中的slots的迁移结果并该结果计入本次的迁移统计 */
for _, m := range ctx.slots {
if m.Action.State != models.ActionNothing {
assigned[m.Action.TargetId]++
}
}

/* 按照平均值计算每个group可以分到的slots的数量(总量为1024) */
var lowerBound = MaxSlotNum / len(groupIds)

/* 遍历slots,统计需要迁移的slots信息 */
for _, m := range ctx.slots {
/* 对于处于迁移状态中的slots不执行任何操作 */
if m.Action.State != models.ActionNothing {
continue
}
/* 当前的slots属于集群中的一个group */
if m.GroupId != 0 {
/* slot所归属group中的slots的数量小于group的平均值,则需要往这个group中分配新的slot */
if groupSize(m.GroupId) < lowerBound {
assigned[m.GroupId]++
} else {
/* slot所归属group中的slots的数量大于group的平均值,则需要将这个slot移出它所归属的group */
pendings[m.GroupId] = append(pendings[m.GroupId], m.Id)
}
}
}

/* 创建一个自定义比较器的红黑树,这棵树代表着需要进行slots迁移的所有group
* key是group的id,slots最少的在左面,slots最多的在右面,key是group的id
*/
var tree = rbtree.NewWith(func(x, y interface{}) int {
var gid1 = x.(int)
var gid2 = y.(int)
if gid1 != gid2 {
if d := groupSize(gid1) - groupSize(gid2); d != 0 {
return d
}
return gid1 - gid2
}
return 0
})
for _, gid := range groupIds {
tree.Put(gid, nil)
}

/* 遍历所有的slots */
for _, m := range ctx.slots {
/* 对于处于迁移状态中的slots不执行任何操作 */
if m.Action.State != models.ActionNothing {
continue
}
if m.GroupId != 0 {
continue
}

/* 有一些slots不属于任何group,需要将这些slots分配给slots最少的group,也就是红黑树左面的最小的group */
dest := tree.Left().Key.(int)
tree.Remove(dest)

docking = append(docking, m.Id)
moveout[dest]--

tree.Put(dest, nil)
}

/* 每一个group能够获取slots的数量的上限,其实约等于 lowerBound + 1 */
var upperBound = (MaxSlotNum + len(groupIds) - 1) / len(groupIds)

/* 树中需要迁移的group大于等于2则需要进行rebalance,只有一个group就不需要了 */
for tree.Size() >= 2 {
from := tree.Right().Key.(int)
tree.Remove(from)

/* 当前group已经把所有需要移出的slots迁移出完毕 */
if len(pendings[from]) == moveout[from] {
continue
}
dest := tree.Left().Key.(int)
tree.Remove(dest)

var (
fromSize = groupSize(from)
destSize = groupSize(dest)
)
/* 右面的group中slots的数量小于等于每个group的平均值,则表示该group迁移完成,不需要再次加入tree中 */
if fromSize <= lowerBound {
break
}
/* 左面的group中slots的数量大于等于每个group的最大值,则表示该group也迁移完成,不需要再次加入tree中 */
if destSize >= upperBound {
break
}
/* 左右group中的slots的数量相差小于等于1,则表示这个两个group也不需要再次加入tree中了 */
if d := fromSize - destSize; d <= 1 {
break
}

/* 右面的group移出了一个,左面的group加入了一个 */
moveout[from]++
moveout[dest]--

/* 还需要继续迁移,将这两个group继续加入树中 */
tree.Put(from, nil)
tree.Put(dest, nil)
}

for gid, n := range moveout {
if n < 0 {
continue
}

/* 当前group需要移出n个slots */
if n > 0 {
/* 倒序遍历需要移出的slots的列表,将需要迁移的slots加入到docking中 */
sids := pendings[gid]
sort.Sort(sort.Reverse(sort.IntSlice(sids)))

docking = append(docking, sids[0:n]...)
pendings[gid] = sids[n:]
}
delete(moveout, gid)
}
/* 排序需要操作的slots列表 */
sort.Ints(docking)

var plans = make(map[int]int)

/* 遍历group,获取每一个group需要迁入多少个slots并将docking中的slots分配给对应的group,
* plans就是最终的分配方案,将某一个slot分配给某一个group
*/
for _, gid := range groupIds {
var in = -moveout[gid]
for i := 0; i < in && len(docking) != 0; i++ {
plans[docking[0]] = gid
docking = docking[1:]
}
}

/* 审批该方案 */
if !confirm {
return plans, nil
}

/* 存储slots与group的分配方案后续执行 */
var slotIds []int
for sid, _ := range plans {
slotIds = append(slotIds, sid)
}
sort.Ints(slotIds)

for _, sid := range slotIds {
m, err := ctx.getSlotMapping(sid)
if err != nil {
return nil, err
}
defer s.dirtySlotsCache(m.Id)

m.Action.State = models.ActionPending
m.Action.Index = ctx.maxSlotActionIndex() + 1
m.Action.TargetId = plans[sid]
if err := s.storeUpdateSlotMapping(m); err != nil {
return nil, err
}
}
return plans, nil
}

2.3、本地测试代码

package main

import (
"fmt"
"sort"

rbtree "github.com/emirpasic/gods/trees/redblacktree"
)

// Slot slot
type Slot struct {
ID int
GroupID int
}

// MaxSlotNum max
var MaxSlotNum = 64

// SlotsRebalance slots rebalance
func SlotsRebalance(groupIds []int, slots []Slot) (map[int]int, error) {
// 排序group
sort.Ints(groupIds)

var (
assigned = make(map[int]int)
pendings = make(map[int][]int)
moveout = make(map[int]int)
docking []int
)
var groupSize = func(gid int) int {
return assigned[gid] + len(pendings[gid]) - moveout[gid]
}
var lowerBound = MaxSlotNum / len(groupIds)

// 按照每个Group可分配Slots的最大限制,统计Group中需要迁入/出的Slots信息
for _, m := range slots {
if m.GroupID != 0 {
if groupSize(m.GroupID) < lowerBound {
assigned[m.GroupID]++
} else {
pendings[m.GroupID] = append(pendings[m.GroupID], m.ID)
}
}
}

// 依据现有的Group中Slots的数量构建红黑树
var tree = rbtree.NewWith(func(x, y interface{}) int {
var gid1 = x.(int)
var gid2 = y.(int)
if gid1 != gid2 {
if d := groupSize(gid1) - groupSize(gid2); d != 0 {
return d
}
return gid1 - gid2
}
return 0
})
for _, gid := range groupIds {
tree.Put(gid, nil)
}
// fmt.Println("rbtree is ", tree.String())

// 统计无主的slots
for _, m := range slots {
if m.GroupID != 0 {
continue
}
dest := tree.Left().Key.(int)
tree.Remove(dest)

docking = append(docking, m.ID)
moveout[dest]--
tree.Put(dest, nil)
}

// 每个group最大的slots数量
var upperBound = (MaxSlotNum + len(groupIds) - 1) / len(groupIds)

// 统计分配Slots
for tree.Size() >= 2 {
from := tree.Right().Key.(int)
tree.Remove(from)

if len(pendings[from]) == moveout[from] {
continue
}
dest := tree.Left().Key.(int)
tree.Remove(dest)

var (
fromSize = groupSize(from)
destSize = groupSize(dest)
)
if fromSize <= lowerBound {
break
}
if destSize >= upperBound {
break
}
if d := fromSize - destSize; d <= 1 {
break
}
moveout[from]++
moveout[dest]--

tree.Put(from, nil)
tree.Put(dest, nil)
}

for gid, n := range moveout {
if n < 0 {
continue
}
if n > 0 {
sids := pendings[gid]
sort.Sort(sort.Reverse(sort.IntSlice(sids)))
docking = append(docking, sids[0:n]...)
pendings[gid] = sids[n:]
}
delete(moveout, gid)
}
sort.Ints(docking)

// 构建迁移方案
var plans = make(map[int]int)
for _, gid := range groupIds {
var in = -moveout[gid]
for i := 0; i < in && len(docking) != 0; i++ {
plans[docking[0]] = gid
docking = docking[1:]
}
}

return plans, nil
}

func main() {
groupIds := []int{1, 2, 3}
slots := make([]Slot, MaxSlotNum)
for i := range slots {
slots[i].ID = i
}

plans, _ := SlotsRebalance(groupIds, slots[:10])
for k, v := range plans {
slots[k].GroupID = v
}

groupIds = append(groupIds, 4)
plans, _ = SlotsRebalance(groupIds, slots)
fmt.Println(plans)
}
Author: bugwz
Link: https://bugwz.com/2020/05/21/codis-slots-rebalance/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.