Rumah > pembangunan bahagian belakang > Golang > Redis Queue dan Cron in Go

Redis Queue dan Cron in Go

DDD
Lepaskan: 2024-12-31 04:40:23
asal
858 orang telah melayarinya

Redis Queue and Cron in Go

Siaran asal ada di sini

Dalam tutorial ini, kami akan berinteraksi dengan baris gilir dan meletakkannya pada pelayan Redis
menggunakan pakej github.com/hibiken/asynq dan buat penjadual untuk
tugas berjadual menggunakan pakej github.com/robfig/cron. Langkah demi langkah ini
panduan menerangkan cara menyediakan baris gilir, menjadualkan tugas dan mengendalikan
yang anggun penutupan.

Mulakan Modul

Mulakan dengan mencipta modul Go baharu untuk projek:

go mod init learn_queue_and_cron
Salin selepas log masuk

Buat cron.go

Fail cron.go bertanggungjawab untuk menjadualkan dan menjalankan tugas pada tertentu
selang waktu. Di bawah ialah pelaksanaannya:

package main

import (
    "fmt"
    "log"
    "time"

    "github.com/robfig/cron/v3"
)

func runCron(c *cron.Cron) {

    // Schedule a task to run every minute
    _, err := c.AddFunc("@every 1m", func() {
        fmt.Printf("Task executed every minute at: %v \n", time.Now().Local())
    })
    if err != nil {
        log.Fatal(err)
    }

    // Start the cron scheduler
    c.Start()
    log.Println("Cron scheduler started")

    // Keep the main goroutine running
    select {}
}
Salin selepas log masuk

Kod ini menjadualkan tugas untuk dijalankan setiap minit dan memastikan aplikasi terus berjalan
untuk memastikan penjadual berfungsi secara berterusan.

Buat queue.go

Fail queue.go menguruskan pemprosesan tugas menggunakan Asynq. Ini kodnya:

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"

    "github.com/hibiken/asynq"
)

func runQueue(server *asynq.Server) {
    mux := asynq.NewServeMux()
    mux.HandleFunc("send_email", emailHandler)
    mux.HandleFunc("generate_report", reportHandler)

    if err := server.Run(mux); err != nil {
        log.Fatalf("Failed to run Asynq server: %v", err)
    }
}

func emailHandler(ctx context.Context, task *asynq.Task) error {
    var payload struct {
        To string `json:"to"`
    }
    if err := json.Unmarshal(task.Payload(), &payload); err != nil {
        return fmt.Errorf("failed to unmarshal payload: %w", err)
    }
    fmt.Printf("Sending email to: %s\n", payload.To)
    return nil
}

func reportHandler(ctx context.Context, task *asynq.Task) error {
    var payload struct {
        ReportID int `json:"report_id"`
    }
    if err := json.Unmarshal(task.Payload(), &payload); err != nil {
        return fmt.Errorf("failed to unmarshal payload: %w", err)
    }
    fmt.Printf("Generating report for ID: %d\n", payload.ReportID)
    return nil
}
Salin selepas log masuk

Penjelasan

  • Pengendali: e-melHandler dan reportHandler memproses tugas dengan menghuraikan muatan mereka dan melaksanakan tindakan masing-masing.
  • Baris Gilir Tugas: Tugasan seperti "send_email" dan "generate_report" ditakrifkan dan diproses melalui baris gilir tugas Asynq.

Cipta router.go

Fail router.go menyediakan titik akhir HTTP untuk menyusun tugasan:

package main

import (
    "encoding/json"
    "net/http"

    "github.com/gin-gonic/gin"
    "github.com/hibiken/asynq"
)

func setupRouter(client *asynq.Client) *gin.Engine {
    r := gin.Default()

    r.POST("/enqueue/email", func(c *gin.Context) {
        var payload struct {
            To string `json:"to"`
        }
        if err := c.ShouldBindJSON(&payload); err != nil {
            c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request body"})
            return
        }

        jsonPayload, err := json.Marshal(payload)
        if err != nil {
            c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to marshal payload"})
            return
        }

        task := asynq.NewTask("send_email", jsonPayload)
        _, err = client.Enqueue(task)
        if err != nil {
            c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to enqueue task"})
            return
        }

        c.JSON(http.StatusOK, gin.H{"message": "Email job enqueued"})
    })

    r.POST("/enqueue/report", func(c *gin.Context) {
        var payload struct {
            ReportID int `json:"report_id"`
        }
        if err := c.ShouldBindJSON(&payload); err != nil {
            c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request body"})
            return
        }

        jsonPayload, err := json.Marshal(payload)
        if err != nil {
            c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to marshal payload"})
            return
        }

        task := asynq.NewTask("generate_report", jsonPayload)
        _, err = client.Enqueue(task)
        if err != nil {
            c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to enqueue task"})
            return
        }

        c.JSON(http.StatusOK, gin.H{"message": "Report job enqueued"})
    })

    return r
}
Salin selepas log masuk

Kod ini menggunakan rangka kerja Gin untuk mendedahkan dua titik akhir untuk tugasan beratur.

Cipta main.go

Fail main.go menyepadukan semuanya bersama:

package main

import (
    "context"
    "log"
    "net/http"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/hibiken/asynq"
    "github.com/robfig/cron/v3"
)

func main() {
    c := cron.New()

    server := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{
            Concurrency: 10,
        },
    )

    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
    defer client.Close()

    router := setupRouter(client)

    httpServer := &http.Server{
        Addr:    ":8080",
        Handler: router,
    }

    // Prepare shutdown context
    ctx, stop := context.WithCancel(context.Background())
    defer stop()
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, os.Interrupt, syscall.SIGTERM)

    go runQueue(server)
    go runCron(c)
    go func() {
        if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
            log.Fatalf("Failed to run HTTP server: %v", err)
        }
    }()

    appShutdown(ctx, httpServer, c, server, quit)
}

func appShutdown(ctx context.Context, httpServer *http.Server, c *cron.Cron, server *asynq.Server, quit chan os.Signal) {
    // Wait for termination signal
    <-quit
    log.Println("Shutting down gracefully...")

    httpCtx, httpCancel := context.WithTimeout(ctx, 5*time.Second)
    defer httpCancel()
    if err := httpServer.Shutdown(httpCtx); err != nil {
        log.Printf("HTTP server shutdown error: %v", err)
    }

    server.Shutdown()
    c.Stop()

    log.Println("Application stopped")
}
Salin selepas log masuk

Fail ini menggabungkan baris gilir, cron, pelayan HTTP dan logik penutupan.

Pasang Ketergantungan

Pasang semua kebergantungan yang diperlukan:

go mod tidy
Salin selepas log masuk

Bina dan Jalankan Aplikasi

Bina dan jalankan aplikasi menggunakan:

go build -o run *.go && ./run
Salin selepas log masuk

Uji Aplikasi

Lawati titik akhir berikut untuk mengatur tugasan:

  • http://localhost:8080/enqueue/email
  • http://localhost:8080/enqueue/report

Perhatikan terminal untuk log pelaksanaan tugas.

URL kanonik

Untuk maklumat lebih terperinci, lawati catatan asal di blog saya.

Atas ialah kandungan terperinci Redis Queue dan Cron in Go. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

sumber:dev.to
Kenyataan Laman Web ini
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn
Tutorial Popular
Lagi>
Muat turun terkini
Lagi>
kesan web
Kod sumber laman web
Bahan laman web
Templat hujung hadapan