ホームページ > バックエンド開発 > Golang > Go でスケーラブルな SQS コンシューマを構築する

Go でスケーラブルな SQS コンシューマを構築する

Barbara Streisand
リリース: 2024-12-11 12:39:09
オリジナル
960 人が閲覧しました

Building a Scalable SQS Consumer in Go

導入

分散システムを構築する場合、Amazon SQS などのメッセージキューは非同期ワークロードの処理において重要な役割を果たします。この投稿では、Keycloak のユーザー登録イベントを処理する堅牢な SQS コンシューマーを Go に実装した私の経験を共有します。このソリューションは、ファンアウト/ファンイン同時実行パターンを使用して、システム リソースを圧迫することなくメッセージを効率的に処理します。

挑戦

私は興味深い問題に直面しました。それは、Keycloak にユーザーを登録するために毎日約 50,000 の SQS イベントを処理することです。単純なアプローチでは、メッセージごとに新しい goroutine が生成される可能性がありますが、これはすぐにリソースの枯渇につながる可能性があります。同時実行に対して、より制御されたアプローチが必要でした。

なぜファンアウト/ファンインなのか?

ファンアウト/ファンイン パターンは、次の理由からこのユースケースに最適です。

  • ワーカーゴルーチンの固定プールを維持します
  • 従業員間で作業を均等に分配します
  • リソースの枯渇を防止します
  • 同時操作をより適切に制御できます

実装の詳細

1. 消費者の構造

まず、基本的な消費者の構造を見てみましょう:

type Consumer struct {
    Client    *sqs.Client
    QueueName string
}

ログイン後にコピー
ログイン後にコピー

2. メッセージ処理パイプライン

実装は 3 つの主要コンポーネントで構成されます:

  1. メッセージ受信者: 新しいメッセージについて SQS を継続的にポーリングします
  2. ワーカープール: メッセージを処理するゴルーチンの固定数
  3. メッセージ チャネル: 受信者をワーカーに接続します

コンシューマを開始する方法は次のとおりです:

func StartPool[requestBody any](
    serviceFunc func(c context.Context, dto *requestBody) error,
    consumer *Consumer) {

    ctx := context.Background()
    params := &sqs.ReceiveMessageInput{
        MaxNumberOfMessages: 10,
        QueueUrl:           aws.String(consumer.QueueName),
        WaitTimeSeconds:    20,
        VisibilityTimeout:  30,
        MessageAttributeNames: []string{
            string(types.QueueAttributeNameAll),
        },
    }

    msgCh := make(chan types.Message)
    var wg sync.WaitGroup

    // Start worker pool first
    startPool(ctx, msgCh, &wg, consumer, serviceFunc)

    // Then start receiving messages
    // ... rest of the implementation
}

ログイン後にコピー

3. 主要な設定パラメータ

重要な SQS 構成パラメータを調べてみましょう:

  • MaxNumberOfMessages (10): 各ポーリングのバッチ サイズ
  • WaitTimeSeconds (20): 長いポーリング期間
  • VisibilityTimeout (30): メッセージ処理の猶予期間

4. ワーカープールの実装

ワーカー プールは、ファンアウト パターンが作用する場所です。

func startPool[requestBody any](
    ctx context.Context,
    msgCh chan types.Message,
    wg *sync.WaitGroup,
    consumer *Consumer,
    serviceFunc func(c context.Context, dto *requestBody) error) {

    processingMessages := &sync.Map{}

    // Start 10 workers
    for i := 0; i < 10; i++ {
        go worker(ctx, msgCh, wg, consumer, processingMessages, serviceFunc)
    }
}

ログイン後にコピー

5. 重複メッセージの処理

重複メッセージの処理を防ぐために sync.Map を使用します。

type Consumer struct {
    Client    *sqs.Client
    QueueName string
}

ログイン後にコピー
ログイン後にコピー

ベストプラクティスと学習

  1. エラー処理: 常にエラーを適切に処理し、適切にログに記録します
  2. メッセージのクリーンアップ: 処理が成功した場合にのみメッセージを削除します
  3. 正常なシャットダウン: コンテキストを使用して適切なシャットダウン メカニズムを実装します
  4. モニタリング: 可観測性のために重要なポイントにログを追加します

パフォーマンスに関する考慮事項

  • ワーカー数: ワークロードと利用可能なリソースに基づいて選択します
  • バッチサイズ: スループットと処理時間のバランス
  • 可視性タイムアウト: 平均処理時間に応じて設定します

今後の改善点

  1. 動的ワーカー スケーリング: キューの深さに基づいてワーカー数を調整します
  2. サーキット ブレーカー: ダウンストリーム サービス用のサーキット ブレーカーを追加します
  3. メトリクス コレクション: モニタリング用の Prometheus メトリクスを追加します
  4. デッドレターキュー: 失敗したメッセージに対する DLQ 処理を実装します
  5. 再試行: 一時的な障害に対して指数バックオフを追加します

結論

ファンアウト/ファンイン パターンは、Go で大量の SQS メッセージを処理するための洗練されたソリューションを提供します。固定ワーカー プールを維持することで、効率的なメッセージ処理を確保しながら、無制限の goroutine 作成の落とし穴を回避します。

このようなパターンを実装するときは、常に特定のユースケースを考慮することを忘れないでください。ここに表示される構成値 (ワーカー数、タイムアウト値など) は、要件とリソースの制約に基づいて調整する必要があります。


ソースコード: [利用可能な場合はリポジトリへのリンク]

タグ: #golang #aws #sqs #concurrency #distributed-systems

以上がGo でスケーラブルな SQS コンシューマを構築するの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ソース:dev.to
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
著者別の最新記事
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート