1 24
用Go实现内存分配与释放的优化

背景

动态的内存分配策略对我们并不陌生,Go中的slice是运行时动态申请内存, 所以我们一般操纵slice,往往都需要设置len和cap的属性。 但如果我们分配的场景是那些高度频繁而且处于高并发的环境下,普通的对象内存分配是否能满足性能上的要求呢?

站在内核的角度,内核中普通对象进行初始化所需要的时间超过了对其分配和释放所需要的时间,我们设计上其实不应该将内存释放回一个全局的 内存池,而是将内存保持为特定的初始化状态。例如:如果内存被分配为一个互斥锁,那么只需要在互斥锁首次分配内存进行一次初始化函数即可,后续 的内存分配不需要执行这个初始化函数。

站在编程人员的角度,处理数据的频繁分配和回收,第一反应基本都是利用空闲链表:包含可供使用并已经分配好的数据结构块。空闲链表在编程上,可以理解 为高速缓存,能快速存储频繁使用的对象类型, 也就是说当代码需要一个新的数据结构实例时,就可以从空闲链表中抓取一个,而 不需要分配内存,再把数据放进去;以后不再需要这个数据结构的时候,就把它放回空闲链表,而不是释放它。

Slab基本概念

空闲链表的做法有个问题:因为当可用内存变得紧缺时, 内核无法通知每个空闲链表,让其收缩缓存的大小以便释放出一些内存来,换句话说,Linux内核 根本就不知道存在任何的所谓空闲链表

有见及此,Linux出现了Slab层:slab由一个或多个物理上连续的页组成,把不同的对象划分为所谓的高速缓存组,这些缓存组存放不同的对象类型,每个高速 缓存可以由多个slab组成。

slab层所起的主要作用就像是一个专用的分配器一样,可以为具体的对象类型进行分配。

结构示意图:

slab-struct

每个缓存都包含了一个 slabs 列表,这是一段连续的内存块(通常都是页面)。

存在 3 种 slab:

  • slabs_full 完全分配的 slab

  • slabs_partial 部分分配的 slab

  • slabs_empty 空 slab,或者没有对象被分配

注意 slabs_empty 列表中的 slab 是进行回收(reaping)的主要备选对象。正是通过此过程,slab 所使用的内存被返回给操作系统供其他用户使用。

slab 列表中的每个 slab 都是一个连续的内存块(一个或多个连续页),它们被划分成一个个对象。 这些对象是从特定缓存中进行分配和释放的基本元素。注意 slab 是 slab 分配器进行操作的最小分配单位, 因此如果需要对 slab 进行扩展,这也就是所扩展的最小值。通常来说,每个 slab 被分配为多个对象。

由于对象是从 slab 中进行分配和释放的,因此单个 slab 可以在 slab 列表之间进行移动。例如,当一个 slab 中的所有对象都被使用完时,就从 slabs_partial 列表中移动到 slabs_full 列表中。当一个 slab 完全被分配并且有对象被释放后,就从 slabs_full 列表中移动到 slabs_partial 列表中。当所有对象都被释放之后,就从 slabs_partial 列表移动到 slabs_empty 列表中。

优点

与传统的内存管理模式相比, slab 缓存分配器提供了很多优点。首先,内核通常依赖于对小对象的分配,它们会在系统生命周期内进行无数次分配。slab 缓存分配器通过对类似大小的对象进行缓存而提供这种功能,从而避免了常见的碎片问题。slab 分配器还支持通用对象的初始化,从而避免了为同一目而对一个对象重复进行初始化。最后,slab 分配器还可以支持硬件缓存对齐和着色,这允许不同缓存中的对象占用相同的缓存行,从而提高缓存的利用率并获得更好的性能。

代码实现

上面是Linux层面的slab设计,利用slab的特点,其实我们在编码过程中,也能开发一个针对性的slab内存池工具(memcached也是这么做的)

之前,看达达fastway的时候,我看到他slab层的实现,那么我们以达达的funny/slab 作为参考,介绍下怎么使用Go写一个slab的内存池,以达到优化分配,减少内存碎片的目的。

我在网上找了一个memcached的slab层设计图,来表达下相关结构关系:

chunk-re

OK,我们先约定好内存操作层面的数据结构:

//无锁的内存池
type MemSlabPool struct {
    classes []class
    minSize int
    maxSize int
}

//对象块
type class struct {
    size      int
    page      []byte
    pageBegin uintptr
    pageEnd   uintptr
    chunks    []chunk
    head      uint64
}

//内存数据块
type chunk struct {
    mem  []byte
    aba  uint32
    next uint64
}

接着我们根据这些主要结构封装我们分配和释放等相关方法,如下:

//创建一个无锁的slab分配内存池
//minSize : 最小的chuck大小
//maxSize : 最大的chunk大小
//factor : 增长因子
//pageSize : 每个slab类的内存大小 (内存分配基本单位是页)
func NewMemSlabPool(minSize, maxSize, factor, pageSize int) *MemSlabPool {

    pool := &MemSlabPool{
        make([]class, 0, 10),
        minSize,
        maxSize}

    //内存池初始化
    for chunkSize := minSize; chunkSize <= maxSize && chunkSize <= pageSize; chunkSize *= factor {

        c := class{
            size:   chunkSize,
            page:   make([]byte, pageSize),
            chunks: make([]chunk, pageSize/chunkSize),
            head:   (1 << 32),
        }

        for i := 0; i < len(c.chunks); i++ {
            chk := &c.chunks[i]
            //为了防止cap由于过小,导致append触发内存申请, 这里锁死cap的大小
            chk.mem = c.page[i*chunkSize : (i+1)*chunkSize : (i+1)*chunkSize]
            if i < len(c.chunks)-1 {
                chk.next = uint64(i+1+1 /* index start from 1 */) << 32
            } else {
                c.pageBegin = uintptr(unsafe.Pointer(&c.page[0]))
                c.pageEnd = uintptr(unsafe.Pointer(&chk.mem[0]))
            }

        }
        //分配到缓存池中
        pool.classes = append(pool.classes, c)
    }
    return pool
}

//分配流程:
//先尝试从slab的class中获取chunk
//如果在slab里头没有空闲的chunk,会创建一个
func (pool *AtomPool) Alloc(size int) []byte {
    if size <= pool.maxSize {
        for i := 0; i < len(pool.classes); i++ {
            if pool.classes[i].size >= size {
                mem := pool.classes[i].Pop()
                if mem != nil {
                    return mem[:size]
                }
                break
            }
        }
    }
    return make([]byte, size)
}

func (pool *AtomPool) Free(mem []byte) {
    size := cap(mem)
    for i := 0; i < len(pool.classes); i++ {
        if pool.classes[i].size == size {
            pool.classes[i].Push(mem)
            break
        }
    }
}

func (c *class) Push(mem []byte) {
    ptr := (*reflect.SliceHeader)(unsafe.Pointer(&mem)).Data
    if c.pageBegin <= ptr && ptr <= c.pageEnd {
        i := (ptr - c.pageBegin) / uintptr(c.size) //哪个chunk
        chk := &c.chunks[i]
        if chk.next != 0 {
            panic("slab.class.push: Double Free")
        }
        chk.aba++
        //新的head指向的地址
        new := uint64(i+1)<<32 + uint64(chk.aba)
        for {
            old := atomic.LoadUint64(&c.head)
            atomic.StoreUint64(&chk.next, old)
            //地址替换
            if atomic.CompareAndSwapUint64(&c.head, old, new) {
                break
            }
            runtime.Gosched()
        }
    }
}

func (c *class) Pop() []byte {
    for {
        old := atomic.LoadUint64(&c.head) //获取地址
        //无效地址
        if old == 0 {
            return nil
        }
        chk := &c.chunks[old>>32-1]

        //整理head的指向的地址
        //获取下个chk的地址
        nxt := atomic.LoadUint64(&chk.next)
        //头指针指向下一个
        if atomic.CompareAndSwapUint64(&c.head, old, nxt) {
            //清除chk的next地址
            atomic.StoreUint64(&chk.next, 0)
            return chk.mem
        }
        runtime.Gosched()
    }
}

编译无误后,我们编写基准测试代码:

package memslabs

import (
    "testing"
)

//GOMAXPROCS=16 go test -v -bench=. -benchmem

//高并发申请128字节的内存,再释放
func Benchmark_AllocAndFree_128(b *testing.B) {
    pool := NewMemSlabPool(128, 1024, 2, 64*1024)
    b.ResetTimer()
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            pool.Free(pool.Alloc(128))
        }
    })
}

//高并发申请256字节的内存,再释放
func Benchmark_AllocAndFree_256(b *testing.B) {
    pool := NewMemSlabPool(128, 1024, 2, 64*1024)
    b.ResetTimer()
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            pool.Free(pool.Alloc(256))
        }
    })
}

//高并发申请512字节的内存,再释放
func Benchmark_AllocAndFree_512(b *testing.B) {
    pool := NewMemSlabPool(128, 1024, 2, 64*1024)
    b.ResetTimer()
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            pool.Free(pool.Alloc(512))
        }
    })
}


最后,在命令行输入相关执行命令:

GOMAXPROCS=16 go test -v -bench=. -benchmem

可以看到相关执行结果:

Benchmark_AllocAndFree_128-16        10000000           127 ns/op           0 B/op           0 allocs/op
Benchmark_AllocAndFree_256-16        10000000           131 ns/op           0 B/op           0 allocs/op
Benchmark_AllocAndFree_512-16        10000000           134 ns/op           0 B/op           0 allocs/op

从基准测试结果我们看到,我们实现的内存池slab功能在每次操作中,基本能达到“零拷贝”的效果,这种方式在高频繁的对象分配和释放中对性能是 很有帮助的。

参考资料