> 웹 프론트엔드 > JS 튜토리얼 > 실제 사례를 통한 Kafka 기본 사항

실제 사례를 통한 Kafka 기본 사항

Linda Hamilton
풀어 주다: 2024-12-28 09:26:11
원래의
354명이 탐색했습니다.

지난 몇 주 동안 저는 Kafka에 대해 알아보고 메모를 하면서 블로그 게시물을 구성하고 구조화하기로 결정했습니다. 개념과 팁 외에도 다음과 같은 실용적인 예제가 있습니다. NestJS 및 KafkaJS.

카프카란 무엇인가?

Apache Kafka는 실시간 이벤트를 처리하도록 설계된 분산 이벤트 스트리밍 플랫폼입니다. 대규모의 높은 처리량, 낮은 지연 시간의 데이터 스트림을 저장, 처리 및 검색할 수 있으므로 실시간 데이터 파이프라인 및 이벤트 기반 애플리케이션을 구축하는 데 적합합니다.

주요 특징:

  • 이벤트 스트리밍: Kafka는 이벤트 로그를 정렬하는 주제로 데이터를 구성합니다.
  • 분산 아키텍처: Kafka는 확장성과 내결함성을 위해 구축되었습니다. 브로커라고 불리는 노드의 클러스터로 작동하며 여러 서버에 데이터를 배포할 수 있습니다.
  • 게시-구독 모델: 생산자주제에 메시지를 쓰고 소비자가 메시지를 읽습니다. Kafka는 여러 소비자를 지원하므로 다양한 애플리케이션이 동일한 데이터 스트림을 독립적으로 처리할 수 있습니다.
  • 고성능: Kafka는 높은 처리량에 최적화되어 짧은 대기 시간으로 초당 수백만 개의 메시지를 처리합니다.
  • 내구성 있는 스토리지: Kafka는 구성 가능한 보존 기간으로 디스크에 메시지를 저장하여 데이터 지속성과 안정성을 보장합니다.
  • 파티셔닝 및 복제: 확장성을 위해 주제를 파티션으로 나누고 내결함성을 위해 브로커 전체에 복제합니다.
  • 재생 가능성: 소비자는 오프셋을 재설정하여 메시지를 다시 읽을 수 있으며 데이터 재처리 또는 복구가 가능합니다.
  • 통합 및 생태계: Kafka는 다양한 시스템과 통합되며 Kafka Connect(데이터 통합용) 및 Kafka Streams(스트림 처리용)와 같은 도구를 갖추고 있습니다.

장점

  • 신뢰성: 데이터 분산, 복제, 파티셔닝을 통해 내결함성을 보장합니다.
  • 확장성: Kafka는 대규모 데이터 볼륨을 처리하고 다운타임 없이 수평으로 확장할 수 있습니다.
  • 지속성: 메시지가 즉시 저장되어 복원력과 데이터 지속성이 보장됩니다.
  • 성능: Kafka는 극심한 데이터 로드에서도 높은 성능을 유지하여 다운타임이나 데이터 손실 없이 대용량 데이터를 처리합니다.

단점

이러한 절충안은 Kafka의 성능을 최대화하기 위한 의도적인 디자인 선택이지만 더 큰 유연성이 필요한 사용 사례에는 문제가 될 수 있습니다.

  • 제한된 유연성: Kafka에는 보고서의 특정 데이터 필터링과 같은 확장 쿼리에 대한 지원이 부족합니다. Kafka는 메시지를 수신된 순서대로 오프셋으로 검색하므로 소비자는 이러한 작업을 처리해야 합니다.
  • 장기 저장용으로 설계되지 않음: Kafka는 스트리밍 데이터에는 탁월하지만 기록 데이터를 장기간 저장하는 데는 적합하지 않습니다. 데이터 중복으로 인해 대규모 데이터세트의 경우 저장 비용이 많이 들 수 있습니다.
  • 와일드카드 주제 지원 없음: Kafka는 와일드카드 패턴(예: log-2024-*)을 사용하여 여러 주제에서 소비하는 것을 허용하지 않습니다.

사용 사례

  • 실시간 분석: 데이터 스트림이 발생하는 대로 처리하고 분석합니다.
  • 이벤트 소싱: 애플리케이션 상태의 모든 변경 사항을 일련의 이벤트로 기록합니다.
  • 로그 집계: 분산 시스템에서 로그를 수집하고 관리합니다.
  • 데이터 파이프라인: 시스템 간 데이터를 안정적이고 효율적으로 스트리밍합니다.
  • IoT 애플리케이션: IoT 장치의 센서 데이터를 실시간으로 처리합니다.

카프카는 어떻게 작동하나요?

Kafka는 대기열 및 게시-구독 메시징 모델의 기능을 모두 통합하여 소비자에게 각 접근 방식의 장점을 제공합니다.

  • 대기열을 사용하면 작업을 여러 소비자 인스턴스에 분산하여 확장 가능한 데이터 처리가 가능하지만 기존 대기열은 여러 구독자를 지원하지 않습니다.
  • 게시-구독 모델은 여러 구독자를 지원하지만 각 메시지가 모든 구독자에게 전송되므로 여러 작업자 프로세스 간에 작업을 배포할 수는 없습니다.

Kafka는 분할된 로그 시스템을 사용하여 큐잉 및 게시-구독 모델의 이점을 결합합니다. 정렬된 레코드 시퀀스인 로그는 파티션으로 나누어지며, 각 파티션은 서로 다른 구독자(소비자)에게 할당됩니다. 이 설정을 사용하면 확장성을 유지하면서 여러 구독자가 주제를 공유할 수 있습니다.

Kafka fundamentals with a practical example

이벤트, 주제 및 파티션

Kafka가 실시간 이벤트를 처리하도록 설계된 플랫폼이라는 것을 살펴보았습니다. 이러한 이벤트가 처리되는 방식에 대해 이야기하기 전에 이에 대한 정의가 필요합니다.

이벤트는 결제, 웹사이트 클릭, 온도 측정 등 애플리케이션에 기록된 작업, 사건 또는 변경 사항입니다.

Kafka의

이벤트키/값 쌍으로 모델링되며, 여기서 키와 값은 모두 바이트 시퀀스로 직렬화됩니다.

  • 은 직렬화된 도메인 개체나 원시 입력(예: 센서 출력 또는 기타 애플리케이션 데이터)을 나타내는 경우가 많습니다. Kafka 이벤트에서 전송되는 핵심 정보를 캡슐화합니다.
  • 는 복잡한 도메인 객체일 수 있지만 문자열이나 정수와 같은 단순한 유형인 경우가 많습니다. 관계형 데이터베이스의 기본 키처럼 개별 이벤트를 고유하게 식별하는 대신 키는 일반적으로 특정 사용자, 주문 또는 연결된 장치와 같은 시스템 내의 엔터티를 식별합니다.

Kafka는 주제라는 순서가 지정된 로그로 이벤트를 구성합니다. 외부 시스템이 Kafka에 이벤트를 기록하면 해당 이벤트가 항목 끝에 추가됩니다. 메시지는 읽은 후에도 구성 가능한 기간 동안 주제에 남아 있습니다. 대기열과 달리 토픽은 내구성이 있고 복제되며 내결함성이 있어 이벤트 레코드를 효율적으로 저장합니다. 단, 로그는 순차적으로만 스캔할 수 있고 쿼리할 수는 없습니다.

주제는 디스크에 로그 파일로 저장되지만 디스크에는 유한한 크기와 I/O 등의 제한이 있습니다. 이를 극복하기 위해 Kafka에서는 주제를 파티션으로 나누어 단일 로그를 여러 서버에 분산할 수 있는 여러 로그로 나눌 수 있습니다. 이러한 파티셔닝을 통해 Kafka는 수평적으로 확장할 수 있어 대용량 이벤트와 높은 처리량을 처리할 수 있는 용량이 향상됩니다.

Kafka는 가 있는지 여부에 따라 파티션에 메시지를 할당합니다.

  • 키 없음: 메시지는 모든 파티션에 라운드 로빈 방식으로 분산되므로 데이터가 균등하게 분산되지만 메시지 순서는 유지되지 않습니다.
  • 키 포함: 파티션은 키를 해싱하여 결정되므로 동일한 키를 가진 메시지는 항상 동일한 파티션으로 이동하고 순서가 유지됩니다.

브로커

Kafka는 브로커라는 노드를 사용하여 Kafka 클러스터를 집합적으로 구성하는 분산 데이터 인프라로 작동합니다. 브로커는 베어메탈 하드웨어, 클라우드 인스턴스, Kubernetes가 관리하는 컨테이너, 노트북의 Docker 또는 JVM 프로세스가 실행될 수 있는 모든 곳에서 실행될 수 있습니다.

브로커의 초점:

  • 파티션에 새 이벤트 쓰기.
  • 파티션에서 읽기를 제공합니다.
  • 브로커 간 파티션 복제.

메시지 계산이나 주제 간 라우팅을 수행하지 않아 디자인을 단순하고 효율적으로 유지합니다.

복제

Kafka는 여러 브로커에 걸쳐 파티션 데이터를 복제하여 데이터 내구성과 내결함성을 보장합니다. 파티션의 기본 복사본은 리더 복제본이고, 추가 복사본은 팔로워 복제본입니다. 데이터는 리더에 기록되며 업데이트는 자동으로 팔로어에게 복제됩니다.

이 복제 프로세스는 다음을 보장합니다.

  • 브로커 또는 스토리지 장애가 발생하더라도 데이터 안전
  • 현재 리더에 장애가 발생할 경우 다른 복제본이 리더 역할을 대신하는 자동 장애 조치.

Kafka가 복제를 투명하게 처리하므로 개발자는 복제를 직접 관리할 필요 없이 이러한 보장의 이점을 누릴 수 있습니다.

생산자

Kafka 프로듀서Kafka 주제에 데이터를 전송(게시)하는 클라이언트 애플리케이션입니다. Kafka 클러스터에 메시지(레코드)를 생성하고 전송하는 역할을 담당합니다. 생산자는 구성과 메시지 키의 존재 여부에 따라 메시지가 저장될 주제파티션을 결정합니다. 제작자는 다음에 대한 책임이 있지만 이에 국한되지는 않습니다.

  • 메시지 구성:
    • 각 메시지는 키(선택 사항), 값(실제 데이터) 및 메타데이터로 구성됩니다.
    • 키는 메시지의 파티션을 결정하여 동일한 키를 가진 메시지의 순서를 보장합니다.
  • 파티션 할당:
    • 키가 제공되면 생산자는 해싱 알고리즘을 사용하여 파티션을 결정합니다.
    • 키가 없으면 로드 밸런싱을 위해 메시지가 라운드 로빈 방식으로 파티션 전체에 분산됩니다.
  • 압축:

    제작자는 메시지를 압축하여 네트워크 대역폭과 저장 공간 사용량을 줄일 수 있습니다.

소비자

Kafka 소비자Kafka 주제에서 메시지를 읽고 Kafka 파티션에서 메시지를 원하는 속도로 검색하는 클라이언트 애플리케이션으로, 데이터의 실시간 또는 일괄 처리가 가능합니다. . Kafka는 소비자에게 메시지를 푸시하지 않고 데이터를 요청하여 Kafka 파티션에서 메시지를 가져옵니다.

소비자는 자신이 처리한 오프셋도 추적합니다. 오프셋은 자동 또는 수동으로 커밋할 수 있으므로 소비자가 실패하더라도 데이터가 손실되지 않습니다. 이를 통해 오프셋을 재설정하여 메시지를 재생하는 등 유연한 소비가 가능합니다.

소비자 그룹

소비자 그룹은 일부 주제의 데이터를 소비하기 위해 협력하는 소비자 집합으로, 주제 메시지의 분산 처리가 가능합니다.

주제의 파티션은 그룹 내 소비자 간에 나누어지므로 각 메시지는 그룹 내 한 명의 소비자만 처리할 수 있습니다. 여러 소비자 그룹이 간섭 없이 동일한 주제를 독립적으로 소비할 수 있습니다.

새 소비자가 그룹에 가입하거나 기존 소비자가 실패하면 Kafka는 모든 파티션이 포함되도록 그룹 내 소비자 간에 파티션을 재할당합니다.

직렬화 및 역직렬화

Kafka의 직렬화 및 역직렬화는 전송 및 저장을 위해 데이터를 원본 형식과 바이트 배열 간에 변환하여 생산자와 소비자가 효율적으로 통신할 수 있도록 하는 것입니다.

직렬화

객체나 데이터 구조를 전송하거나 저장할 수 있도록 바이트 스트림으로 변환하는 프로세스입니다. 생산자는 데이터를 Kafka 주제로 보내기 전에 데이터(키와 값)를 바이트 배열로 직렬화합니다.

일반적인 직렬화 형식:

  • JSON: 사람이 읽을 수 있고 널리 호환됩니다.
  • Avro: 컴팩트하고 효율적이며 스키마 기반입니다.
  • Protobuf: 컴팩트하고 스키마 기반이며 언어에 구애받지 않습니다.
  • 문자열: 간단한 텍스트 기반 직렬화.
  • 사용자 정의 직렬화: 애플리케이션별 요구 사항에 적합합니다.

역직렬화

바이트 스트림이 원래 개체나 데이터 구조로 다시 변환되는 역 프로세스입니다. 소비자가 Kafka 주제에서 데이터를 읽으면 처리를 위해 바이트 배열을 다시 사용 가능한 형식으로 역직렬화합니다.

압축

압축은 메시지를 저장하거나 전송하기 전에 메시지 크기를 줄이는 것입니다. 생산자, 브로커, 소비자 간에 더 작은 페이로드를 전송하여 스토리지 사용량을 최적화하고, 네트워크 대역폭 소비를 줄이며, 전반적인 성능을 향상시킵니다.

생산자가 Kafka 주제에 메시지를 보낼 때 전송 전에 메시지를 압축할 수 있습니다. 압축된 메시지는 있는 그대로 브로커에 저장되며 소비자가 메시지를 읽을 때 압축이 풀립니다.

장점

  • 감소된 네트워크 대역폭: 페이로드가 작을수록 네트워크를 통해 전송되는 데이터의 양이 줄어듭니다.
  • 낮은 저장 공간 요구 사항: 압축된 메시지는 디스크 공간을 덜 차지합니다.
  • 처리량 향상: 메시지가 작을수록 데이터 전송 및 처리 속도가 빨라집니다.

언제 사용하나요?

  • 대형 메시지 크기의 사용 사례: 압축하면 데이터 크기가 크게 줄어듭니다.
  • 고처리량 시스템: 네트워크 및 스토리지 리소스에 대한 부담을 줄입니다.
  • 일괄 처리: 생산자가 여러 메시지를 일괄 처리할 때 압축이 가장 잘 작동합니다.

압축을 사용하면 리소스가 절약되지만 CPU 사용량과 압축 이점 간의 균형을 유지하여 사용 사례에 적합한 코덱을 선택하는 것이 중요합니다.

지원되는 압축 유형

  • 없음: 압축하지 않습니다(기본값).
  • Gzip: 압축률이 높지만 CPU 사용량이 높습니다.
  • Snappy: 균형 잡힌 압축 속도와 CPU 사용량으로 실시간 사용 사례에 적합합니다.
  • LZ4: 압축 및 압축 해제 속도가 빨라지고 대기 시간이 짧은 시스템에 최적화되었습니다.
  • Zstd: Gzip보다 성능이 뛰어난 높은 압축률, 최신 Kafka 버전에서 지원됩니다.

동조

Apache Kafka의 성능을 최적화하려면 처리량과 대기 시간의 균형을 효과적으로 맞추기 위해 다양한 구성 요소를 미세 조정해야 합니다. 이 기사는 이 주제의 표면적인 부분에 불과합니다. Kafka를 튜닝할 때 고려해야 할 몇 가지 측면은 다음과 같습니다.

  • 파티션 관리:

    • 파티션 수: 병렬성과 처리량을 향상하려면 파티션 수를 늘립니다. 그러나 관리 오버헤드를 방지하려면 과도한 파티션을 피하십시오. 소비자 능력과 원하는 소비율에 맞게 파티션 수를 조정하세요.
  • 제작자 구성:

    • 일괄 처리: 효율적인 메시지 일괄 처리를 활성화하여 요청 수를 줄이고 처리량을 향상시키려면 Batch.size 및 linger.ms를 구성하세요.
    • 압축: 메시지 크기를 줄이고 네트워크 및 저장소 사용량을 줄이려면 압축(예: 압축.유형=snappy)을 구현하세요. 압축으로 인해 추가 CPU 오버헤드가 발생한다는 점에 유의하세요.
  • 소비자 구성:

    • 가져오기 설정: fetch.min.bytes 및 fetch.max.wait.ms를 조정하여 소비자가 메시지를 검색하는 방법을 제어하고 애플리케이션 요구 사항에 따라 대기 시간과 처리량의 균형을 맞추세요.

실제 사례

방의 온도를 기록하고 이 데이터를 다른 애플리케이션이 처리하는 Kafka를 사용하여 전송하는 애플리케이션을 상상해 보세요. 단순화를 위해 생산자와 소비자가 모두 동일한 애플리케이션 내에서 구현되는 Kafka 측면에만 중점을 둘 것입니다. 이 시나리오에서 특정 순간에 기록된 각 온도는 이벤트를 나타냅니다.

{
  temperature: 42,
  timeStamp: new Date(),
};
로그인 후 복사
로그인 후 복사
로그인 후 복사

모든 코드는 이 저장소에 있습니다.

먼저 Kafka 브로커가 필요하지만 머신에 Kafka를 설치하는 대신 이 Docker Kafka 이미지만 설치하겠습니다.

해당 이미지를 가져와서 시작하세요.

docker pull apache/kafka

그런 다음 Kafka가 수신하는 포트를 우리 컴퓨터의 동일한 포트에서 매핑하도록 실행합니다.

docker run -d -p 9092:9092 --name Broker apache/kafka:latest

그렇습니다. Kafka 브로커가 실행 중입니다. 계속하기 전에 주제를 생성하고 메시지를 보내고 소비하는 등의 작업을 수행할 수 있습니다. 이를 수행하려면 해당 이미지 페이지의 지침을 따르기만 하면 됩니다.

KafkaJS와 함께 NestJS를 사용하여 애플리케이션을 구축하려면 먼저 Nest CLI로 앱을 생성하세요.

nest new my-nest-project

프로젝트 폴더 안에 kafkajs를 설치하세요

npm과 kafkajs

그리고 다음 모듈을 생성합니다

nest g mo kafka

네스트지모 프로듀서

nest g mo 소비자

네스트 지모 온도

Kafka 모듈은 메시지 연결, 연결 끊기, 전송 및 수신을 위한 소비자 및 생산자 클래스 관리를 포함하여 모든 Kafka 관련 작업을 처리합니다. 이는 kafkajs 패키지와 직접 상호 작용하는 유일한 모듈이 될 것입니다.

생산자 및 소비자 모듈은 pub-sub 플랫폼(이 경우 Kafka)과 나머지 애플리케이션 간의 인터페이스 역할을 하여 플랫폼별 세부 정보를 추상화합니다.

온도 모듈이 이벤트를 관리합니다. 어떤 pub-sub 플랫폼이 사용되고 있는지 알 필요는 없으며 소비자와 생산자만 있으면 작동합니다.

모듈이 생성되면 src/interface 폴더를 만들고 그 안에 다음 인터페이스를 추가해 보겠습니다.

{
  temperature: 42,
  timeStamp: new Date(),
};
로그인 후 복사
로그인 후 복사
로그인 후 복사
// src/interfaces/producer.interface.ts

export interface IProducer {
  produce: (message: any) => Promise<void>;
  connect: () => Promise<void>;
  disconnect: () => Promise<void>;
  isConnected: () => boolean;
}
로그인 후 복사
로그인 후 복사

src/kafka/ 폴더에 해당 인터페이스를 구현하는 생산자 및 소비자 클래스를 추가합니다.

// src/interfaces/consumer.interface.ts

export type ConsumerMessage = {
  key?: string;
  value: any;
};

export type OnMessage = (message: ConsumerMessage) => Promise<void>;

export interface IConsumer {
  connect: () => Promise<void>;
  disconnect: () => Promise<void>;
  consume: (onMessage?: OnMessage) => Promise<void>;
  isConnected: () => boolean;
}
로그인 후 복사
// src/kafka/kafka.producer.ts

