调试 Go 协程泄漏
在开始调试 Goroutines 泄漏之前,让我先简单介绍一些基础知识,这将使您对问题有更广泛的认识。
并发编程。
- 并发编程处理程序的并发执行,其中多个顺序执行流同时运行,从而导致计算执行速度更快。
- 它有助于更好地利用处理器的多核功能以获得更快的结果,并发/并行程序是必要的。
Goroutines
Goroutines 是由 Go 运行时管理的轻量级线程。

简单编程并发地将数字加起来。
package main
import "fmt"
// function to add an array of numbers.
func sum(s []int, c chan int) {
sum := 0
for _, v := range s {
sum += v
}
// writes the sum to the go routines.
c <- sum // send sum to c
}
func main() {
s := []int{7, 2, 8, -9, 4, 0}
c1 := make(chan int)
c2 := make(chan int)
// spin up a goroutine.
go sum(s[:len(s)/2], c1)
// spin up a goroutine.
go sum(s[len(s)/2:], c2)
x, y := <-c1, <-c2 // receive from c1 aND C2
fmt.Println(x, y, x+y)
}
并发编程不再是可选的,它是开发在多核处理器上运行的现代软件的必要条件。


就像任何协调的努力都必须朝着共同的目标前进一样,需要同步和通信。
在上面的程序中,每个 go routine 计算完总和后,都需要与主 goroutine 协调以将结果返回以计算最终值。
Go 的同步方法。


Go 鼓励使用通道在 goroutines 之间传递对数据的引用,而不是显式使用锁来协调对共享数据的访问。这种方法确保在任何给定时间只有一个 goroutine 可以访问数据。
现在,让我们开始在 Go 中进行并发编程。
如果您已经看到这里,那么您就会理解编写并发程序不再是一种选择,而 Go 使这变得很容易。此外,您还了解了 Go 通道并将其用于 Goroutines 之间的同步。现在让我们转向同步 Goroutines 的更难的部分。
同步可能出错!
听起来很可怕!!!!但是可能出了什么问题?!
好吧,go routines 之间的协调有很多方法可能会出错。
这可能导致某些 goroutines 永远等待!

每次使用 go 关键字时,都要注意 Go routine 将如何退出。
在没有接收方的情况下写入通道。
这是一个关于如何在没有接收方的情况下写入通道会导致 go routine 被永远阻塞的简单示例。
package main
import (
"fmt"
"log"
"net/http"
"strconv"
)
// function to add an array of numbers.
func sum(s []int, c chan int) {
sum := 0
for _, v := range s {
sum += v
}
// writes the sum to the go routines.
c <- sum // send sum to c
}
// HTTP handler for /sum
func sumConcurrent(w http.ResponseWriter, r *http.Request) {
s := []int{7, 2, 8, -9, 4, 0}
c1 := make(chan int)
c2 := make(chan int)
// spin up a goroutine.
go sum(s[:len(s)/2], c1)
// spin up a goroutine.
go sum(s[len(s)/2:], c2)
// not reading from c2.
// go routine writing to c2 will be blocked.
x := <-c1
// write the response.
fmt.Fprintf(w, strconv.Itoa(x))
}
func main() {
http.HandleFunc("/sum", sumConcurrent) // set router
err := http.ListenAndServe(":8001", nil) // set listen port
if err != nil {
log.Fatal("ListenAndServe: ", err)
}
}
注意:在实际情况中,这不是您编写程序的方式。这是一个关于如何引入泄漏的简单说明,我们将进一步使用此代码来识别泄漏并调试应用程序。
从没有写入方的通道中接收。
示例 1:在 for-select
上阻塞。
for {
select {
case <-c:
// process here
}
}
示例 2:在通道上循环。
go func() {
for range ch { }
}()
最佳实践
使用超时通道
timeout := make(chan bool, 1)
go func() {
time.Sleep(1e9) // one second
timeout <- true
}()
select {
case <- ch:
// a read from ch has occurred
case <- timeout:
// the read from ch has timed out
}
OR
select {
case res := <-c1:
fmt.Println(res)
case <-time.After(time.Second * 1):
fmt.Println("timeout 1")
}
使用 context 包。
Golang context 包 可用于优雅地结束 go routines,甚至用于超时。
泄漏检测。
在 Web 服务器中检测泄漏的公式是添加检测端点并在负载测试期间使用它们。
// get the count of number of go routines in the system.
func countGoRoutines() int {
return runtime.NumGoroutine()
}
func getGoroutinesCountHandler(w http.ResponseWriter, r *http.Request) {
// Get the count of number of go routines running.
count := countGoRoutines()
w.Write([]byte(strconv.Itoa(count)))
}
func main()
http.HandleFunc("/_count", getGoroutinesCountHandler)
}
使用检测端点,该端点在负载测试之前和之后响应系统中存活的 goroutines 数量。
以下是您的负载测试程序的流程
Step 1: Call the instrumentation endpoint and get the count of number of goroutines alive in your webserver.
Step 2: Perform load test.Lets the load be concurrent.
for i := 0; i < 100 ; i++ {
go callEndpointUnderInvestigation()
}
Step 3: Call the instrumentation endpoint and get the count of number of goroutines alive in your webserver.
如果负载测试后系统中存活的 goroutines 数量出现异常增加,则证明存在泄漏。
这是一个带有泄漏端点的 Web 服务器的小示例。通过简单的测试,我们可以确定服务器中是否存在泄漏。
package main
import (
"fmt"
"log"
"net/http"
"runtime"
"strconv"
)
// get the count of number of go routines in the system.
func countGoRoutines() int {
return runtime.NumGoroutine()
}
func getGoroutinesCountHandler(w http.ResponseWriter, r *http.Request) {
// Get the count of number of go routines running.
count := countGoRoutines()
w.Write([]byte(strconv.Itoa(count)))
}
// function to add an array of numbers.
func sum(s []int, c chan int) {
sum := 0
for _, v := range s {
sum += v
}
// writes the sum to the go routines.
c <- sum // send sum to c
}
// HTTP handler for /sum
func sumConcurrent(w http.ResponseWriter, r *http.Request) {
s := []int{7, 2, 8, -9, 4, 0}
c1 := make(chan int)
c2 := make(chan int)
// spin up a goroutine.
go sum(s[:len(s)/2], c1)
// spin up a goroutine.
go sum(s[len(s)/2:], c2)
// not reading from c2.
// go routine writing to c2 will be blocked.
// Since we are not reading from c2,
// the goroutine attempting to write to c2
// will be blocked forever resulting in leak.
x := <-c1
// write the response.
fmt.Fprintf(w, strconv.Itoa(x))
}
func main() {
// get the sum of numbers.
http.HandleFunc("/sum", sumConcurrent)
// get the count of number of go routines in the system.
http.HandleFunc("/_count", getGoroutinesCountHandler)
err := http.ListenAndServe(":8001", nil)
if err != nil {
log.Fatal("ListenAndServe: ", err)
}
}
package main
import (
"io/ioutil"
"log"
"net/http"
"strconv"
"sync"
)
const (
leakyServer = "http://localhost:8001"
)
// get the count of the number of go routines in the server.
func getRoutineCount() (int, error) {
body, err := getReq("/_count")
if err != nil {
return -1, err
}
count, err := strconv.Atoi(string(body))
if err != nil {
return -1, err
}
return count, nil
}
// Send get request and return the repsonse body.
func getReq(endPoint string) ([]byte, error) {
response, err := http.Get(leakyServer + endPoint)
if err != nil {
return []byte{}, err
}
defer response.Body.Close()
body, err := ioutil.ReadAll(response.Body)
if err != nil {
return []byte{}, err
}
return body, nil
}
func main() {
// get the number of go routines in the leaky server.
count, err := getRoutineCount()
if err != nil {
log.Fatal(err)
}
log.Printf("\n %d Go routines before the load test in the system.", count)
var wg sync.WaitGroup
// send 50 concurrent request to the leaky endpoint.
for i := 0; i < 50; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_, err = getReq("/sum")
if err != nil {
log.Fatal(err)
}
}()
}
wg.Wait()
// get the cout of number of goroutines in the system after the load test.
count, err = getRoutineCount()
if err != nil {
log.Fatal(err)
}
log.Printf("\n %d Go routines after the load test in the system.", count)
}
// First run the leaky server
$ go run leaky-server.go
// Run the load test now.
$ go run load.go
3 Go routines before the load test in the system.
54 Go routines after the load test in the system.
您可以清楚地看到,对泄漏端点发出 50 个并发请求会导致系统中增加 50 个 go routines。
让我们再次运行负载测试。
$ go run load.go
53 Go routines before the load test in the system.
104 Go routines after the load test in the system.
很明显,每次运行负载测试时,服务器中的 go routines 数量都在增加,并且没有下降。这是一个明显的泄漏证据。
识别泄漏的来源。
使用堆栈跟踪检测。
一旦确定 Web 服务器中存在泄漏,您现在需要识别泄漏的来源。
添加一个端点,该端点将返回 Web 服务器的堆栈跟踪,可以帮助您识别泄漏的来源。
import (
"runtime/debug"
"runtime/pprof"
)
func getStackTraceHandler(w http.ResponseWriter, r *http.Request) {
stack := debug.Stack()
w.Write(stack)
pprof.Lookup("goroutine").WriteTo(w, 2)
}
func main() {
http.HandleFunc("/_stack", getStackTraceHandler)
}
在识别泄漏的存在后,使用端点在负载前后获取堆栈跟踪以识别泄漏的来源。
将堆栈跟踪检测添加到泄漏服务器并再次执行负载测试。代码如下
package main
import (
"fmt"
"log"
"net/http"
"runtime"
"runtime/debug"
"runtime/pprof"
"strconv"
)
// get the count of number of go routines in the system.
func countGoRoutines() int {
return runtime.NumGoroutine()
}
// respond with number of go routines in the system.
func getGoroutinesCountHandler(w http.ResponseWriter, r *http.Request) {
// Get the count of number of go routines running.
count := countGoRoutines()
w.Write([]byte(strconv.Itoa(count)))
}
// respond with the stack trace of the system.
func getStackTraceHandler(w http.ResponseWriter, r *http.Request) {
stack := debug.Stack()
w.Write(stack)
pprof.Lookup("goroutine").WriteTo(w, 2)
}
// function to add an array of numbers.
func sum(s []int, c chan int) {
sum := 0
for _, v := range s {
sum += v
}
// writes the sum to the go routines.
c <- sum // send sum to c
}
// HTTP handler for /sum
func sumConcurrent(w http.ResponseWriter, r *http.Request) {
s := []int{7, 2, 8, -9, 4, 0}
c1 := make(chan int)
c2 := make(chan int)
// spin up a goroutine.
go sum(s[:len(s)/2], c1)
// spin up a goroutine.
go sum(s[len(s)/2:], c2)
// not reading from c2.
// go routine writing to c2 will be blocked.
x := <-c1
// write the response.
fmt.Fprintf(w, strconv.Itoa(x))
}
func main() {
// get the sum of numbers.
http.HandleFunc("/sum", sumConcurrent)
// get the count of number of go routines in the system.
http.HandleFunc("/_count", getGoroutinesCountHandler)
// respond with the stack trace of the system.
http.HandleFunc("/_stack", getStackTraceHandler)
err := http.ListenAndServe(":8001", nil)
if err != nil {
log.Fatal("ListenAndServe: ", err)
}
}
package main
import (
"io/ioutil"
"log"
"net/http"
"strconv"
"sync"
)
const (
leakyServer = "http://localhost:8001"
)
// get the count of the number of go routines in the server.
func getRoutineCount() (int, error) {
body, err := getReq("/_count")
if err != nil {
return -1, err
}
count, err := strconv.Atoi(string(body))
if err != nil {
return -1, err
}
return count, nil
}
// Send get request and return the repsonse body.
func getReq(endPoint string) ([]byte, error) {
response, err := http.Get(leakyServer + endPoint)
if err != nil {
return []byte{}, err
}
defer response.Body.Close()
body, err := ioutil.ReadAll(response.Body)
if err != nil {
return []byte{}, err
}
return body, nil
}
// obtain stack trace of the server.
func getStackTrace() (string, error) {
body, err := getReq("/_stack")
if err != nil {
return "", err
}
return string(body), nil
}
func main() {
// get the number of go routines in the leaky server.
count, err := getRoutineCount()
if err != nil {
log.Fatal(err)
}
log.Printf("\n %d Go routines before the load test in the system.", count)
var wg sync.WaitGroup
// send 50 concurrent request to the leaky endpoint.
for i := 0; i < 50; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_, err = getReq("/sum")
if err != nil {
log.Fatal(err)
}
}()
}
wg.Wait()
// get the cout of number of goroutines in the system after the load test.
count, err = getRoutineCount()
if err != nil {
log.Fatal(err)
}
log.Printf("\n %d Go routines after the load test in the system.", count)
// obtain the stack trace of the system.
trace, err := getStackTrace()
if err != nil {
log.Fatal(err)
}
log.Printf("\nStack trace after the load test : \n %s",trace)
}
// First run the leaky server
$ go run leaky-server.go
// Run the load test now.
$ go run load.go
3 Go routines before the load test in the system.
54 Go routines after the load test in the system.
goroutine 149 [chan send]:
main.sum(0xc420122e58, 0x3, 0x3, 0xc420112240)
/home/karthic/gophercon/count-instrument.go:39 +0x6c
created by main.sumConcurrent
/home/karthic/gophercon/count-instrument.go:51 +0x12b
goroutine 243 [chan send]:
main.sum(0xc42021a0d8, 0x3, 0x3, 0xc4202760c0)
/home/karthic/gophercon/count-instrument.go:39 +0x6c
created by main.sumConcurrent
/home/karthic/gophercon/count-instrument.go:51 +0x12b
goroutine 259 [chan send]:
main.sum(0xc4202700d8, 0x3, 0x3, 0xc42029c0c0)
/home/karthic/gophercon/count-instrument.go:39 +0x6c
created by main.sumConcurrent
/home/karthic/gophercon/count-instrument.go:51 +0x12b
goroutine 135 [chan send]:
main.sum(0xc420226348, 0x3, 0x3, 0xc4202363c0)
/home/karthic/gophercon/count-instrument.go:39 +0x6c
created by main.sumConcurrent
/home/karthic/gophercon/count-instrument.go:51 +0x12b
goroutine 166 [chan send]:
main.sum(0xc4202482b8, 0x3, 0x3, 0xc42006b8c0)
/home/karthic/gophercon/count-instrument.go:39 +0x6c
created by main.sumConcurrent
/home/karthic/gophercon/count-instrument.go:51 +0x12b
goroutine 199 [chan send]:
main.sum(0xc420260378, 0x3, 0x3, 0xc420256480)
/home/karthic/gophercon/count-instrument.go:39 +0x6c
created by main.sumConcurrent
/home/karthic/gophercon/count-instrument.go:51 +0x12b
........
堆栈跟踪清楚地指向泄漏的震中。
使用分析。
由于泄漏的 goroutines 通常是被阻塞以尝试读取或写入通道,或者甚至可能处于睡眠状态,因此分析将帮助您识别泄漏的来源。
这是我在 2016 年 Gophercon 上关于基准测试和分析的演讲。

重要的是,在正在调查的端点处于负载状态时进行检测测试和分析。
避免泄漏,尽早捕获
单元和功能测试中的检测可以帮助尽早识别泄漏。
计算测试前后 goroutines 的数量。
func TestMyFunc() {
// get count of go routines.
perform the test.
// get the count diff.
// alert if there's an unexpected rise.
}
测试中的堆栈差异。
堆栈差异是一个简单的程序,它对测试前后堆栈跟踪进行差异比较,并在系统中存在任何不需要的 goroutines 时发出警报。将其集成到您的单元和功能测试中,可以帮助在开发过程中识别泄漏。
import (
github.com/fortytw2/leaktest
)
func TestMyFunc(t *testing.T) {
defer leaktest.Check(t)()
go func() {
for {
time.Sleep(time.Second)
}
}()
}
设计安全
具有作为单独容器/进程运行的端服务/端点的微服务架构可以防止整个系统受到其中一个端点/服务中的泄漏或资源中断的影响。如果编排由 Kubernetes、Mesosphere 和 Docker Swarm 等工具管理,那就太好了。

想象一下获取整个系统的堆栈跟踪并尝试识别在数百个服务中哪个服务导致了泄漏!!!这确实很可怕!!!!
Goroutine 泄漏就像缓慢的杀手。它们会在一段时间内缓慢积累,浪费您的计算资源,而您甚至不会注意到。了解这一点非常重要,务必注意泄漏并尽早调试它们!