首頁 後端開發 Golang 實現訂單處理系統:部分進階時間工作流程

實現訂單處理系統:部分進階時間工作流程

Sep 05, 2024 pm 10:38 PM

Implementing an Order Processing System: Part  Advanced Temporal Workflows

1. 簡介和目標

歡迎回到我們實施複雜訂單處理系統的系列!在上一篇文章中,我們為我們的專案奠定了基礎,設定了一個基本的 CRUD API,與 Postgres 資料庫集成,並實現了一個簡單的時態工作流程。今天,我們將深入研究臨時工作流程的世界,以創建一個強大的、可擴展的訂單處理系統。

上一篇文章回顧

在第 1 部分中,我們:

  • 設定我們的專案結構
  • 使用 Golang 和 Gin 實作了基本的 CRUD API
  • 與 Postgres 資料庫整合
  • 創建了一個簡單的時間工作流程
  • 對我們的應用程式進行 Docker 化

這篇文章的目標

在這篇文章中,我們將顯著擴展 Temporal 的使用,探索高階概念並實現複雜的工作流程。讀完本文後,您將能夠:

  1. 設計與實作多步驟訂單處理工作流程
  2. 有效處理長時間運作的流程
  3. 實作強大的錯誤處理與重試機制
  4. 生產中安全更新的版本工作流程
  5. 為分散式事務實現傳奇模式
  6. 為臨時工作流程設定監控和可觀察性

讓我們開始吧!

2 理論背景與概念

在開始編碼之前,讓我們回顧一些對於我們的高階實作至關重要的關鍵時間概念。

時間工作流程和活動

在 Temporal 中,工作流程是一種持久功能,可以編排長期運作的業務邏輯。工作流程具有容錯能力,可以承受流程和機器故障。它們可以被認為是應用程式狀態轉換的可靠協調機制。

另一方面,活動是工作流程的建構塊。它們代表單一、定義明確的操作或任務,例如進行 API 呼叫、寫入資料庫或傳送電子郵件。活動可以獨立於呼叫它們的工作流程而重試。

工作流程執行、歷史記錄和狀態管理

執行工作流程時,Temporal 會維護其生命週期內發生的所有事件的歷史記錄。此歷史記錄是工作流程狀態的真實來源。如果工作流程工作執行緒失敗並重新啟動,它可以透過重播此歷史記錄來重建工作流程的狀態。

這種事件溯源方法使 Temporal 能夠提供強大的一致性保證,並啟用工作流程版本控制和繼續更新等功能。

處理長時間運行的進程

Temporal 旨在處理可以長時間運行的進程 - 從幾分鐘到幾天甚至幾個月。它為長時間運行的活動提供了心跳等機制,並為產生大量歷史記錄的工作流程提供了持續更新的機制。

工作流程版本控制

隨著系統的發展,您可能需要更新工作流程定義。 Temporal 提供版本控制功能,可讓您對工作流程進行不間斷更改,而不會影響正在執行的執行個體。

分散式事務的 Saga 模式

Saga 模式是一種在分散式交易場景中管理跨微服務資料一致性的方法。當您需要在多個服務之間保持一致性而不使用分散式 ACID 事務時,它特別有用。 Temporal 為實現 sagas 提供了一個優秀的框架。

現在我們已經介紹了這些概念,讓我們開始實施我們的高級訂單處理工作流程。

3. 實施複雜的訂單處理工作流程

讓我們設計一個多步驟訂單處理工作流程,包括訂單驗證、付款處理、庫存管理和運輸安排。我們將把每個步驟作為由工作流程協調的單獨活動來實現。

首先,讓我們先定義我們的活動:

// internal/workflow/activities.go

package workflow

import (
    "context"
    "errors"

    "go.temporal.io/sdk/activity"
    "github.com/yourusername/order-processing-system/internal/db"
)

type OrderActivities struct {
    queries *db.Queries
}

func NewOrderActivities(queries *db.Queries) *OrderActivities {
    return &OrderActivities{queries: queries}
}

func (a *OrderActivities) ValidateOrder(ctx context.Context, order db.Order) error {
    // Implement order validation logic
    if order.TotalAmount <= 0 {
        return errors.New("invalid order amount")
    }
    // Add more validation as needed
    return nil
}

