卓越飞翔博客卓越飞翔博客

卓越飞翔 - 您值得收藏的技术分享站
技术文章34191本站已运行391

使用 go 例程创建 pub sub

使用 go 例程创建 pub sub

问题内容

我正在尝试创建 goroutine 来完成任务

所以我写了这段代码。 像 a、b、c 这样没有依赖关系的任务很容易实现并且运行良好。 只是在实现依赖任务 d 和 e 时遇到一些问题,每个任务都有 2 个任务的依赖关系。

只剩下一个连接点,它为每个任务创建一个通道,然后传递消息,该消息将由依赖任务读取,以减少依赖任务完成后的依赖数量。请参阅代码中的 checkpoint 1 注释。

有人可以帮我解决这个问题吗?我只是停留在如何在这种情况下实现 goroutine 的部分。

代码:

package main

import (
    "fmt"
    "sync"
)

type task struct {
    isdone           bool
    dependencies     []*task
    subscribers      []*task
    donechan         chan bool
    numdependencies  int
    taskname         string
    informsubchannel chan bool //
}

func (t *task) executetask() {
    fmt.printf("task %s is getting executed...n", t.taskname)
    // <-time.after(5 * time.second)
    fmt.printf("task %s is done!! <-------n", t.taskname)
}

func (t *task) updatedependency() {
    var updateddependencies []*task
    for _, t := range t.dependencies {
        if !t.isdone {
            updateddependencies = append(updateddependencies, t)
        }
    }
    t.numdependencies = len(updateddependencies)
    fmt.printf("updating dependency for task: %s to %dn", t.taskname, t.numdependencies)
    t.dependencies = updateddependencies
}

// if we are having dependencies for a task subscribe to those dependent task.
// when the dependent task is done inform it and reduce the no of dependencies.
// a --> d (d depends on a), a has finished its task so inform it subscribed task which is d here and reduce d dependencies.
func (t *task) informsubscriber() {
    if len(t.subscribers) > 0 {
        for _, sub := range t.subscribers {
            fmt.printf("task %s has informed subscriber %sn", t.taskname, sub.taskname)
            sub.updatedependency()
        }
    }
}

// task is subscribed to dependent task. d has been subscribed to a, d will watch over the activity of a
func (t *task) setsubscriber(sub *task) {
    fmt.printf("set subscriber %s to task %sn", sub.taskname, t.taskname)
    t.subscribers = append(t.subscribers, sub)
}

// go routine - background task execution
// mark it as completed
func (t *task) markcompleted() {
    for {
        select {
        case <-t.donechan:
            {
                t.isdone = true
                t.executetask()
                // inform all the subscribers that the task is completed and adjust their dependencies
                t.informsubscriber()
                close(t.donechan)
                return
            }
        default:
        }
    }
}

func (t *task) setdependency(tasks []*task) {
    t.dependencies = tasks
    t.numdependencies = len(t.dependencies)
}

// this will be use if dependent task are already done. will be used in checkpoint 1.
func (t *task) trackdependency() {
    t.numdependencies -= 1
    fmt.printf("no of dependencies for task %s is: %dn", t.taskname, t.numdependencies)
    if t.numdependencies == 0 { // execute task
        t.donechan <- true
    }
}

func (t *task) start() {
    fmt.printf("running task %sn", t.taskname)
    t.updatedependency()
    go t.markcompleted()

    if t.numdependencies > 0 {

        // for every dependent task
        for _, dep := range t.dependencies {
            // create subscribers
            dep.setsubscriber(t)
            // what if all dependencies are already executed. subscriber won't help as they won't be marked completed as already done.
            // say a and c are already done then d won't be able to complete itself since it's still waiting for them
            // if dependencies are already finished mark it as completed too

            // code: handle the dependent case here(unable to implement)
            // background function for tracking dependency
            // checkpoint 1: read dependent task channel value & reduce dependencies if done
            go t.trackdependency()
        }
        fmt.printf("task %s has %d dependencies and waiting for them to get finishedn", t.taskname, t.numdependencies)
    } else {
        // if no dependencies. mark it as finished
        t.donechan <- true
    }

}

func createtask(taskname string) *task {
    return &task{
        isdone:          false,
        taskname:        taskname,
        dependencies:    nil,
        subscribers:     nil,
        numdependencies: 0,
        donechan:        make(chan bool),
    }
}

