funcmain() { // Create a new channel with `make(chan val-type)`. // Channels are typed by the values they convey. messages := make(chanstring) // Send a value into a channel using the `channel <-` // syntax. Here we send `"ping"` to the `messages` // channel we made above, from a new goroutine. gofunc() { messages <- "ping" }() // The `<-channel` syntax receives a value from the // channel. Here we'll receive the `"ping"` message // we sent above and print it out. msg := <-messages fmt.Println(msg) }
channel的功能点:
队列
阻塞
当一端阻塞,可以被另一个端唤醒
我们围绕这3点功能展开,讲讲具体的实现。
channel结构
注释标注了几个重要的变量,从功能上大致可以分为两个功能单元,一个是 ring buffer,用于存数据; 一个是存放 goroutine 的队列。
type hchan struct { qcount uint// 当前队列中的元素个数 dataqsiz uint// 缓冲队列的固定大小 buf unsafe.Pointer // 缓冲数组 elemsize uint16 closed uint32 elemtype *_type // element type sendx uint// 下一次发送的 index recvx uint// 下一次接收的 index recvq waitq // 接受者队列 sendq waitq // 发送者队列
// lock protects all fields in hchan, as well as several // fields in sudogs blocked on this channel. // // Do not change another G's status while holding this lock // (in particular, do not ready a G), as this can deadlock // with stack shrinking. lock mutex }
type sudog struct { // The following fields are protected by the hchan.lock of the // channel this sudog is blocking on. shrinkstack depends on // this. g *g selectdone *uint32// CAS to 1 to win select race (may point to stack) next *sudog prev *sudog elem unsafe.Pointer // data element (may point to stack)
// The following fields are never accessed concurrently. // waitlink is only accessed by g.
acquiretime int64 releasetime int64 ticket uint32 waitlink *sudog // g.waiting list c *hchan // channel }
funcchansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr)bool { // 检查工作
// 如果能从 chennel 的 recvq 弹出 sudog, 那么直接send if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). send(c, sg, ep, func() { unlock(&c.lock) }) returntrue }
// buffer有空余空间,返回; 阻塞操作 }
funcsend(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func()) { // 处理 index
funcsendDirect(t *_type, sg *sudog, src unsafe.Pointer) { // src is on our stack, dst is a slot on another stack.
// Once we read sg.elem out of sg, it will no longer // be updated if the destination's stack gets copied (shrunk). // So make sure that no preemption points can happen between read & use. dst := sg.elem typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size) memmove(dst, src, t.size) }
// memmove copies n bytes from "from" to "to". // in memmove_*.s //go:noescape funcmemmove(to, from unsafe.Pointer, n uintptr)
loop: // pass 1 - look for something already waiting var dfl *scase var cas *scase for i := 0; i < int(sel.ncase); i++ { cas = &scases[pollorder[i]] c = cas.c
switch cas.kind { // 接受数据 case caseRecv: sg = c.sendq.dequeue() // 如果有 sender 在等待 if sg != nil { goto recv } // 当前buffer中有数据 if c.qcount > 0 { goto bufrecv } // 关闭的channel if c.closed != 0 { goto rclose } case caseSend: if raceenabled { racereadpc(unsafe.Pointer(c), cas.pc, chansendpc) } // 关闭 if c.closed != 0 { goto sclose } // 有 receiver 正在等待 sg = c.recvq.dequeue() if sg != nil { goto send } // 有空间接受 if c.qcount < c.dataqsiz { goto bufsend } // 走default case caseDefault: dfl = cas } }
if dfl != nil { selunlock(scases, lockorder) cas = dfl goto retc }