このシリーズの最初の記事では、SAGA パターンを紹介し、最小限の オーケストレーション が中央オーケストレーターを使用して分散トランザクションを管理する方法を示しました。
実際にやってみましょう!今回は、サービスが自律的にイベントを発行および消費することでワークフローを調整する コレオグラフィー アプローチについて詳しく説明します。
これを実用化するために、Go と RabbitMQ を使用してマルチサービスのヘルスケア ワークフローを実装します。各サービスには独自の main.go があり、スケール、テスト、独立した実行が容易になります。
振り付けは分散型コミュニケーションに依存しています。各サービスはイベントをリッスンし、新しいイベントを発行することで後続のステップをトリガーします。中央のオーケストレーターは存在しません。フローは個々のサービスの相互作用から生まれます。
最初の記事のヘルスケア ワークフローをもう一度見てみましょう:
各サービスは次のことを行います:
イベントキューとして RabbitMQ を使用します。 Docker を使用してローカルで実行します:
docker run --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0.5-management
http://localhost:15672 で RabbitMQ 管理インターフェイスにアクセスします (ユーザー名: guest、パスワード: guest)。
イベントに対応できるように RabbitMQ を構成する必要があります。 RabbitMQ インフラストラクチャをセットアップするための init.go ファイルの例を次に示します。
package main import ( "log" "github.com/rabbitmq/amqp091-go" ) func main() { conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() err = ch.ExchangeDeclare("events", "direct", true, false, false, false, nil) if err != nil { log.Fatalf("Failed to declare an exchange: %v", err) } _, err = ch.QueueDeclare("PatientVerified", true, false, false, false, nil) if err != nil { log.Fatalf("Failed to declare a queue: %v", err) } err = ch.QueueBind("PatientVerified", "PatientVerified", "events", false, nil) if err != nil { log.Fatalf("Failed to bind a queue: %v", err) } }
完全なコードはここにあります!
注: 運用環境では、GitOps アプローチ (Terraform など) を使用してこの設定を管理するか、各サービスが独自のキューを動的に処理できるようにすることができます。
各サービスには独自の main.go があります。障害を適切に処理するための補償アクションも含まれます。
このサービスは患者の詳細を確認し、PatientVerified イベントを生成します。また、下流側で障害が発生した場合には患者に通知することで補償します。
docker run --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0.5-management
このサービスは、PatientVerified をリッスンし、ProcedureScheduled を発行します。ダウンストリーム障害が発生した場合、手順をキャンセルすることで補償します。
package main import ( "log" "github.com/rabbitmq/amqp091-go" ) func main() { conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() err = ch.ExchangeDeclare("events", "direct", true, false, false, false, nil) if err != nil { log.Fatalf("Failed to declare an exchange: %v", err) } _, err = ch.QueueDeclare("PatientVerified", true, false, false, false, nil) if err != nil { log.Fatalf("Failed to declare a queue: %v", err) } err = ch.QueueBind("PatientVerified", "PatientVerified", "events", false, nil) if err != nil { log.Fatalf("Failed to bind a queue: %v", err) } }
上記と同じ構造に従って、Inventory Service と Billing Service の実装を含めます。各サービスは前のイベントをリッスンして次のイベントを発行し、障害に対する補償ロジックが確実に導入されるようにします。
完全なコード ここにあります!
RabbitMQ を開始します:
// patient/main.go package main import ( "fmt" "log" "github.com/rabbitmq/amqp091-go" "github.com/thegoodapi/saga_tutorial/choreography/common" ) func main() { conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() go func() { fmt.Println("[PatientService] Waiting for events...") msgs, err := common.ConsumeEvent(ch, "ProcedureScheduleCancelled") if err != nil { log.Fatalf("Failed to consume event: %v", err) } for range msgs { fmt.Println("[PatientService] Processing event: ProcedureScheduleCancelled") if err := notifyProcedureScheduleCancellation(); err != nil { log.Fatalf("Failed to notify patient: %v", err) } } }() common.PublishEvent(ch, "events", "PatientVerified", "Patient details verified") fmt.Println("[PatientService] Event published: PatientVerified") select {} } func notifyProcedureScheduleCancellation() error { fmt.Println("Compensation: Notify patient of procedure cancellation.") return nil }
各サービスを実行します:
別のターミナルを開いて次を実行します:
// scheduler/main.go package main import ( "fmt" "log" "github.com/rabbitmq/amqp091-go" "github.com/thegoodapi/saga_tutorial/choreography/common" ) func main() { conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() go func() { fmt.Println("[SchedulerService] Waiting for events...") msgs, err := common.ConsumeEvent(ch, "PatientVerified") if err != nil { log.Fatalf("Failed to consume event: %v", err) } for range msgs { fmt.Println("[SchedulerService] Processing event: PatientVerified") if err := scheduleProcedure(); err != nil { common.PublishEvent(ch, "events", "ProcedureScheduleFailed", "Failed to schedule procedure") fmt.Println("[SchedulerService] Compensation triggered: ProcedureScheduleFailed") } else { common.PublishEvent(ch, "events", "ProcedureScheduled", "Procedure scheduled successfully") fmt.Println("[SchedulerService] Event published: ProcedureScheduled") } } }() select {} } func scheduleProcedure() error { fmt.Println("Step 2: Scheduling procedure...") return nil // or simulate a failure }
出力の観察:
各サービスはイベントを順番に処理し、ワークフローの進行状況を記録します。
詳しく見てみましょう!
まず第一に、この記事では、過度の複雑さを避けるために SuppliesReserveFailed と ProcedureScheduleFailed,l を実装しません。
以下のイベントを実施します
ステップ (またはトランザクション):
報酬:
この実装図に従う
この図は、振り付けを文書化するための一般的なアプローチを表しています。ただし、特に実装やパターンに慣れていない人にとっては、理解するのがやや難しく、少しイライラすることもあると思います。
詳しく見てみましょう!
上の図はさらに冗長で、各ステップが分解されているため、何が起こっているかを理解しやすくなっています。
一言で言えば:
簡潔にするために、ステップ 1、4、および 7 の失敗を実装していないことに注意してください。ただし、アプローチは同じです。これらの失敗のそれぞれが、前のステップのロールバックをトリガーします。
分散システムのデバッグと監視には可観測性が不可欠です。 ログ、メトリクス、トレースを実装すると、開発者はシステムの動作を理解し、問題を効率的に診断できるようになります。
振り付けにおける可観測性については、このシリーズの後半で詳しく説明します。お楽しみに!
次回の記事では、オーケストレーションについて詳しく説明しますので、お楽しみに!
このシリーズの完全なリポジトリはここで確認してください。コメントで話し合いましょう!
以上がマイクロサービスのトランザクション: コレオグラフィーを使用したパート SAGA パターンの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。