func main() {

    taska := createtask("a")
    taskb := createtask("b")
    taskc := createtask("c")
    taskd := createtask("d")
    taske := createtask("e")

    taskd.setdependency([]*task{taska, taskb})
    taske.setdependency([]*task{taskc, taskd})

    alltasks := []*task{taska, taskb, taskc, taskd, taske}
    var wg sync.waitgroup
    for _, t := range alltasks {
        wg.add(1)
        go func(t *task) {
            defer wg.done()
            t.start()
        }(t)

    }
    wg.wait()

}

示例输出:

(base) ninjakx@Kritis-MacBook-Pro Practice % go run task.go
Running Task D
Running Task B
Running Task C
Updating dependency for task: B to 0
Running Task E
Task B is getting executed...
Updating dependency for task: C to 0
Running Task A
Task C is getting executed...
Task C is done!! <-------
Updating dependency for task: D to 2
Set subscriber D to task A
Set subscriber D to task B
Task D has 2 dependencies and waiting for them to get finished
Task B is done!! <-------
No of dependencies for task D is: 2
Updating dependency for task: E to 2
Set subscriber E to task C
Set subscriber E to task D
Task E has 2 dependencies and waiting for them to get finished
No of dependencies for task E is: 2
No of dependencies for task D is: 2
No of dependencies for task E is: 2
Updating dependency for task: A to 0
task B has informed subscriber D
Updating dependency for task: D to 0
Task A is getting executed...
Task A is done!! <-------

由于上述缺失实现,目前 发现 5 个数据竞争


正确答案


我认为您可以使用较小的任务结构和 waitgroup 的一些帮助来实现上述场景进行同步。

这是我将一些注释放在一起进行解释的示例。

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// tasks holds an id ( for ease of debugging )
// a buffered channel that is only used for signaling when the task is executed
// and finally a list of dependency tasks
type task struct {
    id           string
    done         chan struct{}
    dependencies []*task
}

// run is where all the logic happens
//
// we create a waitgroup that will be the size of the dependencies for the current task
// and we will wait until all tasks have signaled that they have executed.
//
// when all the dependencies have signaled through their channel that they are done
// then the current task is free to execute and then signal any potential waiting task.
func (t *task) run(done func()) {
    wg := sync.waitgroup{}
    wg.add(len(t.dependencies))

    for _, task := range t.dependencies {
        go func(dep *task) {
            fmt.printf("%s is waiting for task %s to finishn", t.id, dep.id)
            <-dep.done
            wg.done()
        }(task)
    }

    wg.wait()

    // emulate work
    time.sleep(time.duration(rand.intn(5-1)+1) * time.second)

    fmt.printf("job %s rann", t.id)
    t.done <- struct{}{}
    done()
}

func newtask(id string) *task {
    return &task{
        id: id,
        // we need buffered size here, else the task will be blocked until someone will read the channel on `run`
        done: make(chan struct{}, 1),
    }
}

func (t *task) setdeps(deps ...*task) {
    t.dependencies = append(t.dependencies, deps...)
}

// executetasks simply runs all the tasks concurrently and waits until every tasks is completed
func executetasks(tasks ...*task) {
    fmt.println("starting execution")

    wg := sync.waitgroup{}
    wg.add(len(tasks))

    for _, task := range tasks {
        go task.run(wg.done)
    }

    wg.wait()

    fmt.println("end of execution")
}

func main() {
    // initialise the tasks
    a := newtask("a")
    b := newtask("b")
    c := newtask("c")
    d := newtask("d")
    e := newtask("e")
    // and set dependencies
    // a.setdeps(d)
    d.setdeps(a, b)
    e.setdeps(d, c)

    // then we "try" to execute all the tasks.
    executetasks(a, b, c, d, e)
}

当然这不是完美的解决方案,我可以认为已经有很多情况没有得到处理

例如

  • 循环依赖最终会陷入死锁 a => dd => a
  • 或者如果多个任务依赖于另一个任务,原因是您只能从一个通道读取相同的值一次。

为了解决第一个问题,您可能需要构建依赖图并检查它是否是循环的。对于第二个, hacky 方式可能是

go func(dep *Task) {
        fmt.Printf("%s is waiting for task %s to finishn", t.id, dep.id)
        <-dep.done
        // put the value back if anyone else is also dependent
        dep.done <- struct{}{}
        wg.Done()
}(task)
卓越飞翔博客
上一篇: 如何在 go 中为 lambda 中间件创建泛型类型
下一篇: 返回列表
留言与评论(共有 0 条评论)
   
验证码:
隐藏边栏