Heim > Backend-Entwicklung > Golang > Eine kurze Analyse, wie Golang verzögerte Aufgaben implementiert

Eine kurze Analyse, wie Golang verzögerte Aufgaben implementiert

PHPz
Freigeben: 2023-03-22 18:08:09
nach vorne
1986 Leute haben es durchsucht

Wie implementiert man verzögerte Aufgaben in Golang? Der folgende Artikel wird Ihnen eine Reihe verzögerter Aufgabenlösungen basierend auf Golang vorstellen. Ich hoffe, er wird Ihnen hilfreich sein!

Eine kurze Analyse, wie Golang verzögerte Aufgaben implementiert

In tatsächlichen Geschäftsszenarien stoßen wir manchmal auf einige Verzögerungsanforderungen: Beispielsweise muss auf einer E-Commerce-Plattform ein Produkt, nachdem der Betreiber es im Verwaltungshintergrund hinzugefügt hat, nicht sofort an der Rezeption angezeigt werden , aber zu einem späteren Zeitpunkt angezeigt.

Natürlich haben wir viele Ideen, um mit diesem Problem umzugehen. Fügen Sie beispielsweise die freizugebenden Produktinformationen zur Datenbank hinzu und fragen Sie dann die Datentabelle über geplante Aufgaben ab, um die zum aktuellen Zeitpunkt freigegebenen Produkte abzufragen. Fügen Sie beispielsweise alle Produktinformationen zu Redis hinzu und führen Sie diese Funktion durch SortSet-Attribut. Die endgültige Wahl hängt von unserem Geschäftsszenario und unserer Betriebsumgebung ab.

Hier möchte ich Ihnen eine Reihe verzögerter Aufgabenlösungen basierend auf Golang vorstellen.

Sie können die flexible Anwendung von lGolang-Pipelines

    Golang-Timer-Anwendung
  • Golang-Slicing-Element in die Implementierung der Mind Map integrieren
  • Um allen einen allgemeinen Eindruck zu vermitteln, werde ich die Gliederung auflisten den Text unten.

Umsetzungsideen

Wir alle wissen, dass jede Art von Warteschlange eigentlich aus zwei Teilen besteht: Produzent und Verbraucher. Es ist nur so, dass verzögerte Aufgaben im Vergleich zu gewöhnlichen Warteschlangen über eine zusätzliche Verzögerungsfunktion verfügen.

1. Produzent

Eine kurze Analyse, wie Golang verzögerte Aufgaben implementiertWenn ein Benutzer aus Sicht des Produzenten eine Aufgabe vorantreibt, hat sie einen verzögerten Ausführungszeitwert. Damit diese Aufgabe zum geplanten Zeitpunkt ausgeführt werden kann, müssen wir diese Aufgabe für einen bestimmten Zeitraum im Speicher speichern, und die Zeit ist eindimensional und wächst. Welche Datenstruktur verwenden wir also zum Speichern?

(1) Wählen Sie eine: Karte. Da die Karte ungeordnet ist und nicht nach Ausführungszeit sortiert werden kann, können wir nicht garantieren, dass die herausgenommenen Aufgaben zum aktuellen Zeitpunkt ausgeführt werden müssen, sodass diese Option ausgeschlossen ist. (2) Wahl 2: Kanal. Tatsächlich kann der Kanal manchmal als Warteschlange betrachtet werden. Allerdings folgen seine Ausgabe und Eingabe strikt dem Prinzip „First in, first out“. Leider werden fortgeschrittene Aufgaben möglicherweise nicht zuerst ausgeführt, sodass der Kanal nicht geeignet ist. (3) Wahl drei:

Scheiben

. Das Schneiden scheint machbar, da die Slice-Elemente geordnet sind. Wenn wir also alle Slice-Elemente in der Reihenfolge der Ausführungszeit anordnen können, müssen wir jedes Mal nur das Head-Element des Slice (vielleicht das Tail-Element) lesen Aufgabe, die wir wollen.

2. Verbraucher

Aus Sicht des Verbrauchers besteht die größte Schwierigkeit darin, jede Aufgabe zu einem bestimmten Zeitpunkt zu erledigen. Wie können wir also dafür sorgen, dass jede Aufgabe eine gewisse Zeit wartet, bevor sie ausgeführt wird?

Ja, es ist ein Timer.

Zusammenfassend lässt sich sagen, dass die Kombination aus „

Schneiden + Timer“ den Zweck erfüllen sollte.

Schritt für Schritt

1. Datenfluss

(1) Der Benutzer ruft

InitDelayQueue() auf, um das verzögerte Aufgabenobjekt zu initialisieren.

(2) Öffnen Sie die Coroutine und hören Sie sich die Task-Operations-Pipeline (Hinzufügen/Löschen-Signal) und die Ausführungszeit-Pipeline (Timer.C-Signal) an.

(3) Der Benutzer sendet das Signal zum Hinzufügen/Löschen. (4) Die Coroutine in (2) erfasst das Signal in (3) und ändert die Aufgabenliste.

(5) Wenn der Zeitpunkt für die Aufgabenausführung erreicht ist (wenn Elemente aus der timer.C-Pipeline ausgegeben werden), führen Sie die Aufgabe aus.

2. Datenstruktur

(1) Verzögertes Aufgabenobjekt

// 延时任务对象
type DelayQueue struct {
   tasks                 []*task             // 存储任务列表的切片
   add                   chan *task          // 用户添加任务的管道信号
   remove                chan string         // 用户删除任务的管道信号
   waitRemoveTaskMapping map[string]struct{} // 等待删除的任务id列表
}
Nach dem Login kopieren

Hier ist zu beachten, dass es ein

waitRemoveTaskMapping

-Feld gibt. Da sich die zu löschende Aufgabe möglicherweise noch in der Add-Pipeline befindet und nicht rechtzeitig im Aufgabenfeld aktualisiert wurde, ist es erforderlich, die ID der Aufgabe, die der Kunde löschen möchte, vorübergehend zu erfassen.

(2) Aufgabenobjekt

// 任务对象
type task struct {
   id       string    // 任务id
   execTime time.Time // 执行时间
   f        func()    // 执行函数
}
Nach dem Login kopieren

Eine kurze Analyse, wie Golang verzögerte Aufgaben implementiert3. Initialisierungsverzögerungsaufgabenobjekt

// 初始化延时任务对象
func InitDelayQueue() *DelayQueue {
   q := &DelayQueue{
      add:                   make(chan *task, 10000),
      remove:                make(chan string, 100),
      waitRemoveTaskMapping: make(map[string]struct{}),
   }

   return q
}
Nach dem Login kopieren
In diesem Prozess müssen wir das Betriebssignal des Benutzers für die Aufgabe und das Ausführungszeitsignal der Aufgabe überwachen.
func (q *DelayQueue) start() {
   for {
      // to do something...

      select {
      case now := <-timer.C:
         // 任务执行时间信号
         // to do something...
      case t := <-q.add:
         // 任务推送信号
         // to do something...
      case id := <-q.remove:
         // 任务删除信号
         // to do something...
      }
   }
}
Nach dem Login kopieren

Verbessern Sie unsere Initialisierungsmethode:

// 初始化延时任务对象
func InitDelayQueue() *DelayQueue {
   q := &DelayQueue{
      add:                   make(chan *task, 10000),
      remove:                make(chan string, 100),
      waitRemoveTaskMapping: make(map[string]struct{}),
   }

   // 开启协程,监听任务相关信号
   go q.start()
   return q
}
Nach dem Login kopieren

4. Der Produzent pusht Aufgaben

Wenn Produzenten die Aufgaben nur zur Add-Pipeline hinzufügen, generieren wir eine Aufgaben-ID und geben sie an den Benutzer zurück .

// 用户推送任务
func (q *DelayQueue) Push(timeInterval time.Duration, f func()) string {
   // 生成一个任务id,方便删除使用
   id := genTaskId()
   t := &task{
      id:       id,
      execTime: time.Now().Add(timeInterval),
      f:        f,
   }

   // 将任务推到add管道中
   q.add <- t
   return id
}
Nach dem Login kopieren

5、任务推送信号的处理

在这里,我们要将用户推送的任务放到延时任务的tasks字段中。由于,我们需要将任务按照执行时间顺序排序,所以,我们需要找到新增任务在切片中的插入位置。又因为,插入之前的任务列表已经是有序的,所以,我们可以采用二分法处理。

// 使用二分法判断新增任务的插入位置
func (q *DelayQueue) getTaskInsertIndex(t *task, leftIndex, rightIndex int) (index int) {
   if len(q.tasks) == 0 {
      return
   }

   length := rightIndex - leftIndex
   if q.tasks[leftIndex].execTime.Sub(t.execTime) >= 0 {
      // 如果当前切片中最小的元素都超过了插入的优先级,则插入位置应该是最左边
      return leftIndex
   }

   if q.tasks[rightIndex].execTime.Sub(t.execTime) <= 0 {
      // 如果当前切片中最大的元素都没超过插入的优先级,则插入位置应该是最右边
      return rightIndex + 1
   }

   if length == 1 && q.tasks[leftIndex].execTime.Before(t.execTime) && q.tasks[rightIndex].execTime.Sub(t.execTime) >= 0 {
      // 如果插入的优先级刚好在仅有的两个优先级之间,则中间的位置就是插入位置
      return leftIndex + 1
   }

   middleVal := q.tasks[leftIndex+length/2].execTime

   // 这里用二分法递归的方式,一直寻找正确的插入位置
   if t.execTime.Sub(middleVal) <= 0 {
      return q.getTaskInsertIndex(t, leftIndex, leftIndex+length/2)
   } else {
      return q.getTaskInsertIndex(t, leftIndex+length/2, rightIndex)
   }
}
Nach dem Login kopieren

找到正确的插入位置后,我们才能将任务准确插入:

// 将任务添加到任务切片列表中
func (q *DelayQueue) addTask(t *task) {
   // 寻找新增任务的插入位置
   insertIndex := q.getTaskInsertIndex(t, 0, len(q.tasks)-1)
   // 找到了插入位置,更新任务列表
   q.tasks = append(q.tasks, &task{})
   copy(q.tasks[insertIndex+1:], q.tasks[insertIndex:])
   q.tasks[insertIndex] = t
}
Nach dem Login kopieren

那么,在监听add管道的时候,我们直接调用上述addTask() 即可。

func (q *DelayQueue) start() {
   for {
      // to do something...

      select {
      case now := <-timer.C:
         // 任务执行时间信号
         // to do something...
      case t := <-q.add:
         // 任务推送信号
         q.addTask(t)
      case id := <-q.remove:
         // 任务删除信号
         // to do something...
      }
   }
}
Nach dem Login kopieren

6、生产者删除任务

// 用户删除任务
func (q *DelayQueue) Delete(id string) {
   q.remove <- id
}
Nach dem Login kopieren

7、任务删除信号的处理

在这里,我们可以遍历任务列表,根据删除任务的id找到其在切片中的对应index。

// 删除指定任务
func (q *DelayQueue) deleteTask(id string) {
   deleteIndex := -1
   for index, t := range q.tasks {
      if t.id == id {
         // 找到了在切片中需要删除的所以呢
         deleteIndex = index
         break
      }
   }

   if deleteIndex == -1 {
      // 如果没有找到删除的任务,说明任务还在add管道中,来不及更新到tasks中,这里我们就将这个删除id临时记录下来
      // 注意,这里暂时不考虑,任务id非法的特殊情况
      q.waitRemoveTaskMapping[id] = struct{}{}
      return
   }

   if len(q.tasks) == 1 {
      // 删除后,任务列表就没有任务了
      q.tasks = []*task{}
      return
   }

   if deleteIndex == len(q.tasks)-1 {
      // 如果删除的是,任务列表的最后一个元素,则执行下列代码
      q.tasks = q.tasks[:len(q.tasks)-1]
      return
   }

   // 如果删除的是,任务列表的其他元素,则需要将deleteIndex之后的元素,全部向前挪动一位
   copy(q.tasks[deleteIndex:len(q.tasks)-1], q.tasks[deleteIndex+1:len(q.tasks)-1])
   q.tasks = q.tasks[:len(q.tasks)-1]
   return
}
Nach dem Login kopieren

然后,我们可以完善start()方法了。

func (q *DelayQueue) start() {
   for {
      // to do something...

      select {
      case now := <-timer.C:
         // 任务执行时间信号
         // to do something...
      case t := <-q.add:
         // 任务推送信号
         q.addTask(t)
      case id := <-q.remove:
         // 任务删除信号
         q.deleteTask(id)
      }
   }
}
Nach dem Login kopieren

8、任务执行信号的处理

start()执行的时候,分成两种情况:任务列表为空,只需要监听add管道即可;任务列表不为空的时候,需要监听所有管道。任务执行信号,主要是依靠timer来实现,属于第二种情况。

func (q *DelayQueue) start() {
   for {
      if len(q.tasks) == 0 {
           // 任务列表为空的时候,只需要监听add管道
           select {
           case t := <-q.add:
              //添加任务
              q.addTask(t)
           }
        
           continue
      }

      // 任务列表不为空的时候,需要监听所有管道

      // 任务的等待时间=任务的执行时间-当前的时间
      currentTask := q.tasks[0]
      timer := time.NewTimer(currentTask.execTime.Sub(time.Now()))

      select {
      case now := <-timer.C:
         // 任务执行信号
         timer.Stop()
        if _, isRemove := q.waitRemoveTaskMapping[currentTask.id]; isRemove {
           // 之前客户已经发出过该任务的删除信号,因此需要结束任务,刷新任务列表
           q.endTask()
           delete(q.waitRemoveTaskMapping, currentTask.id)
           continue
        }
        
        // 开启协程,异步执行任务
        go q.execTask(currentTask, now)
        // 任务结束,刷新任务列表
        q.endTask()
      case t := <-q.add:
         // 任务推送信号
         timer.Stop()
         q.addTask(t)
      case id := <-q.remove:
         // 任务删除信号
         timer.Stop()
         q.deleteTask(id)
      }
   }
}
Nach dem Login kopieren

执行任务:

// 执行任务
func (q *DelayQueue) execTask(task *task, currentTime time.Time) {
   if task.execTime.After(currentTime) {
      // 如果当前任务的执行时间落后于当前时间,则不执行
      return
   }

   // 执行任务
   task.f()
   return
}
Nach dem Login kopieren

结束任务,刷新任务列表:

// 一个任务去执行了,刷新任务列表
func (q *DelayQueue) endTask() {
   if len(q.tasks) == 1 {
      q.tasks = []*task{}
      return
   }

   q.tasks = q.tasks[1:]
}
Nach dem Login kopieren

9、完整代码

delay_queue.go

package delay_queue

import (
   "go.mongodb.org/mongo-driver/bson/primitive"
   "time"
)

// 延时任务对象
type DelayQueue struct {
   tasks                 []*task             // 存储任务列表的切片
   add                   chan *task          // 用户添加任务的管道信号
   remove                chan string         // 用户删除任务的管道信号
   waitRemoveTaskMapping map[string]struct{} // 等待删除的任务id列表
}

// 任务对象
type task struct {
   id       string    // 任务id
   execTime time.Time // 执行时间
   f        func()    // 执行函数
}

// 初始化延时任务对象
func InitDelayQueue() *DelayQueue {
   q := &DelayQueue{
      add:                   make(chan *task, 10000),
      remove:                make(chan string, 100),
      waitRemoveTaskMapping: make(map[string]struct{}),
   }

   // 开启协程,监听任务相关信号
   go q.start()
   return q
}

// 用户删除任务
func (q *DelayQueue) Delete(id string) {
   q.remove <- id
}

// 用户推送任务
func (q *DelayQueue) Push(timeInterval time.Duration, f func()) string {
   // 生成一个任务id,方便删除使用
   id := genTaskId()
   t := &task{
      id:       id,
      execTime: time.Now().Add(timeInterval),
      f:        f,
   }

   // 将任务推到add管道中
   q.add <- t
   return id
}

// 监听各种任务相关信号
func (q *DelayQueue) start() {
   for {
      if len(q.tasks) == 0 {
         // 任务列表为空的时候,只需要监听add管道
         select {
         case t := <-q.add:
            //添加任务
            q.addTask(t)
         }

         continue
      }

      // 任务列表不为空的时候,需要监听所有管道

      // 任务的等待时间=任务的执行时间-当前的时间
      currentTask := q.tasks[0]
      timer := time.NewTimer(currentTask.execTime.Sub(time.Now()))

      select {
      case now := <-timer.C:
         timer.Stop()
         if _, isRemove := q.waitRemoveTaskMapping[currentTask.id]; isRemove {
            // 之前客户已经发出过该任务的删除信号,因此需要结束任务,刷新任务列表
            q.endTask()
            delete(q.waitRemoveTaskMapping, currentTask.id)
            continue
         }

         // 开启协程,异步执行任务
         go q.execTask(currentTask, now)
         // 任务结束,刷新任务列表
         q.endTask()
      case t := <-q.add:
         // 添加任务
         timer.Stop()
         q.addTask(t)
      case id := <-q.remove:
         // 删除任务
         timer.Stop()
         q.deleteTask(id)
      }
   }
}

// 执行任务
func (q *DelayQueue) execTask(task *task, currentTime time.Time) {
   if task.execTime.After(currentTime) {
      // 如果当前任务的执行时间落后于当前时间,则不执行
      return
   }

   // 执行任务
   task.f()
   return
}

// 一个任务去执行了,刷新任务列表
func (q *DelayQueue) endTask() {
   if len(q.tasks) == 1 {
      q.tasks = []*task{}
      return
   }

   q.tasks = q.tasks[1:]
}

