> 백엔드 개발 > Golang > Go의 Redis 대기열 및 Cron

Go의 Redis 대기열 및 Cron

DDD
풀어 주다: 2024-12-31 04:40:23
원래의
874명이 탐색했습니다.

Redis Queue and Cron in Go

원본은 여기

이 튜토리얼에서는 대기열과 상호 작용하여 Redis 서버에 넣습니다.
github.com/hibiken/asynq 패키지를 사용하여
github.com/robfig/cron 패키지를 사용하여 예약된 작업. 이 단계별
가이드에서는 대기열 설정, 작업 예약, 원활한 처리 방법을 설명합니다
종료됩니다.

모듈 초기화

프로젝트를 위한 새 Go 모듈을 만드는 것부터 시작하세요.

go mod init learn_queue_and_cron
로그인 후 복사

cron.go 생성

cron.go 파일은 특정 작업의 예약 및 실행을 담당합니다
간격. 구현은 다음과 같습니다.

package main

import (
    "fmt"
    "log"
    "time"

    "github.com/robfig/cron/v3"
)

func runCron(c *cron.Cron) {

    // Schedule a task to run every minute
    _, err := c.AddFunc("@every 1m", func() {
        fmt.Printf("Task executed every minute at: %v \n", time.Now().Local())
    })
    if err != nil {
        log.Fatal(err)
    }

    // Start the cron scheduler
    c.Start()
    log.Println("Cron scheduler started")

    // Keep the main goroutine running
    select {}
}
로그인 후 복사

이 코드는 1분마다 실행할 작업을 예약하고 애플리케이션을 계속 실행합니다
스케줄러가 지속적으로 작동하도록 합니다.

queue.go 생성

queue.go 파일은 Asynq를 사용하여 작업 처리를 관리합니다. 코드는 다음과 같습니다.

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"

    "github.com/hibiken/asynq"
)

func runQueue(server *asynq.Server) {
    mux := asynq.NewServeMux()
    mux.HandleFunc("send_email", emailHandler)
    mux.HandleFunc("generate_report", reportHandler)

    if err := server.Run(mux); err != nil {
        log.Fatalf("Failed to run Asynq server: %v", err)
    }
}

func emailHandler(ctx context.Context, task *asynq.Task) error {
    var payload struct {
        To string `json:"to"`
    }
    if err := json.Unmarshal(task.Payload(), &payload); err != nil {
        return fmt.Errorf("failed to unmarshal payload: %w", err)
    }
    fmt.Printf("Sending email to: %s\n", payload.To)
    return nil
}

func reportHandler(ctx context.Context, task *asynq.Task) error {
    var payload struct {
        ReportID int `json:"report_id"`
    }
    if err := json.Unmarshal(task.Payload(), &payload); err != nil {
        return fmt.Errorf("failed to unmarshal payload: %w", err)
    }
    fmt.Printf("Generating report for ID: %d\n", payload.ReportID)
    return nil
}
로그인 후 복사

설명

  • 핸들러: emailHandler 및 ReportHandler는 구문 분석을 통해 작업을 처리합니다. 페이로드를 전송하고 해당 작업을 실행합니다.
  • 작업 대기열: "send_email" 및 "generate_report"와 같은 작업이 정의됩니다. Asynq의 작업 대기열을 통해 처리됩니다.

router.go 생성

router.go 파일은 작업을 대기열에 추가하기 위해 HTTP 엔드포인트를 설정합니다.

package main

import (
    "encoding/json"
    "net/http"

    "github.com/gin-gonic/gin"
    "github.com/hibiken/asynq"
)

func setupRouter(client *asynq.Client) *gin.Engine {
    r := gin.Default()

    r.POST("/enqueue/email", func(c *gin.Context) {
        var payload struct {
            To string `json:"to"`
        }
        if err := c.ShouldBindJSON(&payload); err != nil {
            c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request body"})
            return
        }

        jsonPayload, err := json.Marshal(payload)
        if err != nil {
            c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to marshal payload"})
            return
        }

        task := asynq.NewTask("send_email", jsonPayload)
        _, err = client.Enqueue(task)
        if err != nil {
            c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to enqueue task"})
            return
        }

        c.JSON(http.StatusOK, gin.H{"message": "Email job enqueued"})
    })

    r.POST("/enqueue/report", func(c *gin.Context) {
        var payload struct {
            ReportID int `json:"report_id"`
        }
        if err := c.ShouldBindJSON(&payload); err != nil {
            c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request body"})
            return
        }

        jsonPayload, err := json.Marshal(payload)
        if err != nil {
            c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to marshal payload"})
            return
        }

        task := asynq.NewTask("generate_report", jsonPayload)
        _, err = client.Enqueue(task)
        if err != nil {
            c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to enqueue task"})
            return
        }

        c.JSON(http.StatusOK, gin.H{"message": "Report job enqueued"})
    })

    return r
}
로그인 후 복사

이 코드는 Gin 프레임워크를 사용하여 대기열에 추가하는 작업을 위한 두 개의 엔드포인트를 노출합니다.

main.go 생성

main.go 파일은 모든 것을 하나로 통합합니다.

package main

import (
    "context"
    "log"
    "net/http"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/hibiken/asynq"
    "github.com/robfig/cron/v3"
)

func main() {
    c := cron.New()

    server := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{
            Concurrency: 10,
        },
    )

    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
    defer client.Close()

    router := setupRouter(client)

    httpServer := &http.Server{
        Addr:    ":8080",
        Handler: router,
    }

    // Prepare shutdown context
    ctx, stop := context.WithCancel(context.Background())
    defer stop()
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, os.Interrupt, syscall.SIGTERM)

    go runQueue(server)
    go runCron(c)
    go func() {
        if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
            log.Fatalf("Failed to run HTTP server: %v", err)
        }
    }()

    appShutdown(ctx, httpServer, c, server, quit)
}

func appShutdown(ctx context.Context, httpServer *http.Server, c *cron.Cron, server *asynq.Server, quit chan os.Signal) {
    // Wait for termination signal
    <-quit
    log.Println("Shutting down gracefully...")

    httpCtx, httpCancel := context.WithTimeout(ctx, 5*time.Second)
    defer httpCancel()
    if err := httpServer.Shutdown(httpCtx); err != nil {
        log.Printf("HTTP server shutdown error: %v", err)
    }

    server.Shutdown()
    c.Stop()

    log.Println("Application stopped")
}
로그인 후 복사

이 파일은 큐, 크론, HTTP 서버 및 종료 로직을 ​​결합합니다.

종속성 설치

필요한 모든 종속성을 설치합니다.

go mod tidy
로그인 후 복사

애플리케이션 빌드 및 실행

다음을 사용하여 애플리케이션을 빌드하고 실행합니다.

go build -o run *.go && ./run
로그인 후 복사

애플리케이션 테스트

작업을 대기열에 추가하려면 다음 엔드포인트를 방문하세요.

  • http://localhost:8080/enqueue/email
  • http://localhost:8080/enqueue/report

작업 실행 로그를 터미널에서 확인하세요.

표준 URL

자세한 내용은 제 블로그 원문을 확인해주세요.

위 내용은 Go의 Redis 대기열 및 Cron의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

원천:dev.to
본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
인기 튜토리얼
더>
최신 다운로드
더>
웹 효과
웹사이트 소스 코드
웹사이트 자료
프론트엔드 템플릿