Table of Contents
- Goroutine Hints
- Graceful shutdown goroutine examples
Goroutine Hints
-
Never start a goroutine without knowing how it will stop.
Every time you use the go keyword in your program to launch a goroutine, you must know how, and when, that goroutine will exit. If you don’t know the answer, that’s a potential memory leak.
Graceful shutdown goroutine examples
-
one sender one receiver
package main
import (
"context"
"fmt"
"log"
"net/http"
"os"
"strconv"
"sync"
"time"
)
var (
Log *os.File
)
func init() {
file, err := os.OpenFile("info.log", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
panic(err)
}
Log = file
}
func main() {
defer Log.Close()
OneToOne()
}
type Result struct {
URL string
StatusCode int
Error error
}
// make http request
func makeHTTPRequest(ctx context.Context, url string) (int, error) {
// Build the request
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return -1, err
}
req.Header.Add("Content-Type", "text/html")
req = req.WithContext(ctx)
// A Client is an HTTP client
client := &http.Client{}
// Send the request via a client
// Do sends an HTTP request and returns an HTTP response
res, err := client.Do(req)
if err != nil {
return -1, err
}
// Callers should close resp.Body when done reading from it
// Defer the closing of the body
defer res.Body.Close()
return res.StatusCode, nil
}
// One sender , One receiver, the sender says "no more sends" by closing the data channel.
func OneToOne() {
var url string = "https://www.loongzxl.com/"
// data channel
dataCh := make(chan Result)
// signal channel
doneCh := make(chan bool)
ctx, cancel := context.WithTimeout(context.Background(), 800*time.Millisecond)
defer cancel()
// sender
go func(ctx context.Context, url string) {
res, err := makeHTTPRequest(ctx, url)
select {
case <-ctx.Done():
// If http request does not set a timeout field,this case will be selected when the timeout expires.
dataCh <- Result{
URL: url,
Error: ctx.Err(),
}
Log.WriteString(time.Now().String() + " | " + ctx.Err().Error() + "\n")
return
case <-doneCh:
dataCh <- Result{
URL: url,
Error: fmt.Errorf("%s", "request is cancelled"),
}
Log.WriteString(time.Now().String() + " | " + "receive a stop signal" + "\n")
return
case dataCh <- Result{
URL: url,
StatusCode: res,
Error: err,
}:
// If http request set a timeout field,this case will be selected after the request completes.
msg := fmt.Sprintf("%+v", err)
Log.WriteString(time.Now().String() + " | " + "request is handled" + " -> " + msg + "\n")
close(dataCh)
return
}
}(ctx, url)
// collecting results
var result []Result
// global time control
delay := time.NewTicker(1000 * time.Millisecond)
wg := sync.WaitGroup{}
wg.Add(1)
// receiver
go func() {
defer wg.Done()
select {
case val := <-dataCh:
result = append(result, val)
return
case <-delay.C:
Log.WriteString(time.Now().String() + " | " + "global timeout" + "\n")
close(doneCh)
return
}
}()
wg.Wait()
for _, val := range result {
log.Printf("url:%s,statusCode:%d,error:%v\n", val.URL, val.StatusCode, val.Error)
}
}
-
one sender N receivers
package main
import (
"context"
"fmt"
"log"
"net/http"
"os"
"strconv"
"sync"
"time"
)
var (
Log *os.File
)
func init() {
file, err := os.OpenFile("info.log", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
panic(err)
}
Log = file
}
func main() {
defer Log.Close()
OneToMany()
}
type Result struct {
URL string
StatusCode int
Error error
}
// make http request
func makeHTTPRequest(ctx context.Context, url string) (int, error) {
// Build the request
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return -1, err
}
req.Header.Add("Content-Type", "text/html")
req = req.WithContext(ctx)
// A Client is an HTTP client
client := &http.Client{}
// Send the request via a client
// Do sends an HTTP request and returns an HTTP response
res, err := client.Do(req)
if err != nil {
return -1, err
}
// Callers should close resp.Body when done reading from it
// Defer the closing of the body
defer res.Body.Close()
return res.StatusCode, nil
}
// One sender, N receivers, the sender says "no more sends" by closing the data channel.
func OneToMany() {
urls := []string{
"https://www.loongzxl.com/",
"https://www.baidu.com/",
}
// data channel
dataCh := make(chan Result)
// request channel
reqCh := make(chan string)
// One sender
go func() {
for _, url := range urls {
reqCh <- url
}
close(reqCh)
}()
wg := sync.WaitGroup{}
// N receivers
ctx, cancel := context.WithTimeout(context.Background(), 800*time.Millisecond)
defer cancel()
for i := 1; i <= len(urls); i++ {
wg.Add(1)
go func(ctx context.Context, id int) {
defer wg.Done()
for url := range reqCh {
res, err := makeHTTPRequest(ctx, url)
select {
case <-ctx.Done():
// If http request does not set a timeout field,this case will be selected when the timeout expires.
dataCh <- Result{
URL: url,
Error: ctx.Err(),
}
Log.WriteString(time.Now().String() + " | " + ctx.Err().Error() + "\n")
return
case dataCh <- Result{
URL: url,
StatusCode: res,
Error: err,
}:
// If http request set a timeout field,this case will be selected after the request completes.
msg := fmt.Sprintf("%+v", err)
Log.WriteString(time.Now().String() + " | " + "request is handled by " + "receiver" + strconv.Itoa(id) + " -> " + msg + "\n")
return
}
}
}(ctx, i)
}
go func() {
wg.Wait()
close(dataCh)
}()
// collecting results
var result []Result
for val := range dataCh {
result = append(result, val)
}
log.Printf("%+v", result)
}
-
N sender one receiver
package main
import (
"context"
"fmt"
"log"
"net/http"
"os"
"strconv"
"sync"
"time"
)
var (
Log *os.File
)
func init() {
file, err := os.OpenFile("info.log", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
panic(err)
}
Log = file
}
func main() {
defer Log.Close()
ManyToOne()
}
type Result struct {
URL string
StatusCode int
Error error
}
// make http request
func makeHTTPRequest(ctx context.Context, url string) (int, error) {
// Build the request
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return -1, err
}
req.Header.Add("Content-Type", "text/html")
req = req.WithContext(ctx)
// A Client is an HTTP client
client := &http.Client{}
// Send the request via a client
// Do sends an HTTP request and returns an HTTP response
res, err := client.Do(req)
if err != nil {
return -1, err
}
// Callers should close resp.Body when done reading from it
// Defer the closing of the body
defer res.Body.Close()
return res.StatusCode, nil
}
// N senders, One receiver
func ManyToOne() {
urls := []string{
"https://www.loongzxl.com/",
"https://www.baidu.com/",
}
// data channel
dataCh := make(chan Result)
// A WaitGroup for Producer
wgProducer := sync.WaitGroup{}
// N senders
ctx, cancel := context.WithTimeout(context.Background(), 800*time.Millisecond)
defer cancel()
for _, val := range urls {
wgProducer.Add(1)
address := val
go func(ctx context.Context, url string) {
defer wgProducer.Done()
res, err := makeHTTPRequest(ctx, url)
select {
case <-ctx.Done():
// If http request does not set a timeout field,this case will be selected when the timeout expires.
dataCh <- Result{
URL: url,
Error: ctx.Err(),
}
Log.WriteString(time.Now().String() + " | " + ctx.Err().Error() + "\n")
return
case dataCh <- Result{
URL: url,
StatusCode: res,
Error: err,
}:
// If http request set a timeout field,this case will be selected after the request completes.
msg := fmt.Sprintf("%+v", err)
Log.WriteString(time.Now().String() + " | " + "request is handled" + " -> " + msg + "\n")
return
}
}(ctx, address)
}
// collecting results
var result []Result
// A WaitGroup for Consumer
wgConsumer := sync.WaitGroup{}
wgConsumer.Add(1)
// One receiver
go func() {
defer wgConsumer.Done()
for val := range dataCh {
result = append(result, val)
}
}()
// Wait for all senders to finish
go func() {
wgProducer.Wait()
close(dataCh)
}()
wgConsumer.Wait()
log.Printf("%+v", result)
}
-
M senders N receivers
package main
import (
"context"
"fmt"
"log"
"net/http"
"os"
"strconv"
"sync"
"time"
)
var (
Log *os.File
)
func init() {
file, err := os.OpenFile("info.log", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
panic(err)
}
Log = file
}
func main() {
defer Log.Close()
ManyToMany()
}
type Result struct {
URL string
StatusCode int
Error error
}
// make http request
func makeHTTPRequest(ctx context.Context, url string) (int, error) {
// Build the request
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return -1, err
}
req.Header.Add("Content-Type", "text/html")
req = req.WithContext(ctx)
// A Client is an HTTP client
client := &http.Client{}
// Send the request via a client
// Do sends an HTTP request and returns an HTTP response
res, err := client.Do(req)
if err != nil {
return -1, err
}
// Callers should close resp.Body when done reading from it
// Defer the closing of the body
defer res.Body.Close()
return res.StatusCode, nil
}
// M senders , N receiver
func ManyToMany() {
urls := []string{
"https://www.loongzxl.com/",
"https://www.baidu.com/",
}
// data channel
dataCh := make(chan Result)
// A WaitGroup for Producer
wgProducer := sync.WaitGroup{}
// M senders
ctx, cancel := context.WithTimeout(context.Background(), 800*time.Millisecond)
defer cancel()
for _, val := range urls {
wgProducer.Add(1)
address := val
go func(ctx context.Context, url string) {
defer wgProducer.Done()
res, err := makeHTTPRequest(ctx, url)
select {
case <-ctx.Done():
// If http request does not set a timeout field,this case will be selected when the timeout expires.
dataCh <- Result{
URL: url,
Error: ctx.Err(),
}
Log.WriteString(time.Now().String() + " | " + ctx.Err().Error() + "\n")
return
case dataCh <- Result{
URL: url,
StatusCode: res,
Error: err,
}:
// If http request set a timeout field,this case will be selected after the request completes.
msg := fmt.Sprintf("%+v", err)
Log.WriteString(time.Now().String() + " | " + "request is handled" + " -> " + msg + "\n")
return
}
}(ctx, address)
}
// Wait for all senders to finish
go func() {
wgProducer.Wait()
close(dataCh)
}()
// collecting results
var result []Result
// A WaitGroup for Consumer
wgConsumer := sync.WaitGroup{}
// N receivers
for i := 0; i < len(urls); i++ {
wgConsumer.Add(1)
go func(id int) {
defer wgConsumer.Done()
for val := range dataCh {
result = append(result, val)
Log.WriteString(time.Now().String() + " | " + "get a response by " + "receiver" + strconv.Itoa(id) + "\n")
}
}(i)
}
wgConsumer.Wait()
log.Printf("%+v", result)
}