// 将任务添加到任务切片列表中
func (q *DelayQueue) addTask(t *task) {
   // 寻找新增任务的插入位置
   insertIndex := q.getTaskInsertIndex(t, 0, len(q.tasks)-1)
   // 找到了插入位置,更新任务列表
   q.tasks = append(q.tasks, &task{})
   copy(q.tasks[insertIndex+1:], q.tasks[insertIndex:])
   q.tasks[insertIndex] = t
}

// 删除指定任务
func (q *DelayQueue) deleteTask(id string) {
   deleteIndex := -1
   for index, t := range q.tasks {
      if t.id == id {
         // 找到了在切片中需要删除的所以呢
         deleteIndex = index
         break
      }
   }

   if deleteIndex == -1 {
      // 如果没有找到删除的任务,说明任务还在add管道中,来不及更新到tasks中,这里我们就将这个删除id临时记录下来
      // 注意,这里暂时不考虑,任务id非法的特殊情况
      q.waitRemoveTaskMapping[id] = struct{}{}
      return
   }

   if len(q.tasks) == 1 {
      // 删除后,任务列表就没有任务了
      q.tasks = []*task{}
      return
   }

   if deleteIndex == len(q.tasks)-1 {
      // 如果删除的是,任务列表的最后一个元素,则执行下列代码
      q.tasks = q.tasks[:len(q.tasks)-1]
      return
   }

   // 如果删除的是,任务列表的其他元素,则需要将deleteIndex之后的元素,全部向前挪动一位
   copy(q.tasks[deleteIndex:len(q.tasks)-1], q.tasks[deleteIndex+1:len(q.tasks)-1])
   q.tasks = q.tasks[:len(q.tasks)-1]
   return
}

// 寻找任务的插入位置
func (q *DelayQueue) getTaskInsertIndex(t *task, leftIndex, rightIndex int) (index int) {
   // 使用二分法判断新增任务的插入位置
   if len(q.tasks) == 0 {
      return
   }

   length := rightIndex - leftIndex
   if q.tasks[leftIndex].execTime.Sub(t.execTime) >= 0 {
      // 如果当前切片中最小的元素都超过了插入的优先级,则插入位置应该是最左边
      return leftIndex
   }

   if q.tasks[rightIndex].execTime.Sub(t.execTime) <= 0 {
      // 如果当前切片中最大的元素都没超过插入的优先级,则插入位置应该是最右边
      return rightIndex + 1
   }

   if length == 1 && q.tasks[leftIndex].execTime.Before(t.execTime) && q.tasks[rightIndex].execTime.Sub(t.execTime) >= 0 {
      // 如果插入的优先级刚好在仅有的两个优先级之间,则中间的位置就是插入位置
      return leftIndex + 1
   }

   middleVal := q.tasks[leftIndex+length/2].execTime

   // 这里用二分法递归的方式,一直寻找正确的插入位置
   if t.execTime.Sub(middleVal) <= 0 {
      return q.getTaskInsertIndex(t, leftIndex, leftIndex+length/2)
   } else {
      return q.getTaskInsertIndex(t, leftIndex+length/2, rightIndex)
   }
}

func genTaskId() string {
   return primitive.NewObjectID().Hex()
}
Nach dem Login kopieren

测试代码:delay_queue_test.go

package delay_queue

import (
   "fmt"
   "testing"
   "time"
)

func TestDelayQueue(t *testing.T) {
   q := InitDelayQueue()
   for i := 0; i < 100; i++ {
      go func(i int) {
         id := q.Push(time.Duration(i)*time.Second, func() {
            fmt.Printf("%d秒后执行...\n", i)
            return
         })

         if i%7 == 0 {
            q.Delete(id)
         }
      }(i)
   }

   time.Sleep(time.Hour)
}
Nach dem Login kopieren

头脑风暴

上面的方案,的确实现了延时任务的效果,但是其中仍然有一些问题,仍然值得我们思考和优化。

1、按照上面的方案,如果大量延时任务的执行时间,集中在同一个时间点,会造成短时间内timer频繁地创建和销毁。

2、上述方案相比于time.AfterFunc()方法,我们需要在哪些场景下作出取舍。

3、如果服务崩溃或重启,如何去持久化队列中的任务。

小结

本文和大家讨论了延时任务在golang中的一种实现方案,在这个过程中,一次性定时器timer、切片、管道等golang特色,以及二分插入等常见算法都体现得淋漓尽致。

【相关推荐:Go视频教程编程教学

Das obige ist der detaillierte Inhalt vonEine kurze Analyse, wie Golang verzögerte Aufgaben implementiert. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Verwandte Etiketten:
Quelle:juejin.cn
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage