Golang篇-深入理解GMP调度模型

image.png

之前对GMP的认识较为浅显,一直没有很深入的分析过GMP的调度模型,趁着这阵子有时间写一篇笔记记录一下

简介

携程是go中的一大特色,其占用较低的内存,通过携程使我们的应用能支持更大的并发,下面我们来剖析他的工作原理

概念

G(goroutine)

即Go协程,每个go关键字都会创建一个协程,它存储了goroutine的执行stack信息(运行时栈信息)、goroutine状态以及goroutine的任务函数等在G眼中只有P,P就是运行G的 CPU

M(machine)

工作线程,在Go中称为Machine。M是真正调度系统的执行者,它会优先从关联的 P 的本地队列中直接获取中可运行的G,如果本地队列没有的话, 再到调度器持有的全局队列中领取一些任务或是向其他的MP组合偷一半可以执行的G来执行,M 运行 G,G 执行之后,M 会从 P 获取下一个 G,不断重复下去。

P(processor)

processor处理器,它包含了运行 goroutine 的资源,它用于处理M与G的关系:如果线程想运行 goroutine,必须先获取 P,P 中还包含了可运行的 G 队列
P的个数在程序启动时决定,默认等同与CPU的核数,通过 runtime.GOMAXPROCS() 设置P的个数M必须拥有P才可以执行G中的代码,P含有一个包含多个G的队列,P可以调度G交由M执行

源码

以下源码均参考自 release-branch.go1.19

G(goroutine)

runtime/runtime2.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117

type g struct {
// Stack parameters.
// stack describes the actual stack memory: [stack.lo, stack.hi).
// stackguard0 is the stack pointer compared in the Go stack growth prologue.
// It is stack.lo+StackGuard normally, but can be StackPreempt to trigger a preemption.
// stackguard1 is the stack pointer compared in the C stack growth prologue.
// It is stack.lo+StackGuard on g0 and gsignal stacks.
// It is ~0 on other goroutine stacks, to trigger a call to morestackc (and crash).
stack stack // offset known to runtime/cgo
//当前g的栈
stackguard0 uintptr // offset known to liblink
//判断当前g是否被抢占
stackguard1 uintptr // offset known to liblink
//
_panic *_panic // innermost panic - offset known to liblink
//内部panic
_defer *_defer // innermost defer
//内部defer
m *m // current m; offset known to arm liblink
//当前g占用的线程
sched gobuf
//调度相关数据的存储,goroutine切换时,用于保存g的上下文
syscallsp uintptr // if status==Gsyscall, syscallsp = sched.sp to use during gc
syscallpc uintptr // if status==Gsyscall, syscallpc = sched.pc to use during gc
stktopsp uintptr // expected sp at top of stack, to check in traceback
// param is a generic pointer parameter field used to pass
// values in particular contexts where other storage for the
// parameter would be difficult to find. It is currently used
// in three ways:
// 1. When a channel operation wakes up a blocked goroutine, it sets param to
// point to the sudog of the completed blocking operation.
// 2. By gcAssistAlloc1 to signal back to its caller that the goroutine completed
// the GC cycle. It is unsafe to do so in any other way, because the goroutine's
// stack may have moved in the meantime.
// 3. By debugCallWrap to pass parameters to a new goroutine because allocating a
// closure in the runtime is forbidden.
param unsafe.Pointer
//用于传递参数,睡眠时其他goroutine可以设置param,唤醒时该goroutine可以获取
atomicstatus atomic.Uint32
//G的状态
stackLock uint32 // sigprof/scang lock; TODO: fold in to atomicstatus
//
goid uint64
//协程id
schedlink guintptr
//g链表指针
waitsince int64 // approx time when the g become blocked
//g被阻塞的大体时间
waitreason waitReason // if status==Gwaiting
//阻塞原因
preempt bool // preemption signal, duplicates stackguard0 = stackpreempt
//抢占标记
preemptStop bool // transition to _Gpreempted on preemption; otherwise, just deschedule
preemptShrink bool // shrink stack at synchronous safe point

// asyncSafePoint is set if g is stopped at an asynchronous
// safe point. This means there are frames on the stack
// without precise pointer information.
asyncSafePoint bool

paniconfault bool // panic (instead of crash) on unexpected fault address
gcscandone bool // g has scanned stack; protected by _Gscan bit in status
throwsplit bool // must not split stack
// activeStackChans indicates that there are unlocked channels
// pointing into this goroutine's stack. If true, stack
// copying needs to acquire channel locks to protect these
// areas of the stack.
activeStackChans bool
// parkingOnChan indicates that the goroutine is about to
// park on a chansend or chanrecv. Used to signal an unsafe point
// for stack shrinking.
parkingOnChan atomic.Bool

raceignore int8 // ignore race detection events
sysblocktraced bool // StartTrace has emitted EvGoInSyscall about this goroutine
tracking bool // whether we're tracking this G for sched latency statistics
trackingSeq uint8 // used to decide whether to track this G
trackingStamp int64 // timestamp of when the G last started being tracked
runnableTime int64 // the amount of time spent runnable, cleared when running, only used when tracking
sysexitticks int64 // cputicks when syscall has returned (for tracing)
traceseq uint64 // trace event sequencer
tracelastp puintptr // last P emitted an event for this goroutine
lockedm muintptr
//G被锁定只在这个m上运行
sig uint32
writebuf []byte
sigcode0 uintptr
sigcode1 uintptr
sigpc uintptr
gopc uintptr // pc of go statement that created this goroutine
//创建该goroutine的指令地址
ancestors *[]ancestorInfo // ancestor information goroutine(s) that created this goroutine (only used if debug.tracebackancestors)
startpc uintptr // pc of goroutine function
//goroutine 函数的指令地址
racectx uintptr
waiting *sudog // sudog structures this g is waiting on (that have a valid elem ptr); in lock order
cgoCtxt []uintptr // cgo traceback context
labels unsafe.Pointer // profiler labels
timer *timer // cached timer for time.Sleep
selectDone atomic.Uint32 // are we participating in a select and did someone win the race?

// goroutineProfiled indicates the status of this goroutine's stack for the
// current in-progress goroutine profile
goroutineProfiled goroutineProfileStateHolder

// Per-G GC state

// gcAssistBytes is this G's GC assist credit in terms of
// bytes allocated. If this is positive, then the G has credit
// to allocate gcAssistBytes bytes without assisting. If this
// is negative, then the G must correct this by performing
// scan work. We track this in bytes to make it fast to update
// and check for debt in the malloc hot path. The assist ratio
// determines how this corresponds to scan work debt.
gcAssistBytes int64
}

