go-channel

2023-09-15 15:17:33

设计原理

Go 提及的设计模式就是:不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存。

  • 共享内存方式:多个协程共享同一块内存,但是多个协程中读写变量是操作同一块内存,会产生多线程问题的并发问题,所以需要使用互斥锁来实现临界区的互斥访问,会大大影响效率
  • 通信方式(go语言使用):channel通道当做通信的中间件队列,发送方 向channel

先入先出

channel收/发操作都遵循了先进先出的设计,它一共使用了3个队列来实现:

  • 发操作:先向 Channel 发送数据的 Goroutine 会得到先发送数据的权利;(使用写队列hchan.sendq)
    • 接收方会从缓冲区中读取数据,然后唤醒发送方,发送方会尝试向缓冲区写入数据,如果缓冲区已满会重新陷入休眠;
  • 读操作:先从 Channel 读取数据的 Goroutine 会先接收到数据;(使用读队列hchan.recvq)
    • 使用读队列:发送方会向缓冲区中写入数据,然后唤醒接收方,多个接收方会尝试从缓冲区中读取数据,如果没有读取到会重新陷入休眠;

无锁channel(结构体内还是有锁,好像暂未实现)

并发控制可由2种方式实现:

  • 乐观锁:CAS(compare and swap)就是一种乐观锁,默认没有其他线程在修改,当本线程保存数据到内存时判断数据和修改前的原数据是否相同。
  • 悲观锁:redis setnx就是一种悲观锁,默认有其他线程在修改,所以在其他线程拿数据前就阻塞,等待锁释放才能继续操作

乐观锁并没有锁这个变量,而是对原数据进行比较,所以乐观锁只是一种思想无锁channel是使用了乐观锁思想实现的。

数据结构

runtime.hchan结构体

type hchan struct {
	qcount   uint
	dataqsiz uint
	buf      unsafe.Pointer
	elemsize uint16
	closed   uint32
	elemtype *_type
	sendx    uint
	recvx    uint
	recvq    waitq
	sendq    waitq

	lock mutex
}
  • qcount:channel里的元素个数
  • dataqsiz:Channel 中的循环队列的容量
  • buf: Channel 的缓冲区数据指针
  • elemsize:元素占内存的大小
  • closed:channel的关闭状态
  • elemtype:元素的类型元数据
  • sendx: Channel 的发送操作处理到的位置
  • recvx:Channel 的接收操作处理到的位置;
  • recvq:接受队列(读队列),当前 Channel 由于缓冲区空间不足而阻塞的 Goroutine 列表
  • sendq:发送队列(写队列),当前 Channel 由于缓冲区空间不足而阻塞的 Goroutine 列表
  • lock:操作通道的锁,同一个时刻只有一个协程可以操作这个chan

队列中存的结构是runtime.sudog

type sudog struct {
	// The following fields are protected by the hchan.lock of the
	// channel this sudog is blocking on. shrinkstack depends on
	// this for sudogs involved in channel ops.

	g *g

	next *sudog
	prev *sudog
	elem unsafe.Pointer // data element (may point to stack)

	// The following fields are never accessed concurrently.
	// For channels, waitlink is only accessed by g.
	// For semaphores, all fields (including the ones above)
	// are only accessed when holding a semaRoot lock.

	acquiretime int64
	releasetime int64
	ticket      uint32

	// isSelect indicates g is participating in a select, so
	// g.selectDone must be CAS'd to win the wake-up race.
	isSelect bool

	// success indicates whether communication over channel c
	// succeeded. It is true if the goroutine was awoken because a
	// value was delivered over channel c, and false if awoken
	// because c was closed.
	success bool

	parent   *sudog // semaRoot binary tree
	waitlink *sudog // g.waiting list or semaRoot
	waittail *sudog // semaRoot
	c        *hchan // channel
}
  • g:等待channel的goroutine指针
  • channel:等待的哪个channel
  • elem:等待发送/接收的缓冲区地址下标

channel类型

有缓冲区channel

hchan.buff指向一个数组地址,能存放数据,尽量避免了所有协程有阻塞

  • 写操作:
    • 如果缓冲区内有空间hchan.qcount<hchan.dataqsiz,将数据放入缓冲区,hchan.sendx指向下一个数组下标,唤醒读队列hchan.recvq头部的协程。当前协程不阻塞,继续向下执行代码。
    • 如果缓冲区内没有空间hchan.qcount>=hchan.dataqsiz,当前goroutine阻塞(被挂起_GWating),新创建一个sudogsudog.g指向当前goroutine,sudog变量塞进写队列hchan.sendq
  • 读操作:和写操作差不多,只是操作hchan.recvqhchan.recvx

无缓冲区channel

hchan.buff是个nil值,没有数据存储的区域,肯定会出现阻塞现象

  • 写操作:去hchan.recvq队列中去获取一个正阻塞的协程sudog结构变量
    • 如果hchan.recvq有数据,则根据sudog.g变量去唤醒协程,并向这个协程发送数据
    • 如果hchan.recvq有数据,则创建一个sudog结构体变量,sudog.g变量指向当前协程,放到hchan.sendq队列,当前goroutine阻塞(被挂起_GWating)
  • 读操作:和写操作差不多,只是操作hchan.recvq

唤醒阻塞协程对channel做操作,都是由当前协程通知g0协程做调度

多路select

问题:为什么多个case被阻塞,说明当前g被加到了多个hchan.recvq或者hchan.sendq中,为什么只会执行一个case。

func SendBlock2() {
	c1 := make(chan int)
    c2 := make(chan int)
    // go不能放select后,因为执行顺序的问题,如果放后面在select就挂起了协程,导致没有创建这个协程,也就不可能唤醒当前协程,从而导致死锁
	go func() {
		time.Sleep(3 * time.Second)
		a := <-c1
		fmt.Println(a)
	}()
    
	select {
	case c1 <- 2:
		fmt.Println("case1")
	case c2 <- 3:
		fmt.Println("case2")
	}
}

上例的现象是:打印了case1或者case2,并不会两个都打印。

执行步骤:

  1. 执行case1时给c1加锁,执行case2时给c2加锁
  2. select乱序轮询
  3. g被加到c1和c2的 hchan.sendq中,c1和c2解锁允许其他协程操作这个channel,g被挂起等待
  4. 子协程命中唤醒主协程,命中case1,执行case1操作
  5. 再次对所有case的channel加锁(原因是下一步)
  6. 去c1,c2的recvqsendq遍历删除绑定了当前协程的sudog,因为删除了队列中的等待g,所以g不会被重新唤醒,case2就再也不命中。
  7. c1,c2再次解锁
  8. select结束

使用语法

创建channel

// 方法1,没有分配地址,无法读写chan
var c chan Type

// 方法2,分配了地址,设置了size就是有缓冲channel,反之是无缓冲地址
c := make(chan Type [, size])

写channel

c := make(chan Type [, size])

// 方法1
c <- val

// 方法2
select {
    case c<-2:
    // 下一个
    case c<-2:
    //业务逻辑
    default:
    //可以避免阻塞
}

读channel

c := make(chan Type [, size])

// 方法1
t := <-c

// 方法2
t,ok := <-c 	// ok==false表明,chan被关闭

// 方法3
select {
    case <-c:
    //业务逻辑
    default:
    //可以避免阻塞
}

// 方法4
for element := range c {
    fmt.Println("chan element:", element)
}

关闭channel

close(c)

具体案例

有缓冲区channel

func SendBlock1() {
    // 创建缓冲区容量是3的通道
	c := make(chan int, 3)
	defer close(c)
    // 创建4个协程往通道里写,会有一个协程阻塞等待
	for i := 0; i < 4; i++ {
		
		go func(i int) {
			c <- i
			fmt.Printf("i=%d成功插入chan\n", i)
		}(i)	// 如果i不使用传参方式,而是使用闭包函数,那么就会发生数据逃逸,i会被存到堆中,栈帧上的i变成指针指向堆,导致协程里的i不一定打印0,1,2,3
	}
	time.Sleep(3 * time.Second)

	//打印,2协程阻塞等待
	//i=3成功插入chan
	//i=0成功插入chan
	//i=1成功插入chan
}

无缓冲区channel

func SendBlock() {
	c := make(chan int)
	defer close(c)
	for i := 0; i < 4; i++ {
		// 如果i不使用传参方式,而是使用闭包函数,那么就会发生数据逃逸,i会被存到堆中,栈帧上的i变成指针指向堆,导致协程里的i不一定打印0,1,2,3
		go func(i int) {
			c <- i
			fmt.Printf("i=%d成功插入chan\n", i)
		}(i)
	}
	time.Sleep(3 * time.Second)

	//没有任何打印,因为hchan.recvq没有协程可以唤醒

}

使用注意

  • 对一个关闭的channel发送值 panic
  • 对一个关闭的channel接收值,会一直读取成功,直到管道内数据为空
  • 对一个关闭的并且没有值的管道执行接收操作,会得到对应类型的空值
  • 关闭一个已关闭的通道会导致panic
  • 关闭一个chan,会向所有正在监听这个chan的协程都发送一个空元素(元素类型取决于你的chan类型)

死锁:

func f1(channel chan int) {
    time.Sleep(6 * time.Second)
	channel <- 20
	//close(channel)
}

func main() {
	channel := make(chan int)

    //例1 不会死锁,因为读写都只进行了一次之后就结算了
	go func() {
        time.Sleep(6 * time.Second)
		channel <- 20
	}()
	fmt.Println(<-channel)	// 主协程会阻塞等待管道进入数据
    
    //例2 不会死锁,因为读写都只进行了一次之后就结算了
	go f1(channel)
    fmt.Println(<-channel)

    //例3 不会死锁,因为读写都只进行了一次之后就结算了
	go func(channel chan int) {
		channel <- 20
	}(channel)
	fmt.Println(<-channel)
    
    //例4 会死锁,主协程会一直等待子进程写入,无法退出,此时需要在子协程加入close(channel),表明自己不会在对协程做操作了
    go func(channel chan int) {
		channel <- 20
        // close(channel) 加上则不会死锁
	}(channel)
    for element := range channel {
		fmt.Println(element)
	}
    
    
    //例子5,结果是等待3秒后,其中一个消费协程会被死锁,因为他一直在等待channel的数据进入
    channel := make(chan int, 3)

	wg := sync.WaitGroup{}
	wg.Add(3)
	go func() {
		defer wg.Done()
		//fmt.Println("子协程1")
		fmt.Println("子协程1抢到的" + strconv.Itoa(<-channel))
	}()
	go func() {
		defer wg.Done()
		//fmt.Println("子协程2")
		fmt.Println("子协程2抢到的" + strconv.Itoa(<-channel))

	}()
	go func() {
		defer wg.Done()
		channel <- 20
		for i := 0; i < 3; i++ {
			time.Sleep(time.Second)
		}
	}()

	//channel <- 21
	wg.Wait()
}
更多推荐

2023最新AI创作商用ChatGPT源码分享+支持AI绘画

一、SparkAI智能创作系统SparkAi创作系统是基于国外很火的ChatGPT进行开发的Ai智能问答系统。本期针对源码系统整体测试下来非常完美,可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如何搭建部署AI创作ChatGPT?小编这里写一个详细图文教程吧!SparkAi程序使用Ne

华为OD机考算法题:篮球比赛

目录题目部分解读与分析代码实现题目部分题目篮球比赛难度难题目说明篮球(5V5)比赛中,每个球员拥有一个战斗力,每个队伍的所有球员战斗力之和为该队伍的总体战斗力。现有10个球员准备分为两队进行训练赛,教练希望2个队伍的战斗力差值能够尽可能的小,以达到最佳训练效果。给出10个球员的战斗力,如果你是教练,你该如何分队,才能达

进程,线程切换

目录Linux线程切换:Linux进程切换:进程切换和线程切换的区别虚拟地址空间切换耗时的原因Linux线程切换:Linux线程切换的实现涉及到操作系统的调度和线程上下文的切换。线程上下文包括程序计数器(PC)和寄存器值,以及线程的堆栈和堆栈指针等。操作系统通过调度器决定哪个线程将获得CPU时间片来执行。当一个线程被操

蓝桥杯 题库 简单 每日十题 day2

01卡片题目描述本题为填空题,只需要算出结果后,在代码中使用输出语句将所填结果输出即可。小蓝有很多数字卡片,每张卡片上都是数字0到9。小蓝准备用这些卡片来拼一些数,他想从1开始拼出正整数,每拼一个,就保存起来,卡片就不能用来拼其它数了。小蓝想知道自己能从1拼到多少。例如,当小蓝有30张卡片,其中0到9各3张,则小蓝可以

Vue记录(上篇)

Helloworld案例<!DOCTYPEhtml><htmllang="zh-CN"><head><metacharset="UTF-8"><title>初识Vue</title><!--引入Vue--><scripttype="text/javascript"src="./vue.js"></script><scr

linux-checklist命令行

常用的linux命令行:首先打开终端,可用Ctrl+Alt+T快捷键打开.1.一些简单的命令下面是一些常用的简单命令:日期date//显示当前时间cal//显示日历(一般是一整个月)磁盘df//查看磁盘剩余空间free//显示空闲内存数量结束终端exit幕后控制台幕后控制台是和终端仿真器环境相同,不过外表不太美观,在不

图像处理之频域滤波DFT

摘要:傅里叶变换可以将任何满足相应数学条件的信号转换为不同系数的简单正弦和余弦函数的和。图像信号也是一种信号,只不过是二维离散信号,通过傅里叶变换对图像进行变换可以图像存空域转换为频域进行更多的处理。本文主要简要描述傅里叶变换以及其在图像处理中的简单应用,并进行一些简单的实验来描述其相关性质。关键字:傅里叶变换,二维傅

JavaScript学习笔记03

JavaScript笔记03流程控制if判断和Java中if语句的使用方法相同。例:<!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-8"><title>Title</title><script>"usestrict";letscore=90;if(score==

11、Kubernetes核心技术 - Service

目录一、概述二、Endpoint三、Service资源清单四、Service类型4.1、ClusterIP4.2、NodePort4.3、LoadBalancer4.4、ExternalName五、Service使用5.1、ClusterIP5.1.1、定义Pod资源清单5.1.2、创建Pod5.1.3、定义Servi

【从零学习python 】74. UDP网络程序:端口问题与绑定信息详解

文章目录udp网络程序-端口问题UDP绑定信息总结进阶案例udp网络程序-端口问题在运行UDP网络程序时,会遇到端口号会变化的情况。每次重新运行网络程序后,可以观察到运行中的“网络调试助手”显示的数字是不同的。这是因为该数字标识了网络程序的唯一性,系统在重新运行时会随机分配端口号。需要注意的是,在网络程序运行过程中,该

如何从市场上几千只股票中快速选出满意的股票

个人账户实现股票量化程序化自动交易,券商有接口,门槛已降低_股票程序交易接口的博客-CSDN博客像上面的例子,如果按照市面上常见的可转债万3或万2不免5,人工操作+费率限制,这种情况就不要想,根本没机会,有了自动交易这把利器,再加python强大的支持库,能发挥的想像空间实在太大了,目前来看,机会是有,但后期用程序交易

热文推荐