go同步协程的必备工具WaitGroup怎么使用

工具使用   发布日期:2023年09月23日   浏览次数:625

本篇内容主要讲解“go同步协程的必备工具WaitGroup怎么使用”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“go同步协程的必备工具WaitGroup怎么使用”吧!

1. 简介

本文将介绍 Go 语言中的 WaitGroup 并发原语,包括 WaitGroup 的基本使用方法、实现原理、使用注意事项以及常见的使用方式。能够更好地理解和应用 WaitGroup 来协调多个 Goroutine 的执行,提高 Go 并发编程的效率和稳定性。

2. 基本使用

2.1 定义

  1. WaitGroup
是Go语言标准库中的一个结构体,它提供了一种简单的机制,用于同步多个协程的执行。适用于需要并发执行多个任务并等待它们全部完成后才能继续执行后续操作的场景。

2.2 使用方式

首先主协程创建WaitGroup实例,然后在每个协程的开始处,调用

  1. Add(1)
方法,表示需要等待一个任务执行完成,然后协程在任务执行完成之后,调用
  1. Done
方法,表示任务已经执行完成了。

主协程中,需要调用

  1. Wait()
方法,等待所有协程完成任务,示例如下:
  1. func main(){
  2. //首先主协程创建WaitGroup实例
  3. var wg sync.WaitGroup
  4. // 开始时调用Add方法表示有个任务开始执行
  5. wg.Add(1)
  6. go func() {
  7. // 开始执行...
  8. //完成之后,调用Done方法
  9. wg.Done()
  10. }()
  11. // 调用Wait()方法,等待所有协程完成任务
  12. wg.Wait()
  13. // 执行后续逻辑
  14. }

2.3 使用例子

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. func main() {
  7. var wg sync.WaitGroup
  8. for i := 0; i < 5; i++ {
  9. wg.Add(1)
  10. go func(i int) {
  11. defer wg.Done()
  12. fmt.Printf("任务%d开始执行
  13. ", i)
  14. // 模拟协程任务执行一段时间
  15. time.Sleep(time.Duration(rand.Int() % 100))
  16. // 线程任务执行完成
  17. fmt.Printf("任务%d执行完毕
  18. ", i)
  19. }(i)
  20. }
  21. fmt.Println("主协程开始等待所有任务执行完成...")
  22. wg.Wait()
  23. fmt.Println("所有协程已经执行完毕...")
  24. }

在这个例子中,我们使用了

  1. sync.WaitGroup
来等待5个协程执行完毕。在循环中,每创建一个任务,我们调用一次
  1. wg.Add(1)
方法,然后启动一个协程去执行任务,当协程完成任务后,调用
  1. wg.Done
方法,告知主协程任务已经执行完毕。然后主协程会在5个协程任务全部执行完毕之后,才会继续向下执行。

3.实现原理

3.1 设计初衷

  1. WaitGroup
的设计初衷就是为了等待一组操作完成后再执行下一步操作,通常会在一组协程中使用。

3.2 基本原理

  1. sync.WaitGroup
结构体中的
  1. state1
  1. state2
字段是用于实现
  1. WaitGroup
功能的重要变量。
  1. type WaitGroup struct {
  2. noCopy noCopy
  3. state1 uint64
  4. state2 uint32
  5. }

由于

  1. WaitGroup
需要等待一组操作完成之后再执行,因此需要等待所有操作完成之后才能继续执行。为了实现这个功能,WaitGroup 使用了一个计数器
  1. counter
来记录还有多少个操作没有完成,如果
  1. counter
的值为 0,则表示所有操作已经完成。

同时,

  1. WaitGroup
在所有任务都完成之后,需要唤醒所有处于等待的协程,此时需要知道有多少个协程处于等待状态。为了实现这个功能,WaitGroup 使用了一个等待计数器
  1. waiter
来记录当前有多少个协程正在等待操作完成。

这里

  1. WaitGroup
对于计数器和等待计数器的实现,是通过一个64位无符号整数来实现的,也就是
  1. WaitGroup
结构体中的state1,其中高32位保存了任务计数器
  1. counter
的值,低32位保存了等待计数器
  1. waiter
的值。当我们创建一个
  1. WaitGroup
实例时,该实例的任务计数器等待计数器都被初始化为 0。

而且,等待协程需要等待所有任务完成之后才能继续执行,所以等待协程在任务未完成时会被阻塞,当任务全部完成后,自动被唤醒。

  1. WaitGroup
使用
  1. state2
用于实现信号量机制。通过调用
  1. runtime_Semacquire()
  1. runtime_Semrelease()
函数,可以在不阻塞线程的情况下进行等待和通知操作。

3.3 代码实现

3.3.1 Add方法

调用

  1. Add()
方法增加/减小
  1. counter
的值,delta的值可以是正数,也可以是负数,下面是
  1. Add
方法的源码实现:
  1. func (wg *WaitGroup) Add(delta int) {
  2. // delta 的值可以为负数,Done方法便是通过Add(-1)来实现的
  3. // statep: 为state1的地址 semap: 为state2的地址
  4. statep, semap := wg.state()
  5. // 高32位的值 加上 delta,增加任务计数器的值
  6. state := atomic.AddUint64(statep, uint64(delta)<<32)
  7. // v: 取高32位数据,获取到待完成任务数
  8. v := int32(state >> 32)
  9. // 取低32位数据,获取到等待线程的值
  10. w := uint32(state)
  11. // v > 0: 说明还有待完成的任务数,此时不应该唤醒等待协程
  12. // w = 0: 说明没有协程在等待,此时可以直接退出
  13. if v > 0 || w == 0 {
  14. return
  15. }
  16. // 此时v = 0,所有任务都完成了,唤醒等待协程
  17. *statep = 0
  18. for ; w != 0; w-- {
  19. runtime_Semrelease(semap, false, 0)
  20. }
  21. }

3.3.2 Done方法实现

调用

  1. Done()
方法表示完成了一个任务,通过调用
  1. Add
方法,
  1. delta
值为-1,减少任务计数器
  1. counter
的值,当其归为0时,便自动唤醒所有处于等待的协程。
  1. // Done decrements the WaitGroup counter by one.
  2. func (wg *WaitGroup) Done() {
  3. wg.Add(-1)
  4. }

3.3.3 Wait方法实现

调用

  1. Wait
方法,等待任务执行完成,增加等待计数器
  1. Waiter
的值:
  1. func (wg *WaitGroup) Wait() {
  2. // statep: 为state1的地址 semap: 为state2的地址
  3. statep, semap := wg.state()
  4. for {
  5. // 加载state1的值
  6. state := atomic.LoadUint64(statep)
  7. // v: 取高32位数据,获取到待完成任务数
  8. v := int32(state >> 32)
  9. // 没有任务待执行,全部都完成了
  10. if v == 0 {
  11. return
  12. }
  13. // 增加waiter计数器的值
  14. if atomic.CompareAndSwapUint64(statep, state, state+1) {
  15. // 等待被唤醒
  16. runtime_Semacquire(semap)
  17. return
  18. }
  19. }
  20. }

3.4 实现补充

  1. Add
方法,
  1. Done
方法以及
  1. Wait
方法实现中,有一些异常场景的验证逻辑被我删除掉了。当出现异常场景时,说明用户使用方式和
  1. WaitGroup
的设计初衷相违背了,此时
  1. WaitGroup
就会直接panic。

下面通过说明使用的注意事项,来间接介绍

  1. WaitGroup
的异常验证逻辑。

4.使用注意事项

4.1 Add方法和Done方法需要成对出现

下面是一个Add方法和Done方法没有成对出现的例子,此时Add方法调多了,此时计数器永远大于0,Wait 方法会一直阻塞等待。

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. func main() {
  7. var wg sync.WaitGroup
  8. wg.Add(2)
  9. go func() {
  10. defer wg.Done()
  11. fmt.Println("Goroutine 1")
  12. }()
  13. go func() {
  14. fmt.Println("Goroutine 2")
  15. }()
  16. wg.Wait()
  17. fmt.Println("All goroutines finished")
  18. }

在上述代码中,我们调用了

  1. wg.Add(2)
,但只调用了一次
  1. wg.Done()
。这会导致
  1. counter
的值大于0,因此调用
  1. wg.Wait()
会被永久阻塞,不会继续向下继续执行。

还有另外一种情况时Done方法调用多了,此时任务计数器

  1. counter
的值为负数,从
  1. WaitGroup
设计的语意来看,就是需要等待完成的任务数为负数,这个不符合预期,此时将会直接
  1. panic
  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. func main() {
  7. var wg sync.WaitGroup
  8. wg.Add(1)
  9. go func() {
  10. fmt.Println("Goroutine 1 started")
  11. wg.Done() // 第一次调用Done方法
  12. wg.Done() // 第二次调用Done方法
  13. fmt.Println("Goroutine 1 completed")
  14. }()
  15. wg.Wait()
  16. fmt.Println("All goroutines completed")
  17. }

在上面的例子中,我们启动了一个goroutine,第一次调用

  1. Add
方法,counter的值变为1,在第14行调用
  1. Done
,此时计数器的值变为0,此时等待中的goroutine将会被唤醒。在第15行又调用了一次
  1. Done
方法,当counter减小为0时,再次调用
  1. Done
方法会导致panic,因为此时
  1. waitGroup
的计数器已经为0,再次减少将导致负数计数,这是不被允许的。

所以在调用Done方法时,需要保证每次调用都与Add方法的调用一一对应,否则会导致程序出现错误。

4.2 在所有任务都已经添加之后,才调用Wait方法进行等待

  1. WaitGroup
的设计初衷就是为了等待一组操作完成后再执行下一步操作。所以,如果在所有任务添加之前,便调用
  1. Wait
方法进行等待,此时有可能会导致等待协程提前被唤醒,执行下一步操作,而尚未添加的任务则不会被等待,这违反了WaitGroup的设计初衷,也不符合预期。下面是一个简单的例子:
  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )
  7. func main() {
  8. var wg sync.WaitGroup
  9. for i := 1; i <= 3; i++ {
  10. go func(id int) {
  11. wg.Add(1)
  12. defer wg.Done()
  13. fmt.Printf("Goroutine %d started
  14. ", id)
  15. time.Sleep(time.Duration(id) * time.Second)
  16. fmt.Printf("Goroutine %d finished
  17. ", id)
  18. }(i)
  19. }
  20. // 不等待所有任务添加,就开始等待
  21. wg.Wait()
  22. fmt.Println("All goroutines finished")
  23. time.Sleep(10 * time.Second)
  24. }

代码执行结果如下,等待协程被提前唤醒,执行之后的操作,而子任务在等待协程唤醒后才开始执行:

All goroutines finished
Goroutine 1 started
Goroutine 3 started
Goroutine 2 started
Goroutine 1 finished
Goroutine 2 finished
Goroutine 3 finished

在这个例子中,我们创建了三个协程并打印出它们开始和结束的消息。但是,我们没有在任务开始前调用

  1. Add
方法添加任务,而是在任务开始之后再调用
  1. Add
方法添加任务。

这可能会导致某些任务未被加入到

  1. WaitGroup
中,等待协程就调用了
  1. wg.Wait
方法,这样就会导致一些任务未被加入
  1. WaitGrou
,从而导致等待协程不会等待这些任务执行完成。如果这种情况发生了,我们会看到"All goroutines finished"被输出,但实际上有一些协程还没有完成。

因此,我们应该在所有任务添加完毕之后再调用

  1. Wait
方法,以保证等待的正确性。

5. WaitGroup常见使用场景

在函数或方法中使用,如果一个大任务可以拆分为多个独立的子任务,此时会将其进行拆分,并使用多个协程来并发执行这些任务,提高执行效率,同时使用

  1. WaitGroup
等待所有子任务执行完成,完成协程间的同步。

下面来看go-redis

  1. ClusterClient
结构体中
  1. ForEachMaster
方法中对于
  1. WaitGroup
的使用。
  1. ForEachMaster
方法通常用于在 Redis 集群中执行针对所有主节点的某种操作,例如在集群中添加或删除键,或者执行一些全局的诊断操作,具体执行的操作由传入参数
  1. fn
指定。

这里

  1. ForEachMaster
方法会对所有主节点执行某种操作,这里的实现是对所有主节点执行某种操作这个大任务,拆分为多个独立的子任务,每个子任务完成对一个Master节点执行指定操作,然后每个子任务启动一个协程去执行,主协程使用
  1. WaitGroup
等待所有协程完成指定子任务,
  1. ForEachMaster
也就完成了对所有主节点执行某种操作的任务。具体实现如下:
  1. func (c *ClusterClient) ForEachMaster(
  2. ctx context.Context,
  3. fn func(ctx context.Context, client *Client) error,
  4. ) error {
  5. // 重新加载集群状态,以确保状态信息是最新的
  6. state, err := c.state.ReloadOrGet(ctx)
  7. if err != nil {
  8. return err
  9. }
  10. var wg sync.WaitGroup
  11. // 用于协程间通信
  12. errCh := make(chan error, 1)
  13. // 获取到redis集群中所有的master节点
  14. for _, master := range state.Masters {
  15. // 启动一个协程来执行该任务
  16. wg.Add(1)
  17. go func(node *clusterNode) {
  18. // 任务完成时,调用Done告知WaitGroup任务已完成
  19. defer wg.Done()
  20. err := fn(ctx, node.Client)
  21. if err != nil {
  22. select {
  23. case errCh &lt;- err:
  24. default:
  25. }
  26. }
  27. }(master)
  28. }
  29. // 主协程等待所有任务完成
  30. wg.Wait()
  31. return nil
  32. }

以上就是go同步协程的必备工具WaitGroup怎么使用的详细内容,更多关于go同步协程的必备工具WaitGroup怎么使用的资料请关注九品源码其它相关文章!