export class KafkaProducer implements IProducer {
  private readonly logger = new Logger(KafkaProducer.name, { timestamp: true });
  private readonly kafka: Kafka;
  private readonly producer: Producer;
  private connected: boolean = false;

  constructor(
    private readonly broker: string,
    private readonly topic: string,
  ) {
    // The client must be configured with at least one broker
    this.kafka = new Kafka({
      brokers: [this.broker],
    });
    this.producer = this.kafka.producer();
  }

  async produce(
    message: Message,
    compression?: CompressionTypes,
    acks?: number,
    timeout?: number,
  ) {
    // To produce, at least a topic and a list of messages must be provided
    await this.producer.send({
      topic: this.topic,
      messages: [message],
      compression,
      timeout,
      acks,
    });
  }

  // To produce a message, the producer must be connected
  async connect() {
    try {
      // Just hooking up some logs in the producer events
      // And storing the connection status
      this.producer.on('producer.connect', () => {
        this.logger.log(
          `producer connected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = true;
      });

      this.producer.on('producer.disconnect', () => {
        this.logger.log(
          `producer disconnected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = false;
      });

      // Connect to Kafka
      await this.producer.connect();
    } catch (err) {
      this.logger.error(
        `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`,
        err,
      );
    }
  }

  async disconnect() {
    await this.producer.disconnect();
  }

  isConnected(): boolean {
    return this.connected;
  }
}

로그인 후 복사

kafka.module.ts에서 이러한 클래스를 내보내는 것을 잊지 마세요

// src/kafka/kafka.cosumer.ts

export class KafkaConsumer implements IConsumer {
  private readonly logger = new Logger(KafkaConsumer.name, { timestamp: true });
  private readonly kafka: Kafka;
  private readonly consumer: Consumer;
  private connected: boolean = false;

  constructor(
    private readonly broker: string,
    private readonly topic: string,
    private readonly groupId: string,
  ) {
    if (this.broker && this.topic && this.groupId) {
      // The client must be configured with at least one broker
      this.kafka = new Kafka({
        brokers: [this.broker],
      });
      this.consumer = this.kafka.consumer({ groupId: this.groupId });
    } else {
      this.logger.warn('Broker, topic and groupId must be provided');
    }
  }

  // The onMessage function will be called when a message is received
  async consume(onMessage: OnMessage) {
    // Here we subscribe to the topic ...
    await this.consumer.subscribe({ topic: this.topic });

    // ... and handle the messages
    await this.consumer.run({
      eachMessage: async (payload) => {
        try {
          this.logger.log(
            `message: ${payload.message.value.toString()} (topic: ${payload.topic}, partition: ${payload.partition})`,
          );

          await onMessage({
            key: payload.message.key?.toString(),
            value: payload.message.value.toString(),
          });
        } catch (err) {
          this.logger.error('error on consuming message', err);
        }
      },
    });
  }

  // To consume, the consumer must be connected
  async connect() {
    try {
      // Just hooking up some logs in the consumer events
      // And storing the connection status
      this.consumer.on('consumer.connect', () => {
        this.logger.log(
          `consumer connected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = true;
      });

      this.consumer.on('consumer.disconnect', () => {
        this.logger.log(
          `consumer disconnected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = false;
      });

      await this.consumer.connect();
    } catch (err) {
      this.logger.error(
        `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`,
        err,
      );
    }
  }

  async disconnect() {
    await this.consumer.disconnect();
  }

  isConnected(): boolean {
    return this.connected;
  }
}
로그인 후 복사

지금은 온도 모듈로 이동하여 Kafka 클래스를 인스턴스화하고 사용을 시작할 수 있습니다. 하지만 온도 모듈이 어떤 pub-sub 플랫폼을 사용하고 있는지 걱정할 필요가 없다면 더 좋을 것입니다. 대신, 기본 플랫폼에 관계없이 메시지 전송 및 수신에만 초점을 맞춰 주입된 생산자 및/또는 소비자와 함께 작동해야 합니다. 이렇게 하면 나중에 다른 pub-sub 플랫폼으로 전환하기로 결정하더라도 온도 모듈을 변경할 필요가 없습니다.

이러한 추상화를 달성하기 위해 Kafka의 생산자 및 소비자 구현의 세부 사항을 처리하는 생산자 및 소비자 클래스를 만들 수 있습니다. 프로듀서부터 시작해 보겠습니다.

// src/kafka/kafka.module.ts

@Module({
  imports: [],
  providers: [KafkaProducer, KafkaConsumer],
  exports: [KafkaProducer, KafkaConsumer],
})
export class KafkaModule {}
로그인 후 복사
// src/producer/producer.service.ts

@Injectable()
export class ProducerService implements OnApplicationShutdown {
  // Expects any producer that implements the IProducer interface
  private readonly producer: IProducer;

