Dans le premier article de cette série, nous avons présenté le modèle SAGA et démontré comment une Orchestration minimale peut gérer des transactions distribuées avec un orchestrateur central.
Soyons réalistes ! Cette fois, nous allons plonger dans l'Approche chorégraphique, où les services coordonnent les flux de travail en émettant et en consommant des événements de manière autonome.
Pour rendre cela pratique, nous allons mettre en œuvre un flux de travail de soins de santé multiservice à l'aide de Go et RabbitMQ. Chaque service aura son propre main.go, ce qui facilitera sa mise à l'échelle, son test et son exécution indépendante.
La chorégraphie repose sur une communication décentralisée. Chaque service écoute les événements et déclenche les étapes suivantes en émettant de nouveaux événements. Il n’y a pas d’orchestrateur central ; le flux émerge des interactions des services individuels.
Revoyons notre workflow de soins de santé dès le premier article :
Chaque service :
Nous utiliserons RabbitMQ comme file d'attente des événements. Exécutez-le localement à l'aide de Docker :
docker run --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0.5-management
Accédez à l'interface de gestion RabbitMQ sur http://localhost:15672 (nom d'utilisateur : invité, mot de passe : invité).
Nous devons configurer RabbitMQ pour accueillir nos événements. Voici un exemple de fichier init.go pour configurer l'infrastructure RabbitMQ :
package main import ( "log" "github.com/rabbitmq/amqp091-go" ) func main() { conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() err = ch.ExchangeDeclare("events", "direct", true, false, false, false, nil) if err != nil { log.Fatalf("Failed to declare an exchange: %v", err) } _, err = ch.QueueDeclare("PatientVerified", true, false, false, false, nil) if err != nil { log.Fatalf("Failed to declare a queue: %v", err) } err = ch.QueueBind("PatientVerified", "PatientVerified", "events", false, nil) if err != nil { log.Fatalf("Failed to bind a queue: %v", err) } }
Code complet ici !
Remarque : Dans un environnement de production, vous souhaiterez peut-être gérer cette configuration à l'aide d'une approche GitOps (par exemple, avec Terraform) ou laisser chaque service gérer ses propres files d'attente de manière dynamique.
Chaque service aura son propre main.go. Nous inclurons également des actions de compensation pour gérer les échecs avec élégance.
Ce service vérifie les détails du patient et émet un événement PatientVerified. Il compense également en avertissant le patient si une panne en aval survient.
docker run --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0.5-management
Ce service écoute PatientVerified et émet ProcedureScheduled. Il compense en annulant la procédure si une panne en aval survient.
package main import ( "log" "github.com/rabbitmq/amqp091-go" ) func main() { conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() err = ch.ExchangeDeclare("events", "direct", true, false, false, false, nil) if err != nil { log.Fatalf("Failed to declare an exchange: %v", err) } _, err = ch.QueueDeclare("PatientVerified", true, false, false, false, nil) if err != nil { log.Fatalf("Failed to declare a queue: %v", err) } err = ch.QueueBind("PatientVerified", "PatientVerified", "events", false, nil) if err != nil { log.Fatalf("Failed to bind a queue: %v", err) } }
Inclure les implémentations du service d'inventaire et du service de facturation, en suivant la même structure que ci-dessus. Chaque service écoute l'événement précédent et émet le suivant, garantissant ainsi la logique de compensation en cas d'échec.
Code complet ici !
Démarrez RabbitMQ :
// patient/main.go package main import ( "fmt" "log" "github.com/rabbitmq/amqp091-go" "github.com/thegoodapi/saga_tutorial/choreography/common" ) func main() { conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() go func() { fmt.Println("[PatientService] Waiting for events...") msgs, err := common.ConsumeEvent(ch, "ProcedureScheduleCancelled") if err != nil { log.Fatalf("Failed to consume event: %v", err) } for range msgs { fmt.Println("[PatientService] Processing event: ProcedureScheduleCancelled") if err := notifyProcedureScheduleCancellation(); err != nil { log.Fatalf("Failed to notify patient: %v", err) } } }() common.PublishEvent(ch, "events", "PatientVerified", "Patient details verified") fmt.Println("[PatientService] Event published: PatientVerified") select {} } func notifyProcedureScheduleCancellation() error { fmt.Println("Compensation: Notify patient of procedure cancellation.") return nil }
Exécuter chaque service :
Ouvrez des terminaux séparés et exécutez :
// scheduler/main.go package main import ( "fmt" "log" "github.com/rabbitmq/amqp091-go" "github.com/thegoodapi/saga_tutorial/choreography/common" ) func main() { conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() go func() { fmt.Println("[SchedulerService] Waiting for events...") msgs, err := common.ConsumeEvent(ch, "PatientVerified") if err != nil { log.Fatalf("Failed to consume event: %v", err) } for range msgs { fmt.Println("[SchedulerService] Processing event: PatientVerified") if err := scheduleProcedure(); err != nil { common.PublishEvent(ch, "events", "ProcedureScheduleFailed", "Failed to schedule procedure") fmt.Println("[SchedulerService] Compensation triggered: ProcedureScheduleFailed") } else { common.PublishEvent(ch, "events", "ProcedureScheduled", "Procedure scheduled successfully") fmt.Println("[SchedulerService] Event published: ProcedureScheduled") } } }() select {} } func scheduleProcedure() error { fmt.Println("Step 2: Scheduling procedure...") return nil // or simulate a failure }
Observer la sortie :
Chaque service traite les événements dans l'ordre, enregistrant la progression du flux de travail.
Décomposons-le !
Tout d'abord, aux fins de cet article, nous n'implémentons pas SuppliesReserveFailed et ProcedureScheduleFailed, pour éviter une complexité inutile.
Nous mettons en œuvre les événements suivants
Étapes (ou transactions) :
Rémunérations :
En suivant ce schéma de mise en œuvre
Ce diagramme représente une approche courante pour documenter la chorégraphie. Cependant, je trouve cela quelque peu difficile à comprendre et un peu frustrant, en particulier pour ceux qui ne connaissent pas l'implémentation ou le modèle.
Décomposons-le !
Le diagramme ci-dessus est beaucoup plus détaillé et décompose chaque étape, ce qui permet de mieux comprendre ce qui se passe.
En un mot :
Notez que nous n'implémentons pas les échecs pour les étapes 1, 4 et 7 par souci de concision ; cependant, l'approche serait la même. Chacun de ces échecs déclencherait un retour en arrière des étapes précédentes.
L'observabilité est essentielle pour le débogage et la surveillance des systèmes distribués. La mise en œuvre de journaux, métriques et traces garantit que les développeurs peuvent comprendre le comportement du système et diagnostiquer efficacement les problèmes.
Nous aborderons l'observabilité en choérographie plus tard dans cette série, restez à l'écoute !
Restez à l’écoute pour le prochain article, où nous explorerons l’Orchestration !
Consultez le référentiel complet de cette série ici. Discutons-en dans les commentaires !
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!