func (a *OrderActivities) ProcessPayment(ctx context.Context, order db.Order) error {
    // Implement payment processing logic
    // This could involve calling a payment gateway API
    activity.GetLogger(ctx).Info("Processing payment", "orderId", order.ID, "amount", order.TotalAmount)
    // Simulate payment processing
    // In a real scenario, you'd integrate with a payment gateway here
    return nil
}

func (a *OrderActivities) UpdateInventory(ctx context.Context, order db.Order) error {
    // Implement inventory update logic
    // This could involve updating stock levels in the database
    activity.GetLogger(ctx).Info("Updating inventory", "orderId", order.ID)
    // Simulate inventory update
    // In a real scenario, you'd update your inventory management system here
    return nil
}

func (a *OrderActivities) ArrangeShipping(ctx context.Context, order db.Order) error {
    // Implement shipping arrangement logic
    // This could involve calling a shipping provider's API
    activity.GetLogger(ctx).Info("Arranging shipping", "orderId", order.ID)
    // Simulate shipping arrangement
    // In a real scenario, you'd integrate with a shipping provider here
    return nil
}

登入後複製

現在,讓我們實現複雜的訂單處理工作流程:

// internal/workflow/order_workflow.go

package workflow

import (
    "time"

    "go.temporal.io/sdk/workflow"
    "github.com/yourusername/order-processing-system/internal/db"
)

func OrderWorkflow(ctx workflow.Context, order db.Order) error {
    logger := workflow.GetLogger(ctx)
    logger.Info("OrderWorkflow started", "OrderID", order.ID)

    // Activity options
    activityOptions := workflow.ActivityOptions{
        StartToCloseTimeout: time.Minute,
        RetryPolicy: &temporal.RetryPolicy{
            InitialInterval: time.Second,
            BackoffCoefficient: 2.0,
            MaximumInterval: time.Minute,
            MaximumAttempts: 5,
        },
    }
    ctx = workflow.WithActivityOptions(ctx, activityOptions)

    // Step 1: Validate Order
    err := workflow.ExecuteActivity(ctx, a.ValidateOrder, order).Get(ctx, nil)
    if err != nil {
        logger.Error("Order validation failed", "OrderID", order.ID, "Error", err)
        return err
    }

    // Step 2: Process Payment
    err = workflow.ExecuteActivity(ctx, a.ProcessPayment, order).Get(ctx, nil)
    if err != nil {
        logger.Error("Payment processing failed", "OrderID", order.ID, "Error", err)
        return err
    }

    // Step 3: Update Inventory
    err = workflow.ExecuteActivity(ctx, a.UpdateInventory, order).Get(ctx, nil)
    if err != nil {
        logger.Error("Inventory update failed", "OrderID", order.ID, "Error", err)
        // In case of inventory update failure, we might need to refund the payment
        // This is where the saga pattern becomes useful, which we'll cover later
        return err
    }

    // Step 4: Arrange Shipping
    err = workflow.ExecuteActivity(ctx, a.ArrangeShipping, order).Get(ctx, nil)
    if err != nil {
        logger.Error("Shipping arrangement failed", "OrderID", order.ID, "Error", err)
        // If shipping fails, we might need to revert inventory and refund payment
        return err
    }

    logger.Info("OrderWorkflow completed successfully", "OrderID", order.ID)
    return nil
}

登入後複製

此工作流程協調多個活動,每個活動代表我們訂單處理中的一個步驟。請注意我們如何使用工作流程.ExecuteActivity 來運行每個活動,並根據需要傳遞訂單資料。

我們也設定了有重試政策的活動選項。這意味著如果活動失敗(例如,由於臨時網路問題),Temporal 將根據我們指定的策略自動重試。

在下一節中,我們將探討如何在此工作流程結構中處理長時間運作的流程。

4. Handling Long-Running Processes with Temporal

In real-world scenarios, some of our activities might take a long time to complete. For example, payment processing might need to wait for bank confirmation, or shipping arrangement might depend on external logistics systems. Temporal provides several mechanisms to handle such long-running processes effectively.

Heartbeats for Long-Running Activities

For activities that might run for extended periods, it’s crucial to implement heartbeats. Heartbeats allow an activity to report its progress and let Temporal know that it’s still alive and working. If an activity fails to heartbeat within the expected interval, Temporal can mark it as failed and potentially retry it.

Let’s modify our ArrangeShipping activity to include heartbeats:

func (a *OrderActivities) ArrangeShipping(ctx context.Context, order db.Order) error {
    logger := activity.GetLogger(ctx)
    logger.Info("Arranging shipping", "orderId", order.ID)

    // Simulate a long-running process
    for i := 0; i < 10; i++ {
        // Simulate work
        time.Sleep(time.Second)

        // Record heartbeat
        activity.RecordHeartbeat(ctx, i)

        // Check if we need to cancel
        if activity.GetInfo(ctx).Attempt > 1 {
            logger.Info("Cancelling shipping arrangement due to retry", "orderId", order.ID)
            return nil
        }
    }

    logger.Info("Shipping arranged", "orderId", order.ID)
    return nil
}

登入後複製

In this example, we’re simulating a long-running process with a loop. We record a heartbeat in each iteration, allowing Temporal to track the activity’s progress.

Using Continue-As-New for Very Long-Running Workflows

For workflows that run for very long periods or accumulate a large history, Temporal provides the “continue-as-new” feature. This allows you to complete the current workflow execution and immediately start a new execution with the same workflow ID, carrying over any necessary state.

Here’s an example of how we might use continue-as-new in a long-running order tracking workflow:

func LongRunningOrderTrackingWorkflow(ctx workflow.Context, orderID string) error {
    logger := workflow.GetLogger(ctx)

    // Set up a timer for how long we want this workflow execution to run
    timerFired := workflow.NewTimer(ctx, 24*time.Hour)

    // Set up a selector to wait for either the timer to fire or the order to be delivered
    selector := workflow.NewSelector(ctx)

    var orderDelivered bool
    selector.AddFuture(timerFired, func(f workflow.Future) {
        // Timer fired, we'll continue-as-new
        logger.Info("24 hours passed, continuing as new", "orderID", orderID)
        workflow.NewContinueAsNewError(ctx, LongRunningOrderTrackingWorkflow, orderID)
    })

    selector.AddReceive(workflow.GetSignalChannel(ctx, "orderDelivered"), func(c workflow.ReceiveChannel, more bool) {
        c.Receive(ctx, &orderDelivered)
        logger.Info("Order delivered signal received", "orderID", orderID)
    })

    selector.Select(ctx)

    if orderDelivered {
        logger.Info("Order tracking completed, order delivered", "orderID", orderID)
        return nil
    }

    // If we reach here, it means we're continuing as new
    return workflow.NewContinueAsNewError(ctx, LongRunningOrderTrackingWorkflow, orderID)
}

登入後複製

In this example, we set up a workflow that tracks an order for delivery. It runs for 24 hours before using continue-as-new to start a fresh execution. This prevents the workflow history from growing too large over extended periods.

By leveraging these techniques, we can handle long-running processes effectively in our order processing system, ensuring reliability and scalability even for operations that take extended periods to complete.

In the next section, we’ll dive into implementing robust retry logic and error handling in our workflows and activities.

5. Implementing Retry Logic and Error Handling

Robust error handling and retry mechanisms are crucial for building resilient systems, especially in distributed environments. Temporal provides powerful built-in retry mechanisms, but it’s important to understand how to use them effectively and when to implement custom retry logic.

Configuring Retry Policies for Activities

Temporal allows you to configure retry policies at both the workflow and activity level. Let’s update our workflow to include a more sophisticated retry policy:

func OrderWorkflow(ctx workflow.Context, order db.Order) error {
    logger := workflow.GetLogger(ctx)
    logger.Info("OrderWorkflow started", "OrderID", order.ID)

    // Define a retry policy
    retryPolicy := &temporal.RetryPolicy{
        InitialInterval: time.Second,
        BackoffCoefficient: 2.0,
        MaximumInterval: time.Minute,
        MaximumAttempts: 5,
        NonRetryableErrorTypes: []string{"InvalidOrderError"},
    }

    // Activity options with retry policy
    activityOptions := workflow.ActivityOptions{
        StartToCloseTimeout: time.Minute,
        RetryPolicy: retryPolicy,
    }
    ctx = workflow.WithActivityOptions(ctx, activityOptions)

    // Execute activities with retry policy
    err := workflow.ExecuteActivity(ctx, a.ValidateOrder, order).Get(ctx, nil)
    if err != nil {
        return handleOrderError(ctx, "ValidateOrder", err, order)
    }

    // ... (other activities)

    return nil
}

登入後複製

In this example, we’ve defined a retry policy that starts with a 1-second interval, doubles the interval with each retry (up to a maximum of 1 minute), and allows up to 5 attempts. We’ve also specified that errors of type “InvalidOrderError” should not be retried.

Implementing Custom Retry Logic

While Temporal’s built-in retry mechanisms are powerful, sometimes you need custom retry logic. Here’s an example of implementing custom retry logic for a payment processing activity:

func (a *OrderActivities) ProcessPaymentWithCustomRetry(ctx context.Context, order db.Order) error {
    logger := activity.GetLogger(ctx)
    var err error
    for attempt := 1; attempt <= 3; attempt++ {
        err = a.processPayment(ctx, order)
        if err == nil {
            return nil
        }

        if _, ok := err.(*PaymentDeclinedError); ok {
            // Payment was declined, no point in retrying
            return err
        }

        logger.Info("Payment processing failed, retrying", "attempt", attempt, "error", err)
        time.Sleep(time.Duration(attempt) * time.Second)
    }
    return err
}

func (a *OrderActivities) processPayment(ctx context.Context, order db.Order) error {
    // Actual payment processing logic here
    // ...
}

登入後複製

In this example, we implement a custom retry mechanism that attempts the payment processing up to 3 times, with an increasing delay between attempts. It also handles a specific error type (PaymentDeclinedError) differently, not retrying in that case.

Handling and Propagating Errors

Proper error handling is crucial for maintaining the integrity of our workflow. Let’s implement a helper function to handle errors in our workflow:

func handleOrderError(ctx workflow.Context, activityName string, err error, order db.Order) error {
    logger := workflow.GetLogger(ctx)
    logger.Error("Activity failed", "activity", activityName, "orderID", order.ID, "error", err)

    // Depending on the activity and error type, we might want to compensate
    switch activityName {
    case "ProcessPayment":
        // If payment processing failed, we might need to cancel the order
        _ = workflow.ExecuteActivity(ctx, CancelOrder, order).Get(ctx, nil)
    case "UpdateInventory":
        // If inventory update failed after payment, we might need to refund
        _ = workflow.ExecuteActivity(ctx, RefundPayment, order).Get(ctx, nil)
    }

    // Create a customer-facing error message
    return workflow.NewCustomError("OrderProcessingFailed", "Failed to process order due to: "+err.Error())
}

登入後複製

This helper function logs the error, performs any necessary compensating actions, and returns a custom error that can be safely returned to the customer.

6. Versioning Workflows for Safe Updates

As your system evolves, you’ll need to update your workflow definitions. Temporal provides versioning capabilities that allow you to make changes to workflows without affecting running instances.

Implementing Versioned Workflows

Here’s an example of how to implement versioning in our order processing workflow:

func OrderWorkflow(ctx workflow.Context, order db.Order) error {
    logger := workflow.GetLogger(ctx)
    logger.Info("OrderWorkflow started", "OrderID", order.ID)

    // Use GetVersion to handle workflow versioning
    v := workflow.GetVersion(ctx, "OrderWorkflow.PaymentProcessing", workflow.DefaultVersion, 1)

    if v == workflow.DefaultVersion {
        // Old version: process payment before updating inventory
        err := workflow.ExecuteActivity(ctx, a.ProcessPayment, order).Get(ctx, nil)
        if err != nil {
            return handleOrderError(ctx, "ProcessPayment", err, order)
        }

        err = workflow.ExecuteActivity(ctx, a.UpdateInventory, order).Get(ctx, nil)
        if err != nil {
            return handleOrderError(ctx, "UpdateInventory", err, order)
        }
    } else {
        // New version: update inventory before processing payment
        err := workflow.ExecuteActivity(ctx, a.UpdateInventory, order).Get(ctx, nil)
        if err != nil {
            return handleOrderError(ctx, "UpdateInventory", err, order)
        }

        err = workflow.ExecuteActivity(ctx, a.ProcessPayment, order).Get(ctx, nil)
        if err != nil {
            return handleOrderError(ctx, "ProcessPayment", err, order)
        }
    }

    // ... rest of the workflow

    return nil
}

登入後複製

In this example, we’ve used workflow.GetVersion to introduce a change in the order of operations. The new version updates inventory before processing payment, while the old version does the opposite. This allows us to gradually roll out the change without affecting running workflow instances.

Strategies for Updating Workflows in Production

When updating workflows in a production environment, consider the following strategies:

  1. Incremental Changes : Make small, incremental changes rather than large overhauls. This makes it easier to manage versions and roll back if needed.

  2. Compatibility Periods : Maintain compatibility with older versions for a certain period to allow running workflows to complete.

  3. Feature Flags : Use feature flags in conjunction with workflow versions to control the rollout of new features.

  4. Monitoring and Alerting : Set up monitoring and alerting for workflow versions to track the progress of updates and quickly identify any issues.

  5. Rollback Plan : Always have a plan to roll back to the previous version if issues are detected with the new version.

By following these strategies and leveraging Temporal’s versioning capabilities, you can safely evolve your workflows over time without disrupting ongoing operations.

In the next section, we’ll explore how to implement the Saga pattern for managing distributed transactions in our order processing system.

7. Implementing Saga Patterns for Distributed Transactions

The Saga pattern is a way to manage data consistency across microservices in distributed transaction scenarios. It’s particularly useful in our order processing system where we need to coordinate actions across multiple services (e.g., inventory, payment, shipping) and provide a mechanism for compensating actions if any step fails.

Designing a Saga for Our Order Processing System

Let’s design a saga for our order processing system that includes the following steps:

  1. Reserve Inventory
  2. Process Payment
  3. Update Inventory
  4. Arrange Shipping

If any of these steps fail, we need to execute compensating actions for the steps that have already completed.

Here’s how we can implement this saga using Temporal:

func OrderSaga(ctx workflow.Context, order db.Order) error {
    logger := workflow.GetLogger(ctx)
    logger.Info("OrderSaga started", "OrderID", order.ID)

    // Saga compensations
    var compensations []func(context.Context) error

    // Step 1: Reserve Inventory
    err := workflow.ExecuteActivity(ctx, a.ReserveInventory, order).Get(ctx, nil)
    if err != nil {
        return fmt.Errorf("failed to reserve inventory: %w", err)
    }
    compensations = append(compensations, func(ctx context.Context) error {
        return a.ReleaseInventoryReservation(ctx, order)
    })

    // Step 2: Process Payment
    err = workflow.ExecuteActivity(ctx, a.ProcessPayment, order).Get(ctx, nil)
    if err != nil {
        return compensate(ctx, compensations, fmt.Errorf("failed to process payment: %w", err))
    }
    compensations = append(compensations, func(ctx context.Context) error {
        return a.RefundPayment(ctx, order)
    })

    // Step 3: Update Inventory
    err = workflow.ExecuteActivity(ctx, a.UpdateInventory, order).Get(ctx, nil)
    if err != nil {
        return compensate(ctx, compensations, fmt.Errorf("failed to update inventory: %w", err))
    }
    // No compensation needed for this step, as we've already updated the inventory

    // Step 4: Arrange Shipping
    err = workflow.ExecuteActivity(ctx, a.ArrangeShipping, order).Get(ctx, nil)
    if err != nil {
        return compensate(ctx, compensations, fmt.Errorf("failed to arrange shipping: %w", err))
    }

    logger.Info("OrderSaga completed successfully", "OrderID", order.ID)
    return nil
}

func compensate(ctx workflow.Context, compensations []func(context.Context) error, err error) error {
    logger := workflow.GetLogger(ctx)
    logger.Error("Saga failed, executing compensations", "error", err)

    for i := len(compensations) - 1; i >= 0; i-- {
        compensationErr := workflow.ExecuteActivity(ctx, compensations[i]).Get(ctx, nil)
        if compensationErr != nil {
            logger.Error("Compensation failed", "error", compensationErr)
            // In a real-world scenario, you might want to implement more sophisticated
            // error handling for failed compensations, such as retrying or alerting
        }
    }

    return err
}

登入後複製

In this implementation, we execute each step of the order process as an activity. After each successful step, we add a compensating action to a slice. If any step fails, we call the compensate function, which executes all the compensating actions in reverse order.

This approach ensures that we maintain data consistency across our distributed system, even in the face of failures.

8. Monitoring and Observability for Temporal Workflows

Effective monitoring and observability are crucial for operating Temporal workflows in production. Let’s explore how to implement comprehensive monitoring for our order processing system.

Implementing Custom Metrics

Temporal provides built-in metrics, but we can also implement custom metrics for our specific use cases. Here’s an example of how to add custom metrics to our workflow:

func OrderWorkflow(ctx workflow.Context, order db.Order) error {
    logger := workflow.GetLogger(ctx)
    logger.Info("OrderWorkflow started", "OrderID", order.ID)

    // Define metric
    orderProcessingTime := workflow.NewTimer(ctx, 0)
    defer func() {
        duration := orderProcessingTime.ElapsedTime()
        workflow.GetMetricsHandler(ctx).Timer("order_processing_time").Record(duration)
    }()

    // ... rest of the workflow implementation

    return nil
}

登入後複製

In this example, we’re recording the total time taken to process an order.

Integrating with Prometheus

To integrate with Prometheus, we need to expose our metrics. Here’s how we can set up a Prometheus endpoint in our main application:

package main

import (
    "net/http"

    "github.com/prometheus/client_golang/prometheus/promhttp"
    "go.temporal.io/sdk/client"
    "go.temporal.io/sdk/worker"
)

func main() {
    // ... Temporal client setup

    // Create a worker
    w := worker.New(c, "order-processing-task-queue", worker.Options{})

    // Register workflows and activities
    w.RegisterWorkflow(OrderWorkflow)
    w.RegisterActivity(a.ValidateOrder)
    // ... register other activities

    // Start the worker
    go func() {
        err := w.Run(worker.InterruptCh())
        if err != nil {
            logger.Fatal("Unable to start worker", err)
        }
    }()

    // Expose Prometheus metrics
    http.Handle("/metrics", promhttp.Handler())
    go func() {
        err := http.ListenAndServe(":2112", nil)
        if err != nil {
            logger.Fatal("Unable to start metrics server", err)
        }
    }()

    // ... rest of your application
}

登入後複製

This sets up a /metrics endpoint that Prometheus can scrape to collect our custom metrics along with the built-in Temporal metrics.

Implementing Structured Logging

Structured logging can greatly improve the observability of our system. Let’s update our workflow to use structured logging:

func OrderWorkflow(ctx workflow.Context, order db.Order) error {
    logger := workflow.GetLogger(ctx)
    logger.Info("OrderWorkflow started",
        "OrderID", order.ID,
        "CustomerID", order.CustomerID,
        "TotalAmount", order.TotalAmount,
    )

    // ... workflow implementation

    logger.Info("OrderWorkflow completed",
        "OrderID", order.ID,
        "Duration", workflow.Now(ctx).Sub(workflow.GetInfo(ctx).WorkflowStartTime),
    )

    return nil
}

登入後複製

This approach makes it easier to search and analyze logs, especially when aggregating logs from multiple services.

Setting Up Distributed Tracing

Distributed tracing can provide valuable insights into the flow of requests through our system. While Temporal doesn’t natively support distributed tracing, we can implement it in our activities:

import (
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/trace"
)

func (a *OrderActivities) ProcessPayment(ctx context.Context, order db.Order) error {
    _, span := otel.Tracer("order-processing").Start(ctx, "ProcessPayment")
    defer span.End()

    span.SetAttributes(
        attribute.Int64("order.id", order.ID),
        attribute.Float64("order.amount", order.TotalAmount),
    )

    // ... payment processing logic

    return nil
}

登入後複製

By implementing distributed tracing, we can track the entire lifecycle of an order across multiple services and activities.

9. Testing and Validation

Thorough testing is crucial for ensuring the reliability of our Temporal workflows. Let’s explore some strategies for testing our order processing system.

Unit Testing Workflows

Temporal provides a testing framework that allows us to unit test workflows. Here’s an example of how to test our OrderWorkflow:

func TestOrderWorkflow(t *testing.T) {
    testSuite := &testsuite.WorkflowTestSuite{}
    env := testSuite.NewTestWorkflowEnvironment()

    // Mock activities
    env.OnActivity(a.ValidateOrder, mock.Anything, mock.Anything).Return(nil)
    env.OnActivity(a.ProcessPayment, mock.Anything, mock.Anything).Return(nil)
    env.OnActivity(a.UpdateInventory, mock.Anything, mock.Anything).Return(nil)
    env.OnActivity(a.ArrangeShipping, mock.Anything, mock.Anything).Return(nil)

    // Execute workflow
    env.ExecuteWorkflow(OrderWorkflow, db.Order{ID: 1, CustomerID: 100, TotalAmount: 99.99})

    require.True(t, env.IsWorkflowCompleted())
    require.NoError(t, env.GetWorkflowError())
}

登入後複製

This test sets up a test environment, mocks the activities, and verifies that the workflow completes successfully.

Testing Saga Compensations

It’s important to test that our saga compensations work correctly. Here’s an example test:

func TestOrderSagaCompensation(t *testing.T) {
    testSuite := &testsuite.WorkflowTestSuite{}
    env := testSuite.NewTestWorkflowEnvironment()

    // Mock activities
    env.OnActivity(a.ReserveInventory, mock.Anything, mock.Anything).Return(nil)
    env.OnActivity(a.ProcessPayment, mock.Anything, mock.Anything).Return(errors.New("payment failed"))
    env.OnActivity(a.ReleaseInventoryReservation, mock.Anything, mock.Anything).Return(nil)

    // Execute workflow
    env.ExecuteWorkflow(OrderSaga, db.Order{ID: 1, CustomerID: 100, TotalAmount: 99.99})

    require.True(t, env.IsWorkflowCompleted())
    require.Error(t, env.GetWorkflowError())

    // Verify that compensation was called
    env.AssertExpectations(t)
}

登入後複製

This test verifies that when the payment processing fails, the inventory reservation is released as part of the compensation.

10. Défis et considérations

Lorsque nous mettons en œuvre et exploitons notre système avancé de traitement des commandes, il y a plusieurs défis et considérations à garder à l'esprit :

  1. Complexité des flux de travail : À mesure que les flux de travail deviennent plus complexes, ils peuvent devenir difficiles à comprendre et à maintenir. Une refactorisation régulière et une bonne documentation sont cruciales.

  2. Test des workflows de longue durée : tester des workflows qui peuvent s'exécuter pendant des jours ou des semaines peut être difficile. Pensez à mettre en place des mécanismes pour accélérer le temps de vos tests.

  3. Gestion des dépendances externes : Les services externes peuvent échouer ou devenir indisponibles. Mettez en œuvre des disjoncteurs et des mécanismes de secours pour gérer ces scénarios.

  4. Surveillance et alertes : configurez une surveillance et des alertes complètes pour identifier et répondre rapidement aux problèmes dans vos flux de travail.

  5. Cohérence des données : assurez-vous que vos implémentations de saga maintiennent la cohérence des données entre les services, même en cas de pannes.

  6. Réglage des performances : à mesure que votre système évolue, vous devrez peut-être ajuster les paramètres de performances de Temporal, tels que le nombre de travailleurs de flux de travail et d'activité.

  7. Gestion des versions du workflow : gérez soigneusement les versions du workflow pour garantir des mises à jour fluides sans interrompre les instances en cours d'exécution.

11. Prochaines étapes et aperçu de la partie 3

Dans cet article, nous avons approfondi les concepts avancés de flux de travail temporel, en mettant en œuvre une logique de traitement de commande complexe, des modèles de saga et une gestion robuste des erreurs. Nous avons également abordé les stratégies de surveillance, d'observabilité et de test pour nos flux de travail.

Dans la prochaine partie de notre série, nous nous concentrerons sur les opérations avancées de base de données avec sqlc. Nous couvrirons :

  1. Mise en œuvre de requêtes et de transactions de bases de données complexes
  2. Optimisation des performances de la base de données
  3. Mise en œuvre des opérations par lots
  4. Gestion des migrations de bases de données dans un environnement de production
  5. Mise en œuvre du partitionnement de base de données pour l'évolutivité
  6. Assurer la cohérence des données dans un système distribué

Restez à l'écoute pendant que nous continuons à développer notre système sophistiqué de traitement des commandes !


Besoin d'aide ?

Êtes-vous confronté à des problèmes difficiles ou avez-vous besoin d'un point de vue externe sur une nouvelle idée ou un nouveau projet ? Je peux aider ! Que vous cherchiez à établir une preuve de concept technologique avant de réaliser un investissement plus important ou que vous ayez besoin de conseils sur des problèmes difficiles, je suis là pour vous aider.

