Maison > développement back-end > Golang > Créer un consommateur SQS évolutif dans Go

Créer un consommateur SQS évolutif dans Go

Barbara Streisand
Libérer: 2024-12-11 12:39:09
original
960 Les gens l'ont consulté

Building a Scalable SQS Consumer in Go

Introduction

Lors de la création de systèmes distribués, les files d'attente de messages comme Amazon SQS jouent un rôle crucial dans la gestion des charges de travail asynchrones. Dans cet article, je partagerai mon expérience dans la mise en œuvre d'un consommateur SQS robuste dans Go qui gère les événements d'enregistrement des utilisateurs pour Keycloak. La solution utilise le modèle de simultanéité fan-out/fan-in pour traiter les messages efficacement sans surcharger les ressources système.

Le défi

J'ai été confronté à un problème intéressant : traiter quotidiennement environ 50 000 événements SQS pour enregistrer les utilisateurs dans Keycloak. Une approche naïve pourrait générer une nouvelle goroutine pour chaque message, mais cela pourrait rapidement conduire à un épuisement des ressources. Nous avions besoin d'une approche plus contrôlée de la concurrence.

Pourquoi Fan-out/Fan-in ?

Le modèle fan-out/fan-in est parfait pour ce cas d'utilisation car il :

  • Maintient un pool fixe de goroutines de travail
  • Répartit le travail uniformément entre les travailleurs
  • Empêche l'épuisement des ressources
  • Fournit un meilleur contrôle sur les opérations simultanées

Analyse approfondie de la mise en œuvre

1. La structure du consommateur

Tout d'abord, regardons notre structure de consommation de base :

type Consumer struct {
    Client    *sqs.Client
    QueueName string
}

Copier après la connexion
Copier après la connexion

2. Pipeline de traitement des messages

La mise en œuvre se compose de trois éléments principaux :

  1. Récepteur de messages : interroge en permanence SQS pour les nouveaux messages
  2. Worker Pool : nombre fixe de goroutines traitant les messages
  3. Canal de messages : connecte le récepteur aux travailleurs

Voici comment nous démarrons le consommateur :

func StartPool[requestBody any](
    serviceFunc func(c context.Context, dto *requestBody) error,
    consumer *Consumer) {

    ctx := context.Background()
    params := &sqs.ReceiveMessageInput{
        MaxNumberOfMessages: 10,
        QueueUrl:           aws.String(consumer.QueueName),
        WaitTimeSeconds:    20,
        VisibilityTimeout:  30,
        MessageAttributeNames: []string{
            string(types.QueueAttributeNameAll),
        },
    }

    msgCh := make(chan types.Message)
    var wg sync.WaitGroup

    // Start worker pool first
    startPool(ctx, msgCh, &wg, consumer, serviceFunc)

    // Then start receiving messages
    // ... rest of the implementation
}

Copier après la connexion

3. Paramètres de configuration clés

Examinons les paramètres de configuration SQS cruciaux :

  • MaxNumberOfMessages (10) : Taille du lot pour chaque sondage
  • WaitTimeSeconds (20) : Longue durée d'interrogation
  • VisibilityTimeout (30) : Délai de grâce pour le traitement des messages

4. Mise en œuvre du pool de travailleurs

Le pool de travailleurs est l'endroit où le modèle de répartition entre en jeu :

func startPool[requestBody any](
    ctx context.Context,
    msgCh chan types.Message,
    wg *sync.WaitGroup,
    consumer *Consumer,
    serviceFunc func(c context.Context, dto *requestBody) error) {

    processingMessages := &sync.Map{}

    // Start 10 workers
    for i := 0; i < 10; i++ {
        go worker(ctx, msgCh, wg, consumer, processingMessages, serviceFunc)
    }
}

Copier après la connexion

5. Gestion des messages en double

Nous utilisons un sync.Map pour éviter de traiter les messages en double :

type Consumer struct {
    Client    *sqs.Client
    QueueName string
}

Copier après la connexion
Copier après la connexion

Meilleures pratiques et enseignements

  1. Gestion des erreurs : gérez toujours les erreurs avec élégance et enregistrez-les de manière appropriée
  2. Nettoyage des messages : supprimez les messages uniquement après un traitement réussi
  3. Graceful Shutdown : implémentez des mécanismes d'arrêt appropriés à l'aide du contexte
  4. Surveillance : Ajoutez une journalisation aux points clés pour l'observabilité

Considérations relatives aux performances

  • Nombre de travailleurs : choisissez en fonction de votre charge de travail et des ressources disponibles
  • Taille du lot : équilibre entre le débit et le temps de traitement
  • Délai de visibilité : défini en fonction de votre temps de traitement moyen

Améliorations futures

  1. Dynamic Worker Scaling : ajustez le nombre de travailleurs en fonction de la profondeur de la file d'attente
  2. Disjoncteur : Ajouter un disjoncteur pour les services en aval
  3. Collection de métriques : ajoutez des métriques Prometheus pour la surveillance
  4. File d'attente des lettres mortes : implémentez la gestion DLQ pour les messages ayant échoué
  5. Nouvelles tentatives : ajout d'un délai exponentiel pour les échecs transitoires

Conclusion

Le modèle fan-out/fan-in fournit une solution élégante pour traiter des messages SQS à volume élevé dans Go. En maintenant un pool de travailleurs fixe, nous évitons les pièges de la création illimitée de goroutines tout en garantissant un traitement efficace des messages.

N'oubliez pas de toujours tenir compte de votre cas d'utilisation spécifique lors de la mise en œuvre de tels modèles. Les valeurs de configuration affichées ici (nombre de travailleurs, valeurs de délai d'attente, etc.) doivent être ajustées en fonction de vos besoins et des contraintes de ressources.


Code source : [Lien vers votre référentiel si disponible]

Balises : #golang #aws #sqs #concurrency #distributed-systems

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

source:dev.to
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Derniers articles par auteur
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal