我正在尝试创建 goroutine 来完成任务
所以我写了这段代码。 像 a、b、c 这样没有依赖关系的任务很容易实现并且运行良好。 只是在实现依赖任务 d 和 e 时遇到一些问题,每个任务都有 2 个任务的依赖关系。
只剩下一个连接点,它为每个任务创建一个通道,然后传递消息,该消息将由依赖任务读取,以减少依赖任务完成后的依赖数量。请参阅代码中的 checkpoint 1
注释。
有人可以帮我解决这个问题吗?我只是停留在如何在这种情况下实现 goroutine 的部分。
代码:
package main
import (
"fmt"
"sync"
)
type task struct {
isdone bool
dependencies []*task
subscribers []*task
donechan chan bool
numdependencies int
taskname string
informsubchannel chan bool //
}
func (t *task) executetask() {
fmt.printf("task %s is getting executed...n", t.taskname)
// <-time.after(5 * time.second)
fmt.printf("task %s is done!! <-------n", t.taskname)
}
func (t *task) updatedependency() {
var updateddependencies []*task
for _, t := range t.dependencies {
if !t.isdone {
updateddependencies = append(updateddependencies, t)
}
}
t.numdependencies = len(updateddependencies)
fmt.printf("updating dependency for task: %s to %dn", t.taskname, t.numdependencies)
t.dependencies = updateddependencies
}
// if we are having dependencies for a task subscribe to those dependent task.
// when the dependent task is done inform it and reduce the no of dependencies.
// a --> d (d depends on a), a has finished its task so inform it subscribed task which is d here and reduce d dependencies.
func (t *task) informsubscriber() {
if len(t.subscribers) > 0 {
for _, sub := range t.subscribers {
fmt.printf("task %s has informed subscriber %sn", t.taskname, sub.taskname)
sub.updatedependency()
}
}
}
// task is subscribed to dependent task. d has been subscribed to a, d will watch over the activity of a
func (t *task) setsubscriber(sub *task) {
fmt.printf("set subscriber %s to task %sn", sub.taskname, t.taskname)
t.subscribers = append(t.subscribers, sub)
}
// go routine - background task execution
// mark it as completed
func (t *task) markcompleted() {
for {
select {
case <-t.donechan:
{
t.isdone = true
t.executetask()
// inform all the subscribers that the task is completed and adjust their dependencies
t.informsubscriber()
close(t.donechan)
return
}
default:
}
}
}
func (t *task) setdependency(tasks []*task) {
t.dependencies = tasks
t.numdependencies = len(t.dependencies)
}
// this will be use if dependent task are already done. will be used in checkpoint 1.
func (t *task) trackdependency() {
t.numdependencies -= 1
fmt.printf("no of dependencies for task %s is: %dn", t.taskname, t.numdependencies)
if t.numdependencies == 0 { // execute task
t.donechan <- true
}
}
func (t *task) start() {
fmt.printf("running task %sn", t.taskname)
t.updatedependency()
go t.markcompleted()
if t.numdependencies > 0 {
// for every dependent task
for _, dep := range t.dependencies {
// create subscribers
dep.setsubscriber(t)
// what if all dependencies are already executed. subscriber won't help as they won't be marked completed as already done.
// say a and c are already done then d won't be able to complete itself since it's still waiting for them
// if dependencies are already finished mark it as completed too
// code: handle the dependent case here(unable to implement)
// background function for tracking dependency
// checkpoint 1: read dependent task channel value & reduce dependencies if done
go t.trackdependency()
}
fmt.printf("task %s has %d dependencies and waiting for them to get finishedn", t.taskname, t.numdependencies)
} else {
// if no dependencies. mark it as finished
t.donechan <- true
}
}
func createtask(taskname string) *task {
return &task{
isdone: false,
taskname: taskname,
dependencies: nil,
subscribers: nil,
numdependencies: 0,
donechan: make(chan bool),
}
}
func main() {
taska := createtask("a")
taskb := createtask("b")
taskc := createtask("c")
taskd := createtask("d")
taske := createtask("e")
taskd.setdependency([]*task{taska, taskb})
taske.setdependency([]*task{taskc, taskd})
alltasks := []*task{taska, taskb, taskc, taskd, taske}
var wg sync.waitgroup
for _, t := range alltasks {
wg.add(1)
go func(t *task) {
defer wg.done()
t.start()
}(t)
}
wg.wait()
}
示例输出:
(base) ninjakx@Kritis-MacBook-Pro Practice % go run task.go
Running Task D
Running Task B
Running Task C
Updating dependency for task: B to 0
Running Task E
Task B is getting executed...
Updating dependency for task: C to 0
Running Task A
Task C is getting executed...
Task C is done!! <-------
Updating dependency for task: D to 2
Set subscriber D to task A
Set subscriber D to task B
Task D has 2 dependencies and waiting for them to get finished
Task B is done!! <-------
No of dependencies for task D is: 2
Updating dependency for task: E to 2
Set subscriber E to task C
Set subscriber E to task D
Task E has 2 dependencies and waiting for them to get finished
No of dependencies for task E is: 2
No of dependencies for task D is: 2
No of dependencies for task E is: 2
Updating dependency for task: A to 0
task B has informed subscriber D
Updating dependency for task: D to 0
Task A is getting executed...
Task A is done!! <-------
由于上述缺失实现,目前 发现 5 个数据竞争
。
正确答案
我认为您可以使用较小的任务结构和 waitgroup
的一些帮助来实现上述场景进行同步。
这是我将一些注释放在一起进行解释的示例。
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// tasks holds an id ( for ease of debugging )
// a buffered channel that is only used for signaling when the task is executed
// and finally a list of dependency tasks
type task struct {
id string
done chan struct{}
dependencies []*task
}
// run is where all the logic happens
//
// we create a waitgroup that will be the size of the dependencies for the current task
// and we will wait until all tasks have signaled that they have executed.
//
// when all the dependencies have signaled through their channel that they are done
// then the current task is free to execute and then signal any potential waiting task.
func (t *task) run(done func()) {
wg := sync.waitgroup{}
wg.add(len(t.dependencies))
for _, task := range t.dependencies {
go func(dep *task) {
fmt.printf("%s is waiting for task %s to finishn", t.id, dep.id)
<-dep.done
wg.done()
}(task)
}
wg.wait()
// emulate work
time.sleep(time.duration(rand.intn(5-1)+1) * time.second)
fmt.printf("job %s rann", t.id)
t.done <- struct{}{}
done()
}
func newtask(id string) *task {
return &task{
id: id,
// we need buffered size here, else the task will be blocked until someone will read the channel on `run`
done: make(chan struct{}, 1),
}
}
func (t *task) setdeps(deps ...*task) {
t.dependencies = append(t.dependencies, deps...)
}
// executetasks simply runs all the tasks concurrently and waits until every tasks is completed
func executetasks(tasks ...*task) {
fmt.println("starting execution")
wg := sync.waitgroup{}
wg.add(len(tasks))
for _, task := range tasks {
go task.run(wg.done)
}
wg.wait()
fmt.println("end of execution")
}
func main() {
// initialise the tasks
a := newtask("a")
b := newtask("b")
c := newtask("c")
d := newtask("d")
e := newtask("e")
// and set dependencies
// a.setdeps(d)
d.setdeps(a, b)
e.setdeps(d, c)
// then we "try" to execute all the tasks.
executetasks(a, b, c, d, e)
}
当然这不是完美的解决方案,我可以认为已经有很多情况没有得到处理
例如
- 循环依赖最终会陷入死锁
a => d
和d => a
- 或者如果多个任务依赖于另一个任务,原因是您只能从一个通道读取相同的值一次。
为了解决第一个问题,您可能需要构建依赖图并检查它是否是循环的。对于第二个, hacky
方式可能是
go func(dep *Task) {
fmt.Printf("%s is waiting for task %s to finishn", t.id, dep.id)
<-dep.done
// put the value back if anyone else is also dependent
dep.done <- struct{}{}
wg.Done()
}(task)