首頁 > 後端開發 > Golang > 微服務中的事務:SAGA 模式與編排的一部分

微服務中的事務:SAGA 模式與編排的一部分

Barbara Streisand
發布: 2025-01-23 02:05:08
原創
406 人瀏覽過

在本系列的第一篇文章中,我們介紹了SAGA 模式,並示範了最小的編排如何使用中央編排器管理分散式事務。

讓我們面對現實吧!這次,我們將深入探討編排方法,其中服務透過自主發出和消費事件來協調工作流程。

為了使其切實可行,我們將使用 Go 和 RabbitMQ 實現多服務醫療保健工作流程。每個服務都有自己的 main.go,使其易於擴展、測試和獨立運行。

什麼是SAGA編排?

編排依賴去中心化的溝通。每個服務都會偵聽事件並透過發出新事件來觸發後續步驟。沒有中央協調者;流程源自於各個服務的互動。

主要優點:

  • 解耦服務:每個服務獨立運作。
  • 可擴充性:事件驅動系統有效處理高負載。
  • 彈性:新增服務不需要更改工作流程邏輯。

挑戰:

  • 偵錯複雜性:跨多個服務追蹤事件可能很棘手。 (我會寫一篇專門討論這個主題的文章,敬請期待!
  • 基礎架構設定:服務需要強大的訊息代理程式(例如 RabbitMQ)來連接所有點。
  • 事件風暴:設計不當的工作流程可能會導致系統因事件而不堪負荷。

實例:醫療保健工作流程

讓我們回顧一下第一篇文章中的醫療保健工作流程:

  1. 病患服務:驗證病患詳細資料和保險範圍。
  2. 調度服務: 安排程序。
  3. 庫存服務:儲備醫療用品。
  4. 計費服務:處理計費。

每項服務將:

  • 使用 RabbitMQ 監聽特定事件。
  • 發出新事件來觸發後續步驟。

使用 Docker 設定 RabbitMQ

我們將使用 RabbitMQ 作為事件佇列。使用 Docker 在本地運行它:

docker run --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0.5-management
登入後複製
登入後複製

存取RabbitMQ管理介面:http://localhost:15672(使用者名稱:guest,密碼:guest)。

交換、佇列和綁定設置

我們需要設定 RabbitMQ 來適應我們的事件。以下是用於設定 RabbitMQ 基礎架構的 init.go 檔案範例:

package main

import (
    "log"

    "github.com/rabbitmq/amqp091-go"
)

func main() {
    conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Failed to connect to RabbitMQ: %v", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Failed to open a channel: %v", err)
    }
    defer ch.Close()

    err = ch.ExchangeDeclare("events", "direct", true, false, false, false, nil)
    if err != nil {
        log.Fatalf("Failed to declare an exchange: %v", err)
    }

    _, err = ch.QueueDeclare("PatientVerified", true, false, false, false, nil)
    if err != nil {
        log.Fatalf("Failed to declare a queue: %v", err)
    }

    err = ch.QueueBind("PatientVerified", "PatientVerified", "events", false, nil)
    if err != nil {
        log.Fatalf("Failed to bind a queue: %v", err)
    }
}
登入後複製
登入後複製

完整程式碼在這裡!

注意:在生產環境中,您可能想要使用 GitOps 方法(例如,使用 Terraform)來管理此設置,或讓每個服務動態處理自己的佇列。

實施:服務文件

每個服務都有自己的 main.go。我們還將包括優雅地處理失敗的補償措施。

1.病人服務

此服務驗證患者詳細資料並發出 PatientVerified 事件。如果發生下游故障,它也會透過通知患者進行補償。

docker run --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0.5-management
登入後複製
登入後複製

2.調度服務

此服務偵聽 PatientVerified 並發出 procedureScheduled。如果發生下游故障,它會透過取消程序來進行補償。

package main

import (
    "log"

    "github.com/rabbitmq/amqp091-go"
)

func main() {
    conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Failed to connect to RabbitMQ: %v", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Failed to open a channel: %v", err)
    }
    defer ch.Close()

    err = ch.ExchangeDeclare("events", "direct", true, false, false, false, nil)
    if err != nil {
        log.Fatalf("Failed to declare an exchange: %v", err)
    }

    _, err = ch.QueueDeclare("PatientVerified", true, false, false, false, nil)
    if err != nil {
        log.Fatalf("Failed to declare a queue: %v", err)
    }

    err = ch.QueueBind("PatientVerified", "PatientVerified", "events", false, nil)
    if err != nil {
        log.Fatalf("Failed to bind a queue: %v", err)
    }
}
登入後複製
登入後複製

附加服務

包括庫存服務和計費服務實現,遵循與上面相同的結構。每個服務都會監聽前一個事件並發出下一個事件,確保針對失敗的補償邏輯到位。

完整程式碼在這裡!


運行工作流程

啟動 RabbitMQ:

// patient/main.go
package main

import (
    "fmt"
    "log"

    "github.com/rabbitmq/amqp091-go"
    "github.com/thegoodapi/saga_tutorial/choreography/common"
)

