Heim > Backend-Entwicklung > Golang > Transaktionen in Microservices: Teil SAGA-Muster mit Choreografie

Transaktionen in Microservices: Teil SAGA-Muster mit Choreografie

Barbara Streisand
Freigeben: 2025-01-23 02:05:08
Original
404 Leute haben es durchsucht

Im ersten Artikel dieser Serie haben wir das SAGA-Muster vorgestellt und gezeigt, wie eine minimale Orchestrierung verteilte Transaktionen mit einem zentralen Orchestrator verwalten kann.

Lass uns real werden! Dieses Mal befassen wir uns mit dem Choreografie-Ansatz, bei dem Dienste Arbeitsabläufe koordinieren, indem sie Ereignisse autonom aussenden und konsumieren.

Um dies praktisch zu machen, implementieren wir einen Multi-Service-Workflow für das Gesundheitswesen mit Go und RabbitMQ. Jeder Dienst verfügt über ein eigenes main.go, sodass er einfach unabhängig skaliert, getestet und ausgeführt werden kann.

Was ist SAGA-Choreographie?

Choreografie setzt auf dezentrale Kommunikation. Jeder Dienst wartet auf Ereignisse und löst nachfolgende Schritte aus, indem er neue Ereignisse ausgibt. Es gibt keinen zentralen Orchestrator; Der Fluss entsteht aus den Interaktionen einzelner Dienste.

Hauptvorteile:

  • Entkoppelte Dienste:Jeder Dienst arbeitet unabhängig.
  • Skalierbarkeit: Ereignisgesteuerte Systeme bewältigen hohe Lasten effizient.
  • Flexibilität: Das Hinzufügen neuer Dienste erfordert keine Änderung der Workflow-Logik.

Herausforderungen:

  • Komplexität des Debuggens: Das Verfolgen von Ereignissen über mehrere Dienste hinweg kann schwierig sein. (Ich werde einen Artikel zu diesem Thema schreiben, bleiben Sie dran!)
  • Einrichtung der Infrastruktur:Dienste erfordern einen robusten Nachrichtenbroker (z. B. RabbitMQ), um alle Punkte zu verbinden.
  • Ereignisstürme: Schlecht gestaltete Arbeitsabläufe können das System mit Ereignissen überschwemmen.

Praxisbeispiel: Arbeitsablauf im Gesundheitswesen

Lassen Sie uns unseren Arbeitsablauf im Gesundheitswesen vom ersten Artikel noch einmal Revue passieren lassen:

  1. Patientenservice:Überprüft Patientendaten und Versicherungsschutz.
  2. Planungsdienst:Plant den Vorgang.
  3. Inventarservice:Reserviert medizinische Versorgung.
  4. Abrechnungsdienst: Verarbeitet die Abrechnung.

Jeder Dienst wird:

  • Mit RabbitMQ auf bestimmte Ereignisse achten.
  • Neue Ereignisse ausgeben, um nachfolgende Schritte auszulösen.

RabbitMQ mit Docker einrichten

Wir verwenden RabbitMQ als Ereigniswarteschlange. Führen Sie es lokal mit Docker aus:

docker run --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0.5-management
Nach dem Login kopieren
Nach dem Login kopieren

Zugriff auf die RabbitMQ-Verwaltungsoberfläche unter http://localhost:15672 (Benutzername: Gast, Passwort: Gast).

Einrichtung von Austausch, Warteschlangen und Bindungen

Wir müssen RabbitMQ so konfigurieren, dass es unseren Veranstaltungen gerecht wird. Hier ist eine Beispieldatei init.go zum Einrichten der RabbitMQ-Infrastruktur:

package main

import (
    "log"

    "github.com/rabbitmq/amqp091-go"
)

func main() {
    conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Failed to connect to RabbitMQ: %v", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Failed to open a channel: %v", err)
    }
    defer ch.Close()

    err = ch.ExchangeDeclare("events", "direct", true, false, false, false, nil)
    if err != nil {
        log.Fatalf("Failed to declare an exchange: %v", err)
    }

    _, err = ch.QueueDeclare("PatientVerified", true, false, false, false, nil)
    if err != nil {
        log.Fatalf("Failed to declare a queue: %v", err)
    }

    err = ch.QueueBind("PatientVerified", "PatientVerified", "events", false, nil)
    if err != nil {
        log.Fatalf("Failed to bind a queue: %v", err)
    }
}
Nach dem Login kopieren
Nach dem Login kopieren

Vollständiger Code hier!

Hinweis:In einer Produktionsumgebung möchten Sie dieses Setup möglicherweise mit einem GitOps-Ansatz verwalten (z. B. mit Terraform) oder jeden Dienst seine eigenen Warteschlangen dynamisch verwalten lassen.

Implementierung: Servicedateien

Jeder Dienst wird sein eigenes main.go haben. Wir werden auch Entschädigungsmaßnahmen für den ordnungsgemäßen Umgang mit Fehlern einbeziehen.

1. Patientenservice

Dieser Dienst überprüft Patientendaten und gibt ein PatientVerified-Ereignis aus. Es kompensiert auch, indem es den Patienten benachrichtigt, wenn ein Downstream-Fehler auftritt.

docker run --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0.5-management
Nach dem Login kopieren
Nach dem Login kopieren

2. Planerdienst

Dieser Dienst wartet auf PatientVerified und gibt ProcedureScheduled aus. Dies wird dadurch kompensiert, dass der Vorgang abgebrochen wird, wenn ein Downstream-Fehler auftritt.

package main

import (
    "log"

    "github.com/rabbitmq/amqp091-go"
)

func main() {
    conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Failed to connect to RabbitMQ: %v", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Failed to open a channel: %v", err)
    }
    defer ch.Close()

    err = ch.ExchangeDeclare("events", "direct", true, false, false, false, nil)
    if err != nil {
        log.Fatalf("Failed to declare an exchange: %v", err)
    }

    _, err = ch.QueueDeclare("PatientVerified", true, false, false, false, nil)
    if err != nil {
        log.Fatalf("Failed to declare a queue: %v", err)
    }

    err = ch.QueueBind("PatientVerified", "PatientVerified", "events", false, nil)
    if err != nil {
        log.Fatalf("Failed to bind a queue: %v", err)
    }
}
Nach dem Login kopieren
Nach dem Login kopieren

Zusätzliche Dienstleistungen

Beziehen Sie die Implementierungen von Inventory Service und Billing Service ein und folgen Sie dabei der gleichen Struktur wie oben. Jeder Dienst wartet auf das vorherige Ereignis und gibt das nächste aus, um sicherzustellen, dass eine Kompensationslogik für Fehler vorhanden ist.

Vollständiger Codehier!


Ausführen des Workflows

RabbitMQ starten:

// patient/main.go
package main

import (
    "fmt"
    "log"

    "github.com/rabbitmq/amqp091-go"
    "github.com/thegoodapi/saga_tutorial/choreography/common"
)

func main() {
    conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Failed to connect to RabbitMQ: %v", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Failed to open a channel: %v", err)
    }
    defer ch.Close()

    go func() {
        fmt.Println("[PatientService] Waiting for events...")
        msgs, err := common.ConsumeEvent(ch, "ProcedureScheduleCancelled")
        if err != nil {
            log.Fatalf("Failed to consume event: %v", err)
        }

        for range msgs {
            fmt.Println("[PatientService] Processing event: ProcedureScheduleCancelled")
            if err := notifyProcedureScheduleCancellation(); err != nil {
                log.Fatalf("Failed to notify patient: %v", err)
            }
        }
    }()

    common.PublishEvent(ch, "events", "PatientVerified", "Patient details verified")
    fmt.Println("[PatientService] Event published: PatientVerified")

    select {}
}

func notifyProcedureScheduleCancellation() error {
    fmt.Println("Compensation: Notify patient of procedure cancellation.")
    return nil
}
Nach dem Login kopieren

Jeden Dienst ausführen:
Öffnen Sie separate Terminals und führen Sie Folgendes aus:

// scheduler/main.go
package main

import (
    "fmt"
    "log"

    "github.com/rabbitmq/amqp091-go"
    "github.com/thegoodapi/saga_tutorial/choreography/common"
)

func main() {
    conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Failed to connect to RabbitMQ: %v", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Failed to open a channel: %v", err)
    }
    defer ch.Close()

    go func() {
        fmt.Println("[SchedulerService] Waiting for events...")
        msgs, err := common.ConsumeEvent(ch, "PatientVerified")
        if err != nil {
            log.Fatalf("Failed to consume event: %v", err)
        }

        for range msgs {
            fmt.Println("[SchedulerService] Processing event: PatientVerified")
            if err := scheduleProcedure(); err != nil {
                common.PublishEvent(ch, "events", "ProcedureScheduleFailed", "Failed to schedule procedure")
                fmt.Println("[SchedulerService] Compensation triggered: ProcedureScheduleFailed")
            } else {
                common.PublishEvent(ch, "events", "ProcedureScheduled", "Procedure scheduled successfully")
                fmt.Println("[SchedulerService] Event published: ProcedureScheduled")
            }
        }
    }()

    select {}
}

func scheduleProcedure() error {
    fmt.Println("Step 2: Scheduling procedure...")
    return nil // or simulate a failure
}
Nach dem Login kopieren

Ausgabe beobachten:
Jeder Dienst verarbeitet Ereignisse nacheinander und protokolliert den Workflow-Fortschritt.

Was ist passiert?

Lassen Sie es uns aufschlüsseln!

Zunächst einmal implementieren wir für die Zwecke dieses Artikels keine SuppliesReserveFailed und ProcedureScheduleFailed, um übermäßige Komplexität zu vermeiden.

Wir führen folgende Veranstaltungen durch

Schritte (oder Transaktionen):

  • T1: (init): PatientVerified
  • T2: ProcedureScheduled
  • T3: Vorräte reserviert
  • T4: Abrechnungerfolgreich

Vergütungen:

  • C4: Abrechnung fehlgeschlagen
  • C3: ReservedSuppliesReleased
  • C2: ProcedureScheduleCancelled
  • C1: NotifyFailureToUser (nicht implementiert)

Folgen Sie diesem Implementierungsdiagramm

high-level implementation flow

Dieses Diagramm stellt einen gängigen Ansatz zur Dokumentation von Choreografie dar. Allerdings finde ich es etwas schwierig zu verstehen und etwas frustrierend, insbesondere für diejenigen, die mit der Implementierung oder dem Muster nicht vertraut sind.

Lassen Sie es uns aufschlüsseln!

detailed implementation flow

Das obige Diagramm ist viel ausführlicher und schlüsselt jeden Schritt auf, sodass es leichter zu verstehen ist, was vor sich geht.

Kurz gesagt:

  1. Der Patientenservice überprüft die Patientendaten erfolgreich
  2. Der Patientenservice gibt PatientVerified ab
  3. Der Planerdienst verbraucht PatientVerified
  4. Der Terminplaner-Service plant den Termin erfolgreich
  5. Der Scheduler-Dienst gibt ProcedureScheduled aus
  6. Der Inventardienst verbraucht ProcedureScheduled
  7. Der Inventarservice reserviert die Vorräte erfolgreich
  8. Inventarservice gibt VorräteReserviert aus
  9. Abrechnungsservice verbraucht VorräteReserviert
  10. Der Abrechnungsdienst belastet den Kunden nicht und startet die Entschädigung
  11. Der Abrechnungsdienst gibt BillingFailed aus
  12. Der Inventardienst verbraucht BillingFailed
  13. Der Lagerdienst gibt die in Schritt 7 reservierten Vorräte frei
  14. Der Inventardienst gibt ReservedSuppliesReleased aus
  15. Der Scheduler-Dienst verbraucht ReservedSuppliesReleased
  16. Der Terminplanerdienst löscht den in Schritt 4 geplanten Termin
  17. Der Scheduler-Dienst gibt ProcedureScheduleCancelled aus
  18. Patientendienst verbraucht VerfahrensplanAbgesagt
  19. Der Patientenservice benachrichtigt den Kunden über den Fehler

Beachten Sie, dass wir der Kürze halber keine Fehler für die Schritte 1, 4 und 7 implementieren; Der Ansatz wäre jedoch derselbe. Jeder dieser Fehler würde ein Rollback der vorherigen Schritte auslösen.


Beobachtbarkeit

Beobachtbarkeit ist für das Debuggen und Überwachen verteilter Systeme unerlässlich. Durch die Implementierung von Protokollen, Metriken und Traces wird sichergestellt, dass Entwickler das Systemverhalten verstehen und Probleme effizient diagnostizieren können.

Protokollierung

  • Verwenden Sie strukturierte Protokollierung (z. B. JSON-Format), um Ereignisse und Metadaten zu erfassen.
  • Fügen Sie Korrelations-IDs in Protokolle ein, um Arbeitsabläufe dienstübergreifend zu verfolgen.

Metriken

  • Überwachen Sie Warteschlangengrößen und Ereignisverarbeitungszeiten.
  • Verwenden Sie Tools wie Prometheus, um Metriken zu sammeln und zu visualisieren.

Nachverfolgung

  • Implementieren Sie verteiltes Tracing (z. B. mit OpenTelemetry), um Ereignisse dienstübergreifend zu verfolgen.
  • Kommentieren Sie Spannen mit relevanten Daten (z. B. Ereignisnamen, Zeitstempel), um bessere Einblicke zu erhalten.

Wir werden uns später in dieser Serie mit der Beobachtbarkeit in der Chorografie befassen, bleiben Sie dran!


Wichtige Erkenntnisse

  • Dezentrale Steuerung: Choreografie ermöglicht autonome Zusammenarbeit.
  • Ereignisgesteuerte Einfachheit: RabbitMQ vereinfacht den Nachrichtenaustausch.
  • Skalierbare Architektur:Das Hinzufügen neuer Dienste erfolgt nahtlos.
  • Choerografie kann zunächst sehr überwältigend sein, aber wie immer gilt: Übung macht perfekt besser!

Bleiben Sie gespannt auf den nächsten Artikel, in dem wir uns mit der Orchestrierung befassen!

Sehen Sie sich hier das vollständige Repository für diese Serie an. Lasst uns in den Kommentaren diskutieren!

Das obige ist der detaillierte Inhalt vonTransaktionen in Microservices: Teil SAGA-Muster mit Choreografie. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Quelle:dev.to
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Neueste Artikel des Autors
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage