在本系列的第一篇文章中,我們介紹了SAGA 模式,並示範了最小的編排如何使用中央編排器管理分散式事務。
讓我們面對現實吧!這次,我們將深入探討編排方法,其中服務透過自主發出和消費事件來協調工作流程。
為了使其切實可行,我們將使用 Go 和 RabbitMQ 實現多服務醫療保健工作流程。每個服務都有自己的 main.go,使其易於擴展、測試和獨立運行。
編排依賴去中心化的溝通。每個服務都會偵聽事件並透過發出新事件來觸發後續步驟。沒有中央協調者;流程源自於各個服務的互動。
讓我們回顧一下第一篇文章中的醫療保健工作流程:
每項服務將:
我們將使用 RabbitMQ 作為事件佇列。使用 Docker 在本地運行它:
docker run --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0.5-management
存取RabbitMQ管理介面:http://localhost:15672(使用者名稱:guest,密碼:guest)。
我們需要設定 RabbitMQ 來適應我們的事件。以下是用於設定 RabbitMQ 基礎架構的 init.go 檔案範例:
package main import ( "log" "github.com/rabbitmq/amqp091-go" ) func main() { conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() err = ch.ExchangeDeclare("events", "direct", true, false, false, false, nil) if err != nil { log.Fatalf("Failed to declare an exchange: %v", err) } _, err = ch.QueueDeclare("PatientVerified", true, false, false, false, nil) if err != nil { log.Fatalf("Failed to declare a queue: %v", err) } err = ch.QueueBind("PatientVerified", "PatientVerified", "events", false, nil) if err != nil { log.Fatalf("Failed to bind a queue: %v", err) } }
完整程式碼在這裡!
注意:在生產環境中,您可能想要使用 GitOps 方法(例如,使用 Terraform)來管理此設置,或讓每個服務動態處理自己的佇列。
每個服務都有自己的 main.go。我們還將包括優雅地處理失敗的補償措施。
此服務驗證患者詳細資料並發出 PatientVerified 事件。如果發生下游故障,它也會透過通知患者進行補償。
docker run --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0.5-management
此服務偵聽 PatientVerified 並發出 procedureScheduled。如果發生下游故障,它會透過取消程序來進行補償。
package main import ( "log" "github.com/rabbitmq/amqp091-go" ) func main() { conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() err = ch.ExchangeDeclare("events", "direct", true, false, false, false, nil) if err != nil { log.Fatalf("Failed to declare an exchange: %v", err) } _, err = ch.QueueDeclare("PatientVerified", true, false, false, false, nil) if err != nil { log.Fatalf("Failed to declare a queue: %v", err) } err = ch.QueueBind("PatientVerified", "PatientVerified", "events", false, nil) if err != nil { log.Fatalf("Failed to bind a queue: %v", err) } }
包括庫存服務和計費服務實現,遵循與上面相同的結構。每個服務都會監聽前一個事件並發出下一個事件,確保針對失敗的補償邏輯到位。
完整程式碼在這裡!
啟動 RabbitMQ:
// patient/main.go package main import ( "fmt" "log" "github.com/rabbitmq/amqp091-go" "github.com/thegoodapi/saga_tutorial/choreography/common" ) func main() { conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() go func() { fmt.Println("[PatientService] Waiting for events...") msgs, err := common.ConsumeEvent(ch, "ProcedureScheduleCancelled") if err != nil { log.Fatalf("Failed to consume event: %v", err) } for range msgs { fmt.Println("[PatientService] Processing event: ProcedureScheduleCancelled") if err := notifyProcedureScheduleCancellation(); err != nil { log.Fatalf("Failed to notify patient: %v", err) } } }() common.PublishEvent(ch, "events", "PatientVerified", "Patient details verified") fmt.Println("[PatientService] Event published: PatientVerified") select {} } func notifyProcedureScheduleCancellation() error { fmt.Println("Compensation: Notify patient of procedure cancellation.") return nil }
執行每個服務:
打開單獨的終端並運行:
// scheduler/main.go package main import ( "fmt" "log" "github.com/rabbitmq/amqp091-go" "github.com/thegoodapi/saga_tutorial/choreography/common" ) func main() { conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() go func() { fmt.Println("[SchedulerService] Waiting for events...") msgs, err := common.ConsumeEvent(ch, "PatientVerified") if err != nil { log.Fatalf("Failed to consume event: %v", err) } for range msgs { fmt.Println("[SchedulerService] Processing event: PatientVerified") if err := scheduleProcedure(); err != nil { common.PublishEvent(ch, "events", "ProcedureScheduleFailed", "Failed to schedule procedure") fmt.Println("[SchedulerService] Compensation triggered: ProcedureScheduleFailed") } else { common.PublishEvent(ch, "events", "ProcedureScheduled", "Procedure scheduled successfully") fmt.Println("[SchedulerService] Event published: ProcedureScheduled") } } }() select {} } func scheduleProcedure() error { fmt.Println("Step 2: Scheduling procedure...") return nil // or simulate a failure }
觀察輸出:
每個服務依序處理事件,記錄工作流程進度。
讓我們來分解一下吧!
首先,就本文而言,我們不會實作SuppliesReserveFailed 和ProcedureScheduleFailed,l 以避免不必要的複雜性。
我們正在實施以下活動
步驟(或交易):
補償:
按照這個實現圖
該圖代表了記錄編排的常見方法。然而,我發現它有點難以理解並且有點令人沮喪,特別是對於那些不熟悉實現或模式的人。
讓我們來分解一下吧!
上圖更加冗長,它分解了每個步驟,使您更容易理解發生了什麼。
簡而言之:
請注意,為了簡潔起見,我們沒有實現步驟 1、4 和 7 的失敗;然而,方法是相同的。每一次失敗都會觸發前面步驟的回滾。
可觀察性對於除錯和監控分散式系統至關重要。實作日誌、指標和追蹤可確保開發人員能夠了解系統行為並有效診斷問題。
我們將在本系列後面深入探討編舞中的可觀察性,敬請期待!
請繼續關注下一篇文章,我們將探索編排!
在此處查看本系列的完整儲存庫。評論裡一起討論吧!
以上是微服務中的事務:SAGA 模式與編排的一部分的詳細內容。更多資訊請關注PHP中文網其他相關文章!