Maison > développement back-end > Golang > Implémentation d'un système de traitement des commandes : partie des flux de travail temporels avancés

Implémentation d'un système de traitement des commandes : partie des flux de travail temporels avancés

PHPz
Libérer: 2024-09-05 22:38:32
original
1098 Les gens l'ont consulté

Implementing an Order Processing System: Part  Advanced Temporal Workflows

1. Introduction et objectifs

Bienvenue dans notre série sur la mise en œuvre d'un système sophistiqué de traitement des commandes ! Dans notre article précédent, nous avons jeté les bases de notre projet, en configurant une API CRUD de base, en l'intégrant à une base de données Postgres et en implémentant un flux de travail temporel simple. Aujourd'hui, nous approfondissons le monde des flux de travail temporels pour créer un système de traitement des commandes robuste et évolutif.

Récapitulatif du post précédent

Dans la première partie, nous :

  • Mettre en place notre structure de projet
  • Implémentation d'une API CRUD de base utilisant Golang et Gin
  • Intégré à une base de données Postgres
  • Création d'un flux de travail temporel simple
  • Dockerisé notre application

Objectifs pour cet article

Dans cet article, nous élargirons considérablement notre utilisation de Temporal, en explorant des concepts avancés et en mettant en œuvre des flux de travail complexes. À la fin de cet article, vous pourrez :

  1. Concevoir et mettre en œuvre des workflows de traitement des commandes en plusieurs étapes
  2. Gérer efficacement les processus de longue durée
  3. Mettre en œuvre des mécanismes robustes de gestion des erreurs et de nouvelles tentatives
  4. Workflows de version pour des mises à jour sécurisées en production
  5. Implémenter des modèles de saga pour les transactions distribuées
  6. Configurer la surveillance et l'observabilité des flux de travail temporels

Plongeons-nous !

2. Contexte théorique et concepts

Avant de commencer à coder, passons en revue quelques concepts temporels clés qui seront cruciaux pour notre implémentation avancée.

Flux de travail et activités temporels

Dans Temporal, un Workflow est une fonction durable qui orchestre une logique métier à long terme. Les flux de travail sont tolérants aux pannes et peuvent survivre aux pannes de processus et de machines. Ils peuvent être considérés comme des mécanismes de coordination fiables pour les transitions d’état de votre application.

Les activités, quant à elles, sont les éléments constitutifs d'un flux de travail. Ils représentent une action ou une tâche unique et bien définie, comme effectuer un appel API, écrire dans une base de données ou envoyer un e-mail. Les activités peuvent être réessayées indépendamment du workflow qui les appelle.

Exécution du workflow, historique et gestion des états

Lorsqu'un workflow est exécuté, Temporal conserve un historique de tous les événements qui se produisent au cours de sa durée de vie. Cet historique est la source de vérité sur l’état du workflow. Si un travailleur de workflow échoue et redémarre, il peut reconstruire l’état du workflow en rejouant cet historique.

Cette approche de sourcing d'événements permet à Temporal de fournir de solides garanties de cohérence et d'activer des fonctionnalités telles que la gestion des versions du flux de travail et la continuité comme neuf.

Gestion des processus de longue durée

Temporal est conçu pour gérer des processus qui peuvent s'exécuter pendant des périodes prolongées - de quelques minutes à plusieurs jours, voire plusieurs mois. Il fournit des mécanismes tels que des battements de cœur pour les activités de longue durée et continue comme neuf pour les flux de travail qui génèrent des historiques volumineux.

Gestion des versions du flux de travail

À mesure que votre système évolue, vous devrez peut-être mettre à jour les définitions de flux de travail. Temporal fournit des fonctionnalités de gestion de versions qui vous permettent d'apporter des modifications ininterrompues aux flux de travail sans affecter les instances en cours d'exécution.

Modèle de saga pour les transactions distribuées

Le modèle Saga est un moyen de gérer la cohérence des données entre les microservices dans des scénarios de transactions distribuées. Ceci est particulièrement utile lorsque vous devez maintenir la cohérence entre plusieurs services sans utiliser de transactions ACID distribuées. Temporal fournit un excellent cadre pour mettre en œuvre des sagas.

Maintenant que nous avons abordé ces concepts, commençons à mettre en œuvre notre flux de travail avancé de traitement des commandes.

3. Mise en œuvre de flux de travail complexes de traitement des commandes

Concevons un workflow de traitement des commandes en plusieurs étapes qui comprend la validation des commandes, le traitement des paiements, la gestion des stocks et les modalités d'expédition. Nous mettrons en œuvre chacune de ces étapes sous forme d’activités distinctes coordonnées par un flux de travail.

Définissons d’abord nos activités :

// internal/workflow/activities.go

package workflow

import (
    "context"
    "errors"

    "go.temporal.io/sdk/activity"
    "github.com/yourusername/order-processing-system/internal/db"
)

type OrderActivities struct {
    queries *db.Queries
}

func NewOrderActivities(queries *db.Queries) *OrderActivities {
    return &OrderActivities{queries: queries}
}

func (a *OrderActivities) ValidateOrder(ctx context.Context, order db.Order) error {
    // Implement order validation logic
    if order.TotalAmount <= 0 {
        return errors.New("invalid order amount")
    }
    // Add more validation as needed
    return nil
}

func (a *OrderActivities) ProcessPayment(ctx context.Context, order db.Order) error {
    // Implement payment processing logic
    // This could involve calling a payment gateway API
    activity.GetLogger(ctx).Info("Processing payment", "orderId", order.ID, "amount", order.TotalAmount)
    // Simulate payment processing
    // In a real scenario, you'd integrate with a payment gateway here
    return nil
}

func (a *OrderActivities) UpdateInventory(ctx context.Context, order db.Order) error {
    // Implement inventory update logic
    // This could involve updating stock levels in the database
    activity.GetLogger(ctx).Info("Updating inventory", "orderId", order.ID)
    // Simulate inventory update
    // In a real scenario, you'd update your inventory management system here
    return nil
}

func (a *OrderActivities) ArrangeShipping(ctx context.Context, order db.Order) error {
    // Implement shipping arrangement logic
    // This could involve calling a shipping provider's API
    activity.GetLogger(ctx).Info("Arranging shipping", "orderId", order.ID)
    // Simulate shipping arrangement
    // In a real scenario, you'd integrate with a shipping provider here
    return nil
}

Copier après la connexion

Maintenant, mettons en œuvre notre workflow complexe de traitement des commandes :

// internal/workflow/order_workflow.go

package workflow

import (
    "time"

    "go.temporal.io/sdk/workflow"
    "github.com/yourusername/order-processing-system/internal/db"
)

func OrderWorkflow(ctx workflow.Context, order db.Order) error {
    logger := workflow.GetLogger(ctx)
    logger.Info("OrderWorkflow started", "OrderID", order.ID)

    // Activity options
    activityOptions := workflow.ActivityOptions{
        StartToCloseTimeout: time.Minute,
        RetryPolicy: &temporal.RetryPolicy{
            InitialInterval: time.Second,
            BackoffCoefficient: 2.0,
            MaximumInterval: time.Minute,
            MaximumAttempts: 5,
        },
    }
    ctx = workflow.WithActivityOptions(ctx, activityOptions)

    // Step 1: Validate Order
    err := workflow.ExecuteActivity(ctx, a.ValidateOrder, order).Get(ctx, nil)
    if err != nil {
        logger.Error("Order validation failed", "OrderID", order.ID, "Error", err)
        return err
    }

    // Step 2: Process Payment
    err = workflow.ExecuteActivity(ctx, a.ProcessPayment, order).Get(ctx, nil)
    if err != nil {
        logger.Error("Payment processing failed", "OrderID", order.ID, "Error", err)
        return err
    }

    // Step 3: Update Inventory
    err = workflow.ExecuteActivity(ctx, a.UpdateInventory, order).Get(ctx, nil)
    if err != nil {
        logger.Error("Inventory update failed", "OrderID", order.ID, "Error", err)
        // In case of inventory update failure, we might need to refund the payment
        // This is where the saga pattern becomes useful, which we'll cover later
        return err
    }

    // Step 4: Arrange Shipping
    err = workflow.ExecuteActivity(ctx, a.ArrangeShipping, order).Get(ctx, nil)
    if err != nil {
        logger.Error("Shipping arrangement failed", "OrderID", order.ID, "Error", err)
        // If shipping fails, we might need to revert inventory and refund payment
        return err
    }

    logger.Info("OrderWorkflow completed successfully", "OrderID", order.ID)
    return nil
}

Copier après la connexion

Ce workflow coordonne plusieurs activités, chacune représentant une étape de notre traitement de commande. Notez comment nous utilisons workflow.ExecuteActivity pour exécuter chaque activité, en transmettant les données de commande si nécessaire.

Nous avons également mis en place des options d’activité avec une politique de nouvelle tentative. Cela signifie que si une activité échoue (par exemple, en raison d'un problème de réseau temporaire), Temporal la réessayera automatiquement en fonction de notre politique spécifiée.

Dans la section suivante, nous explorerons comment gérer les processus de longue durée au sein de cette structure de flux de travail.

4. Handling Long-Running Processes with Temporal

In real-world scenarios, some of our activities might take a long time to complete. For example, payment processing might need to wait for bank confirmation, or shipping arrangement might depend on external logistics systems. Temporal provides several mechanisms to handle such long-running processes effectively.

Heartbeats for Long-Running Activities

For activities that might run for extended periods, it’s crucial to implement heartbeats. Heartbeats allow an activity to report its progress and let Temporal know that it’s still alive and working. If an activity fails to heartbeat within the expected interval, Temporal can mark it as failed and potentially retry it.

Let’s modify our ArrangeShipping activity to include heartbeats:

func (a *OrderActivities) ArrangeShipping(ctx context.Context, order db.Order) error {
    logger := activity.GetLogger(ctx)
    logger.Info("Arranging shipping", "orderId", order.ID)

    // Simulate a long-running process
    for i := 0; i < 10; i++ {
        // Simulate work
        time.Sleep(time.Second)

        // Record heartbeat
        activity.RecordHeartbeat(ctx, i)

        // Check if we need to cancel
        if activity.GetInfo(ctx).Attempt > 1 {
            logger.Info("Cancelling shipping arrangement due to retry", "orderId", order.ID)
            return nil
        }
    }

    logger.Info("Shipping arranged", "orderId", order.ID)
    return nil
}

Copier après la connexion

In this example, we’re simulating a long-running process with a loop. We record a heartbeat in each iteration, allowing Temporal to track the activity’s progress.

Using Continue-As-New for Very Long-Running Workflows

For workflows that run for very long periods or accumulate a large history, Temporal provides the “continue-as-new” feature. This allows you to complete the current workflow execution and immediately start a new execution with the same workflow ID, carrying over any necessary state.

Here’s an example of how we might use continue-as-new in a long-running order tracking workflow:

func LongRunningOrderTrackingWorkflow(ctx workflow.Context, orderID string) error {
    logger := workflow.GetLogger(ctx)

    // Set up a timer for how long we want this workflow execution to run
    timerFired := workflow.NewTimer(ctx, 24*time.Hour)

    // Set up a selector to wait for either the timer to fire or the order to be delivered
    selector := workflow.NewSelector(ctx)

    var orderDelivered bool
    selector.AddFuture(timerFired, func(f workflow.Future) {
        // Timer fired, we'll continue-as-new
        logger.Info("24 hours passed, continuing as new", "orderID", orderID)
        workflow.NewContinueAsNewError(ctx, LongRunningOrderTrackingWorkflow, orderID)
    })

    selector.AddReceive(workflow.GetSignalChannel(ctx, "orderDelivered"), func(c workflow.ReceiveChannel, more bool) {
        c.Receive(ctx, &orderDelivered)
        logger.Info("Order delivered signal received", "orderID", orderID)
    })

    selector.Select(ctx)

    if orderDelivered {
        logger.Info("Order tracking completed, order delivered", "orderID", orderID)
        return nil
    }

    // If we reach here, it means we're continuing as new
    return workflow.NewContinueAsNewError(ctx, LongRunningOrderTrackingWorkflow, orderID)
}

Copier après la connexion

In this example, we set up a workflow that tracks an order for delivery. It runs for 24 hours before using continue-as-new to start a fresh execution. This prevents the workflow history from growing too large over extended periods.

By leveraging these techniques, we can handle long-running processes effectively in our order processing system, ensuring reliability and scalability even for operations that take extended periods to complete.

In the next section, we’ll dive into implementing robust retry logic and error handling in our workflows and activities.

5. Implementing Retry Logic and Error Handling

Robust error handling and retry mechanisms are crucial for building resilient systems, especially in distributed environments. Temporal provides powerful built-in retry mechanisms, but it’s important to understand how to use them effectively and when to implement custom retry logic.

Configuring Retry Policies for Activities

Temporal allows you to configure retry policies at both the workflow and activity level. Let’s update our workflow to include a more sophisticated retry policy:

func OrderWorkflow(ctx workflow.Context, order db.Order) error {
    logger := workflow.GetLogger(ctx)
    logger.Info("OrderWorkflow started", "OrderID", order.ID)

    // Define a retry policy
    retryPolicy := &temporal.RetryPolicy{
        InitialInterval: time.Second,
        BackoffCoefficient: 2.0,
        MaximumInterval: time.Minute,
        MaximumAttempts: 5,
        NonRetryableErrorTypes: []string{"InvalidOrderError"},
    }

    // Activity options with retry policy
    activityOptions := workflow.ActivityOptions{
        StartToCloseTimeout: time.Minute,
        RetryPolicy: retryPolicy,
    }
    ctx = workflow.WithActivityOptions(ctx, activityOptions)

    // Execute activities with retry policy
    err := workflow.ExecuteActivity(ctx, a.ValidateOrder, order).Get(ctx, nil)
    if err != nil {
        return handleOrderError(ctx, "ValidateOrder", err, order)
    }

    // ... (other activities)

    return nil
}

Copier après la connexion

In this example, we’ve defined a retry policy that starts with a 1-second interval, doubles the interval with each retry (up to a maximum of 1 minute), and allows up to 5 attempts. We’ve also specified that errors of type “InvalidOrderError” should not be retried.

Implementing Custom Retry Logic

While Temporal’s built-in retry mechanisms are powerful, sometimes you need custom retry logic. Here’s an example of implementing custom retry logic for a payment processing activity:

func (a *OrderActivities) ProcessPaymentWithCustomRetry(ctx context.Context, order db.Order) error {
    logger := activity.GetLogger(ctx)
    var err error
    for attempt := 1; attempt <= 3; attempt++ {
        err = a.processPayment(ctx, order)
        if err == nil {
            return nil
        }

        if _, ok := err.(*PaymentDeclinedError); ok {
            // Payment was declined, no point in retrying
            return err
        }

        logger.Info("Payment processing failed, retrying", "attempt", attempt, "error", err)
        time.Sleep(time.Duration(attempt) * time.Second)
    }
    return err
}

func (a *OrderActivities) processPayment(ctx context.Context, order db.Order) error {
    // Actual payment processing logic here
    // ...
}

Copier après la connexion

In this example, we implement a custom retry mechanism that attempts the payment processing up to 3 times, with an increasing delay between attempts. It also handles a specific error type (PaymentDeclinedError) differently, not retrying in that case.

Handling and Propagating Errors

Proper error handling is crucial for maintaining the integrity of our workflow. Let’s implement a helper function to handle errors in our workflow:

func handleOrderError(ctx workflow.Context, activityName string, err error, order db.Order) error {
    logger := workflow.GetLogger(ctx)
    logger.Error("Activity failed", "activity", activityName, "orderID", order.ID, "error", err)

    // Depending on the activity and error type, we might want to compensate
    switch activityName {
    case "ProcessPayment":
        // If payment processing failed, we might need to cancel the order
        _ = workflow.ExecuteActivity(ctx, CancelOrder, order).Get(ctx, nil)
    case "UpdateInventory":
        // If inventory update failed after payment, we might need to refund
        _ = workflow.ExecuteActivity(ctx, RefundPayment, order).Get(ctx, nil)
    }

    // Create a customer-facing error message
    return workflow.NewCustomError("OrderProcessingFailed", "Failed to process order due to: "+err.Error())
}

Copier après la connexion

This helper function logs the error, performs any necessary compensating actions, and returns a custom error that can be safely returned to the customer.

6. Versioning Workflows for Safe Updates

As your system evolves, you’ll need to update your workflow definitions. Temporal provides versioning capabilities that allow you to make changes to workflows without affecting running instances.

Implementing Versioned Workflows

Here’s an example of how to implement versioning in our order processing workflow:

func OrderWorkflow(ctx workflow.Context, order db.Order) error {
    logger := workflow.GetLogger(ctx)
    logger.Info("OrderWorkflow started", "OrderID", order.ID)

    // Use GetVersion to handle workflow versioning
    v := workflow.GetVersion(ctx, "OrderWorkflow.PaymentProcessing", workflow.DefaultVersion, 1)

    if v == workflow.DefaultVersion {
        // Old version: process payment before updating inventory
        err := workflow.ExecuteActivity(ctx, a.ProcessPayment, order).Get(ctx, nil)
        if err != nil {
            return handleOrderError(ctx, "ProcessPayment", err, order)
        }

        err = workflow.ExecuteActivity(ctx, a.UpdateInventory, order).Get(ctx, nil)
        if err != nil {
            return handleOrderError(ctx, "UpdateInventory", err, order)
        }
    } else {
        // New version: update inventory before processing payment
        err := workflow.ExecuteActivity(ctx, a.UpdateInventory, order).Get(ctx, nil)
        if err != nil {
            return handleOrderError(ctx, "UpdateInventory", err, order)
        }

        err = workflow.ExecuteActivity(ctx, a.ProcessPayment, order).Get(ctx, nil)
        if err != nil {
            return handleOrderError(ctx, "ProcessPayment", err, order)
        }
    }

    // ... rest of the workflow

    return nil
}

Copier après la connexion

In this example, we’ve used workflow.GetVersion to introduce a change in the order of operations. The new version updates inventory before processing payment, while the old version does the opposite. This allows us to gradually roll out the change without affecting running workflow instances.

Strategies for Updating Workflows in Production

When updating workflows in a production environment, consider the following strategies:

  1. Incremental Changes : Make small, incremental changes rather than large overhauls. This makes it easier to manage versions and roll back if needed.

  2. Compatibility Periods : Maintain compatibility with older versions for a certain period to allow running workflows to complete.

  3. Feature Flags : Use feature flags in conjunction with workflow versions to control the rollout of new features.

  4. Monitoring and Alerting : Set up monitoring and alerting for workflow versions to track the progress of updates and quickly identify any issues.

  5. Rollback Plan : Always have a plan to roll back to the previous version if issues are detected with the new version.

By following these strategies and leveraging Temporal’s versioning capabilities, you can safely evolve your workflows over time without disrupting ongoing operations.

In the next section, we’ll explore how to implement the Saga pattern for managing distributed transactions in our order processing system.

7. Implementing Saga Patterns for Distributed Transactions

The Saga pattern is a way to manage data consistency across microservices in distributed transaction scenarios. It’s particularly useful in our order processing system where we need to coordinate actions across multiple services (e.g., inventory, payment, shipping) and provide a mechanism for compensating actions if any step fails.

Designing a Saga for Our Order Processing System

Let’s design a saga for our order processing system that includes the following steps:

  1. Reserve Inventory
  2. Process Payment
  3. Update Inventory
  4. Arrange Shipping

If any of these steps fail, we need to execute compensating actions for the steps that have already completed.

Here’s how we can implement this saga using Temporal:

func OrderSaga(ctx workflow.Context, order db.Order) error {
    logger := workflow.GetLogger(ctx)
    logger.Info("OrderSaga started", "OrderID", order.ID)

    // Saga compensations
    var compensations []func(context.Context) error

    // Step 1: Reserve Inventory
    err := workflow.ExecuteActivity(ctx, a.ReserveInventory, order).Get(ctx, nil)
    if err != nil {
        return fmt.Errorf("failed to reserve inventory: %w", err)
    }
    compensations = append(compensations, func(ctx context.Context) error {
        return a.ReleaseInventoryReservation(ctx, order)
    })

    // Step 2: Process Payment
    err = workflow.ExecuteActivity(ctx, a.ProcessPayment, order).Get(ctx, nil)
    if err != nil {
        return compensate(ctx, compensations, fmt.Errorf("failed to process payment: %w", err))
    }
    compensations = append(compensations, func(ctx context.Context) error {
        return a.RefundPayment(ctx, order)
    })

    // Step 3: Update Inventory
    err = workflow.ExecuteActivity(ctx, a.UpdateInventory, order).Get(ctx, nil)
    if err != nil {
        return compensate(ctx, compensations, fmt.Errorf("failed to update inventory: %w", err))
    }
    // No compensation needed for this step, as we've already updated the inventory

    // Step 4: Arrange Shipping
    err = workflow.ExecuteActivity(ctx, a.ArrangeShipping, order).Get(ctx, nil)
    if err != nil {
        return compensate(ctx, compensations, fmt.Errorf("failed to arrange shipping: %w", err))
    }

    logger.Info("OrderSaga completed successfully", "OrderID", order.ID)
    return nil
}

func compensate(ctx workflow.Context, compensations []func(context.Context) error, err error) error {
    logger := workflow.GetLogger(ctx)
    logger.Error("Saga failed, executing compensations", "error", err)

    for i := len(compensations) - 1; i >= 0; i-- {
        compensationErr := workflow.ExecuteActivity(ctx, compensations[i]).Get(ctx, nil)
        if compensationErr != nil {
            logger.Error("Compensation failed", "error", compensationErr)
            // In a real-world scenario, you might want to implement more sophisticated
            // error handling for failed compensations, such as retrying or alerting
        }
    }

    return err
}

Copier après la connexion

In this implementation, we execute each step of the order process as an activity. After each successful step, we add a compensating action to a slice. If any step fails, we call the compensate function, which executes all the compensating actions in reverse order.

This approach ensures that we maintain data consistency across our distributed system, even in the face of failures.

8. Monitoring and Observability for Temporal Workflows

Effective monitoring and observability are crucial for operating Temporal workflows in production. Let’s explore how to implement comprehensive monitoring for our order processing system.

Implementing Custom Metrics

Temporal provides built-in metrics, but we can also implement custom metrics for our specific use cases. Here’s an example of how to add custom metrics to our workflow:

func OrderWorkflow(ctx workflow.Context, order db.Order) error {
    logger := workflow.GetLogger(ctx)
    logger.Info("OrderWorkflow started", "OrderID", order.ID)

    // Define metric
    orderProcessingTime := workflow.NewTimer(ctx, 0)
    defer func() {
        duration := orderProcessingTime.ElapsedTime()
        workflow.GetMetricsHandler(ctx).Timer("order_processing_time").Record(duration)
    }()

    // ... rest of the workflow implementation

    return nil
}

Copier après la connexion

In this example, we’re recording the total time taken to process an order.

Integrating with Prometheus

To integrate with Prometheus, we need to expose our metrics. Here’s how we can set up a Prometheus endpoint in our main application:

package main

import (
    "net/http"

    "github.com/prometheus/client_golang/prometheus/promhttp"
    "go.temporal.io/sdk/client"
    "go.temporal.io/sdk/worker"
)

func main() {
    // ... Temporal client setup

    // Create a worker
    w := worker.New(c, "order-processing-task-queue", worker.Options{})

    // Register workflows and activities
    w.RegisterWorkflow(OrderWorkflow)
    w.RegisterActivity(a.ValidateOrder)
    // ... register other activities

    // Start the worker
    go func() {
        err := w.Run(worker.InterruptCh())
        if err != nil {
            logger.Fatal("Unable to start worker", err)
        }
    }()

    // Expose Prometheus metrics
    http.Handle("/metrics", promhttp.Handler())
    go func() {
        err := http.ListenAndServe(":2112", nil)
        if err != nil {
            logger.Fatal("Unable to start metrics server", err)
        }
    }()

    // ... rest of your application
}

Copier après la connexion

This sets up a /metrics endpoint that Prometheus can scrape to collect our custom metrics along with the built-in Temporal metrics.

Implementing Structured Logging

Structured logging can greatly improve the observability of our system. Let’s update our workflow to use structured logging:

func OrderWorkflow(ctx workflow.Context, order db.Order) error {
    logger := workflow.GetLogger(ctx)
    logger.Info("OrderWorkflow started",
        "OrderID", order.ID,
        "CustomerID", order.CustomerID,
        "TotalAmount", order.TotalAmount,
    )

    // ... workflow implementation

    logger.Info("OrderWorkflow completed",
        "OrderID", order.ID,
        "Duration", workflow.Now(ctx).Sub(workflow.GetInfo(ctx).WorkflowStartTime),
    )

    return nil
}

Copier après la connexion

This approach makes it easier to search and analyze logs, especially when aggregating logs from multiple services.

Setting Up Distributed Tracing

Distributed tracing can provide valuable insights into the flow of requests through our system. While Temporal doesn’t natively support distributed tracing, we can implement it in our activities:

import (
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/trace"
)

func (a *OrderActivities) ProcessPayment(ctx context.Context, order db.Order) error {
    _, span := otel.Tracer("order-processing").Start(ctx, "ProcessPayment")
    defer span.End()

    span.SetAttributes(
        attribute.Int64("order.id", order.ID),
        attribute.Float64("order.amount", order.TotalAmount),
    )

    // ... payment processing logic

    return nil
}

Copier après la connexion

By implementing distributed tracing, we can track the entire lifecycle of an order across multiple services and activities.

9. Testing and Validation

Thorough testing is crucial for ensuring the reliability of our Temporal workflows. Let’s explore some strategies for testing our order processing system.

Unit Testing Workflows

Temporal provides a testing framework that allows us to unit test workflows. Here’s an example of how to test our OrderWorkflow:

func TestOrderWorkflow(t *testing.T) {
    testSuite := &testsuite.WorkflowTestSuite{}
    env := testSuite.NewTestWorkflowEnvironment()

    // Mock activities
    env.OnActivity(a.ValidateOrder, mock.Anything, mock.Anything).Return(nil)
    env.OnActivity(a.ProcessPayment, mock.Anything, mock.Anything).Return(nil)
    env.OnActivity(a.UpdateInventory, mock.Anything, mock.Anything).Return(nil)
    env.OnActivity(a.ArrangeShipping, mock.Anything, mock.Anything).Return(nil)

    // Execute workflow
    env.ExecuteWorkflow(OrderWorkflow, db.Order{ID: 1, CustomerID: 100, TotalAmount: 99.99})

    require.True(t, env.IsWorkflowCompleted())
    require.NoError(t, env.GetWorkflowError())
}

Copier après la connexion

This test sets up a test environment, mocks the activities, and verifies that the workflow completes successfully.

Testing Saga Compensations

It’s important to test that our saga compensations work correctly. Here’s an example test:

func TestOrderSagaCompensation(t *testing.T) {
    testSuite := &testsuite.WorkflowTestSuite{}
    env := testSuite.NewTestWorkflowEnvironment()

    // Mock activities
    env.OnActivity(a.ReserveInventory, mock.Anything, mock.Anything).Return(nil)
    env.OnActivity(a.ProcessPayment, mock.Anything, mock.Anything).Return(errors.New("payment failed"))
    env.OnActivity(a.ReleaseInventoryReservation, mock.Anything, mock.Anything).Return(nil)

    // Execute workflow
    env.ExecuteWorkflow(OrderSaga, db.Order{ID: 1, CustomerID: 100, TotalAmount: 99.99})

    require.True(t, env.IsWorkflowCompleted())
    require.Error(t, env.GetWorkflowError())

    // Verify that compensation was called
    env.AssertExpectations(t)
}

Copier après la connexion

This test verifies that when the payment processing fails, the inventory reservation is released as part of the compensation.

10. Herausforderungen und Überlegungen

Bei der Implementierung und dem Betrieb unseres fortschrittlichen Auftragsverarbeitungssystems müssen mehrere Herausforderungen und Überlegungen berücksichtigt werden:

  1. Workflow-Komplexität: Je komplexer Arbeitsabläufe werden, desto schwieriger kann es werden, sie zu verstehen und aufrechtzuerhalten. Regelmäßiges Refactoring und eine gute Dokumentation sind entscheidend.

  2. Testen lang laufender Workflows: Das Testen von Workflows, die möglicherweise Tage oder Wochen dauern, kann eine Herausforderung sein. Erwägen Sie die Implementierung von Mechanismen, um die Zeit bei Ihren Tests zu verkürzen.

  3. Umgang mit externen Abhängigkeiten: Externe Dienste können ausfallen oder nicht mehr verfügbar sein. Implementieren Sie Leistungsschalter und Fallback-Mechanismen, um diese Szenarien zu bewältigen.

  4. Überwachung und Alarmierung: Richten Sie eine umfassende Überwachung und Alarmierung ein, um Probleme in Ihren Arbeitsabläufen schnell zu erkennen und darauf zu reagieren.

  5. Datenkonsistenz: Stellen Sie sicher, dass Ihre Saga-Implementierungen die Datenkonsistenz über alle Dienste hinweg aufrechterhalten, auch bei Ausfällen.

  6. Leistungsoptimierung: Wenn Ihr System skaliert, müssen Sie möglicherweise die Leistungseinstellungen von Temporal optimieren, z. B. die Anzahl der Workflow- und Aktivitätsarbeiter.

  7. Workflow-Versionierung: Verwalten Sie Workflow-Versionen sorgfältig, um reibungslose Aktualisierungen sicherzustellen, ohne laufende Instanzen zu unterbrechen.

11. Nächste Schritte und Vorschau auf Teil 3

In diesem Beitrag haben wir uns eingehend mit fortgeschrittenen Temporal-Workflow-Konzepten befasst und komplexe Auftragsabwicklungslogik, Saga-Muster und robuste Fehlerbehandlung implementiert. Wir haben auch Überwachungs-, Beobachtbarkeits- und Teststrategien für unsere Arbeitsabläufe behandelt.

Im nächsten Teil unserer Serie konzentrieren wir uns auf erweiterte Datenbankoperationen mit SQLC. Wir behandeln Folgendes:

  1. Implementierung komplexer Datenbankabfragen und Transaktionen
  2. Optimierung der Datenbankleistung
  3. Batch-Operationen implementieren
  4. Abwicklung von Datenbankmigrationen in einer Produktionsumgebung
  5. Implementieren von Datenbank-Sharding für Skalierbarkeit
  6. Sicherstellung der Datenkonsistenz in einem verteilten System

Bleiben Sie dran, während wir unser ausgefeiltes Auftragsabwicklungssystem weiter ausbauen!


Brauchen Sie Hilfe?

Stehen Sie vor herausfordernden Problemen oder benötigen Sie eine externe Perspektive auf eine neue Idee oder ein neues Projekt? Ich kann helfen! Ganz gleich, ob Sie einen Technologie-Proof of Concept erstellen möchten, bevor Sie eine größere Investition tätigen, oder ob Sie Beratung bei schwierigen Themen benötigen, ich bin hier, um Ihnen zu helfen.

Angebotene Dienstleistungen:

  • Problemlösung:Komplexe Probleme mit innovativen Lösungen angehen.
  • Beratung: Bereitstellung fachkundiger Beratung und neuer Standpunkte zu Ihren Projekten.
  • Proof of Concept: Entwicklung vorläufiger Modelle zum Testen und Validieren Ihrer Ideen.

Wenn Sie an einer Zusammenarbeit mit mir interessiert sind, wenden Sie sich bitte per E-Mail an hungaikevin@gmail.com.

Lassen Sie uns Ihre Herausforderungen in Chancen verwandeln!

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

source:dev.to
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal