> 백엔드 개발 > Golang > Go에서 확장 가능한 SQS 소비자 구축

Go에서 확장 가능한 SQS 소비자 구축

Barbara Streisand
풀어 주다: 2024-12-11 12:39:09
원래의
960명이 탐색했습니다.

Building a Scalable SQS Consumer in Go

소개

분산 시스템을 구축할 때 Amazon SQS와 같은 메시지 대기열은 비동기식 워크로드를 처리하는 데 중요한 역할을 합니다. 이 게시물에서는 Keycloak에 대한 사용자 등록 이벤트를 처리하는 강력한 SQS 소비자를 Go에서 구현한 경험을 공유하겠습니다. 이 솔루션은 팬아웃/팬인 동시성 패턴을 사용하여 시스템 리소스를 과도하게 사용하지 않고도 효율적으로 메시지를 처리합니다.

도전

저는 흥미로운 문제에 직면했습니다. Keycloak에 사용자를 등록하기 위해 매일 약 50,000개의 SQS 이벤트를 처리합니다. 순진한 접근 방식은 각 메시지에 대해 새로운 고루틴을 생성할 수 있지만 이로 인해 리소스가 빠르게 고갈될 수 있습니다. 동시성에 대해 보다 통제된 접근 방식이 필요했습니다.

왜 팬아웃/팬인인가?

팬아웃/팬인 패턴은 다음과 같은 이유로 이 사용 사례에 적합합니다.

  • 고정된 작업자 고루틴 풀을 유지합니다
  • 직원들에게 업무를 균등하게 분배합니다
  • 자원 고갈 방지
  • 동시 작업에 대한 더 나은 제어 제공

구현 심층 분석

1. 소비자 구조

먼저 기본 소비자 구조를 살펴보겠습니다.

type Consumer struct {
    Client    *sqs.Client
    QueueName string
}

로그인 후 복사
로그인 후 복사

2. 메시지 처리 파이프라인

구현은 세 가지 주요 구성요소로 구성됩니다.

  1. 메시지 수신자: 새 메시지에 대해 지속적으로 SQS를 폴링합니다
  2. 작업자 풀: 메시지를 처리하는 고정된 수의 고루틴
  3. 메시지 채널: 수신자를 작업자와 연결

소비자를 시작하는 방법은 다음과 같습니다.

func StartPool[requestBody any](
    serviceFunc func(c context.Context, dto *requestBody) error,
    consumer *Consumer) {

    ctx := context.Background()
    params := &sqs.ReceiveMessageInput{
        MaxNumberOfMessages: 10,
        QueueUrl:           aws.String(consumer.QueueName),
        WaitTimeSeconds:    20,
        VisibilityTimeout:  30,
        MessageAttributeNames: []string{
            string(types.QueueAttributeNameAll),
        },
    }

    msgCh := make(chan types.Message)
    var wg sync.WaitGroup

    // Start worker pool first
    startPool(ctx, msgCh, &wg, consumer, serviceFunc)

    // Then start receiving messages
    // ... rest of the implementation
}

로그인 후 복사

3. 주요 구성 매개변수

중요한 SQS 구성 매개변수를 살펴보겠습니다.

  • MaxNumberOfMessages (10): 각 설문조사의 배치 크기
  • WaitTimeSeconds (20): 긴 폴링 기간
  • VisibilityTimeout (30): 메시지 처리 유예 기간

4. 작업자 풀 구현

작업자 풀은 팬아웃 패턴이 작용하는 곳입니다.

func startPool[requestBody any](
    ctx context.Context,
    msgCh chan types.Message,
    wg *sync.WaitGroup,
    consumer *Consumer,
    serviceFunc func(c context.Context, dto *requestBody) error) {

    processingMessages := &sync.Map{}

    // Start 10 workers
    for i := 0; i < 10; i++ {
        go worker(ctx, msgCh, wg, consumer, processingMessages, serviceFunc)
    }
}

로그인 후 복사

5. 중복 메시지 처리

중복 메시지 처리를 방지하기 위해 sync.Map을 사용합니다.

type Consumer struct {
    Client    *sqs.Client
    QueueName string
}

로그인 후 복사
로그인 후 복사

모범 사례 및 학습

  1. 오류 처리: 항상 오류를 적절하게 처리하고 적절하게 기록합니다
  2. 메시지 정리: 성공적으로 처리된 후에만 메시지를 삭제합니다
  3. 우아한 종료: 컨텍스트를 사용하여 적절한 종료 메커니즘 구현
  4. 모니터링: 관찰 가능성을 위해 주요 지점에 로깅 추가

성능 고려 사항

  • 작업자 수: 작업량과 사용 가능한 리소스를 기준으로 선택하세요
  • 배치 크기: 처리량과 처리 시간 간의 균형
  • 가시성 시간 초과: 평균 처리 시간에 따라 설정

향후 개선 사항

  1. 동적 작업자 확장: 대기열 깊이에 따라 작업자 수 조정
  2. 회로 차단기: 다운스트림 서비스에 대한 회로 차단기 추가
  3. 메트릭 컬렉션: 모니터링을 위한 Prometheus 메트릭 추가
  4. 배달 못한 편지 대기열: 실패한 메시지에 대한 DLQ 처리 구현
  5. 재시도: 일시적인 오류에 대한 지수 백오프 추가

결론

팬아웃/팬인 패턴은 Go에서 대용량 SQS 메시지를 처리하기 위한 우아한 솔루션을 제공합니다. 고정된 작업자 풀을 유지함으로써 효율적인 메시지 처리를 보장하는 동시에 무제한 고루틴 생성의 함정을 피합니다.

이러한 패턴을 구현할 때는 항상 구체적인 사용 사례를 고려해야 한다는 점을 기억하세요. 여기에 표시된 구성 값(작업자 수, 제한 시간 값 등)은 요구 사항 및 리소스 제약 조건에 따라 조정되어야 합니다.


소스 코드: [사용 가능한 경우 저장소 링크]

태그: #golang #aws #sqs #concurrency #분산 시스템

위 내용은 Go에서 확장 가능한 SQS 소비자 구축의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

원천:dev.to
본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
저자별 최신 기사
인기 튜토리얼
더>
최신 다운로드
더>
웹 효과
웹사이트 소스 코드
웹사이트 자료
프론트엔드 템플릿