loong博客

Golang Concurrency - Goroutine

Table of Contents

  • Goroutine Hints
  • Graceful shutdown goroutine examples

Goroutine Hints

  • Never start a goroutine without knowing how it will stop.

Every time you use the go keyword in your program to launch a goroutine, you must know how, and when, that goroutine will exit. If you don’t know the answer, that’s a potential memory leak.

Graceful shutdown goroutine examples

  • one sender one receiver


package main

import (
	"context"
	"fmt"
	"log"
	"net/http"
	"os"
	"strconv"
	"sync"
	"time"
)

var (
	Log *os.File
)

func init() {
	file, err := os.OpenFile("info.log", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
	if err != nil {
		panic(err)
	}
	Log = file
}

func main() {
	defer Log.Close()
	OneToOne()
}

type Result struct {
	URL        string
	StatusCode int
	Error      error
}

// make http request
func makeHTTPRequest(ctx context.Context, url string) (int, error) {
	// Build the request
	req, err := http.NewRequest(http.MethodGet, url, nil)
	if err != nil {
		return -1, err
	}
	req.Header.Add("Content-Type", "text/html")
	req = req.WithContext(ctx)
	// A Client is an HTTP client
	client := &http.Client{}
	// Send the request via a client
	// Do sends an HTTP request and returns an HTTP response
	res, err := client.Do(req)
	if err != nil {
		return -1, err
	}
	// Callers should close resp.Body when done reading from it
	// Defer the closing of the body
	defer res.Body.Close()
	return res.StatusCode, nil
}

// One sender , One receiver, the sender says "no more sends" by closing the data channel.
func OneToOne() {
	var url string = "https://www.loongzxl.com/"
	// data channel
	dataCh := make(chan Result)
	// signal channel
	doneCh := make(chan bool)
	ctx, cancel := context.WithTimeout(context.Background(), 800*time.Millisecond)
	defer cancel()
	// sender
	go func(ctx context.Context, url string) {
		res, err := makeHTTPRequest(ctx, url)
		select {
		case <-ctx.Done():
            // If http request does not set a timeout field,this case will be selected when the timeout expires.
			dataCh <- Result{
				URL:   url,
				Error: ctx.Err(),
			}
			Log.WriteString(time.Now().String() + " | " + ctx.Err().Error() + "\n")
			return
		case <-doneCh:
			dataCh <- Result{
				URL:   url,
				Error: fmt.Errorf("%s", "request is cancelled"),
			}
			Log.WriteString(time.Now().String() + " | " + "receive a stop signal" + "\n")
			return
		case dataCh <- Result{
			URL:        url,
			StatusCode: res,
			Error:      err,
		}:
			// If http request set a timeout field,this case will be selected after the request completes.
			msg := fmt.Sprintf("%+v", err)
			Log.WriteString(time.Now().String() + " | " + "request is handled" + " -> " + msg + "\n")
			close(dataCh)
			return
		}
	}(ctx, url)
	// collecting results
	var result []Result
	// global time control
	delay := time.NewTicker(1000 * time.Millisecond)
	wg := sync.WaitGroup{}
	wg.Add(1)
	// receiver
	go func() {
		defer wg.Done()
		select {
		case val := <-dataCh:
			result = append(result, val)
			return
		case <-delay.C:
			Log.WriteString(time.Now().String() + " | " + "global timeout" + "\n")
			close(doneCh)
			return
		}
	}()
	wg.Wait()
	for _, val := range result {
		log.Printf("url:%s,statusCode:%d,error:%v\n", val.URL, val.StatusCode, val.Error)
	}
}

  • one sender N receivers


package main

import (
	"context"
	"fmt"
	"log"
	"net/http"
	"os"
	"strconv"
	"sync"
	"time"
)

var (
	Log *os.File
)

func init() {
	file, err := os.OpenFile("info.log", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
	if err != nil {
		panic(err)
	}
	Log = file
}

func main() {
	defer Log.Close()
	OneToMany()
}

type Result struct {
	URL        string
	StatusCode int
	Error      error
}

// make http request
func makeHTTPRequest(ctx context.Context, url string) (int, error) {
	// Build the request
	req, err := http.NewRequest(http.MethodGet, url, nil)
	if err != nil {
		return -1, err
	}
	req.Header.Add("Content-Type", "text/html")
	req = req.WithContext(ctx)
	// A Client is an HTTP client
	client := &http.Client{}
	// Send the request via a client
	// Do sends an HTTP request and returns an HTTP response
	res, err := client.Do(req)
	if err != nil {
		return -1, err
	}
	// Callers should close resp.Body when done reading from it
	// Defer the closing of the body
	defer res.Body.Close()
	return res.StatusCode, nil
}

// One sender, N receivers, the sender says "no more sends" by closing the data channel.
func OneToMany() {
	urls := []string{
		"https://www.loongzxl.com/",
		"https://www.baidu.com/",
	}
	// data channel
	dataCh := make(chan Result)
	// request channel
	reqCh := make(chan string)
	// One sender
	go func() {
		for _, url := range urls {
			reqCh <- url
		}
		close(reqCh)
	}()
	wg := sync.WaitGroup{}
	// N receivers
	ctx, cancel := context.WithTimeout(context.Background(), 800*time.Millisecond)
	defer cancel()
	for i := 1; i <= len(urls); i++ {
		wg.Add(1)
		go func(ctx context.Context, id int) {
			defer wg.Done()
			for url := range reqCh {
				res, err := makeHTTPRequest(ctx, url)
				select {
				case <-ctx.Done():
					// If http request does not set a timeout field,this case will be selected when the timeout expires.
					dataCh <- Result{
						URL:   url,
						Error: ctx.Err(),
					}
					Log.WriteString(time.Now().String() + " | " + ctx.Err().Error() + "\n")
					return
				case dataCh <- Result{
					URL:        url,
					StatusCode: res,
					Error:      err,
				}:
					// If http request set a timeout field,this case will be selected after the request completes.
					msg := fmt.Sprintf("%+v", err)
					Log.WriteString(time.Now().String() + " | " + "request is handled by " + "receiver" + strconv.Itoa(id) + " -> " + msg + "\n")
					return
				}
			}
		}(ctx, i)
	}

	go func() {
		wg.Wait()
		close(dataCh)
	}()

	// collecting results
	var result []Result
	for val := range dataCh {
		result = append(result, val)
	}
	log.Printf("%+v", result)
}

  • N sender one receiver


package main

import (
	"context"
	"fmt"
	"log"
	"net/http"
	"os"
	"strconv"
	"sync"
	"time"
)

var (
	Log *os.File
)

func init() {
	file, err := os.OpenFile("info.log", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
	if err != nil {
		panic(err)
	}
	Log = file
}

func main() {
	defer Log.Close()
	ManyToOne()
}

type Result struct {
	URL        string
	StatusCode int
	Error      error
}

// make http request
func makeHTTPRequest(ctx context.Context, url string) (int, error) {
	// Build the request
	req, err := http.NewRequest(http.MethodGet, url, nil)
	if err != nil {
		return -1, err
	}
	req.Header.Add("Content-Type", "text/html")
	req = req.WithContext(ctx)
	// A Client is an HTTP client
	client := &http.Client{}
	// Send the request via a client
	// Do sends an HTTP request and returns an HTTP response
	res, err := client.Do(req)
	if err != nil {
		return -1, err
	}
	// Callers should close resp.Body when done reading from it
	// Defer the closing of the body
	defer res.Body.Close()
	return res.StatusCode, nil
}

// N senders, One receiver
func ManyToOne() {
	urls := []string{
		"https://www.loongzxl.com/",
		"https://www.baidu.com/",
	}
	// data channel
	dataCh := make(chan Result)
	// A WaitGroup for Producer
	wgProducer := sync.WaitGroup{}
	// N senders
	ctx, cancel := context.WithTimeout(context.Background(), 800*time.Millisecond)
	defer cancel()
	for _, val := range urls {
		wgProducer.Add(1)
		address := val
		go func(ctx context.Context, url string) {
			defer wgProducer.Done()
			res, err := makeHTTPRequest(ctx, url)
			select {
			case <-ctx.Done():
				// If http request does not set a timeout field,this case will be selected when the timeout expires.
				dataCh <- Result{
					URL:   url,
					Error: ctx.Err(),
				}
				Log.WriteString(time.Now().String() + " | " + ctx.Err().Error() + "\n")
				return
			case dataCh <- Result{
				URL:        url,
				StatusCode: res,
				Error:      err,
			}:
				// If http request set a timeout field,this case will be selected after the request completes.
				msg := fmt.Sprintf("%+v", err)
				Log.WriteString(time.Now().String() + " | " + "request is handled" + " -> " + msg + "\n")
				return
			}
		}(ctx, address)
	}
	// collecting results
	var result []Result
	// A WaitGroup for Consumer
	wgConsumer := sync.WaitGroup{}
	wgConsumer.Add(1)
	// One receiver
	go func() {
		defer wgConsumer.Done()
		for val := range dataCh {
			result = append(result, val)
		}
	}()

	// Wait for all senders to finish
	go func() {
		wgProducer.Wait()
		close(dataCh)
	}()

	wgConsumer.Wait()
	log.Printf("%+v", result)
}

  • M senders N receivers


package main

import (
	"context"
	"fmt"
	"log"
	"net/http"
	"os"
	"strconv"
	"sync"
	"time"
)

var (
	Log *os.File
)

func init() {
	file, err := os.OpenFile("info.log", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
	if err != nil {
		panic(err)
	}
	Log = file
}

func main() {
	defer Log.Close()
	ManyToMany()
}

type Result struct {
	URL        string
	StatusCode int
	Error      error
}

// make http request
func makeHTTPRequest(ctx context.Context, url string) (int, error) {
	// Build the request
	req, err := http.NewRequest(http.MethodGet, url, nil)
	if err != nil {
		return -1, err
	}
	req.Header.Add("Content-Type", "text/html")
	req = req.WithContext(ctx)
	// A Client is an HTTP client
	client := &http.Client{}
	// Send the request via a client
	// Do sends an HTTP request and returns an HTTP response
	res, err := client.Do(req)
	if err != nil {
		return -1, err
	}
	// Callers should close resp.Body when done reading from it
	// Defer the closing of the body
	defer res.Body.Close()
	return res.StatusCode, nil
}

// M senders , N receiver
func ManyToMany() {
	urls := []string{
		"https://www.loongzxl.com/",
		"https://www.baidu.com/",
	}
	// data channel
	dataCh := make(chan Result)
	// A WaitGroup for Producer
	wgProducer := sync.WaitGroup{}
	// M senders
	ctx, cancel := context.WithTimeout(context.Background(), 800*time.Millisecond)
	defer cancel()
	for _, val := range urls {
		wgProducer.Add(1)
		address := val
		go func(ctx context.Context, url string) {
			defer wgProducer.Done()
			res, err := makeHTTPRequest(ctx, url)
			select {
			case <-ctx.Done():
				// If http request does not set a timeout field,this case will be selected when the timeout expires.
				dataCh <- Result{
					URL:   url,
					Error: ctx.Err(),
				}
				Log.WriteString(time.Now().String() + " | " + ctx.Err().Error() + "\n")
				return
			case dataCh <- Result{
				URL:        url,
				StatusCode: res,
				Error:      err,
			}:
				// If http request set a timeout field,this case will be selected after the request completes.
				msg := fmt.Sprintf("%+v", err)
				Log.WriteString(time.Now().String() + " | " + "request is handled" + " -> " + msg + "\n")
				return
			}
		}(ctx, address)
	}

	// Wait for all senders to finish
	go func() {
		wgProducer.Wait()
		close(dataCh)
	}()

	// collecting results
	var result []Result
	// A WaitGroup for Consumer
	wgConsumer := sync.WaitGroup{}
	// N receivers
	for i := 0; i < len(urls); i++ {
		wgConsumer.Add(1)
		go func(id int) {
			defer wgConsumer.Done()
			for val := range dataCh {
				result = append(result, val)
				Log.WriteString(time.Now().String() + " | " + "get a response by " + "receiver" + strconv.Itoa(id) + "\n")
			}
		}(i)
	}
	wgConsumer.Wait()
	log.Printf("%+v", result)
}

Reference