Maison > développement back-end > Golang > le corps du texte

Créer un sous-pub à l'aide de la routine go

王林
Libérer: 2024-02-06 09:35:07
avant
712 Les gens l'ont consulté

使用 go 例程创建 pub sub

Contenu de la question

J'essaie de créer une goroutine pour terminer la tâche

J'ai donc écrit ce code. Les tâches comme a, b, c sans dépendances sont faciles à mettre en œuvre et fonctionnent bien. Je viens de rencontrer quelques problèmes lors de la mise en œuvre des tâches dépendantes d et e, chaque tâche a des dépendances de 2 tâches.

Il ne reste qu'un seul point de connexion, qui crée un canal pour chaque tâche, puis délivre le message, qui sera lu par la tâche dépendante afin de réduire le nombre de dépendances une fois la tâche dépendante terminée. Voir les commentaires checkpoint 1 dans le code.

Quelqu'un peut-il m'aider à résoudre ce problème ? Je ne sais pas comment implémenter goroutine dans ce cas.

Code :

package main

import (
    "fmt"
    "sync"
)

type task struct {
    isdone           bool
    dependencies     []*task
    subscribers      []*task
    donechan         chan bool
    numdependencies  int
    taskname         string
    informsubchannel chan bool //
}

func (t *task) executetask() {
    fmt.printf("task %s is getting executed...\n", t.taskname)
    // <-time.after(5 * time.second)
    fmt.printf("task %s is done!! <-------\n", t.taskname)
}

func (t *task) updatedependency() {
    var updateddependencies []*task
    for _, t := range t.dependencies {
        if !t.isdone {
            updateddependencies = append(updateddependencies, t)
        }
    }
    t.numdependencies = len(updateddependencies)
    fmt.printf("updating dependency for task: %s to %d\n", t.taskname, t.numdependencies)
    t.dependencies = updateddependencies
}

// if we are having dependencies for a task subscribe to those dependent task.
// when the dependent task is done inform it and reduce the no of dependencies.
// a --> d (d depends on a), a has finished its task so inform it subscribed task which is d here and reduce d dependencies.
func (t *task) informsubscriber() {
    if len(t.subscribers) > 0 {
        for _, sub := range t.subscribers {
            fmt.printf("task %s has informed subscriber %s\n", t.taskname, sub.taskname)
            sub.updatedependency()
        }
    }
}

// task is subscribed to dependent task. d has been subscribed to a, d will watch over the activity of a
func (t *task) setsubscriber(sub *task) {
    fmt.printf("set subscriber %s to task %s\n", sub.taskname, t.taskname)
    t.subscribers = append(t.subscribers, sub)
}

// go routine - background task execution
// mark it as completed
func (t *task) markcompleted() {
    for {
        select {
        case <-t.donechan:
            {
                t.isdone = true
                t.executetask()
                // inform all the subscribers that the task is completed and adjust their dependencies
                t.informsubscriber()
                close(t.donechan)
                return
            }
        default:
        }
    }
}

func (t *task) setdependency(tasks []*task) {
    t.dependencies = tasks
    t.numdependencies = len(t.dependencies)
}

// this will be use if dependent task are already done. will be used in checkpoint 1.
func (t *task) trackdependency() {
    t.numdependencies -= 1
    fmt.printf("no of dependencies for task %s is: %d\n", t.taskname, t.numdependencies)
    if t.numdependencies == 0 { // execute task
        t.donechan <- true
    }
}

func (t *task) start() {
    fmt.printf("running task %s\n", t.taskname)
    t.updatedependency()
    go t.markcompleted()

    if t.numdependencies > 0 {

        // for every dependent task
        for _, dep := range t.dependencies {
            // create subscribers
            dep.setsubscriber(t)
            // what if all dependencies are already executed. subscriber won't help as they won't be marked completed as already done.
            // say a and c are already done then d won't be able to complete itself since it's still waiting for them
            // if dependencies are already finished mark it as completed too

            // code: handle the dependent case here(unable to implement)
            // background function for tracking dependency
            // checkpoint 1: read dependent task channel value & reduce dependencies if done
            go t.trackdependency()
        }
        fmt.printf("task %s has %d dependencies and waiting for them to get finished\n", t.taskname, t.numdependencies)
    } else {
        // if no dependencies. mark it as finished
        t.donechan <- true
    }

}

func createtask(taskname string) *task {
    return &task{
        isdone:          false,
        taskname:        taskname,
        dependencies:    nil,
        subscribers:     nil,
        numdependencies: 0,
        donechan:        make(chan bool),
    }
}

func main() {

    taska := createtask("a")
    taskb := createtask("b")
    taskc := createtask("c")
    taskd := createtask("d")
    taske := createtask("e")

    taskd.setdependency([]*task{taska, taskb})
    taske.setdependency([]*task{taskc, taskd})

    alltasks := []*task{taska, taskb, taskc, taskd, taske}
    var wg sync.waitgroup
    for _, t := range alltasks {
        wg.add(1)
        go func(t *task) {
            defer wg.done()
            t.start()
        }(t)

    }
    wg.wait()

}
Copier après la connexion

Exemple de sortie :

(base) ninjakx@Kritis-MacBook-Pro Practice % go run task.go
Running Task D
Running Task B
Running Task C
Updating dependency for task: B to 0
Running Task E
Task B is getting executed...
Updating dependency for task: C to 0
Running Task A
Task C is getting executed...
Task C is done!! <-------
Updating dependency for task: D to 2
Set subscriber D to task A
Set subscriber D to task B
Task D has 2 dependencies and waiting for them to get finished
Task B is done!! <-------
No of dependencies for task D is: 2
Updating dependency for task: E to 2
Set subscriber E to task C
Set subscriber E to task D
Task E has 2 dependencies and waiting for them to get finished
No of dependencies for task E is: 2
No of dependencies for task D is: 2
No of dependencies for task E is: 2
Updating dependency for task: A to 0
task B has informed subscriber D
Updating dependency for task: D to 0
Task A is getting executed...
Task A is done!! <-------
Copier après la connexion

En raison de l'implémentation manquante ci-dessus, actuellement 5 courses de données trouvées . 发现 5 个数据竞争


正确答案


我认为您可以使用较小的任务结构和 waitgroup

Bonne réponse

Je pense que vous pouvez réaliser le scénario ci-dessus en utilisant une structure de tâches plus petite et l'aide de waitgroup pour la synchronisation.

Voici un exemple de moi qui rassemble quelques notes pour expliquer.

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// tasks holds an id ( for ease of debugging )
// a buffered channel that is only used for signaling when the task is executed
// and finally a list of dependency tasks
type task struct {
    id           string
    done         chan struct{}
    dependencies []*task
}

// run is where all the logic happens
//
// we create a waitgroup that will be the size of the dependencies for the current task
// and we will wait until all tasks have signaled that they have executed.
//
// when all the dependencies have signaled through their channel that they are done
// then the current task is free to execute and then signal any potential waiting task.
func (t *task) run(done func()) {
    wg := sync.waitgroup{}
    wg.add(len(t.dependencies))

    for _, task := range t.dependencies {
        go func(dep *task) {
            fmt.printf("%s is waiting for task %s to finish\n", t.id, dep.id)
            <-dep.done
            wg.done()
        }(task)
    }

    wg.wait()

    // emulate work
    time.sleep(time.duration(rand.intn(5-1)+1) * time.second)

    fmt.printf("job %s ran\n", t.id)
    t.done <- struct{}{}
    done()
}

func newtask(id string) *task {
    return &task{
        id: id,
        // we need buffered size here, else the task will be blocked until someone will read the channel on `run`
        done: make(chan struct{}, 1),
    }
}

func (t *task) setdeps(deps ...*task) {
    t.dependencies = append(t.dependencies, deps...)
}

// executetasks simply runs all the tasks concurrently and waits until every tasks is completed
func executetasks(tasks ...*task) {
    fmt.println("starting execution")

    wg := sync.waitgroup{}
    wg.add(len(tasks))

    for _, task := range tasks {
        go task.run(wg.done)
    }

    wg.wait()

    fmt.println("end of execution")
}

func main() {
    // initialise the tasks
    a := newtask("a")
    b := newtask("b")
    c := newtask("c")
    d := newtask("d")
    e := newtask("e")
    // and set dependencies
    // a.setdeps(d)
    d.setdeps(a, b)
    e.setdeps(d, c)

    // then we "try" to execute all the tasks.
    executetasks(a, b, c, d, e)
}
Copier après la connexion

Bien sûr, ce n'est pas une solution parfaite, je constate qu'il existe déjà de nombreuses situations qui ne sont pas traitées
  • Par exemplea => dd => a
  • Les dépendances circulaires finiront par conduire à une impasse

Ou si plusieurs tâches dépendent d'une autre tâche, la raison est que vous ne pouvez lire qu'une seule fois la même valeur sur un canal. hacky

🎜Pour résoudre le premier problème, vous devrez peut-être créer le graphe de dépendances et vérifier s'il est cyclique. Pour le deuxième, la 🎜 voie pourrait être 🎜
go func(dep *Task) {
        fmt.Printf("%s is waiting for task %s to finish\n", t.id, dep.id)
        <-dep.done
        // put the value back if anyone else is also dependent
        dep.done <- struct{}{}
        wg.Done()
}(task)
Copier après la connexion

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

source:stackoverflow.com
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal