Maison > développement back-end > Golang > le corps du texte

Une brève analyse de la façon dont Golang implémente les tâches retardées

PHPz
Libérer: 2023-03-22 18:08:09
avant
1937 Les gens l'ont consulté

Comment implémenter des tâches différées dans Golang ? L'article suivant partagera avec vous un ensemble de solutions de tâches retardées basées sur Golang. J'espère qu'il vous sera utile !

Une brève analyse de la façon dont Golang implémente les tâches retardées

Dans les scénarios commerciaux réels, nous rencontrons parfois des exigences de retard : par exemple, sur une plateforme de commerce électronique, après que l'opérateur ajoute un produit dans l'arrière-plan de gestion, il n'est pas nécessaire de l'afficher immédiatement à la réception. , mais à une date ultérieure. Affiché à un moment donné.

Bien sûr, nous avons de nombreuses idées pour résoudre ce problème. Par exemple, ajoutez des informations sur le produit à publier dans la base de données, puis interrogez la table de données via des tâches planifiées pour interroger les produits publiés à l'heure actuelle. Pour un autre exemple, ajoutez toutes les informations sur le produit à Redis et complétez cette fonction via le fichier ; Attribut SortSet. Le choix final dépend de notre scénario commercial et de notre environnement opérationnel.

Ici, j'aimerais partager avec vous un ensemble de solutions de tâches retardées basées sur le golang.

Vous pouvez récolter l'application flexible des pipelines lGolang

    Application de Golang Timer
  • Élément de découpage Golang dans la mise en œuvre de la carte mentale
  • Afin de donner à chacun une impression générale, je vais énumérer les grandes lignes de. le texte ci-dessous.

Idées de mise en œuvre

Nous savons tous que tout type de file d'attente comporte en réalité deux parties : le producteur et le consommateur. C'est juste que par rapport aux files d'attente ordinaires, les tâches retardées ont une fonctionnalité de retard supplémentaire.

1. Producteur

Une brève analyse de la façon dont Golang implémente les tâches retardéesDu point de vue du producteur, lorsque l'utilisateur pousse une tâche, celle-ci portera la valeur du temps d'exécution retardé. Afin de permettre à cette tâche d'être exécutée à l'heure prévue, nous devons stocker cette tâche en mémoire pendant un certain temps, et le temps est unidimensionnel et croissant. Alors, quelle structure de données utilisons-nous pour le stocker ?

(1) Choisissez-en un : la carte. Étant donné que la carte est désordonnée et ne peut pas être triée en fonction du temps d'exécution, nous ne pouvons pas garantir si les tâches supprimées doivent être exécutées au moment actuel, cette option est donc exclue. (2) Choix 2 : canal. En effet, le canal peut parfois être considéré comme une file d'attente. Cependant, sa sortie et son entrée suivent strictement le principe du « premier entré, premier sorti ». Malheureusement, les tâches avancées peuvent ne pas être exécutées en premier, le canal n'est donc pas adapté. (3) Troisième choix :

tranche

. Le découpage semble réalisable, car les éléments de tranche sont ordonnés, donc si nous pouvons organiser tous les éléments de tranche dans l'ordre du temps d'exécution, alors nous n'avons besoin que de lire l'élément de tête de la tranche (peut-être l'élément de queue) à chaque fois, nous pouvons obtenir le tâche que nous voulons.

2. Consommateur

Du point de vue du consommateur, la plus grande difficulté est de savoir comment faire en sorte que chaque tâche soit consommée à un moment précis. Alors, pour chaque tâche, comment la faire attendre un certain temps avant de l’exécuter ?

Oui, c'est une minuterie.

Pour résumer, la combinaison «

slice + timer » devrait pouvoir atteindre l'objectif.

Pas à pas

1. Flux de données

(1) L'utilisateur appelle

InitDelayQueue() pour initialiser l'objet tâche retardée.

(2) Démarrez la coroutine et écoutez le pipeline d'opération de tâche (signal d'ajout/suppression) et le pipeline de temps d'exécution (signal timer.C).

(3) L'utilisateur envoie le signal d'ajout/suppression. (4) La coroutine en (2) capture le signal en (3) et modifie la liste des tâches.

(5) Lorsque le moment venu pour l'exécution de la tâche arrive (lorsqu'il y a des éléments en sortie du pipeline timer.C), exécutez la tâche.

2. Structure des données

(1) Objet de tâche retardée

// 延时任务对象
type DelayQueue struct {
   tasks                 []*task             // 存储任务列表的切片
   add                   chan *task          // 用户添加任务的管道信号
   remove                chan string         // 用户删除任务的管道信号
   waitRemoveTaskMapping map[string]struct{} // 等待删除的任务id列表
}
Copier après la connexion

Il convient de noter ici qu'il existe un champ

waitRemoveTaskMapping

. Étant donné que la tâche à supprimer peut encore être dans le pipeline d'ajout et n'a pas été mise à jour à temps dans le champ des tâches, il est nécessaire d'enregistrer temporairement l'ID de la tâche que le client souhaite supprimer.

(2) Objet de tâche

// 任务对象
type task struct {
   id       string    // 任务id
   execTime time.Time // 执行时间
   f        func()    // 执行函数
}
Copier après la connexion

Une brève analyse de la façon dont Golang implémente les tâches retardées3. Objet de tâche de retard d'initialisation

// 初始化延时任务对象
func InitDelayQueue() *DelayQueue {
   q := &DelayQueue{
      add:                   make(chan *task, 10000),
      remove:                make(chan string, 100),
      waitRemoveTaskMapping: make(map[string]struct{}),
   }

   return q
}
Copier après la connexion
Dans ce processus, nous devons surveiller le signal d'opération de l'utilisateur pour la tâche et le signal de temps d'exécution de la tâche.
func (q *DelayQueue) start() {
   for {
      // to do something...

      select {
      case now := <-timer.C:
         // 任务执行时间信号
         // to do something...
      case t := <-q.add:
         // 任务推送信号
         // to do something...
      case id := <-q.remove:
         // 任务删除信号
         // to do something...
      }
   }
}
Copier après la connexion

Améliorez notre méthode d'initialisation :

// 初始化延时任务对象
func InitDelayQueue() *DelayQueue {
   q := &DelayQueue{
      add:                   make(chan *task, 10000),
      remove:                make(chan string, 100),
      waitRemoveTaskMapping: make(map[string]struct{}),
   }

   // 开启协程,监听任务相关信号
   go q.start()
   return q
}
Copier après la connexion

4. Le producteur pousse les tâches

Lorsque les producteurs poussent des tâches, il leur suffit d'ajouter les tâches au pipeline d'ajout. Ici, nous générons un identifiant de tâche et le renvoyons à l'utilisateur. .

// 用户推送任务
func (q *DelayQueue) Push(timeInterval time.Duration, f func()) string {
   // 生成一个任务id,方便删除使用
   id := genTaskId()
   t := &task{
      id:       id,
      execTime: time.Now().Add(timeInterval),
      f:        f,
   }

   // 将任务推到add管道中
   q.add <- t
   return id
}
Copier après la connexion

5、任务推送信号的处理

在这里,我们要将用户推送的任务放到延时任务的tasks字段中。由于,我们需要将任务按照执行时间顺序排序,所以,我们需要找到新增任务在切片中的插入位置。又因为,插入之前的任务列表已经是有序的,所以,我们可以采用二分法处理。

// 使用二分法判断新增任务的插入位置
func (q *DelayQueue) getTaskInsertIndex(t *task, leftIndex, rightIndex int) (index int) {
   if len(q.tasks) == 0 {
      return
   }

   length := rightIndex - leftIndex
   if q.tasks[leftIndex].execTime.Sub(t.execTime) >= 0 {
      // 如果当前切片中最小的元素都超过了插入的优先级,则插入位置应该是最左边
      return leftIndex
   }

   if q.tasks[rightIndex].execTime.Sub(t.execTime) <= 0 {
      // 如果当前切片中最大的元素都没超过插入的优先级,则插入位置应该是最右边
      return rightIndex + 1
   }

   if length == 1 && q.tasks[leftIndex].execTime.Before(t.execTime) && q.tasks[rightIndex].execTime.Sub(t.execTime) >= 0 {
      // 如果插入的优先级刚好在仅有的两个优先级之间,则中间的位置就是插入位置
      return leftIndex + 1
   }

   middleVal := q.tasks[leftIndex+length/2].execTime

   // 这里用二分法递归的方式,一直寻找正确的插入位置
   if t.execTime.Sub(middleVal) <= 0 {
      return q.getTaskInsertIndex(t, leftIndex, leftIndex+length/2)
   } else {
      return q.getTaskInsertIndex(t, leftIndex+length/2, rightIndex)
   }
}
Copier après la connexion

找到正确的插入位置后,我们才能将任务准确插入:

// 将任务添加到任务切片列表中
func (q *DelayQueue) addTask(t *task) {
   // 寻找新增任务的插入位置
   insertIndex := q.getTaskInsertIndex(t, 0, len(q.tasks)-1)
   // 找到了插入位置,更新任务列表
   q.tasks = append(q.tasks, &task{})
   copy(q.tasks[insertIndex+1:], q.tasks[insertIndex:])
   q.tasks[insertIndex] = t
}
Copier après la connexion

那么,在监听add管道的时候,我们直接调用上述addTask() 即可。

func (q *DelayQueue) start() {
   for {
      // to do something...

      select {
      case now := <-timer.C:
         // 任务执行时间信号
         // to do something...
      case t := <-q.add:
         // 任务推送信号
         q.addTask(t)
      case id := <-q.remove:
         // 任务删除信号
         // to do something...
      }
   }
}
Copier après la connexion

6、生产者删除任务

// 用户删除任务
func (q *DelayQueue) Delete(id string) {
   q.remove <- id
}
Copier après la connexion

7、任务删除信号的处理

在这里,我们可以遍历任务列表,根据删除任务的id找到其在切片中的对应index。

// 删除指定任务
func (q *DelayQueue) deleteTask(id string) {
   deleteIndex := -1
   for index, t := range q.tasks {
      if t.id == id {
         // 找到了在切片中需要删除的所以呢
         deleteIndex = index
         break
      }
   }

   if deleteIndex == -1 {
      // 如果没有找到删除的任务,说明任务还在add管道中,来不及更新到tasks中,这里我们就将这个删除id临时记录下来
      // 注意,这里暂时不考虑,任务id非法的特殊情况
      q.waitRemoveTaskMapping[id] = struct{}{}
      return
   }

   if len(q.tasks) == 1 {
      // 删除后,任务列表就没有任务了
      q.tasks = []*task{}
      return
   }

   if deleteIndex == len(q.tasks)-1 {
      // 如果删除的是,任务列表的最后一个元素,则执行下列代码
      q.tasks = q.tasks[:len(q.tasks)-1]
      return
   }

   // 如果删除的是,任务列表的其他元素,则需要将deleteIndex之后的元素,全部向前挪动一位
   copy(q.tasks[deleteIndex:len(q.tasks)-1], q.tasks[deleteIndex+1:len(q.tasks)-1])
   q.tasks = q.tasks[:len(q.tasks)-1]
   return
}
Copier après la connexion

然后,我们可以完善start()方法了。

func (q *DelayQueue) start() {
   for {
      // to do something...

      select {
      case now := <-timer.C:
         // 任务执行时间信号
         // to do something...
      case t := <-q.add:
         // 任务推送信号
         q.addTask(t)
      case id := <-q.remove:
         // 任务删除信号
         q.deleteTask(id)
      }
   }
}
Copier après la connexion

8、任务执行信号的处理

start()执行的时候,分成两种情况:任务列表为空,只需要监听add管道即可;任务列表不为空的时候,需要监听所有管道。任务执行信号,主要是依靠timer来实现,属于第二种情况。

func (q *DelayQueue) start() {
   for {
      if len(q.tasks) == 0 {
           // 任务列表为空的时候,只需要监听add管道
           select {
           case t := <-q.add:
              //添加任务
              q.addTask(t)
           }
        
           continue
      }

      // 任务列表不为空的时候,需要监听所有管道

      // 任务的等待时间=任务的执行时间-当前的时间
      currentTask := q.tasks[0]
      timer := time.NewTimer(currentTask.execTime.Sub(time.Now()))

      select {
      case now := <-timer.C:
         // 任务执行信号
         timer.Stop()
        if _, isRemove := q.waitRemoveTaskMapping[currentTask.id]; isRemove {
           // 之前客户已经发出过该任务的删除信号,因此需要结束任务,刷新任务列表
           q.endTask()
           delete(q.waitRemoveTaskMapping, currentTask.id)
           continue
        }
        
        // 开启协程,异步执行任务
        go q.execTask(currentTask, now)
        // 任务结束,刷新任务列表
        q.endTask()
      case t := <-q.add:
         // 任务推送信号
         timer.Stop()
         q.addTask(t)
      case id := <-q.remove:
         // 任务删除信号
         timer.Stop()
         q.deleteTask(id)
      }
   }
}
Copier après la connexion

执行任务:

// 执行任务
func (q *DelayQueue) execTask(task *task, currentTime time.Time) {
   if task.execTime.After(currentTime) {
      // 如果当前任务的执行时间落后于当前时间,则不执行
      return
   }

   // 执行任务
   task.f()
   return
}
Copier après la connexion

结束任务,刷新任务列表:

// 一个任务去执行了,刷新任务列表
func (q *DelayQueue) endTask() {
   if len(q.tasks) == 1 {
      q.tasks = []*task{}
      return
   }

   q.tasks = q.tasks[1:]
}
Copier après la connexion

9、完整代码

delay_queue.go

package delay_queue

import (
   "go.mongodb.org/mongo-driver/bson/primitive"
   "time"
)

// 延时任务对象
type DelayQueue struct {
   tasks                 []*task             // 存储任务列表的切片
   add                   chan *task          // 用户添加任务的管道信号
   remove                chan string         // 用户删除任务的管道信号
   waitRemoveTaskMapping map[string]struct{} // 等待删除的任务id列表
}

// 任务对象
type task struct {
   id       string    // 任务id
   execTime time.Time // 执行时间
   f        func()    // 执行函数
}

// 初始化延时任务对象
func InitDelayQueue() *DelayQueue {
   q := &DelayQueue{
      add:                   make(chan *task, 10000),
      remove:                make(chan string, 100),
      waitRemoveTaskMapping: make(map[string]struct{}),
   }

   // 开启协程,监听任务相关信号
   go q.start()
   return q
}

// 用户删除任务
func (q *DelayQueue) Delete(id string) {
   q.remove <- id
}

// 用户推送任务
func (q *DelayQueue) Push(timeInterval time.Duration, f func()) string {
   // 生成一个任务id,方便删除使用
   id := genTaskId()
   t := &task{
      id:       id,
      execTime: time.Now().Add(timeInterval),
      f:        f,
   }

   // 将任务推到add管道中
   q.add <- t
   return id
}

// 监听各种任务相关信号
func (q *DelayQueue) start() {
   for {
      if len(q.tasks) == 0 {
         // 任务列表为空的时候,只需要监听add管道
         select {
         case t := <-q.add:
            //添加任务
            q.addTask(t)
         }

         continue
      }

      // 任务列表不为空的时候,需要监听所有管道

      // 任务的等待时间=任务的执行时间-当前的时间
      currentTask := q.tasks[0]
      timer := time.NewTimer(currentTask.execTime.Sub(time.Now()))

      select {
      case now := <-timer.C:
         timer.Stop()
         if _, isRemove := q.waitRemoveTaskMapping[currentTask.id]; isRemove {
            // 之前客户已经发出过该任务的删除信号,因此需要结束任务,刷新任务列表
            q.endTask()
            delete(q.waitRemoveTaskMapping, currentTask.id)
            continue
         }

         // 开启协程,异步执行任务
         go q.execTask(currentTask, now)
         // 任务结束,刷新任务列表
         q.endTask()
      case t := <-q.add:
         // 添加任务
         timer.Stop()
         q.addTask(t)
      case id := <-q.remove:
         // 删除任务
         timer.Stop()
         q.deleteTask(id)
      }
   }
}

// 执行任务
func (q *DelayQueue) execTask(task *task, currentTime time.Time) {
   if task.execTime.After(currentTime) {
      // 如果当前任务的执行时间落后于当前时间,则不执行
      return
   }

   // 执行任务
   task.f()
   return
}

// 一个任务去执行了,刷新任务列表
func (q *DelayQueue) endTask() {
   if len(q.tasks) == 1 {
      q.tasks = []*task{}
      return
   }

   q.tasks = q.tasks[1:]
}

// 将任务添加到任务切片列表中
func (q *DelayQueue) addTask(t *task) {
   // 寻找新增任务的插入位置
   insertIndex := q.getTaskInsertIndex(t, 0, len(q.tasks)-1)
   // 找到了插入位置,更新任务列表
   q.tasks = append(q.tasks, &task{})
   copy(q.tasks[insertIndex+1:], q.tasks[insertIndex:])
   q.tasks[insertIndex] = t
}

// 删除指定任务
func (q *DelayQueue) deleteTask(id string) {
   deleteIndex := -1
   for index, t := range q.tasks {
      if t.id == id {
         // 找到了在切片中需要删除的所以呢
         deleteIndex = index
         break
      }
   }

   if deleteIndex == -1 {
      // 如果没有找到删除的任务,说明任务还在add管道中,来不及更新到tasks中,这里我们就将这个删除id临时记录下来
      // 注意,这里暂时不考虑,任务id非法的特殊情况
      q.waitRemoveTaskMapping[id] = struct{}{}
      return
   }

   if len(q.tasks) == 1 {
      // 删除后,任务列表就没有任务了
      q.tasks = []*task{}
      return
   }

   if deleteIndex == len(q.tasks)-1 {
      // 如果删除的是,任务列表的最后一个元素,则执行下列代码
      q.tasks = q.tasks[:len(q.tasks)-1]
      return
   }

   // 如果删除的是,任务列表的其他元素,则需要将deleteIndex之后的元素,全部向前挪动一位
   copy(q.tasks[deleteIndex:len(q.tasks)-1], q.tasks[deleteIndex+1:len(q.tasks)-1])
   q.tasks = q.tasks[:len(q.tasks)-1]
   return
}

// 寻找任务的插入位置
func (q *DelayQueue) getTaskInsertIndex(t *task, leftIndex, rightIndex int) (index int) {
   // 使用二分法判断新增任务的插入位置
   if len(q.tasks) == 0 {
      return
   }

   length := rightIndex - leftIndex
   if q.tasks[leftIndex].execTime.Sub(t.execTime) >= 0 {
      // 如果当前切片中最小的元素都超过了插入的优先级,则插入位置应该是最左边
      return leftIndex
   }

   if q.tasks[rightIndex].execTime.Sub(t.execTime) <= 0 {
      // 如果当前切片中最大的元素都没超过插入的优先级,则插入位置应该是最右边
      return rightIndex + 1
   }

   if length == 1 && q.tasks[leftIndex].execTime.Before(t.execTime) && q.tasks[rightIndex].execTime.Sub(t.execTime) >= 0 {
      // 如果插入的优先级刚好在仅有的两个优先级之间,则中间的位置就是插入位置
      return leftIndex + 1
   }

   middleVal := q.tasks[leftIndex+length/2].execTime

   // 这里用二分法递归的方式,一直寻找正确的插入位置
   if t.execTime.Sub(middleVal) <= 0 {
      return q.getTaskInsertIndex(t, leftIndex, leftIndex+length/2)
   } else {
      return q.getTaskInsertIndex(t, leftIndex+length/2, rightIndex)
   }
}

func genTaskId() string {
   return primitive.NewObjectID().Hex()
}
Copier après la connexion

测试代码:delay_queue_test.go

package delay_queue

import (
   "fmt"
   "testing"
   "time"
)

func TestDelayQueue(t *testing.T) {
   q := InitDelayQueue()
   for i := 0; i < 100; i++ {
      go func(i int) {
         id := q.Push(time.Duration(i)*time.Second, func() {
            fmt.Printf("%d秒后执行...\n", i)
            return
         })

         if i%7 == 0 {
            q.Delete(id)
         }
      }(i)
   }

   time.Sleep(time.Hour)
}
Copier après la connexion

头脑风暴

上面的方案,的确实现了延时任务的效果,但是其中仍然有一些问题,仍然值得我们思考和优化。

1、按照上面的方案,如果大量延时任务的执行时间,集中在同一个时间点,会造成短时间内timer频繁地创建和销毁。

2、上述方案相比于time.AfterFunc()方法,我们需要在哪些场景下作出取舍。

3、如果服务崩溃或重启,如何去持久化队列中的任务。

小结

本文和大家讨论了延时任务在golang中的一种实现方案,在这个过程中,一次性定时器timer、切片、管道等golang特色,以及二分插入等常见算法都体现得淋漓尽致。

【相关推荐:Go视频教程编程教学

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Étiquettes associées:
source:juejin.cn
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal