在本系列的第一篇文章中,我们介绍了SAGA 模式,并演示了最小的编排如何使用中央编排器管理分布式事务。
让我们面对现实吧!这次,我们将深入探讨编排方法,其中服务通过自主发出和消费事件来协调工作流程。
为了使其切实可行,我们将使用 Go 和 RabbitMQ 实现多服务医疗保健工作流程。每个服务都有自己的 main.go,使其易于扩展、测试和独立运行。
编排依赖于去中心化的沟通。每个服务都会侦听事件并通过发出新事件来触发后续步骤。没有中央协调者;该流程源自各个服务的交互。
让我们回顾一下第一篇文章中的医疗保健工作流程:
每项服务将:
我们将使用 RabbitMQ 作为事件队列。使用 Docker 在本地运行它:
docker run --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0.5-management
访问RabbitMQ管理界面:http://localhost:15672(用户名:guest,密码:guest)。
我们需要配置 RabbitMQ 来适应我们的事件。以下是用于设置 RabbitMQ 基础设施的 init.go 文件示例:
package main import ( "log" "github.com/rabbitmq/amqp091-go" ) func main() { conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() err = ch.ExchangeDeclare("events", "direct", true, false, false, false, nil) if err != nil { log.Fatalf("Failed to declare an exchange: %v", err) } _, err = ch.QueueDeclare("PatientVerified", true, false, false, false, nil) if err != nil { log.Fatalf("Failed to declare a queue: %v", err) } err = ch.QueueBind("PatientVerified", "PatientVerified", "events", false, nil) if err != nil { log.Fatalf("Failed to bind a queue: %v", err) } }
完整代码在这里!
注意:在生产环境中,您可能希望使用 GitOps 方法(例如,使用 Terraform)来管理此设置,或者让每个服务动态处理自己的队列。
每个服务都有自己的 main.go。我们还将包括优雅地处理失败的补偿措施。
该服务验证患者详细信息并发出 PatientVerified 事件。如果发生下游故障,它还会通过通知患者进行补偿。
docker run --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0.5-management
此服务侦听 PatientVerified 并发出 procedureScheduled。如果发生下游故障,它会通过取消程序来进行补偿。
package main import ( "log" "github.com/rabbitmq/amqp091-go" ) func main() { conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() err = ch.ExchangeDeclare("events", "direct", true, false, false, false, nil) if err != nil { log.Fatalf("Failed to declare an exchange: %v", err) } _, err = ch.QueueDeclare("PatientVerified", true, false, false, false, nil) if err != nil { log.Fatalf("Failed to declare a queue: %v", err) } err = ch.QueueBind("PatientVerified", "PatientVerified", "events", false, nil) if err != nil { log.Fatalf("Failed to bind a queue: %v", err) } }
包括库存服务和计费服务实现,遵循与上面相同的结构。每个服务都会监听前一个事件并发出下一个事件,确保针对失败的补偿逻辑到位。
完整代码在这里!
启动 RabbitMQ:
// patient/main.go package main import ( "fmt" "log" "github.com/rabbitmq/amqp091-go" "github.com/thegoodapi/saga_tutorial/choreography/common" ) func main() { conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() go func() { fmt.Println("[PatientService] Waiting for events...") msgs, err := common.ConsumeEvent(ch, "ProcedureScheduleCancelled") if err != nil { log.Fatalf("Failed to consume event: %v", err) } for range msgs { fmt.Println("[PatientService] Processing event: ProcedureScheduleCancelled") if err := notifyProcedureScheduleCancellation(); err != nil { log.Fatalf("Failed to notify patient: %v", err) } } }() common.PublishEvent(ch, "events", "PatientVerified", "Patient details verified") fmt.Println("[PatientService] Event published: PatientVerified") select {} } func notifyProcedureScheduleCancellation() error { fmt.Println("Compensation: Notify patient of procedure cancellation.") return nil }
运行每个服务:
打开单独的终端并运行:
// scheduler/main.go package main import ( "fmt" "log" "github.com/rabbitmq/amqp091-go" "github.com/thegoodapi/saga_tutorial/choreography/common" ) func main() { conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() go func() { fmt.Println("[SchedulerService] Waiting for events...") msgs, err := common.ConsumeEvent(ch, "PatientVerified") if err != nil { log.Fatalf("Failed to consume event: %v", err) } for range msgs { fmt.Println("[SchedulerService] Processing event: PatientVerified") if err := scheduleProcedure(); err != nil { common.PublishEvent(ch, "events", "ProcedureScheduleFailed", "Failed to schedule procedure") fmt.Println("[SchedulerService] Compensation triggered: ProcedureScheduleFailed") } else { common.PublishEvent(ch, "events", "ProcedureScheduled", "Procedure scheduled successfully") fmt.Println("[SchedulerService] Event published: ProcedureScheduled") } } }() select {} } func scheduleProcedure() error { fmt.Println("Step 2: Scheduling procedure...") return nil // or simulate a failure }
观察输出:
每个服务按顺序处理事件,记录工作流程进度。
让我们来分解一下!
首先,就本文而言,我们不会实现SuppliesReserveFailed 和ProcedureScheduleFailed,l 以避免不必要的复杂性。
我们正在实施以下活动
步骤(或交易):
补偿:
按照这个实现图
该图代表了记录编排的常见方法。然而,我发现它有点难以理解并且有点令人沮丧,特别是对于那些不熟悉实现或模式的人。
让我们来分解一下!
上图更加冗长,它分解了每个步骤,使您更容易理解发生了什么。
简而言之:
请注意,为了简洁起见,我们没有实现步骤 1、4 和 7 的失败;然而,方法是相同的。每一次失败都会触发前面步骤的回滚。
可观察性对于调试和监控分布式系统至关重要。实施日志、指标和跟踪可确保开发人员能够了解系统行为并有效诊断问题。
我们将在本系列后面深入探讨编舞中的可观察性,敬请期待!
请继续关注下一篇文章,我们将探索编排!
在此处查看本系列的完整存储库。评论里一起讨论吧!
以上是微服务中的事务:SAGA 模式与编排的一部分的详细内容。更多信息请关注PHP中文网其他相关文章!