卓越飞翔博客卓越飞翔博客

卓越飞翔 - 您值得收藏的技术分享站
技术文章34429本站已运行393

使用 errgroup 实现 Go 工作池,goroutines 卡住

使用 errgroup 实现 go 工作池,goroutines 卡住

php小编苹果今天为大家介绍一个使用 errgroup 实现 Go 工作池的方法,解决了 goroutines 卡住的问题。在并发编程中,使用 goroutines 可以实现高效的并发处理,但当遇到某个 goroutine 发生错误或卡住时,会影响整个程序的执行。通过使用 errgroup 包,我们可以优雅地管理 goroutines 的执行,并在其中发生错误时进行处理,保证程序的稳定性和可靠性。让我们来看看具体的实现方式。

问题内容


我已经使用 errgroup 实现了工作池模式,以便可以捕获任何 goroutine 中的错误。这是我的详细信息:

jobs := make(chan usersinfo, totalusers)
    results := make(chan usersinfo, totalusers)

    g, gctx := errgroup.withcontext(ctx)

    for i := 1; i <= 4; i++ {
        g.go(func() error {
            err := workeruser(gctx, jobs, results)
            if err != nil {
                return err
            }
            return nil
        })
    }

    for _, user := range usersresp {
        jobs <- user
    }
    close(jobs)

    var usersarray []usersinfo
    for  i := 1; i <= totalusers; i++ {
        r := <-results
        usersarray = append(usersarray, r)
    }

    if err := g.wait(); err != nil {
        return nil, err
    }

然后worker函数的实现是这样的:

func workerUser(ctx context.Context, jobs <-chan UsersInfo, results chan<- UsersInfo) error {
  for {
    select {
    case <-ctx.Done():
        return ctx.Err()
    case user, _ := <-jobs:
        userInfo, err := CallUserAPI(ctx, user)
        if err != nil {
            return err
        }
        results <- userInfo
    }
 }
}

calluserapi 返回 403 forbidden 错误,该错误应调用 g.wait() 并应在出现非 nil 错误时立即停止所有 goroutine。但这里的情况并非如此,g.wait() 永远不会被调用。


解决方法


有几个问题:

  • 循环

    for  i := 1; i <= totalusers; i++ {
          r := <-results
          usersarray = append(usersarray, r)
      }

    等待工作人员为每个用户发送结果。当 calluserapi 返回错误时,不会发生这种情况。

  • worker不处理jobs关闭的情况。

下面的代码可以解决这两个问题:

声明一个类型,指定要处理的用户以及将结果放在何处:

type job struct {
    user usersinfo
    result *usersinfo
}

修改工作线程以使用这种新类型。另外,修改worker,使其在jobs关闭时退出。

func workeruser(ctx context.context, jobs <-chan job) error {
    for {
        select {
        case <-ctx.done():
            return ctx.err()
        case job, ok := <-jobs:
            if !ok {
                // no more jobs, exit.
                return nil
            }
            var err error
            *job.result, err = calluserapi(ctx, job.user)
            if err != nil {
                return err
            }
        }
    }
}

在主 goroutine 中将它们粘合在一起:

jobs := make(chan UsersInfo, totalUsers)
usersArray := make([]UsersInfo, totalUsers)
g, gCtx := errgroup.WithContext(ctx)

// Start the workers.
for i := 1; i <= 4; i++ {
    g.Go(func() error {
        return workerUser(gCtx, jobs)
    })
}

// Feed the workers.  
for i, user := range usersResp {
    jobs <- job{user: user, result: &usersArray[i]}
}

// Close the channel to indicate that no more jobs will be sent.
// The workers exit via the `if !ok { return nil }` statement.
close(jobs)

// Wait for the workers to complete.
if err := g.Wait(); err != nil {
    return nil, err
}

// It is now safe to access the results in usersArray.
卓越飞翔博客
上一篇: 如何使用 Golang 获取容器日志? (错误)
下一篇: 返回列表
留言与评论(共有 0 条评论)
   
验证码:
隐藏边栏