设计原理
Go 提及的设计模式就是:不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存。
- 共享内存方式:多个协程共享同一块内存,但是多个协程中读写变量是操作同一块内存,会产生多线程问题的并发问题,所以需要使用互斥锁来实现临界区的互斥访问,会大大影响效率
- 通信方式(go语言使用):channel通道当做通信的中间件队列,发送方 向channel
先入先出
channel收/发操作都遵循了先进先出的设计,它一共使用了3个队列来实现:
- 发操作:先向 Channel 发送数据的 Goroutine 会得到先发送数据的权利;(使用写队列
hchan.sendq
)- 接收方会从缓冲区中读取数据,然后唤醒发送方,发送方会尝试向缓冲区写入数据,如果缓冲区已满会重新陷入休眠;
- 读操作:先从 Channel 读取数据的 Goroutine 会先接收到数据;(使用读队列
hchan.recvq
)- 使用读队列:发送方会向缓冲区中写入数据,然后唤醒接收方,多个接收方会尝试从缓冲区中读取数据,如果没有读取到会重新陷入休眠;
无锁channel(结构体内还是有锁,好像暂未实现)
并发控制可由2种方式实现:
- 乐观锁:CAS(compare and swap)就是一种乐观锁,默认没有其他线程在修改,当本线程保存数据到内存时判断数据和修改前的原数据是否相同。
- 悲观锁:redis setnx就是一种悲观锁,默认有其他线程在修改,所以在其他线程拿数据前就阻塞,等待锁释放才能继续操作
乐观锁并没有锁这个变量,而是对原数据进行比较,所以乐观锁只是一种思想。无锁channel是使用了乐观锁思想实现的。
数据结构
runtime.hchan
结构体
type hchan struct {
qcount uint
dataqsiz uint
buf unsafe.Pointer
elemsize uint16
closed uint32
elemtype *_type
sendx uint
recvx uint
recvq waitq
sendq waitq
lock mutex
}
- qcount:channel里的元素个数
- dataqsiz:Channel 中的循环队列的容量
- buf: Channel 的缓冲区数据指针
- elemsize:元素占内存的大小
- closed:channel的关闭状态
- elemtype:元素的类型元数据
- sendx: Channel 的发送操作处理到的位置
- recvx:Channel 的接收操作处理到的位置;
- recvq:接受队列(读队列),当前 Channel 由于缓冲区空间不足而阻塞的 Goroutine 列表
- sendq:发送队列(写队列),当前 Channel 由于缓冲区空间不足而阻塞的 Goroutine 列表
- lock:操作通道的锁,同一个时刻只有一个协程可以操作这个chan
队列中存的结构是runtime.sudog
:
type sudog struct {
// The following fields are protected by the hchan.lock of the
// channel this sudog is blocking on. shrinkstack depends on
// this for sudogs involved in channel ops.
g *g
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.
acquiretime int64
releasetime int64
ticket uint32
// isSelect indicates g is participating in a select, so
// g.selectDone must be CAS'd to win the wake-up race.
isSelect bool
// success indicates whether communication over channel c
// succeeded. It is true if the goroutine was awoken because a
// value was delivered over channel c, and false if awoken
// because c was closed.
success bool
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
- g:等待channel的goroutine指针
- channel:等待的哪个channel
- elem:等待发送/接收的缓冲区地址下标
channel类型
有缓冲区channel
hchan.buff
指向一个数组地址,能存放数据,尽量避免了所有协程有阻塞
- 写操作:
- 如果缓冲区内有空间
hchan.qcount<hchan.dataqsiz
,将数据放入缓冲区,hchan.sendx
指向下一个数组下标,唤醒读队列hchan.recvq
头部的协程。当前协程不阻塞,继续向下执行代码。 - 如果缓冲区内没有空间
hchan.qcount>=hchan.dataqsiz
,当前goroutine阻塞(被挂起_GWating
),新创建一个sudog
,sudog.g
指向当前goroutine,sudog
变量塞进写队列hchan.sendq
- 如果缓冲区内有空间
- 读操作:和写操作差不多,只是操作
hchan.recvq
和hchan.recvx
无缓冲区channel
hchan.buff
是个nil值,没有数据存储的区域,肯定会出现阻塞现象
- 写操作:去
hchan.recvq
队列中去获取一个正阻塞的协程sudog
结构变量- 如果
hchan.recvq
有数据,则根据sudog.g
变量去唤醒协程,并向这个协程发送数据 - 如果
hchan.recvq
有数据,则创建一个sudog
结构体变量,sudog.g
变量指向当前协程,放到hchan.sendq
队列,当前goroutine阻塞(被挂起_GWating
)
- 如果
- 读操作:和写操作差不多,只是操作
hchan.recvq
唤醒阻塞协程对channel做操作,都是由当前协程通知g0协程做调度
多路select
问题:为什么多个case被阻塞,说明当前g被加到了多个hchan.recvq
或者hchan.sendq
中,为什么只会执行一个case。
func SendBlock2() {
c1 := make(chan int)
c2 := make(chan int)
// go不能放select后,因为执行顺序的问题,如果放后面在select就挂起了协程,导致没有创建这个协程,也就不可能唤醒当前协程,从而导致死锁
go func() {
time.Sleep(3 * time.Second)
a := <-c1
fmt.Println(a)
}()
select {
case c1 <- 2:
fmt.Println("case1")
case c2 <- 3:
fmt.Println("case2")
}
}
上例的现象是:打印了case1或者case2,并不会两个都打印。
执行步骤:
- 执行case1时给c1加锁,执行case2时给c2加锁
- select乱序轮询
- g被加到c1和c2的
hchan.sendq
中,c1和c2解锁允许其他协程操作这个channel,g被挂起等待 - 子协程命中唤醒主协程,命中case1,执行case1操作
- 再次对所有case的channel加锁(原因是下一步)
- 去c1,c2的
recvq
和sendq
遍历删除绑定了当前协程的sudog
,因为删除了队列中的等待g,所以g不会被重新唤醒,case2就再也不命中。 - c1,c2再次解锁
- select结束
使用语法
创建channel
// 方法1,没有分配地址,无法读写chan
var c chan Type
// 方法2,分配了地址,设置了size就是有缓冲channel,反之是无缓冲地址
c := make(chan Type [, size])
写channel
c := make(chan Type [, size])
// 方法1
c <- val
// 方法2
select {
case c<-2:
// 下一个
case c<-2:
//业务逻辑
default:
//可以避免阻塞
}
读channel
c := make(chan Type [, size])
// 方法1
t := <-c
// 方法2
t,ok := <-c // ok==false表明,chan被关闭
// 方法3
select {
case <-c:
//业务逻辑
default:
//可以避免阻塞
}
// 方法4
for element := range c {
fmt.Println("chan element:", element)
}
关闭channel
close(c)
具体案例
有缓冲区channel
func SendBlock1() {
// 创建缓冲区容量是3的通道
c := make(chan int, 3)
defer close(c)
// 创建4个协程往通道里写,会有一个协程阻塞等待
for i := 0; i < 4; i++ {
go func(i int) {
c <- i
fmt.Printf("i=%d成功插入chan\n", i)
}(i) // 如果i不使用传参方式,而是使用闭包函数,那么就会发生数据逃逸,i会被存到堆中,栈帧上的i变成指针指向堆,导致协程里的i不一定打印0,1,2,3
}
time.Sleep(3 * time.Second)
//打印,2协程阻塞等待
//i=3成功插入chan
//i=0成功插入chan
//i=1成功插入chan
}
无缓冲区channel
func SendBlock() {
c := make(chan int)
defer close(c)
for i := 0; i < 4; i++ {
// 如果i不使用传参方式,而是使用闭包函数,那么就会发生数据逃逸,i会被存到堆中,栈帧上的i变成指针指向堆,导致协程里的i不一定打印0,1,2,3
go func(i int) {
c <- i
fmt.Printf("i=%d成功插入chan\n", i)
}(i)
}
time.Sleep(3 * time.Second)
//没有任何打印,因为hchan.recvq没有协程可以唤醒
}
使用注意
- 对一个关闭的channel发送值 panic
- 对一个关闭的channel接收值,会一直读取成功,直到管道内数据为空
- 对一个关闭的并且没有值的管道执行接收操作,会得到对应类型的空值
- 关闭一个已关闭的通道会导致panic
- 关闭一个chan,会向所有正在监听这个chan的协程都发送一个空元素(元素类型取决于你的chan类型)
死锁:
func f1(channel chan int) {
time.Sleep(6 * time.Second)
channel <- 20
//close(channel)
}
func main() {
channel := make(chan int)
//例1 不会死锁,因为读写都只进行了一次之后就结算了
go func() {
time.Sleep(6 * time.Second)
channel <- 20
}()
fmt.Println(<-channel) // 主协程会阻塞等待管道进入数据
//例2 不会死锁,因为读写都只进行了一次之后就结算了
go f1(channel)
fmt.Println(<-channel)
//例3 不会死锁,因为读写都只进行了一次之后就结算了
go func(channel chan int) {
channel <- 20
}(channel)
fmt.Println(<-channel)
//例4 会死锁,主协程会一直等待子进程写入,无法退出,此时需要在子协程加入close(channel),表明自己不会在对协程做操作了
go func(channel chan int) {
channel <- 20
// close(channel) 加上则不会死锁
}(channel)
for element := range channel {
fmt.Println(element)
}
//例子5,结果是等待3秒后,其中一个消费协程会被死锁,因为他一直在等待channel的数据进入
channel := make(chan int, 3)
wg := sync.WaitGroup{}
wg.Add(3)
go func() {
defer wg.Done()
//fmt.Println("子协程1")
fmt.Println("子协程1抢到的" + strconv.Itoa(<-channel))
}()
go func() {
defer wg.Done()
//fmt.Println("子协程2")
fmt.Println("子协程2抢到的" + strconv.Itoa(<-channel))
}()
go func() {
defer wg.Done()
channel <- 20
for i := 0; i < 3; i++ {
time.Sleep(time.Second)
}
}()
//channel <- 21
wg.Wait()
}