ホームページ > バックエンド開発 > Golang > マイクロサービスのトランザクション: コレオグラフィーを使用したパート SAGA パターン

マイクロサービスのトランザクション: コレオグラフィーを使用したパート SAGA パターン

Barbara Streisand
リリース: 2025-01-23 02:05:08
オリジナル
406 人が閲覧しました

このシリーズの最初の記事では、SAGA パターンを紹介し、最小限の オーケストレーション が中央オーケストレーターを使用して分散トランザクションを管理する方法を示しました。

実際にやってみましょう!今回は、サービスが自律的にイベントを発行および消費することでワークフローを調整する コレオグラフィー アプローチについて詳しく説明します。

これを実用化するために、Go と RabbitMQ を使用してマルチサービスのヘルスケア ワークフローを実装します。各サービスには独自の main.go があり、スケール、テスト、独立した実行が容易になります。

SAGAコレオグラフィーとは何ですか?

振り付けは分散型コミュニケーションに依存しています。各サービスはイベントをリッスンし、新しいイベントを発行することで後続のステップをトリガーします。中央のオーケストレーターは存在しません。フローは個々のサービスの相互作用から生まれます。

主な利点:

  • 分離されたサービス: 各サービスは独立して動作します。
  • スケーラビリティ: イベント駆動型システムは高負荷を効率的に処理します。
  • 柔軟性: 新しいサービスを追加する場合、ワークフロー ロジックを変更する必要はありません。

課題:

  • デバッグの複雑さ: 複数のサービスにわたるイベントの追跡は難しい場合があります。 (このトピックに特化した記事を書きます。お楽しみに!)
  • インフラストラクチャのセットアップ: サービスには、すべての点を接続するための堅牢なメッセージ ブローカー (RabbitMQ など) が必要です。
  • イベント ストーム: ワークフローの設計が不十分だと、システムがイベントで圧倒される可能性があります。

実践例: 医療ワークフロー

最初の記事のヘルスケア ワークフローをもう一度見てみましょう:

  1. 患者サービス: 患者の詳細と保険適用範囲を確認します。
  2. スケジューラ サービス: 手順をスケジュールします。
  3. 在庫サービス: 医療用品を予約します。
  4. 請求サービス: 請求を処理します。

各サービスは次のことを行います:

  • RabbitMQ を使用して特定のイベントをリッスンします。
  • 新しいイベントを発行して後続のステップをトリガーします。

Docker を使用した RabbitMQ のセットアップ

イベントキューとして RabbitMQ を使用します。 Docker を使用してローカルで実行します:

docker run --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0.5-management
ログイン後にコピー
ログイン後にコピー

http://localhost:15672 で RabbitMQ 管理インターフェイスにアクセスします (ユーザー名: guest、パスワード: guest)。

交換、キュー、バインディングのセットアップ

イベントに対応できるように RabbitMQ を構成する必要があります。 RabbitMQ インフラストラクチャをセットアップするための init.go ファイルの例を次に示します。

package main

import (
    "log"

    "github.com/rabbitmq/amqp091-go"
)

func main() {
    conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Failed to connect to RabbitMQ: %v", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Failed to open a channel: %v", err)
    }
    defer ch.Close()

    err = ch.ExchangeDeclare("events", "direct", true, false, false, false, nil)
    if err != nil {
        log.Fatalf("Failed to declare an exchange: %v", err)
    }

    _, err = ch.QueueDeclare("PatientVerified", true, false, false, false, nil)
    if err != nil {
        log.Fatalf("Failed to declare a queue: %v", err)
    }

    err = ch.QueueBind("PatientVerified", "PatientVerified", "events", false, nil)
    if err != nil {
        log.Fatalf("Failed to bind a queue: %v", err)
    }
}
ログイン後にコピー
ログイン後にコピー

完全なコードはここにあります!

注: 運用環境では、GitOps アプローチ (Terraform など) を使用してこの設定を管理するか、各サービスが独自のキューを動的に処理できるようにすることができます。

実装: サービス ファイル

各サービスには独自の main.go があります。障害を適切に処理するための補償アクションも含まれます。

1.患者サービス

このサービスは患者の詳細を確認し、PatientVerified イベントを生成します。また、下流側で障害が発生した場合には患者に通知することで補償します。

docker run --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0.5-management
ログイン後にコピー
ログイン後にコピー

2.スケジューラーサービス

このサービスは、PatientVerified をリッスンし、ProcedureScheduled を発行します。ダウンストリーム障害が発生した場合、手順をキャンセルすることで補償します。

package main

import (
    "log"

    "github.com/rabbitmq/amqp091-go"
)

func main() {
    conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Failed to connect to RabbitMQ: %v", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Failed to open a channel: %v", err)
    }
    defer ch.Close()

    err = ch.ExchangeDeclare("events", "direct", true, false, false, false, nil)
    if err != nil {
        log.Fatalf("Failed to declare an exchange: %v", err)
    }

    _, err = ch.QueueDeclare("PatientVerified", true, false, false, false, nil)
    if err != nil {
        log.Fatalf("Failed to declare a queue: %v", err)
    }

    err = ch.QueueBind("PatientVerified", "PatientVerified", "events", false, nil)
    if err != nil {
        log.Fatalf("Failed to bind a queue: %v", err)
    }
}
ログイン後にコピー
ログイン後にコピー

追加サービス

上記と同じ構造に従って、Inventory Service と Billing Service の実装を含めます。各サービスは前のイベントをリッスンして次のイベントを発行し、障害に対する補償ロジックが確実に導入されるようにします。

完全なコード ここにあります!


ワークフローの実行

RabbitMQ を開始します:

// patient/main.go
package main

import (
    "fmt"
    "log"

    "github.com/rabbitmq/amqp091-go"
    "github.com/thegoodapi/saga_tutorial/choreography/common"
)

func main() {
    conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Failed to connect to RabbitMQ: %v", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Failed to open a channel: %v", err)
    }
    defer ch.Close()

    go func() {
        fmt.Println("[PatientService] Waiting for events...")
        msgs, err := common.ConsumeEvent(ch, "ProcedureScheduleCancelled")
        if err != nil {
            log.Fatalf("Failed to consume event: %v", err)
        }

        for range msgs {
            fmt.Println("[PatientService] Processing event: ProcedureScheduleCancelled")
            if err := notifyProcedureScheduleCancellation(); err != nil {
                log.Fatalf("Failed to notify patient: %v", err)
            }
        }
    }()

    common.PublishEvent(ch, "events", "PatientVerified", "Patient details verified")
    fmt.Println("[PatientService] Event published: PatientVerified")

    select {}
}

func notifyProcedureScheduleCancellation() error {
    fmt.Println("Compensation: Notify patient of procedure cancellation.")
    return nil
}
ログイン後にコピー

各サービスを実行します:
別のターミナルを開いて次を実行します:

// scheduler/main.go
package main

import (
    "fmt"
    "log"

    "github.com/rabbitmq/amqp091-go"
    "github.com/thegoodapi/saga_tutorial/choreography/common"
)

func main() {
    conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Failed to connect to RabbitMQ: %v", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Failed to open a channel: %v", err)
    }
    defer ch.Close()

    go func() {
        fmt.Println("[SchedulerService] Waiting for events...")
        msgs, err := common.ConsumeEvent(ch, "PatientVerified")
        if err != nil {
            log.Fatalf("Failed to consume event: %v", err)
        }

        for range msgs {
            fmt.Println("[SchedulerService] Processing event: PatientVerified")
            if err := scheduleProcedure(); err != nil {
                common.PublishEvent(ch, "events", "ProcedureScheduleFailed", "Failed to schedule procedure")
                fmt.Println("[SchedulerService] Compensation triggered: ProcedureScheduleFailed")
            } else {
                common.PublishEvent(ch, "events", "ProcedureScheduled", "Procedure scheduled successfully")
                fmt.Println("[SchedulerService] Event published: ProcedureScheduled")
            }
        }
    }()

    select {}
}

func scheduleProcedure() error {
    fmt.Println("Step 2: Scheduling procedure...")
    return nil // or simulate a failure
}
ログイン後にコピー

出力の観察:
各サービスはイベントを順番に処理し、ワークフローの進行状況を記録します。

どうしたの?

詳しく見てみましょう!

まず第一に、この記事では、過度の複雑さを避けるために SuppliesReserveFailed と ProcedureScheduleFailed,l を実装しません

以下のイベントを実施します

ステップ (またはトランザクション):

  • T1: (初期化): 患者検証済み
  • T2: プロシージャスケジュール済み
  • T3: 供給品は予約済み
  • T4: 請求成功

報酬:

  • C4: 請求失敗
  • C3: 予約済み供給品リリース済み
  • C2: ProcedureScheduleCancelled
  • C1: NotifyFailureToUser (未実装)

この実装図に従う

high-level implementation flow

この図は、振り付けを文書化するための一般的なアプローチを表しています。ただし、特に実装やパターンに慣れていない人にとっては、理解するのがやや難しく、少しイライラすることもあると思います。

詳しく見てみましょう!

detailed implementation flow

上の図はさらに冗長で、各ステップが分解されているため、何が起こっているかを理解しやすくなっています。

一言で言えば:

  1. 患者サービスが患者の詳細を正常に確認しました
  2. 患者サービスが放出
  3. スケジューラ サービスはを消費します
  4. スケジューラ サービスが予定を正常にスケジュールしました
  5. スケジューラ サービスがエミットを実行します。
  6. インベントリ サービスは消費します。
  7. 在庫サービスは供給品を正常に予約します
  8. 在庫サービスは予約済みの供給品を排出します
  9. 請求サービスは予約済みの消耗品を消費します
  10. 請求サービスは顧客への請求に失敗し 補償を開始します
  11. 請求サービスが発行 BillingFailed
  12. インベントリ サービスが 消費 請求失敗
  13. 在庫サービスは、ステップ 7 で予約した供給品をリリースします
  14. 在庫サービスが放出予約済み供給品リリース済み
  15. スケジューラー サービスが消費 予約済み供給リリース済み
  16. スケジューラー サービスは、手順 4 でスケジュールされた予定を削除します
  17. スケジューラ サービスが送信します。
  18. 患者サービスが消費します。ProcedureScheduleCancelled
  19. 患者サービスは顧客にエラーを通知します

簡潔にするために、ステップ 1、4、および 7 の失敗を実装していないことに注意してください。ただし、アプローチは同じです。これらの失敗のそれぞれが、前のステップのロールバックをトリガーします。


可観測性

分散システムのデバッグと監視には可観測性が不可欠です。 ログ、メトリクス、トレースを実装すると、開発者はシステムの動作を理解し、問題を効率的に診断できるようになります。

ロギング

  • 構造化ログ (JSON 形式など) を使用してイベントとメタデータをキャプチャします。
  • サービス間のワークフローを追跡するには、ログに相関 ID を含めます。

メトリクス

  • キューのサイズとイベントの処理時間を監視します。
  • Prometheus などのツールを使用してメトリクスを収集し、視覚化します。

トレース

  • 分散トレースを (OpenTelemetry などを使用して) 実装して、サービス全体でイベントを追跡します。
  • より良い洞察を得るために、関連データ (イベント名、タイムスタンプなど) でスパンに注釈を付けます。

振り付けにおける可観測性については、このシリーズの後半で詳しく説明します。お楽しみに!


重要なポイント

  • 分散制御: コレオグラフィーにより自律的なコラボレーションが可能になります。
  • イベント駆動型のシンプルさ: RabbitMQ はメッセージ交換を簡素化します。
  • スケーラブルなアーキテクチャ: 新しいサービスの追加はシームレスです。
  • 振り付けは、最初はとても大歓迎かもしれませんが、いつものように、練習すれば完璧になります!

次回の記事では、オーケストレーションについて詳しく説明しますので、お楽しみに!

このシリーズの完全なリポジトリはここで確認してください。コメントで話し合いましょう!

以上がマイクロサービスのトランザクション: コレオグラフィーを使用したパート SAGA パターンの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ソース:dev.to
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
著者別の最新記事
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート