卓越飞翔博客卓越飞翔博客

卓越飞翔 - 您值得收藏的技术分享站
技术文章33949本站已运行390

如何解决此问题:恐慌:同步:负数 WaitGroup 计数器

如何解决此问题:恐慌:同步:负数 waitgroup 计数器

问题内容

一次又一次运行后,我有时会遇到这个问题。我知道这与计数器相关。当调用sync.waitgroup的done()方法的次数多于调用add()方法的次数时,它将抛出此错误。

如何解决这个问题?

我的代码创建了大小为 4 的批次,并对每个批次进行一些处理,但我在解决此恐慌时遇到了问题。

package main

import (
    "fmt"
    "sync"
)

func main() {
    // create input channel
    input := make(chan int)

    // create wait group
    var wg sync.waitgroup

    // start batcher goroutine
    wg.add(1)
    go batcher(input, &wg)

    // send input values to the batcher
    for i := 1; i <= 10; i++ {
        input <- i
    }

    // close input channel
    close(input)

    // wait for batcher goroutine to finish
    wg.wait()
}

func batcher(input chan int, wg *sync.waitgroup) {
    // create batch channel with buffer of size 4
    batch := make(chan int, 4)

    // create channel to synchronize worker goroutines
    done := make(chan bool)

    // create wait group for worker goroutines
    var workerwg sync.waitgroup

    // start worker goroutines
    for i := 0; i < 4; i++ {
        workerwg.add(1)
        go worker(batch, &workerwg, done)
    }

    // read input values and send to batch
    for value := range input {
        batch <- value
        if len(batch) == 4 {
            // wait for worker goroutines to finish processing batch
            workerwg.wait()

            // send batch to worker goroutines
            for i := 0; i < 4; i++ {
                workerwg.add(1)
                go sendbatch(batch, &workerwg, done)
            }
        }
    }

    // wait for worker goroutines to finish processing remaining batch
    workerwg.wait()

    // close done channel to notify that all batches have been processed
    close(done)

    wg.done()
}

func sendbatch(batch chan int, workerwg *sync.waitgroup, done chan bool) {
    // process batch
    for value := range batch {
        fmt.println("processing value:", value)
    }

    // notify worker goroutines that batch has been processed
    workerwg.done()

    select {
    case done <- true:
    default:
        // done channel has been closed
    }
}

func worker(batch chan int, workerwg *sync.waitgroup, done chan bool) {
    // process batches received from batch channel
    for batch := range batch {
        // process batch
        fmt.println("processing batch:", batch)
        workerwg.done()
    }

    // notify batcher goroutine that worker goroutine has finished
    select {
    case done <- true:
    default:
        // done channel has been closed
    }
}

编写批处理程序的基本代码:

package main

import (
    "fmt"
    "sync"
)

func main() {
    input := make(chan int)
    output := make(chan []int)

    var wg sync.waitgroup
    wg.add(2)

    // start the batcher goroutine
    go func() {
        batch := []int{}
        for value := range input {
            batch = append(batch, value)
            if len(batch) == 4 {
                output <- batch
                batch = []int{}
            }
        }
        if len(batch) > 0 {
            output <- batch
        }
        close(output)
        wg.done()
    }()

    // start the worker goroutine
    go func() {
        for batch := range output {
            sum := 0
            for _, value := range batch {
                sum += value
            }
            fmt.printf("sum of batch %v: %dn", batch, sum)
        }
        wg.done()
    }()

    // send input values to the batcher
    for _, v := range []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} {
        input <- v
    }
    close(input)

    // wait for both goroutines to finish
    wg.wait()
}
Sum of batch [1 2 3 4]: 10
Sum of batch [5 6 7 8]: 26
Sum of batch [9 10]: 19

早期的设计有点复杂,我会尝试扩展这个基本设计。


正确答案


根据这段代码:

for i := 0; i < 4; i++ {
    workerwg.add(1)
    go worker(batch, &workerwg, done)
}

我认为 workerwg.done() 应该移到循环之外:

func worker(batch chan int, workerWg *sync.WaitGroup, done chan bool) {
+   defer workerWg.Done()
    // process batches received from batch channel
    for batch := range batch {
        // process batch
        fmt.Println("Processing batch:", batch)
-       workerWg.Done()
    }

    // notify batcher goroutine that worker goroutine has finished
    select {
    case done <- true:
    default:
        // done channel has been closed
    }
  }

但是batch在demo中并没有关闭。所以事实上,goroutine 将永远运行,直到程序结束。

不知道是否还有其他问题。设计太复杂了。复杂的代码难以理解并且容易出错。考虑重新设计它。

卓越飞翔博客
上一篇: Go中如何正确处理带有转义的字符串?
下一篇: 返回列表
留言与评论(共有 0 条评论)
   
验证码:
隐藏边栏