保存g上下文结构体,里面存储执行g的一些相关指针

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type gobuf struct {
// The offsets of sp, pc, and g are known to (hard-coded in) libmach.
//
// ctxt is unusual with respect to GC: it may be a
// heap-allocated funcval, so GC needs to track it, but it
// needs to be set and cleared from assembly, where it's
// difficult to have write barriers. However, ctxt is really a
// saved, live register, and we only ever exchange it between
// the real register and the gobuf. Hence, we treat it as a
// root during stack scanning, which means assembly that saves
// and restores it doesn't need write barriers. It's still
// typed as a pointer so that any other writes from Go get
// write barriers.
sp uintptr
pc uintptr
g guintptr//g指针
ctxt unsafe.Pointer
ret uintptr
lr uintptr
bp uintptr // for framepointer-enabled architectures
}

G status

枚举协程十种状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
const (
// G status
//
// Beyond indicating the general state of a G, the G status
// acts like a lock on the goroutine's stack (and hence its
// ability to execute user code).
//
// If you add to this list, add to the list
// of "okay during garbage collection" status
// in mgcmark.go too.
//
// TODO(austin): The _Gscan bit could be much lighter-weight.
// For example, we could choose not to run _Gscanrunnable
// goroutines found in the run queue, rather than CAS-looping
// until they become _Grunnable. And transitions like
// _Gscanwaiting -> _Gscanrunnable are actually okay because
// they don't affect stack ownership.

// _Gidle means this goroutine was just allocated and has not
// yet been initialized.
_Gidle = iota // 0 //刚分配但是未进行初始化

// _Grunnable means this goroutine is on a run queue. It is
// not currently executing user code. The stack is not owned.
_Grunnable // 1

// _Grunning means this goroutine may execute user code. The
// stack is owned by this goroutine. It is not on a run queue.
// It is assigned an M and a P (g.m and g.m.p are valid).
_Grunning // 2

// _Gsyscall means this goroutine is executing a system call.
// It is not executing user code. The stack is owned by this
// goroutine. It is not on a run queue. It is assigned an M.
_Gsyscall // 3

// _Gwaiting means this goroutine is blocked in the runtime.
// It is not executing user code. It is not on a run queue,
// but should be recorded somewhere (e.g., a channel wait
// queue) so it can be ready()d when necessary. The stack is
// not owned *except* that a channel operation may read or
// write parts of the stack under the appropriate channel
// lock. Otherwise, it is not safe to access the stack after a
// goroutine enters _Gwaiting (e.g., it may get moved).
_Gwaiting // 4

// _Gmoribund_unused is currently unused, but hardcoded in gdb
// scripts.
_Gmoribund_unused // 5

// _Gdead means this goroutine is currently unused. It may be
// just exited, on a free list, or just being initialized. It
// is not executing user code. It may or may not have a stack
// allocated. The G and its stack (if any) are owned by the M
// that is exiting the G or that obtained the G from the free
// list.
_Gdead // 6

// _Genqueue_unused is currently unused.
_Genqueue_unused // 7

// _Gcopystack means this goroutine's stack is being moved. It
// is not executing user code and is not on a run queue. The
// stack is owned by the goroutine that put it in _Gcopystack.
_Gcopystack // 8

// _Gpreempted means this goroutine stopped itself for a
// suspendG preemption. It is like _Gwaiting, but nothing is
// yet responsible for ready()ing it. Some suspendG must CAS
// the status to _Gwaiting to take responsibility for
// ready()ing this G.
_Gpreempted // 9

// _Gscan combined with one of the above states other than
// _Grunning indicates that GC is scanning the stack. The
// goroutine is not executing user code and the stack is owned
// by the goroutine that set the _Gscan bit.
//
// _Gscanrunning is different: it is used to briefly block
// state transitions while GC signals the G to scan its own
// stack. This is otherwise like _Grunning.
//
// atomicstatus&~Gscan gives the state the goroutine will
// return to when the scan completes.
_Gscan = 0x1000
_Gscanrunnable = _Gscan + _Grunnable // 0x1001
_Gscanrunning = _Gscan + _Grunning // 0x1002
_Gscansyscall = _Gscan + _Gsyscall // 0x1003
_Gscanwaiting = _Gscan + _Gwaiting // 0x1004
_Gscanpreempted = _Gscan + _Gpreempted // 0x1009
)

M(machine)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91

type m struct {
g0 *g // goroutine with scheduling stack
//g的调度栈
morebuf gobuf // gobuf arg to morestack
divmod uint32 // div/mod denominator for arm - known to liblink
_ uint32 // align next field to 8 bytes

// Fields not known to debuggers.
procid uint64 // for debuggers, but offset not hard-coded
gsignal *g // signal-handling g
//处理信号的goroutine
goSigStack gsignalStack // Go-allocated signal handling stack
sigmask sigset // storage for saved signal mask
tls [tlsSlots]uintptr // thread-local storage (for x86 extern register)
mstartfn func()
curg *g // current running goroutine
//当前运行的g
caughtsig guintptr // goroutine running during fatal signal
p puintptr // attached p for executing go code (nil if not executing go code)
//正在运行的p
nextp puintptr
//接下来运行的p
oldp puintptr // the p that was attached before executing a syscall
//之前运行的p
id int64
mallocing int32
throwing throwType
preemptoff string // if != "", keep curg running on this m
locks int32
dying int32
profilehz int32
spinning bool // m is out of work and is actively looking for work
//
blocked bool // m is blocked on a note
//m是否被阻塞
newSigstack bool // minit on C thread called sigaltstack
printlock int8
incgo bool // m is executing a cgo call
isextra bool // m is an extra m
freeWait atomic.Uint32 // Whether it is safe to free g0 and delete m (one of freeMRef, freeMStack, freeMWait)
fastrand uint64
needextram bool
traceback uint8
ncgocall uint64 // number of cgo calls in total
ncgo int32 // number of cgo calls currently in progress
cgoCallersUse atomic.Uint32 // if non-zero, cgoCallers in use temporarily
cgoCallers *cgoCallers // cgo traceback if crashing in cgo call
park note
alllink *m // on allm
schedlink muintptr
lockedg guintptr
createstack [32]uintptr // stack that created this thread.
lockedExt uint32 // tracking for external LockOSThread
lockedInt uint32 // tracking for internal lockOSThread
nextwaitm muintptr // next m waiting for lock
waitunlockf func(*g, unsafe.Pointer) bool
waitlock unsafe.Pointer
waittraceev byte
waittraceskip int
startingtrace bool
syscalltick uint32
freelink *m // on sched.freem

// these are here because they are too large to be on the stack
// of low-level NOSPLIT functions.
libcall libcall
libcallpc uintptr // for cpu profiler
libcallsp uintptr
libcallg guintptr
syscall libcall // stores syscall parameters on windows

vdsoSP uintptr // SP for traceback while in VDSO call (0 if not in call)
vdsoPC uintptr // PC for traceback while in VDSO call

// preemptGen counts the number of completed preemption
// signals. This is used to detect when a preemption is
// requested, but fails.
preemptGen atomic.Uint32

// Whether this is a pending preemption signal on this M.
signalPending atomic.Uint32

dlogPerM

mOS

// Up to 10 locks held by this m, maintained by the lock ranking code.
locksHeldLen int
locksHeld [10]heldLockInfo
}

结构体M中有两个G是需要关注一下的,一个是curg,代表结构体M当前绑定的结构体G。另一个是g0,是带有调度栈的goroutine,这是一个比较特殊的goroutine。普通的goroutine的栈是在堆上分配的可增长的栈,而g0的栈是M对应的线程的栈。所有调度相关的代码,会先切换到该goroutine的栈中再执行。也就是说线程的栈也是用的g实现,而不是使用的OS的。

P(processor)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167

type p struct {
id int32
status uint32 // one of pidle/prunning/...
//当前p状态
link puintptr
//
schedtick uint32 // incremented on every scheduler call
syscalltick uint32 // incremented on every system call
sysmontick sysmontick // last tick observed by sysmon
m muintptr // back-link to associated m (nil if idle)
//调度的m
mcache *mcache
//
pcache pageCache
//页缓存
raceprocctx uintptr

deferpool []*_defer // pool of available defer structs (see panic.go)
deferpoolbuf [32]*_defer

// Cache of goroutine ids, amortizes accesses to runtime·sched.goidgen.
goidcache uint64
goidcacheend uint64
//goroutine id 缓存

// Queue of runnable goroutines. Accessed without lock.
runqhead uint32
//运行队列队头
runqtail uint32
//运行队列队尾
runq [256]guintptr
//运行队列
// runnext, if non-nil, is a runnable G that was ready'd by
// the current G and should be run next instead of what's in
// runq if there's time remaining in the running G's time
// slice. It will inherit the time left in the current time
// slice. If a set of goroutines is locked in a
// communicate-and-wait pattern, this schedules that set as a
// unit and eliminates the (potentially large) scheduling
// latency that otherwise arises from adding the ready'd
// goroutines to the end of the run queue.
//
// Note that while other P's may atomically CAS this to zero,
// only the owner P can CAS it to a valid G.
runnext guintptr
//下一个要运行的协程地址指针

// Available G's (status == Gdead)
gFree struct {
gList
n int32
}

sudogcache []*sudog
sudogbuf [128]*sudog

// Cache of mspan objects from the heap.
mspancache struct {
// We need an explicit length here because this field is used
// in allocation codepaths where write barriers are not allowed,
// and eliminating the write barrier/keeping it eliminated from
// slice updates is tricky, moreso than just managing the length
// ourselves.
len int
buf [128]*mspan
}

tracebuf traceBufPtr

// traceSweep indicates the sweep events should be traced.
// This is used to defer the sweep start event until a span
// has actually been swept.
traceSweep bool
// traceSwept and traceReclaimed track the number of bytes
// swept and reclaimed by sweeping in the current sweep loop.
traceSwept, traceReclaimed uintptr

palloc persistentAlloc // per-P to avoid mutex

// The when field of the first entry on the timer heap.
// This is 0 if the timer heap is empty.
timer0When atomic.Int64

// The earliest known nextwhen field of a timer with
// timerModifiedEarlier status. Because the timer may have been
// modified again, there need not be any timer with this value.
// This is 0 if there are no timerModifiedEarlier timers.
timerModifiedEarliest atomic.Int64

// Per-P GC state
gcAssistTime int64 // Nanoseconds in assistAlloc
gcFractionalMarkTime int64 // Nanoseconds in fractional mark worker (atomic)

// limiterEvent tracks events for the GC CPU limiter.
limiterEvent limiterEvent

// gcMarkWorkerMode is the mode for the next mark worker to run in.
// That is, this is used to communicate with the worker goroutine
// selected for immediate execution by
// gcController.findRunnableGCWorker. When scheduling other goroutines,
// this field must be set to gcMarkWorkerNotWorker.
gcMarkWorkerMode gcMarkWorkerMode
// gcMarkWorkerStartTime is the nanotime() at which the most recent
// mark worker started.
gcMarkWorkerStartTime int64

// gcw is this P's GC work buffer cache. The work buffer is
// filled by write barriers, drained by mutator assists, and
// disposed on certain GC state transitions.
gcw gcWork

// wbBuf is this P's GC write barrier buffer.
//
// TODO: Consider caching this in the running G.
wbBuf wbBuf

runSafePointFn uint32 // if 1, run sched.safePointFn at next safe point

// statsSeq is a counter indicating whether this P is currently
// writing any stats. Its value is even when not, odd when it is.
statsSeq atomic.Uint32

// Lock for timers. We normally access the timers while running
// on this P, but the scheduler can also do it from a different P.
timersLock mutex

// Actions to take at some time. This is used to implement the
// standard library's time package.
// Must hold timersLock to access.
timers []*timer

// Number of timers in P's heap.
numTimers atomic.Uint32

// Number of timerDeleted timers in P's heap.
deletedTimers atomic.Uint32

// Race context used while executing timer functions.
timerRaceCtx uintptr

// maxStackScanDelta accumulates the amount of stack space held by
// live goroutines (i.e. those eligible for stack scanning).
// Flushed to gcController.maxStackScan once maxStackScanSlack
// or -maxStackScanSlack is reached.
maxStackScanDelta int64

// gc-time statistics about current goroutines
// Note that this differs from maxStackScan in that this
// accumulates the actual stack observed to be used at GC time (hi - sp),
// not an instantaneous measure of the total stack size that might need
// to be scanned (hi - lo).
scannedStackSize uint64 // stack size of goroutines scanned by this P
scannedStacks uint64 // number of goroutines scanned by this P

// preempt is set to indicate that this P should be enter the
// scheduler ASAP (regardless of what G is running on it).
preempt bool

// pageTraceBuf is a buffer for writing out page allocation/free/scavenge traces.
//
// Used only if GOEXPERIMENT=pagetrace.
pageTraceBuf pageTraceBuf

// Padding is no longer needed. False sharing is now not a worry because p is large enough
// that its size class is an integer multiple of the cache line size (for any of our architectures).
}

Schedt(调度器)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
type schedt struct {
// accessed atomically. keep at top to ensure alignment on 32-bit systems.
goidgen uint64
lastpoll uint64 // time of last network poll, 0 if currently polling
pollUntil uint64 // time to which current poll is sleeping

lock mutex

// When increasing nmidle, nmidlelocked, nmsys, or nmfreed, be
// sure to call checkdead().

midle muintptr // idle m's waiting for work
//空闲M链表
nmidle int32 // number of idle m's waiting for work
//空闲M数量
nmidlelocked int32 // number of locked m's waiting for work
//被锁住的M的数量
mnext int64 // number of m's that have been created and next M ID
//已创建M的数量,以及下一个M ID
maxmcount int32 // maximum number of m's allowed (or die)
//允许创建最大的M数量
nmsys int32 // number of system m's not counted for deadlock
//不计入死锁的M数量
nmfreed int64 // cumulative number of freed m's
//累计释放M的数量

ngsys uint32 // number of system goroutines; updated atomically

pidle puintptr // idle p's
//空闲的P链表
npidle uint32
//空闲的P数量
nmspinning uint32 // See "Worker thread parking/unparking" comment in proc.go.

// Global runnable queue.
runq gQueue
//全局runnable的G队列
runqsize int32
//全局runnable的G数量

// disable controls selective disabling of the scheduler.
//
// Use schedEnableUser to control this.
//
// disable is protected by sched.lock.
disable struct {
// user disables scheduling of user goroutines.
user bool
runnable gQueue // pending runnable Gs
n int32 // length of runnable
}

// Global cache of dead G's.
gFree struct {
lock mutex
stack gList // Gs with stacks
noStack gList // Gs without stacks
n int32
}

// Central cache of sudog structs.
sudoglock mutex
sudogcache *sudog

// Central pool of available defer structs.
deferlock mutex
deferpool *_defer

// freem is the list of m's waiting to be freed when their
// m.exited is set. Linked through m.freelink.
freem *m

gcwaiting uint32 // gc is waiting to run
stopwait int32
stopnote note
sysmonwait uint32
sysmonnote note

// safepointFn should be called on each P at the next GC
// safepoint if p.runSafePointFn is set.
safePointFn func(*p)
safePointWait int32
safePointNote note

profilehz int32 // cpu profiling rate

procresizetime int64 // nanotime() of last change to gomaxprocs
totaltime int64 // ∫gomaxprocs dt up to procresizetime

// sysmonlock protects sysmon's actions on the runtime.
//
// Acquire and hold this mutex to block sysmon from interacting
// with the rest of the runtime.
sysmonlock mutex

// timeToRun is a distribution of scheduling latencies, defined
// as the sum of time a G spends in the _Grunnable state before
// it transitions to _Grunning.
//
// timeToRun is protected by sched.lock.
timeToRun timeHistogram
}

模型

图示

G-M-P分别代表:

  • G - Goroutine,Go协程,是参与调度与执行的最小单位
  • M - Machine,指的是系统级线程
  • P - Processor,指的是逻辑处理器,P关联了的本地可运行G的队列(也称为LRQ),最多可存放256个G。

GMP调度流程大致如下:

  • 线程M想运行任务就需得获取 P,即与P关联。
  • 然从 P 的本地队列(LRQ)获取 G,P更加偏向于类似G资源供给器的职责功能
  • 若LRQ中没有可运行的G,M 会尝试从全局队列(GRQ)拿一批G放到P的本地队列,
  • 若全局队列也未找到可运行的G时候,M会随机从其他 P 的本地队列偷一半放到自己 P 的本地队列。
  • 拿到可运行的G之后,M 运行 G,G 执行之后,M 会从 P 获取下一个 G,不断重复下去。

数量问题

P的数量:

  • p的最大运行数量由GOMAXPROCS控制,一般设置为cpu的核数,比如 GOMAXPROCS = 核数/2,则最多利用了一半的 CPU 核进行并行,又因为一个协程goroutine让出 CPU 后,才执行下一个协程,所以程序执行的任意时刻都只有 GOMAXPROCS 个 goroutine 在同时运行,在 Go 中,一个 goroutine 最多占用 CPU 10ms,防止其他 goroutine 被饿死

M的数量:

  • go 程序启动时,会设置 M 的最大数量,默认 10000. 但是内核很难支持这么多的线程数,所以这个限制可以忽略
  • runtime/debug 中的 SetMaxThreads 函数,设置 M 的最大数量
  • 一个 M 阻塞了,会创建新的 M

M 与 P 的数量关系:

  • M 与 P 的数量没有绝对关系,一个 M 阻塞,P 就会去创建或者切换另一个 M,所以,即使 P 的默认数量是 1,也有可能会创建很多个 M 出来

P 和 M 何时被创建:

  • P 何时创建:在确定了 P 的最大数量 n 后,运行时系统会根据这个数量创建 n 个 P
  • M 何时创建:没有足够的 M 来关联 P 并运行其中的可运行的 G时。比如所有的 M 此时都在忙,而 P 中还有很多就绪任务,就会去寻找空闲的 M,而没有空闲的,就会去创建新的 M

窃取式工作分配机制

  • 优先在本地队列中查找G,本地没有去全局查找,如果在本地或者全局都找不到G则去别的P中进行窃取,增强线程的利用率

创建协程

  • 随机寻找一个P
  • 将新协程放入P的runnext(插队)队列中
  • 若P本地队列满,放入全局队列

go/src/runtime/proc.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
// Create a new g running fn.
// Put it on the queue of g's waiting to run.
// The compiler turns a go statement into a call to this.
func newproc(fn *funcval) {
gp := getg()
pc := getcallerpc()
systemstack(func() {
//创建新的G
newg := newproc1(fn, gp, pc)
_p_ := getg().m.p.ptr()
//放入G到运行队列中
runqput(_p_, newg, true)
if mainStarted {
wakep()
}
})
}

