首页 > 后端开发 > Golang > 微服务中的事务:SAGA 模式与编排的一部分

微服务中的事务:SAGA 模式与编排的一部分

Barbara Streisand
发布: 2025-01-23 02:05:08
原创
435 人浏览过

在本系列的第一篇文章中,我们介绍了SAGA 模式,并演示了最小的编排如何使用中央编排器管理分布式事务。

让我们面对现实吧!这次,我们将深入探讨编排方法,其中服务通过自主发出和消费事件来协调工作流程。

为了使其切实可行,我们将使用 Go 和 RabbitMQ 实现多服务医疗保健工作流程。每个服务都有自己的 main.go,使其易于扩展、测试和独立运行。

什么是SAGA编排?

编排依赖于去中心化的沟通。每个服务都会侦听事件并通过发出新事件来触发后续步骤。没有中央协调者;该流程源自各个服务的交互。

主要优点:

  • 解耦服务:每个服务独立运行。
  • 可扩展性:事件驱动系统有效地处理高负载。
  • 灵活性:添加新服务不需要更改工作流程逻辑。

挑战:

  • 调试复杂性:跨多个服务跟踪事件可能很棘手。 (我会写一篇专门讨论这个话题的文章,敬请期待!
  • 基础设施设置:服务需要强大的消息代理(例如 RabbitMQ)来连接所有点。
  • 事件风暴:设计不当的工作流程可能会导致系统因事件而不堪重负。

实例:医疗保健工作流程

让我们回顾一下第一篇文章中的医疗保健工作流程:

  1. 患者服务:验证患者详细信息和保险范围。
  2. 调度服务: 安排程序。
  3. 库存服务:储备医疗用品。
  4. 计费服务:处理计费。

每项服务将:

  • 使用 RabbitMQ 监听特定事件。
  • 发出新事件来触发后续步骤。

使用 Docker 设置 RabbitMQ

我们将使用 RabbitMQ 作为事件队列。使用 Docker 在本地运行它:

docker run --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0.5-management
登录后复制
登录后复制

访问RabbitMQ管理界面:http://localhost:15672(用户名:guest,密码:guest)。

交换、队列和绑定设置

我们需要配置 RabbitMQ 来适应我们的事件。以下是用于设置 RabbitMQ 基础设施的 init.go 文件示例:

package main

import (
    "log"

    "github.com/rabbitmq/amqp091-go"
)

func main() {
    conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Failed to connect to RabbitMQ: %v", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Failed to open a channel: %v", err)
    }
    defer ch.Close()

    err = ch.ExchangeDeclare("events", "direct", true, false, false, false, nil)
    if err != nil {
        log.Fatalf("Failed to declare an exchange: %v", err)
    }

    _, err = ch.QueueDeclare("PatientVerified", true, false, false, false, nil)
    if err != nil {
        log.Fatalf("Failed to declare a queue: %v", err)
    }

    err = ch.QueueBind("PatientVerified", "PatientVerified", "events", false, nil)
    if err != nil {
        log.Fatalf("Failed to bind a queue: %v", err)
    }
}
登录后复制
登录后复制

完整代码在这里!

注意:在生产环境中,您可能希望使用 GitOps 方法(例如,使用 Terraform)来管理此设置,或者让每个服务动态处理自己的队列。

实施:服务文件

每个服务都有自己的 main.go。我们还将包括优雅地处理失败的补偿措施。

1.病人服务

该服务验证患者详细信息并发出 PatientVerified 事件。如果发生下游故障,它还会通过通知患者进行补偿。

docker run --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0.5-management
登录后复制
登录后复制

2.调度服务

此服务侦听 PatientVerified 并发出 procedureScheduled。如果发生下游故障,它会通过取消程序来进行补偿。

package main

import (
    "log"

    "github.com/rabbitmq/amqp091-go"
)

func main() {
    conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Failed to connect to RabbitMQ: %v", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Failed to open a channel: %v", err)
    }
    defer ch.Close()

    err = ch.ExchangeDeclare("events", "direct", true, false, false, false, nil)
    if err != nil {
        log.Fatalf("Failed to declare an exchange: %v", err)
    }

    _, err = ch.QueueDeclare("PatientVerified", true, false, false, false, nil)
    if err != nil {
        log.Fatalf("Failed to declare a queue: %v", err)
    }

    err = ch.QueueBind("PatientVerified", "PatientVerified", "events", false, nil)
    if err != nil {
        log.Fatalf("Failed to bind a queue: %v", err)
    }
}
登录后复制
登录后复制

附加服务

包括库存服务和计费服务实现,遵循与上面相同的结构。每个服务都会监听前一个事件并发出下一个事件,确保针对失败的补偿逻辑到位。

完整代码在这里!


运行工作流程

启动 RabbitMQ:

// patient/main.go
package main

import (
    "fmt"
    "log"

    "github.com/rabbitmq/amqp091-go"
    "github.com/thegoodapi/saga_tutorial/choreography/common"
)

func main() {
    conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Failed to connect to RabbitMQ: %v", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Failed to open a channel: %v", err)
    }
    defer ch.Close()

    go func() {
        fmt.Println("[PatientService] Waiting for events...")
        msgs, err := common.ConsumeEvent(ch, "ProcedureScheduleCancelled")
        if err != nil {
            log.Fatalf("Failed to consume event: %v", err)
        }

        for range msgs {
            fmt.Println("[PatientService] Processing event: ProcedureScheduleCancelled")
            if err := notifyProcedureScheduleCancellation(); err != nil {
                log.Fatalf("Failed to notify patient: %v", err)
            }
        }
    }()

    common.PublishEvent(ch, "events", "PatientVerified", "Patient details verified")
    fmt.Println("[PatientService] Event published: PatientVerified")

    select {}
}

func notifyProcedureScheduleCancellation() error {
    fmt.Println("Compensation: Notify patient of procedure cancellation.")
    return nil
}
登录后复制

运行每个服务:
打开单独的终端并运行:

// scheduler/main.go
package main

import (
    "fmt"
    "log"

    "github.com/rabbitmq/amqp091-go"
    "github.com/thegoodapi/saga_tutorial/choreography/common"
)

func main() {
    conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Failed to connect to RabbitMQ: %v", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Failed to open a channel: %v", err)
    }
    defer ch.Close()

    go func() {
        fmt.Println("[SchedulerService] Waiting for events...")
        msgs, err := common.ConsumeEvent(ch, "PatientVerified")
        if err != nil {
            log.Fatalf("Failed to consume event: %v", err)
        }

        for range msgs {
            fmt.Println("[SchedulerService] Processing event: PatientVerified")
            if err := scheduleProcedure(); err != nil {
                common.PublishEvent(ch, "events", "ProcedureScheduleFailed", "Failed to schedule procedure")
                fmt.Println("[SchedulerService] Compensation triggered: ProcedureScheduleFailed")
            } else {
                common.PublishEvent(ch, "events", "ProcedureScheduled", "Procedure scheduled successfully")
                fmt.Println("[SchedulerService] Event published: ProcedureScheduled")
            }
        }
    }()

    select {}
}

func scheduleProcedure() error {
    fmt.Println("Step 2: Scheduling procedure...")
    return nil // or simulate a failure
}
登录后复制

观察输出:
每个服务按顺序处理事件,记录工作流程进度。

发生了什么?

让我们来分解一下!

首先,就本文而言,我们不会实现SuppliesReserveFailed 和ProcedureScheduleFailed,l 以避免不必要的复杂性。

我们正在实施以下活动

步骤(或交易):

  • T1:(初始化):患者已验证
  • T2:已安排程序
  • T3:补给预留
  • T4:计费成功

补偿:

  • C4:计费失败
  • C3:保留供应品已释放
  • C2:程序安排已取消
  • C1:NotifyFailureToUser(未实现)

按照这个实现图

high-level implementation flow

该图代表了记录编排的常见方法。然而,我发现它有点难以理解并且有点令人沮丧,特别是对于那些不熟悉实现或模式的人。

让我们来分解一下!

detailed implementation flow

上图更加冗长,它分解了每个步骤,使您更容易理解发生了什么。

简而言之:

  1. 患者服务已成功验证患者详细信息
  2. 患者服务发出PatientVerified
  3. 调度程序服务消耗PatientVerified
  4. 预约服务预约成功
  5. 调度程序服务发出ProcedureScheduled
  6. 库存服务消耗ProcedureScheduled
  7. 库存服务成功储备货源
  8. 库存服务发出供应品保留
  9. 计费服务消耗SuppliesReserved
  10. 计费服务无法向客户收费并开始补偿
  11. 计费服务发出 BillingFailed
  12. 库存服务消耗 BillingFailed
  13. 库存服务释放第 7 步中保留的物资
  14. 库存服务发出ReservedSuppliesReleased
  15. 调度程序服务消耗ReservedSuppliesReleased
  16. 调度程序服务删除步骤 4 中安排的约会
  17. 调度程序服务发出ProcedureScheduleCancelled
  18. 患者服务消耗ProcedureScheduleCancelled
  19. 患者服务人员通知客户错误

请注意,为了简洁起见,我们没有实现步骤 1、4 和 7 的失败;然而,方法是相同的。每一次失败都会触发前面步骤的回滚。


可观察性

可观察性对于调试和监控分布式系统至关重要。实施日志、指标和跟踪可确保开发人员能够了解系统行为并有效诊断问题。

记录

  • 使用结构化日志记录(例如 JSON 格式)来捕获事件和元数据。
  • 在日志中包含相关 ID 以跟踪跨服务的工作流程。

指标

  • 监控队列大小和事件处理时间。
  • 使用 Prometheus 等工具来收集和可视化指标。

追踪

  • 实施分布式跟踪(例如,使用 OpenTelemetry)来跟踪跨服务的事件。
  • 使用相关数据(例如事件名称、时间戳)对范围进行注释,以获得更好的见解。

我们将在本系列后面深入探讨编舞中的可观察性,敬请期待!


要点

  • 分散控制:编排可实现自主协作。
  • 事件驱动的简单性: RabbitMQ 简化了消息交换。
  • 可扩展架构:无缝添加新服务。
  • 编舞一开始可能会让人不知所措,但一如既往:练习会让你完美更好!

请继续关注下一篇文章,我们将探索编排

在此处查看本系列的完整存储库。评论里一起讨论吧!

以上是微服务中的事务:SAGA 模式与编排的一部分的详细内容。更多信息请关注PHP中文网其他相关文章!

来源:dev.to
本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
作者最新文章
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板