同步操作将从 infraboard/go-course 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
对于Go语言而言, 一提到并发,可能想到它内建的并发支持, 这也是Go语言最吸引人的地方。
Go采用的并发编程思想是CSP(Communicating Sequential Process,通讯顺序进程), CSP有着精确的数学模型, 其思想的核心是同步通讯, 易于理解。CSP是Go语言推荐的并发模型,
但是除了CSP,并发模型其实还有很多, 但在讨论并发模型之前首先要明确一组概念:并发与并行
并发不是并行
并发程序含有多个逻辑上的独立执行块,它们可以独立的并行执行,也可以串行执行。并行程序解决问题的速度往往比串行程序快的多,因为其可以同时执行整个任务的多个部分。并行程序可能有多个独立执行体,也可能仅有一个。
并行一般是简单的大量重复,例如GPU中对图像处理都会有大量的并行运算。几种并行架构:
我们还可以从另一种角度来看待并发和并行之间的差异。并发是问题域中的概念,程序需要被设计成能够处理多个同时(或者几乎同时)发生的事情,而并行则是方法域中的概念,通过将问题中的多个部分并行执行来加速解决问题。
总结下并发与并行:
在早期,CPU都是以单核的形式顺序执行机器指令, Go语言的祖先C语言正是这种顺序编程语言的代表,顺序编程语言中的顺序是指:所有的指令都是以串行的方式执行,在相同的时刻有且仅有一个CPU在顺序执行程序的指令
随着处理器技术的发展,单核时代以提升处理器频率来提高运行效率的方式遇到了瓶颈,单核CPU的发展的停滞,多核CPU的发展迎来春天, 这个时候我们的代码就可以真正的被并行处理
单核到多核的区别下图:
但是哪怕是单核CPU,我们也可以跑多个程序, 这是为什么喃?
进程和线程是操作系统级别的概念, 不要和硬件层(比如CPU的线程混淆了), 单个逻辑核心为啥能同时执行多个程序, 有2个部分原因:
到次可以看出,我们的硬件(CPU)是串行进行处理的, 但是进程在OS层面是并发的, 所以多进程是在操作系统层面的并发模型, 我们如果想要让自己开发的程序也支持并发,可以利用操作系统进程并发的特性, 比如常见的Master/Worker模型:
进程是操作系统资源分配的最小单元。因为所有的进程都是有操作系统的内核管理的。所以每个进程之间是独立的,每一个进程都会有自己单独的内存空间以及上下文信息,一个进程挂了不会影响其他进程的运行。这个也是多进程最大的优点,但是它的缺点也很明显, 因为独立,所以进程的结构都需要复制一遍(Fork), 开销很大, 因此我们看到使用多进程来实现并发的案例也不多
那有没有开销更小的方案喃?
有, 多线程是目前最流行的并发场景的解决方案,由于线程更加轻量级,创建和销毁的成本都很低。并且线程之间通信以及共享内存非常方便,和多进程相比开销要小得多
但是多线程也有缺点,一个缺点也是开销。虽然线程的开销要比进程小得多,但是如果创建和销毁频繁的话仍然是不小的负担。
针对这个问题诞生了线程池这种设计。
创建一大批线程放入线程池当中,需要用的时候拿出来使用,用完了再放回
所以在很长一段时间里, 基于线程池来实现并发是最常用的一种手段, 下面是一段基于线程池的python代码:
# coding: utf-8
from concurrent.futures import ThreadPoolExecutor
import time
def spider(page):
time.sleep(page)
print(f"crawl task{page} finished")
return page
with ThreadPoolExecutor(max_workers=5) as t: # 创建一个最大容纳数量为5的线程池
task1 = t.submit(spider, 1)
task2 = t.submit(spider, 2) # 通过submit提交执行的函数到线程池中
task3 = t.submit(spider, 3)
print(f"task1: {task1.done()}") # 通过done来判断线程是否完成
print(f"task2: {task2.done()}")
print(f"task3: {task3.done()}")
time.sleep(2.5)
print(f"task1: {task1.done()}")
print(f"task2: {task2.done()}")
print(f"task3: {task3.done()}")
print(task1.result()) # 通过result来获取返回值
每个系统级线程都会有一个固定大小的栈(一般默认可能是2MB),这个栈主要用来保存函数递归调用时参数和局部变量。固定了栈的大小导致了两个问题:
一个Goroutine会以一个很小的栈启动(可能是2KB或4KB),当遇到深度递归导致当前栈空间不足时,Goroutine会根据需要动态地伸缩栈的大小(主流实现中栈的最大值可达到1GB)。 因为启动的代价很小,所以我们可以轻易地启动成千上万个Goroutine
这也就是我们为什么把协程叫做轻量级线程的原因: 基本上比线程需要的启动内存少1个数量级
协程的调度不是基于操作系统的而是基于用户空间的程序的, 一般由库或者程序的运行时提供调度, 会根据具体函数只保存必要的寄存器,切换的代价要比系统线程低得多。
也就是说协程更像是程序里的函数,但是在执行的过程当中可以随时挂起、随时继续:
协程1和协程2的关系是完全对等的,协程1执行过程中可以中断挂起执行另外一个协程2,反之也是可以的,直到最终两个协程都执行完以后再返回回到主程序中,即协程1和协程2相互协作完成了整个任务
func A() {
fmt.Print("1")
fmt.Print("2")
fmt.Print("3")
}
func B() {
fmt.Print("A")
fmt.Print("B")
fmt.Print("C")
}
如果我们在一个线程内执行A和B这两个函数,要么先执行A再执行B要么先执行B再执行A。输出的结果是确定的,但如果我们用写成来执行A和B,有可能A函数执行了一半刚输出了一条语句的时候就转而去执行B,B输出了一条又再回到A继续执行。不管执行的过程当中发生了几次中断和继续,在操作系统当中执行的线程都没有发生变化。也就是说这是程序级的调度
那么和多线程相比,我们创建、销毁线程的开销就完全没有了,整个过程变得非常灵活。但是缺点是由于是程序级别的调度,所以需要编程语言自身的支持,如果语言本身不支持,就很难使用了。目前原生就支持协程的语言并不多,显然golang就是其中一个
那Go是如何实现协程的喃? 协程又是如何交给线程去运行的喃?
Go的Runtime实现了一个调度器, 我们叫他协程调度器, 由他将协程调度给操作系统的线程运行,比如:
图中为啥是 M 个协程绑定 N 个线程, 为啥不是N:1 或者 1:1喃:
在Go语言里面,协程叫做Goroutine, 就是Go coroutine的一个合写, 后面我们叫Go里的协程叫: Goroutine
我们来看看这个协程调度器调度器是如何设计的
我们现在使用的调度器是go1.1 (released 2013/05/13),中实现的, 其实在Go刚诞生时 还有1个简单版本的调度器
这个版本的调度器因为性能存在问题, 仅存在很短一短时间
调度过程:
M 想要执行、放回 G 都必须访问全局 G 队列,并且 M 有多个,即多线程访问同一资源需要加锁进行保证互斥 / 同步,所以全局 G 队列是有互斥锁进行保护的
这样调度存在2个缺陷:
那新版本是如何改进这2个问题的喃?
面对之前调度器的问题,Go 设计了新的调度器。
在新调度器中,除了 M (thread) 和 G (goroutine),又引进了 P (Processor)
1个P绑定一个1个线程, 这样线程M就不需要频繁的切换了, 只需要消费P中的协程即可, 由于P包含一个本地队列,这样也避免了直接使用全局队列带来的加锁问题
Processor,它包含了运行 goroutine 的资源,如果线程想运行 goroutine,必须先获取 P,P 中还包含了可运行的 G 队列
struct P
{
Lock;
G *gfree; // freelist, moved from sched
G *ghead; // runnable, moved from sched
G *gtail;
MCache *mcache; // moved from M
FixAlloc *stackalloc; // moved from M
uint64 ncgocall;
GCStats gcstats;
// etc
...
};
P *allp; // [GOMAXPROCS]
There is also a lock-free list of idle P’s:
P *idlep; // lock-free list
加入了P和P的本地队列后,新的调度流程如下:
详细细节可以看: Scalable Go Scheduler Design Doc
讲了那么多,我该如何创建一个G喃?, 很简单使用go关键字, 比如
// 这样我们就把doSomething变成了一个G协程,交由调度器去运行
go doSomething()
当我们执行 go func() 时, Go按照上面的GPM模型, 流程应该是什么样的:
下面通过一个具体的案例来跟踪整个过程
比如我们要执行一个任务, 该任务执行一次好似2s, 如下:
func runTask(id int) {
time.Sleep(2 * time.Second)
fmt.Printf("task %d complete\n", id)
}
如果我们要顺序执行10次,那么跑一轮下来就得耗时 20秒:
func syncRun() {
for i := 0; i < 10; i++ {
runTask(i + 1)
}
}
func main() {
syncRun()
}
task 1 complete
task 2 complete
task 3 complete
task 4 complete
task 5 complete
task 6 complete
task 7 complete
task 8 complete
task 9 complete
task 10 complete
我如果想让他们并发运行怎么办? 这个时候就可以使用Goroutine了
func asyncRun() {
for i := 0; i < 10; i++ {
go runTask(i + 1)
}
}
func main() {
asyncRun()
}
这样你会发现, 并没有按照我们预期的进行, 为什么? 再看下,下面这张图:
我们启动了10个协程, 协程启动过后,还没来得急执行,我们的主程序就退出了,所以没有打印任何结果
那我们如何知道协程已经退出? 有过其他语言编程经验的同学应该可以想到,基于共享内存:
因为协程是基于线程运行的,线程又是共享内存的,因此我们可以定义一个遍历, 没启动一个我们就加1, 退出1个我们就减1:
package main
import (
"fmt"
"time"
)
var (
// 状态计数器
goroutineCount = 0
)
func Add() {
goroutineCount++
}
func Exit() {
goroutineCount--
}
func runTask(id int) {
// 推出一个减去1
defer Exit()
fmt.Printf("task %d start..\n", id)
time.Sleep(2 * time.Second)
fmt.Printf("task %d complete\n", id)
}
func asyncRun() {
for i := 0; i < 10; i++ {
go runTask(i + 1)
// 没启动一个go routine 就+1
Add()
}
}
func main() {
asyncRun()
for goroutineCount > 0 {
time.Sleep(100 * time.Millisecond)
}
}
这样我们的程序运行Task就时并行的了
task 6 start..
task 2 start..
task 1 start..
task 4 start..
task 8 start..
task 9 start..
task 10 start..
task 3 start..
task 7 start..
task 5 start..
task 5 complete
task 8 complete
task 7 complete
task 3 complete
task 1 complete
task 9 complete
task 6 complete
task 10 complete
task 2 complete
task 4 complete
上面还有个问题,就是由于多个goroutine 同时访问共享变量时, 可能导致变量的不准确修改, 最好还是加锁
var (
// 状态计数器
goroutineCount = 0
mu sync.Mutex
)
// 并不是Gorouine进行访问的不需要加锁
func Add() {
goroutineCount++
}
// Goroutine 并发访问的变量,需要加锁
func Exit() {
mu.Lock()
defer mu.Unlock()
goroutineCount--
}
对于这种要等待N个线程完成后再进行下一步的同步操作有一个简单的做法,就是使用sync.WaitGroup来等待一组事件, 其实现逻辑也好我们的方式差不多, 但是由于是使用的原子锁,比我们使用的互斥锁效率高很多
package main
import (
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
func runTask(id int) {
// 推出一个减去1
defer wg.Done()
fmt.Printf("task %d start..\n", id)
time.Sleep(2 * time.Second)
fmt.Printf("task %d complete\n", id)
}
func asyncRun() {
for i := 0; i < 10; i++ {
go runTask(i + 1)
// 没启动一个go routine 就+1
wg.Add(1)
}
}
func main() {
asyncRun()
wg.Wait()
}
如何跟踪程序的运行过程?
go提供了一个不错的工具: go tool trace, 用于分析程序的运行过程, 但是在使用这个工具之前,我们需要采集运行过程的数据
改造下我们的main, 加入采集trace过程的库
func main() {
//创建trace文件
f, err := os.Create("trace.out")
if err != nil {
panic(err)
}
defer f.Close()
//启动trace goroutine
err = trace.Start(f)
if err != nil {
panic(err)
}
defer trace.Stop()
asyncRun()
wg.Wait()
}
执行完成后,会生成一个trace.out的文件我们基于这个文件就可以使用 to的trace工具了进行分析了:
go tool trace trace.out
2021/08/05 20:43:45 Parsing trace...
2021/08/05 20:43:45 Splitting trace...
2021/08/05 20:43:45 Opening browser. Trace viewer is listening on http://127.0.0.1:53603
我们可以通过浏览器打开 http://127.0.0.1:53603 网址,点击 view trace 能够看见可视化的调度流程
可以看到G7 ~ G16 就是我们刚才运行Task的10个goroutine
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。