// Create a new g in state _Grunnable, starting at fn. callerpc is the
// address of the go statement that created this. The caller is responsible
// for adding the new g to the scheduler.
func newproc1(fn *funcval, callergp *g, callerpc uintptr) *g {
_g_ := getg()

if fn == nil {
fatal("go of nil func value")
}
acquirem() // disable preemption because it can be holding p in a local var

_p_ := _g_.m.p.ptr()
newg := gfget(_p_)
if newg == nil {
newg = malg(_StackMin)
casgstatus(newg, _Gidle, _Gdead)
allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
}
if newg.stack.hi == 0 {
throw("newproc1: newg missing stack")
}

if readgstatus(newg) != _Gdead {
throw("newproc1: new g is not Gdead")
}

totalSize := uintptr(4*goarch.PtrSize + sys.MinFrameSize) // extra space in case of reads slightly beyond frame
totalSize = alignUp(totalSize, sys.StackAlign)
sp := newg.stack.hi - totalSize
spArg := sp
if usesLR {
// caller's LR
*(*uintptr)(unsafe.Pointer(sp)) = 0
prepGoExitFrame(sp)
spArg += sys.MinFrameSize
}

memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
newg.sched.sp = sp
newg.stktopsp = sp
newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
newg.sched.g = guintptr(unsafe.Pointer(newg))
gostartcallfn(&newg.sched, fn)
newg.gopc = callerpc
newg.ancestors = saveAncestors(callergp)
newg.startpc = fn.fn
if isSystemGoroutine(newg, false) {
atomic.Xadd(&sched.ngsys, +1)
} else {
// Only user goroutines inherit pprof labels.
if _g_.m.curg != nil {
newg.labels = _g_.m.curg.labels
}
if goroutineProfile.active {
// A concurrent goroutine profile is running. It should include
// exactly the set of goroutines that were alive when the goroutine
// profiler first stopped the world. That does not include newg, so
// mark it as not needing a profile before transitioning it from
// _Gdead.
newg.goroutineProfiled.Store(goroutineProfileSatisfied)
}
}
// Track initial transition?
newg.trackingSeq = uint8(fastrand())
if newg.trackingSeq%gTrackingPeriod == 0 {
newg.tracking = true
}
casgstatus(newg, _Gdead, _Grunnable)
gcController.addScannableStack(_p_, int64(newg.stack.hi-newg.stack.lo))

if _p_.goidcache == _p_.goidcacheend {
// Sched.goidgen is the last allocated id,
// this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch].
// At startup sched.goidgen=0, so main goroutine receives goid=1.
_p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)
_p_.goidcache -= _GoidCacheBatch - 1
_p_.goidcacheend = _p_.goidcache + _GoidCacheBatch
}
newg.goid = int64(_p_.goidcache)
_p_.goidcache++
if raceenabled {
newg.racectx = racegostart(callerpc)
if newg.labels != nil {
// See note in proflabel.go on labelSync's role in synchronizing
// with the reads in the signal handler.
racereleasemergeg(newg, unsafe.Pointer(&labelSync))
}
}
if trace.enabled {
traceGoCreate(newg, newg.startpc)
}
releasem(_g_.m)

return newg
}

可以看到,其优先会放到本地运行队列中,如果本地运行队列已满,则会将其放入全局队列中去

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// runqput tries to put g on the local runnable queue.
// If next is false, runqput adds g to the tail of the runnable queue.
// If next is true, runqput puts g in the _p_.runnext slot.
// If the run queue is full, runnext puts g on the global queue.
// Executed only by the owner P.
func runqput(_p_ *p, gp *g, next bool) {
if randomizeScheduler && next && fastrandn(2) == 0 {
next = false
}

if next {
retryNext:
oldnext := _p_.runnext
if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
goto retryNext
}
if oldnext == 0 {
return
}
// Kick the old runnext out to the regular run queue.
gp = oldnext.ptr()
}

retry:
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
t := _p_.runqtail
if t-h < uint32(len(_p_.runq)) {
_p_.runq[t%uint32(len(_p_.runq))].set(gp)
atomic.StoreRel(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
return
}
if runqputslow(_p_, gp, h, t) {//放入全局队列
return
}
// the queue is not full, now the put above must succeed
goto retry
}

资料

https://github.com/golang/go


Golang篇-深入理解GMP调度模型
https://mikeygithub.github.io/2022/12/20/yuque/Golang篇-深入理解GMP调度模型/
作者
Mikey
发布于
2022年12月20日
许可协议