func main() {
    conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Failed to connect to RabbitMQ: %v", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Failed to open a channel: %v", err)
    }
    defer ch.Close()

    go func() {
        fmt.Println("[PatientService] Waiting for events...")
        msgs, err := common.ConsumeEvent(ch, "ProcedureScheduleCancelled")
        if err != nil {
            log.Fatalf("Failed to consume event: %v", err)
        }

        for range msgs {
            fmt.Println("[PatientService] Processing event: ProcedureScheduleCancelled")
            if err := notifyProcedureScheduleCancellation(); err != nil {
                log.Fatalf("Failed to notify patient: %v", err)
            }
        }
    }()

    common.PublishEvent(ch, "events", "PatientVerified", "Patient details verified")
    fmt.Println("[PatientService] Event published: PatientVerified")

    select {}
}

func notifyProcedureScheduleCancellation() error {
    fmt.Println("Compensation: Notify patient of procedure cancellation.")
    return nil
}
登入後複製

執行每個服務:
打開單獨的終端並運行:

// scheduler/main.go
package main

import (
    "fmt"
    "log"

    "github.com/rabbitmq/amqp091-go"
    "github.com/thegoodapi/saga_tutorial/choreography/common"
)

func main() {
    conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Failed to connect to RabbitMQ: %v", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Failed to open a channel: %v", err)
    }
    defer ch.Close()

    go func() {
        fmt.Println("[SchedulerService] Waiting for events...")
        msgs, err := common.ConsumeEvent(ch, "PatientVerified")
        if err != nil {
            log.Fatalf("Failed to consume event: %v", err)
        }

        for range msgs {
            fmt.Println("[SchedulerService] Processing event: PatientVerified")
            if err := scheduleProcedure(); err != nil {
                common.PublishEvent(ch, "events", "ProcedureScheduleFailed", "Failed to schedule procedure")
                fmt.Println("[SchedulerService] Compensation triggered: ProcedureScheduleFailed")
            } else {
                common.PublishEvent(ch, "events", "ProcedureScheduled", "Procedure scheduled successfully")
                fmt.Println("[SchedulerService] Event published: ProcedureScheduled")
            }
        }
    }()

    select {}
}

func scheduleProcedure() error {
    fmt.Println("Step 2: Scheduling procedure...")
    return nil // or simulate a failure
}
登入後複製

觀察輸出:
每個服務依序處理事件,記錄工作流程進度。

發生了什麼事?

讓我們來分解一下吧!

首先,就本文而言,我們不會實作SuppliesReserveFailed 和ProcedureScheduleFailed,l 以避免不必要的複雜性。

我們正在實施以下活動

步驟(或交易):

  • T1:(初始化):患者已驗證
  • T2:已排定程序
  • T3:補給預留
  • T4:計費成功

補償:

  • C4:計費失敗
  • C3:保留供應品已釋放
  • C2:程式安排已取消
  • C1:NotifyFailureToUser(未實作)

按照這個實現圖

high-level implementation flow

該圖代表了記錄編排的常見方法。然而,我發現它有點難以理解並且有點令人沮喪,特別是對於那些不熟悉實現或模式的人。

讓我們來分解一下吧!

detailed implementation flow

上圖更加冗長,它分解了每個步驟,使您更容易理解發生了什麼。

簡而言之:

  1. 患者服務已成功驗證患者詳細資訊
  2. 病患服務發出PatientVerified
  3. 調度程序服務消耗PatientVerified
  4. 預約服務預約成功
  5. 調度程序服務發出ProcedureScheduled
  6. 庫存服務消耗ProcedureScheduled
  7. 庫存服務成功儲備貨源
  8. 庫存服務發出供應品保留
  9. 計費服務消耗SuppliesReserved
  10. 計費服務無法向客戶收費並開始補償
  11. 計費服務發出 BillingFailed
  12. 庫存服務消耗 BillingFailed
  13. 庫存服務釋放第 7 步中保留的物資
  14. 庫存服務發出ReservedSuppliesReleased
  15. 調度程序服務消耗ReservedSuppliesReleased
  16. 調度程序服務刪除步驟 4 中安排的約會
  17. 調度程序服務發出ProcedureScheduleCancelled
  18. 病患服務消耗ProcedureScheduleCancelled
  19. 病患服務人員通知客戶錯誤

請注意,為了簡潔起見,我們沒有實現步驟 1、4 和 7 的失敗;然而,方法是相同的。每一次失敗都會觸發前面步驟的回滾。


可觀察性

可觀察性對於除錯和監控分散式系統至關重要。實作日誌、指標和追蹤可確保開發人員能夠了解系統行為並有效診斷問題。

記錄

  • 使用結構化日誌記錄(例如 JSON 格式)來擷取事件和元資料。
  • 在日誌中包含相關 ID 以追蹤跨服務的工作流程。

指標

  • 監控佇列大小和事件處理時間。
  • 使用 Prometheus 等工具來收集和視覺化指標。

追蹤

  • 實作分散式追蹤(例如,使用 OpenTelemetry)來追蹤跨服務的事件。
  • 使用相關資料(例如事件名稱、時間戳記)對範圍進行註釋,以獲得更好的見解。

我們將在本系列後面深入探討編舞中的可觀察性,敬請期待!


重點

  • 分散控制:編排可自主協作。
  • 事件驅動的簡單性: RabbitMQ 簡化了訊息交換。
  • 可擴充架構:無縫新增服務。
  • 編舞一開始可能會讓人不知所措,但一如既往:練習會讓你完美更好!

請繼續關注下一篇文章,我們將探索編排

在此處查看本系列的完整儲存庫。評論裡一起討論吧!

以上是微服務中的事務:SAGA 模式與編排的一部分的詳細內容。更多資訊請關注PHP中文網其他相關文章!

來源:dev.to
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
作者最新文章
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板