Golang并发的循环

2023-09-15 09:29:40

本节中,我们会探索一些用来在并行时循环迭代的常见并发模型。我们会探究从全尺寸图片生成一些缩略图的问题。gopl.io/ch8/thumbnail包提供了ImageFile函数来帮我们拉伸图片。我们不会说明这个函数的实现,只需要从gopl.io下载它。

gopl.io/ch8/thumbnail

package thumbnail // ImageFile reads an image from infile and writes // a thumbnail-size version of it in the same directory. // It returns the generated file name, e.g., "foo.thumb.jpg". func ImageFile(infile string) (string, error)

下面的程序会循环迭代一些图片文件名,并为每一张图片生成一个缩略图:

gopl.io/ch8/thumbnail

// makeThumbnails makes thumbnails of the specified files. func makeThumbnails(filenames []string) { for _, f := range filenames { if _, err := thumbnail.ImageFile(f); err != nil { log.Println(err) } } }

显然我们处理文件的顺序无关紧要,因为每一个图片的拉伸操作和其它图片的处理操作都是彼此独立的。像这种子问题都是完全彼此独立的问题被叫做易并行问题(译注:embarrassingly parallel,直译的话更像是尴尬并行)。易并行问题是最容易被实现成并行的一类问题(废话),并且最能够享受到并发带来的好处,能够随着并行的规模线性地扩展。

下面让我们并行地执行这些操作,从而将文件IO的延迟隐藏掉,并用上多核cpu的计算能力来拉伸图像。我们的第一个并发程序只是使用了一个go关键字。这里我们先忽略掉错误,之后再进行处理。

// NOTE: incorrect! func makeThumbnails2(filenames []string) { for _, f := range filenames { go thumbnail.ImageFile(f) // NOTE: ignoring errors } }

这个版本运行的实在有点太快,实际上,由于它比最早的版本使用的时间要短得多,即使当文件名的slice中只包含有一个元素。这就有点奇怪了,如果程序没有并发执行的话,那为什么一个并发的版本还是要快呢?答案其实是makeThumbnails在它还没有完成工作之前就已经返回了。它启动了所有的goroutine,每一个文件名对应一个,但没有等待它们一直到执行完毕。

没有什么直接的办法能够等待goroutine完成,但是我们可以改变goroutine里的代码让其能够将完成情况报告给外部的goroutine知晓,使用的方式是向一个共享的channel中发送事件。因为我们已经确切地知道有len(filenames)个内部goroutine,所以外部的goroutine只需要在返回之前对这些事件计数。

// makeThumbnails3 makes thumbnails of the specified files in parallel. func makeThumbnails3(filenames []string) { ch := make(chan struct{}) for _, f := range filenames { go func(f string) { thumbnail.ImageFile(f) // NOTE: ignoring errors ch <- struct{}{} }(f) } // Wait for goroutines to complete. for range filenames { <-ch } }

注意我们将f的值作为一个显式的变量传给了函数,而不是在循环的闭包中声明:

for _, f := range filenames { go func() { thumbnail.ImageFile(f) // NOTE: incorrect! // ... }() }

回忆一下之前在5.6.1节中,匿名函数中的循环变量快照问题。上面这个单独的变量f是被所有的匿名函数值所共享,且会被连续的循环迭代所更新的。当新的goroutine开始执行字面函数时,for循环可能已经更新了f并且开始了另一轮的迭代或者(更有可能的)已经结束了整个循环,所以当这些goroutine开始读取f的值时,它们所看到的值已经是slice的最后一个元素了。显式地添加这个参数,我们能够确保使用的f是当go语句执行时的“当前”那个f。

如果我们想要从每一个worker goroutine往主goroutine中返回值时该怎么办呢?当我们调用thumbnail.ImageFile创建文件失败的时候,它会返回一个错误。下一个版本的makeThumbnails会返回其在做拉伸操作时接收到的第一个错误:

// makeThumbnails4 makes thumbnails for the specified files in parallel. // It returns an error if any step failed. func makeThumbnails4(filenames []string) error { errors := make(chan error) for _, f := range filenames { go func(f string) { _, err := thumbnail.ImageFile(f) errors <- err }(f) } for range filenames { if err := <-errors; err != nil { return err // NOTE: incorrect: goroutine leak! } } return nil }

这个程序有一个微妙的bug。当它遇到第一个非nil的error时会直接将error返回到调用方,使得没有一个goroutine去排空errors channel。这样剩下的worker goroutine在向这个channel中发送值时,都会永远地阻塞下去,并且永远都不会退出。这种情况叫做goroutine泄露(§8.4.4),可能会导致整个程序卡住或者跑出out of memory的错误。

最简单的解决办法就是用一个具有合适大小的buffered channel,这样这些worker goroutine向channel中发送错误时就不会被阻塞。(一个可选的解决办法是创建一个另外的goroutine,当main goroutine返回第一个错误的同时去排空channel。)

下一个版本的makeThumbnails使用了一个buffered channel来返回生成的图片文件的名字,附带生成时的错误。

// makeThumbnails5 makes thumbnails for the specified files in parallel. // It returns the generated file names in an arbitrary order, // or an error if any step failed. func makeThumbnails5(filenames []string) (thumbfiles []string, err error) { type item struct { thumbfile string err error } ch := make(chan item, len(filenames)) for _, f := range filenames { go func(f string) { var it item it.thumbfile, it.err = thumbnail.ImageFile(f) ch <- it }(f) } for range filenames { it := <-ch if it.err != nil { return nil, it.err } thumbfiles = append(thumbfiles, it.thumbfile) } return thumbfiles, nil }

我们最后一个版本的makeThumbnails返回了新文件们的大小总计数(bytes)。和前面的版本都不一样的一点是我们在这个版本里没有把文件名放在slice里,而是通过一个string的channel传过来,所以我们无法对循环的次数进行预测。

为了知道最后一个goroutine什么时候结束(最后一个结束并不一定是最后一个开始),我们需要一个递增的计数器,在每一个goroutine启动时加一,在goroutine退出时减一。这需要一种特殊的计数器,这个计数器需要在多个goroutine操作时做到安全并且提供在其减为零之前一直等待的一种方法。这种计数类型被称为sync.WaitGroup,下面的代码就用到了这种方法:

// makeThumbnails6 makes thumbnails for each file received from the channel. // It returns the number of bytes occupied by the files it creates. func makeThumbnails6(filenames <-chan string) int64 { sizes := make(chan int64) var wg sync.WaitGroup // number of working goroutines for f := range filenames { wg.Add(1) // worker go func(f string) { defer wg.Done() thumb, err := thumbnail.ImageFile(f) if err != nil { log.Println(err) return } info, _ := os.Stat(thumb) // OK to ignore error sizes <- info.Size() }(f) } // closer go func() { wg.Wait() close(sizes) }() var total int64 for size := range sizes { total += size } return total }

注意Add和Done方法的不对称。Add是为计数器加一,必须在worker goroutine开始之前调用,而不是在goroutine中;否则的话我们没办法确定Add是在"closer" goroutine调用Wait之前被调用。并且Add还有一个参数,但Done却没有任何参数;其实它和Add(-1)是等价的。我们使用defer来确保计数器即使是在出错的情况下依然能够正确地被减掉。上面的程序代码结构是当我们使用并发循环,但又不知道迭代次数时很通常而且很地道的写法。

sizes channel携带了每一个文件的大小到main goroutine,在main goroutine中使用了range loop来计算总和。观察一下我们是怎样创建一个closer goroutine,并让其在所有worker goroutine们结束之后再关闭sizes channel的。两步操作:wait和close,必须是基于sizes的循环的并发。考虑一下另一种方案:如果等待操作被放在了main goroutine中,在循环之前,这样的话就永远都不会结束了,如果在循环之后,那么又变成了不可达的部分,因为没有任何东西去关闭这个channel,这个循环就永远都不会终止。

图8.5 表明了makethumbnails6函数中事件的序列。纵列表示goroutine。窄线段代表sleep,粗线段代表活动。斜线箭头代表用来同步两个goroutine的事件。时间向下流动。注意main goroutine是如何大部分的时间被唤醒执行其range循环,等待worker发送值或者closer来关闭channel的。

更多推荐

【云原生】kubernetes中pod(进阶)

目录一、资源限制业务cpu内存1.1CPU资源单位1.2内存资源单位示例1示例2:二、健康检查:又称为探针(Probe)2.1探针的三种规则2.2Probe支持三种检查方法2.3示例示例1:exec方式示例3:tcpSocket方式示例4:就绪检测示例5:就绪检测2示例:启动、退出动作扩展pod的状态Container

Element树形控件使用过程中遇到的问题及解决方法

1.需求1点击编辑按钮,出现修改组织弹窗,且将点击时的组织名称返现在输入框中。思路是点击编辑按钮,取到节点点击时返回的data信息中的label进行赋值即可。<el-treestyle="margin-top:20px":data="organizationTreeData"node-key="id"default-e

mysql死锁排查及解决

MySQL死锁是在多个并发事务同时请求相同资源时发生的一种情况,其中每个事务都在等待对方释放资源,从而导致数据库无法继续执行。死锁的排查和解决通常需要以下步骤:1.检测死锁:MySQL通常会在错误日志中记录死锁信息。可以通过以下方式检测死锁:SHOWENGINEINNODBSTATUS;查找"InnoDB"部分,寻找"

Win10编译chrome

一、系统准备windows10以上版本硬盘空余空间100G以上,磁盘格式为NTFS内存8G以上,推荐32G需要科学上网卸载杀毒软件(注意重启系统)二、安装VisualStudio2022VisualStudio2022(>=17.0.0)编译chromium时需要VisualStudio的支持。在windows操作系统

Android Media3 ExoPlayer 开启缓存功能

ExoPlayer开启播放缓存功能,在下次加载已经播放过的网络资源的时候,可以直接从本地缓存加载,实现为用户节省流量和提升加载效率的作用。方法一:采用ExoPlayer缓存策略第1步:实现Exoplayer参考Exoplayer官网Releasenotes:对应关系:2.19.0(2023-07-05)--Androi

黑马JVM总结(十一)

(1)垃圾回收概述前面我们学了堆,里面有一个垃圾回收的机制(2)判断垃圾_引用计数指只要有一个对象被其他变量所引用,我们就让这个对象的计数加1,有个一变量不在引用,让它的计数减一,当这个对象的计数变为0的时候,说明没有变量引用它了,那么他就可以作为一个垃圾进行一个回收,但是引用计数存在一个弊端:存在循环引用问题:a对象

Pytorch实现MNIST字符识别

1.下载mnist.pkl.gz网址:http://www.iro.umontreal.ca/~lisa/deep/data/mnist/mnist.pkl.gz数据集文件夹路径是data2/mnist/mnist.pkl.gz2.读取数据frompathlibimportPathimportmatplotlib.py

设计模式:状态模式

目录组件代码示例源码中使用优缺点总结状态模式(StatePattern)是一种行为型设计模式,用于解决对象在不同状态下的行为变化问题。状态模式允许对象在内部状态发生改变时改变其行为,使得对象的行为可以根据状态的改变而灵活变化。在状态模式中,对象的行为会根据其内部状态的改变而变化,但对外部来说,对象的接口保持一致。状态模

设计模式之十:状态模式

状态模式通过改变对象内部的状态来帮助对象控制自己的行为。这是一张状态图,其中每个圆圈都是一个状态。最简单,第一反应的实现就是使用一个变量来控制状态值,并在方法内书写条件代码来处理不同情况。packageheadfirst.designpatterns.state.gumball;publicclassGumballMa

9、Spring之推断构造方法源码解析

推断构造方法流程图:Spring推断构造方法底层执行流程|ProcessOn免费在线作图,在线流程图,在线思维导图AutowiredAnnotationBeanPostProcessor中推断构造方法不同情况思维脑图:Spring中的一个bean,需要实例化得到一个对象,而实例化就需要用到构造方法。一般情况下,一个类只

【shell学习】企业运维工作中常用的shell脚本

本站以分享各种运维经验和运维所需要的技能为主《python零基础入门》:python零基础入门学习《python运维脚本》:python运维脚本实践《shell》:shell学习《terraform》持续更新中:terraform_Aws学习零基础入门到最佳实战《k8》暂未更新《docker学习》暂未更新《ceph学习

热文推荐