  constructor(
    @Inject('broker') broker: string,
    @Inject('topic') topic: string,
  ) {
    this.producer = new KafkaProducer(broker, topic);
  }

  /** The produce() and message can receive more parameters,
   * refer to produce method in src/kafka/kafka.producer.ts
   */
  async produce(message: { key?: string; value: string }) {
    if (!this.producer.isConnected()) {
      await this.producer.connect();
    }
    await this.producer.produce(message);
  }

  async onApplicationShutdown() {
    await this.producer.disconnect();
  }
}
로그인 후 복사

이제 소비자는

// src/producer/producer.module.ts

@Module({
  imports: [KafkaModule],
  providers: [
    ProducerService,
    {
      provide: 'broker',
      useValue: 'default-broker-value',
    },
    {
      provide: 'topic',
      useValue: 'default-topic-value',
    },
  ],
  exports: [ProducerService],
})
export class ProducerModule {}
로그인 후 복사
// src/consumer/consumer.service.ts

@Injectable()
export class ConsumerService implements OnApplicationShutdown {
  // Expects any consumer that implements the IConsumer interface
  private readonly consumer: IConsumer;

  constructor(
    @Inject('broker') broker: string,
    @Inject('topic') topic: string,
    @Inject('groupId') groupId: string,
  ) {
    this.consumer = new KafkaConsumer(broker, topic, groupId);
  }

  async consume(onMessage: OnMessage) {
    if (!this.consumer.isConnected()) {
      await this.consumer.connect();
    }
    await this.consumer.consume(onMessage);
  }

  async onApplicationShutdown() {
    await this.consumer.disconnect();
  }
}
로그인 후 복사

이제 온도 모듈 구축에 집중할 수 있습니다. 온도.service.ts 파일에서 온도를 등록하는 메서드를 생성합니다. 이 예에서는 생성자를 사용하여 브로커에 온도 데이터를 전송합니다. 또한 데모 목적으로 들어오는 메시지를 처리하는 방법을 구현하겠습니다.

이러한 메소드는 다른 서비스나 컨트롤러에서 호출할 수 있습니다. 그러나 단순화를 위해 이 예에서는 애플리케이션이 시작될 때 onModuleInit 메소드를 활용하여 직접 호출하겠습니다.

{
  temperature: 42,
  timeStamp: new Date(),
};
로그인 후 복사
로그인 후 복사
로그인 후 복사
// src/interfaces/producer.interface.ts

export interface IProducer {
  produce: (message: any) => Promise<void>;
  connect: () => Promise<void>;
  disconnect: () => Promise<void>;
  isConnected: () => boolean;
}
로그인 후 복사
로그인 후 복사

그렇습니다! Docker 컨테이너에서 실행 중인 브로커를 사용하면 애플리케이션을 시작하여 메시지를 보내고 받을 수 있습니다. 또한 다음 명령을 사용하여 브로커 컨테이너 내부에서 셸을 열 수 있습니다.

docker exec --workdir /opt/kafka/bin/ -it Broker sh

여기에서 브로커와 직접 상호 작용하고 애플리케이션에 메시지를 보내고, 메시지를 받고, 새 주제를 만드는 등의 작업을 수행할 수 있습니다.

이 예제의 코드가 담긴 저장소입니다.

위 내용은 실제 사례를 통한 Kafka 기본 사항의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

원천:dev.to
본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
저자별 최신 기사
인기 튜토리얼
더>
최신 다운로드
더>
웹 효과
웹사이트 소스 코드
웹사이트 자료
프론트엔드 템플릿