Services offerts :

  • Résolution de problèmes : S'attaquer à des problèmes complexes avec des solutions innovantes.
  • Consultation : Apporter des conseils d'experts et des points de vue neufs sur vos projets.
  • Preuve de concept : Développer des modèles préliminaires pour tester et valider vos idées.

Si vous souhaitez travailler avec moi, veuillez nous contacter par e-mail à hungaikevin@gmail.com.

Transformons vos défis en opportunités !

以上是實現訂單處理系統:部分進階時間工作流程的詳細內容。更多資訊請關注PHP中文網其他相關文章!

本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱工具

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

強大的PHP整合開發環境

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

熱門話題

Java教學
1657
14
CakePHP 教程
1415
52
Laravel 教程
1309
25
PHP教程
1257
29
C# 教程
1230
24
Golang的目的:建立高效且可擴展的系統 Golang的目的:建立高效且可擴展的系統 Apr 09, 2025 pm 05:17 PM

Go語言在構建高效且可擴展的系統中表現出色,其優勢包括:1.高性能:編譯成機器碼,運行速度快;2.並發編程:通過goroutines和channels簡化多任務處理;3.簡潔性:語法簡潔,降低學習和維護成本;4.跨平台:支持跨平台編譯,方便部署。

Golang和C:並發與原始速度 Golang和C:並發與原始速度 Apr 21, 2025 am 12:16 AM

Golang在並發性上優於C ,而C 在原始速度上優於Golang。 1)Golang通過goroutine和channel實現高效並發,適合處理大量並發任務。 2)C 通過編譯器優化和標準庫,提供接近硬件的高性能,適合需要極致優化的應用。

Golang vs. Python:主要差異和相似之處 Golang vs. Python:主要差異和相似之處 Apr 17, 2025 am 12:15 AM

Golang和Python各有优势:Golang适合高性能和并发编程,Python适用于数据科学和Web开发。Golang以其并发模型和高效性能著称,Python则以简洁语法和丰富库生态系统著称。

Golang vs. Python:性能和可伸縮性 Golang vs. Python:性能和可伸縮性 Apr 19, 2025 am 12:18 AM

Golang在性能和可擴展性方面優於Python。 1)Golang的編譯型特性和高效並發模型使其在高並發場景下表現出色。 2)Python作為解釋型語言,執行速度較慢,但通過工具如Cython可優化性能。

表演競賽:Golang vs.C 表演競賽:Golang vs.C Apr 16, 2025 am 12:07 AM

Golang和C 在性能競賽中的表現各有優勢:1)Golang適合高並發和快速開發,2)C 提供更高性能和細粒度控制。選擇應基於項目需求和團隊技術棧。

C和Golang:表演至關重要時 C和Golang:表演至關重要時 Apr 13, 2025 am 12:11 AM

C 更適合需要直接控制硬件資源和高性能優化的場景,而Golang更適合需要快速開發和高並發處理的場景。 1.C 的優勢在於其接近硬件的特性和高度的優化能力,適合遊戲開發等高性能需求。 2.Golang的優勢在於其簡潔的語法和天然的並發支持,適合高並發服務開發。

Golang的影響:速度,效率和簡單性 Golang的影響:速度,效率和簡單性 Apr 14, 2025 am 12:11 AM

goimpactsdevelopmentpositationality throughspeed,效率和模擬性。 1)速度:gocompilesquicklyandrunseff,IdealforlargeProjects.2)效率:效率:ITScomprehenSevestAndardArdardArdArdArdArdArdArdArdArdArdArdArdArdArdArdArdArdArdArdArdArdArdArdArdArdArdArdArdArdArdArdArdArdArdArdArdArdEcceSteral Depentencies,增強的Depleflovelmentimency.3)簡單性。

Golang和C:性能的權衡 Golang和C:性能的權衡 Apr 17, 2025 am 12:18 AM

Golang和C 在性能上的差異主要體現在內存管理、編譯優化和運行時效率等方面。 1)Golang的垃圾回收機制方便但可能影響性能,2)C 的手動內存管理和編譯器優化在遞歸計算中表現更為高效。

See all articles