7 22
go同步之条件变量

Go的标准库中有一个类型叫条件变量:sync.Cond。这种类型与互斥锁和读写锁不同,它不是开箱即用的,它需要与互斥锁组合使用:


// NewCond returns a new Cond with Locker l.
func NewCond(l Locker) *Cond {
    return &Cond{L: l}
}

// A Locker represents an object that can be locked and unlocked.
type Locker interface {
    Lock()
    Unlock()
}

通过使用 NewCond 函数可以返回 *sync.Cond 类型的结果, *sync.Cond 我们主要操作其三个方法,分别是:

wait():等待通知

Signal():单播通知

Broadcast():广播通知

具体的函数说明如下:

// Wait atomically unlocks c.L and suspends execution
// of the calling goroutine. After later resuming execution,
// Wait locks c.L before returning. Unlike in other systems,
// Wait cannot return unless awoken by Broadcast or Signal.
//
// Because c.L is not locked when Wait first resumes, the caller
// typically cannot assume that the condition is true when
// Wait returns. Instead, the caller should Wait in a loop:
//
//    c.L.Lock()
//    for !condition() {
//        c.Wait()
//    }
//    ... make use of condition ...
//    c.L.Unlock()
//
func (c *Cond) Wait() {
    c.checker.check()
    t := runtime_notifyListAdd(&c.notify)
    c.L.Unlock()
    runtime_notifyListWait(&c.notify, t)
    c.L.Lock()
}

// Signal wakes one goroutine waiting on c, if there is any.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Signal() {
    c.checker.check()
    runtime_notifyListNotifyOne(&c.notify)
}

// Broadcast wakes all goroutines waiting on c.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Broadcast() {
    c.checker.check()
    runtime_notifyListNotifyAll(&c.notify)
}

条件变量sync.Cond本质上是一些正在等待某个条件的线程的同步机制。

sync.Cond 主要实现一个条件变量,假如 goroutine A 执行前需要等待另外的goroutine B 的通知,那边处于等待的goroutine A 会保存在一个通知列表,也就是说需要某种变量状态的goroutine A 将会等待/Wait在那里,当某个时刻状态改变时负责通知的goroutine B 通过对条件变量通知的方式(Broadcast,Signal)来通知处于等待条件变量的goroutine A, 这样便可首先一种“消息通知”的同步机制。

以go的http处理为例,在Go的源码中http模块server部分源码中所示,当需要处理一个新的连接的时候,若连接conn是实现自*tls.Conn的情况下,会进行相关的客户端与服务端的“握手”处理Handshake(), 入口代码如下:

if tlsConn, ok := c.rwc.(*tls.Conn); ok {
        if d := c.server.ReadTimeout; d != 0 {
            c.rwc.SetReadDeadline(time.Now().Add(d))
        }
        if d := c.server.WriteTimeout; d != 0 {
            c.rwc.SetWriteDeadline(time.Now().Add(d))
        }
        if err := tlsConn.Handshake(); err != nil {
            c.server.logf("http: TLS handshake error from %s: %v", c.rwc.RemoteAddr(), err)
            return
        }
        c.tlsState = new(tls.ConnectionState)
        *c.tlsState = tlsConn.ConnectionState()
        if proto := c.tlsState.NegotiatedProtocol; validNPN(proto) {
            if fn := c.server.TLSNextProto[proto]; fn != nil {
                h := initNPNRequest{tlsConn, serverHandler{c.server}}
                fn(c.server, tlsConn, h)
            }
            return
        }
    }

其中的Handshake函数代码通过使用条件变量的方式来处理新连接握手调用的同步问题:

func (c *Conn) Handshake() error {
    c.handshakeMutex.Lock()
    defer c.handshakeMutex.Unlock()

    for {
        if err := c.handshakeErr; err != nil {
            return err
        }
        if c.handshakeComplete {
            return nil
        }
        if c.handshakeCond == nil {
            break
        }

        c.handshakeCond.Wait()
    }

    c.handshakeCond = sync.NewCond(&c.handshakeMutex)
    c.handshakeMutex.Unlock()

    c.in.Lock()
    defer c.in.Unlock()

    c.handshakeMutex.Lock()

    if c.handshakeErr != nil || c.handshakeComplete {
        panic("handshake should not have been able to complete after handshakeCond was set")
    }

    if c.isClient {
        c.handshakeErr = c.clientHandshake()
    } else {
        c.handshakeErr = c.serverHandshake()
    }
    if c.handshakeErr == nil {
        c.handshakes++
    } else {
        c.flush()
    }

    if c.handshakeErr == nil && !c.handshakeComplete {
        panic("handshake should have had a result.")
    }

    c.handshakeCond.Broadcast()
    c.handshakeCond = nil

    return c.hand

我们也可以再通过一个例子熟悉sync.Cond的使用:

我们尝试实现一个读写同步的例子,需求是:我们有数个读取器和数个写入器,读取器必须依赖写入器对缓存区进行数据写入后,才可从缓存区中对数据进行读出。我们思考下,要实现类似的功能,除了使用channel,还能如何做?

写入器每次完成写入数据后,它都需要某种通知机制广播给处于阻塞状态的读取器,告诉它们可以对数据进行访问,这其实跟sync.Cond 的 广播机制是不是很像? 有了这个广播机制,我们可以通过sync.Cond来实现这个例子了:

package main

import (
    "bytes"
    "fmt"
    "io"
    "sync"
    "time"
)

type MyDataBucket struct {
    br     *bytes.Buffer
    gmutex *sync.RWMutex
    rcond  *sync.Cond //读操作需要用到的条件变量
}

func NewDataBucket() *MyDataBucket {
    buf := make([]byte, 0)
    db := &MyDataBucket{
        br:     bytes.NewBuffer(buf),
        gmutex: new(sync.RWMutex),
    }
    db.rcond = sync.NewCond(db.gmutex.RLocker())
    return db
}

func (db *MyDataBucket) Read(i int) {
    db.gmutex.RLock()
    defer db.gmutex.RUnlock()
    var data []byte
    var d byte
    var err error
    for {
        //读取一个字节
        if d, err = db.br.ReadByte(); err != nil {
            if err == io.EOF {
                if string(data) != "" {
                    fmt.Printf("reader-%d: %s\n", i, data)
                }
                db.rcond.Wait()
                data = data[:0]
                continue
            }
        }
        data = append(data, d)
    }
}

func (db *MyDataBucket) Put(d []byte) (int, error) {
    db.gmutex.Lock()
    defer db.gmutex.Unlock()
    //写入一个数据块
    n, err := db.br.Write(d)
    db.rcond.Broadcast()
    return n, err
}

func main() {
    db := NewDataBucket()

    go db.Read(1)

    go db.Read(2)

    for i := 0; i < 10; i++ {
        go func(i int) {
            d := fmt.Sprintf("data-%d", i)
            db.Put([]byte(d))
        }(i)
        time.Sleep(100 * time.Millisecond)
    }
}

当使用sync.Cond的时候有两点移动要注意的:

  • 一定要在调用cond.Wait方法前,锁定与之关联的读写锁

  • 一定不要忘记在cond.Wait后,若数据已经处理完毕,在返回前要对与之关联的读写锁进行解锁。

如下面 Wait() 的源码所示,Cond.Wait会自动释放锁等待信号的到来,当信号到来后,第一个获取到信号的Wait将继续往下执行并从新上锁

func (c *Cond) Wait() {
    c.checker.check()
    t := runtime_notifyListAdd(&c.notify)
    c.L.Unlock()
    runtime_notifyListWait(&c.notify, t)
    c.L.Lock()
}

如果不释放锁, 其它收到信号的gouroutine将阻塞无法继续执行。