지난 몇 주 동안 저는 Kafka에 대해 알아보고 메모를 하면서 블로그 게시물을 구성하고 구조화하기로 결정했습니다. 개념과 팁 외에도 다음과 같은 실용적인 예제가 있습니다. NestJS 및 KafkaJS.
Apache Kafka는 실시간 이벤트를 처리하도록 설계된 분산 이벤트 스트리밍 플랫폼입니다. 대규모의 높은 처리량, 낮은 지연 시간의 데이터 스트림을 저장, 처리 및 검색할 수 있으므로 실시간 데이터 파이프라인 및 이벤트 기반 애플리케이션을 구축하는 데 적합합니다.
이러한 절충안은 Kafka의 성능을 최대화하기 위한 의도적인 디자인 선택이지만 더 큰 유연성이 필요한 사용 사례에는 문제가 될 수 있습니다.
Kafka는 대기열 및 게시-구독 메시징 모델의 기능을 모두 통합하여 소비자에게 각 접근 방식의 장점을 제공합니다.
Kafka는 분할된 로그 시스템을 사용하여 큐잉 및 게시-구독 모델의 이점을 결합합니다. 정렬된 레코드 시퀀스인 로그는 파티션으로 나누어지며, 각 파티션은 서로 다른 구독자(소비자)에게 할당됩니다. 이 설정을 사용하면 확장성을 유지하면서 여러 구독자가 주제를 공유할 수 있습니다.
Kafka가 실시간 이벤트를 처리하도록 설계된 플랫폼이라는 것을 살펴보았습니다. 이러한 이벤트가 처리되는 방식에 대해 이야기하기 전에 이에 대한 정의가 필요합니다.
이벤트는 결제, 웹사이트 클릭, 온도 측정 등 애플리케이션에 기록된 작업, 사건 또는 변경 사항입니다.
Kafka의이벤트는 키/값 쌍으로 모델링되며, 여기서 키와 값은 모두 바이트 시퀀스로 직렬화됩니다.
Kafka는 주제라는 순서가 지정된 로그로 이벤트를 구성합니다. 외부 시스템이 Kafka에 이벤트를 기록하면 해당 이벤트가 항목 끝에 추가됩니다. 메시지는 읽은 후에도 구성 가능한 기간 동안 주제에 남아 있습니다. 대기열과 달리 토픽은 내구성이 있고 복제되며 내결함성이 있어 이벤트 레코드를 효율적으로 저장합니다. 단, 로그는 순차적으로만 스캔할 수 있고 쿼리할 수는 없습니다.
주제는 디스크에 로그 파일로 저장되지만 디스크에는 유한한 크기와 I/O 등의 제한이 있습니다. 이를 극복하기 위해 Kafka에서는 주제를 파티션으로 나누어 단일 로그를 여러 서버에 분산할 수 있는 여러 로그로 나눌 수 있습니다. 이러한 파티셔닝을 통해 Kafka는 수평적으로 확장할 수 있어 대용량 이벤트와 높은 처리량을 처리할 수 있는 용량이 향상됩니다.
Kafka는 키가 있는지 여부에 따라 파티션에 메시지를 할당합니다.
Kafka는 브로커라는 노드를 사용하여 Kafka 클러스터를 집합적으로 구성하는 분산 데이터 인프라로 작동합니다. 브로커는 베어메탈 하드웨어, 클라우드 인스턴스, Kubernetes가 관리하는 컨테이너, 노트북의 Docker 또는 JVM 프로세스가 실행될 수 있는 모든 곳에서 실행될 수 있습니다.
브로커의 초점:
메시지 계산이나 주제 간 라우팅을 수행하지 않아 디자인을 단순하고 효율적으로 유지합니다.
Kafka는 여러 브로커에 걸쳐 파티션 데이터를 복제하여 데이터 내구성과 내결함성을 보장합니다. 파티션의 기본 복사본은 리더 복제본이고, 추가 복사본은 팔로워 복제본입니다. 데이터는 리더에 기록되며 업데이트는 자동으로 팔로어에게 복제됩니다.
이 복제 프로세스는 다음을 보장합니다.
Kafka가 복제를 투명하게 처리하므로 개발자는 복제를 직접 관리할 필요 없이 이러한 보장의 이점을 누릴 수 있습니다.
Kafka 프로듀서는 Kafka 주제에 데이터를 전송(게시)하는 클라이언트 애플리케이션입니다. Kafka 클러스터에 메시지(레코드)를 생성하고 전송하는 역할을 담당합니다. 생산자는 구성과 메시지 키의 존재 여부에 따라 메시지가 저장될 주제 및 파티션을 결정합니다. 제작자는 다음에 대한 책임이 있지만 이에 국한되지는 않습니다.
압축:
제작자는 메시지를 압축하여 네트워크 대역폭과 저장 공간 사용량을 줄일 수 있습니다.
Kafka 소비자는 Kafka 주제에서 메시지를 읽고 Kafka 파티션에서 메시지를 원하는 속도로 검색하는 클라이언트 애플리케이션으로, 데이터의 실시간 또는 일괄 처리가 가능합니다. . Kafka는 소비자에게 메시지를 푸시하지 않고 데이터를 요청하여 Kafka 파티션에서 메시지를 가져옵니다.
소비자는 자신이 처리한 오프셋도 추적합니다. 오프셋은 자동 또는 수동으로 커밋할 수 있으므로 소비자가 실패하더라도 데이터가 손실되지 않습니다. 이를 통해 오프셋을 재설정하여 메시지를 재생하는 등 유연한 소비가 가능합니다.
소비자 그룹은 일부 주제의 데이터를 소비하기 위해 협력하는 소비자 집합으로, 주제 메시지의 분산 처리가 가능합니다.
주제의 파티션은 그룹 내 소비자 간에 나누어지므로 각 메시지는 그룹 내 한 명의 소비자만 처리할 수 있습니다. 여러 소비자 그룹이 간섭 없이 동일한 주제를 독립적으로 소비할 수 있습니다.
새 소비자가 그룹에 가입하거나 기존 소비자가 실패하면 Kafka는 모든 파티션이 포함되도록 그룹 내 소비자 간에 파티션을 재할당합니다.
Kafka의 직렬화 및 역직렬화는 전송 및 저장을 위해 데이터를 원본 형식과 바이트 배열 간에 변환하여 생산자와 소비자가 효율적으로 통신할 수 있도록 하는 것입니다.
객체나 데이터 구조를 전송하거나 저장할 수 있도록 바이트 스트림으로 변환하는 프로세스입니다. 생산자는 데이터를 Kafka 주제로 보내기 전에 데이터(키와 값)를 바이트 배열로 직렬화합니다.
일반적인 직렬화 형식:
바이트 스트림이 원래 개체나 데이터 구조로 다시 변환되는 역 프로세스입니다. 소비자가 Kafka 주제에서 데이터를 읽으면 처리를 위해 바이트 배열을 다시 사용 가능한 형식으로 역직렬화합니다.
압축은 메시지를 저장하거나 전송하기 전에 메시지 크기를 줄이는 것입니다. 생산자, 브로커, 소비자 간에 더 작은 페이로드를 전송하여 스토리지 사용량을 최적화하고, 네트워크 대역폭 소비를 줄이며, 전반적인 성능을 향상시킵니다.
생산자가 Kafka 주제에 메시지를 보낼 때 전송 전에 메시지를 압축할 수 있습니다. 압축된 메시지는 있는 그대로 브로커에 저장되며 소비자가 메시지를 읽을 때 압축이 풀립니다.
압축을 사용하면 리소스가 절약되지만 CPU 사용량과 압축 이점 간의 균형을 유지하여 사용 사례에 적합한 코덱을 선택하는 것이 중요합니다.
Apache Kafka의 성능을 최적화하려면 처리량과 대기 시간의 균형을 효과적으로 맞추기 위해 다양한 구성 요소를 미세 조정해야 합니다. 이 기사는 이 주제의 표면적인 부분에 불과합니다. Kafka를 튜닝할 때 고려해야 할 몇 가지 측면은 다음과 같습니다.
파티션 관리:
제작자 구성:
소비자 구성:
방의 온도를 기록하고 이 데이터를 다른 애플리케이션이 처리하는 Kafka를 사용하여 전송하는 애플리케이션을 상상해 보세요. 단순화를 위해 생산자와 소비자가 모두 동일한 애플리케이션 내에서 구현되는 Kafka 측면에만 중점을 둘 것입니다. 이 시나리오에서 특정 순간에 기록된 각 온도는 이벤트를 나타냅니다.
{ temperature: 42, timeStamp: new Date(), };
모든 코드는 이 저장소에 있습니다.
먼저 Kafka 브로커가 필요하지만 머신에 Kafka를 설치하는 대신 이 Docker Kafka 이미지만 설치하겠습니다.
해당 이미지를 가져와서 시작하세요.
docker pull apache/kafka
그런 다음 Kafka가 수신하는 포트를 우리 컴퓨터의 동일한 포트에서 매핑하도록 실행합니다.
docker run -d -p 9092:9092 --name Broker apache/kafka:latest
그렇습니다. Kafka 브로커가 실행 중입니다. 계속하기 전에 주제를 생성하고 메시지를 보내고 소비하는 등의 작업을 수행할 수 있습니다. 이를 수행하려면 해당 이미지 페이지의 지침을 따르기만 하면 됩니다.
KafkaJS와 함께 NestJS를 사용하여 애플리케이션을 구축하려면 먼저 Nest CLI로 앱을 생성하세요.
nest new my-nest-project
프로젝트 폴더 안에 kafkajs를 설치하세요
npm과 kafkajs
그리고 다음 모듈을 생성합니다
nest g mo kafka
네스트지모 프로듀서
nest g mo 소비자
네스트 지모 온도
Kafka 모듈은 메시지 연결, 연결 끊기, 전송 및 수신을 위한 소비자 및 생산자 클래스 관리를 포함하여 모든 Kafka 관련 작업을 처리합니다. 이는 kafkajs 패키지와 직접 상호 작용하는 유일한 모듈이 될 것입니다.
생산자 및 소비자 모듈은 pub-sub 플랫폼(이 경우 Kafka)과 나머지 애플리케이션 간의 인터페이스 역할을 하여 플랫폼별 세부 정보를 추상화합니다.
온도 모듈이 이벤트를 관리합니다. 어떤 pub-sub 플랫폼이 사용되고 있는지 알 필요는 없으며 소비자와 생산자만 있으면 작동합니다.
모듈이 생성되면 src/interface 폴더를 만들고 그 안에 다음 인터페이스를 추가해 보겠습니다.
{ temperature: 42, timeStamp: new Date(), };
// src/interfaces/producer.interface.ts export interface IProducer { produce: (message: any) => Promise<void>; connect: () => Promise<void>; disconnect: () => Promise<void>; isConnected: () => boolean; }
src/kafka/ 폴더에 해당 인터페이스를 구현하는 생산자 및 소비자 클래스를 추가합니다.
// src/interfaces/consumer.interface.ts export type ConsumerMessage = { key?: string; value: any; }; export type OnMessage = (message: ConsumerMessage) => Promise<void>; export interface IConsumer { connect: () => Promise<void>; disconnect: () => Promise<void>; consume: (onMessage?: OnMessage) => Promise<void>; isConnected: () => boolean; }
// src/kafka/kafka.producer.ts export class KafkaProducer implements IProducer { private readonly logger = new Logger(KafkaProducer.name, { timestamp: true }); private readonly kafka: Kafka; private readonly producer: Producer; private connected: boolean = false; constructor( private readonly broker: string, private readonly topic: string, ) { // The client must be configured with at least one broker this.kafka = new Kafka({ brokers: [this.broker], }); this.producer = this.kafka.producer(); } async produce( message: Message, compression?: CompressionTypes, acks?: number, timeout?: number, ) { // To produce, at least a topic and a list of messages must be provided await this.producer.send({ topic: this.topic, messages: [message], compression, timeout, acks, }); } // To produce a message, the producer must be connected async connect() { try { // Just hooking up some logs in the producer events // And storing the connection status this.producer.on('producer.connect', () => { this.logger.log( `producer connected. broker: ${this.broker} topic: ${this.topic}`, ); this.connected = true; }); this.producer.on('producer.disconnect', () => { this.logger.log( `producer disconnected. broker: ${this.broker} topic: ${this.topic}`, ); this.connected = false; }); // Connect to Kafka await this.producer.connect(); } catch (err) { this.logger.error( `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`, err, ); } } async disconnect() { await this.producer.disconnect(); } isConnected(): boolean { return this.connected; } }
kafka.module.ts에서 이러한 클래스를 내보내는 것을 잊지 마세요
// src/kafka/kafka.cosumer.ts export class KafkaConsumer implements IConsumer { private readonly logger = new Logger(KafkaConsumer.name, { timestamp: true }); private readonly kafka: Kafka; private readonly consumer: Consumer; private connected: boolean = false; constructor( private readonly broker: string, private readonly topic: string, private readonly groupId: string, ) { if (this.broker && this.topic && this.groupId) { // The client must be configured with at least one broker this.kafka = new Kafka({ brokers: [this.broker], }); this.consumer = this.kafka.consumer({ groupId: this.groupId }); } else { this.logger.warn('Broker, topic and groupId must be provided'); } } // The onMessage function will be called when a message is received async consume(onMessage: OnMessage) { // Here we subscribe to the topic ... await this.consumer.subscribe({ topic: this.topic }); // ... and handle the messages await this.consumer.run({ eachMessage: async (payload) => { try { this.logger.log( `message: ${payload.message.value.toString()} (topic: ${payload.topic}, partition: ${payload.partition})`, ); await onMessage({ key: payload.message.key?.toString(), value: payload.message.value.toString(), }); } catch (err) { this.logger.error('error on consuming message', err); } }, }); } // To consume, the consumer must be connected async connect() { try { // Just hooking up some logs in the consumer events // And storing the connection status this.consumer.on('consumer.connect', () => { this.logger.log( `consumer connected. broker: ${this.broker} topic: ${this.topic}`, ); this.connected = true; }); this.consumer.on('consumer.disconnect', () => { this.logger.log( `consumer disconnected. broker: ${this.broker} topic: ${this.topic}`, ); this.connected = false; }); await this.consumer.connect(); } catch (err) { this.logger.error( `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`, err, ); } } async disconnect() { await this.consumer.disconnect(); } isConnected(): boolean { return this.connected; } }
지금은 온도 모듈로 이동하여 Kafka 클래스를 인스턴스화하고 사용을 시작할 수 있습니다. 하지만 온도 모듈이 어떤 pub-sub 플랫폼을 사용하고 있는지 걱정할 필요가 없다면 더 좋을 것입니다. 대신, 기본 플랫폼에 관계없이 메시지 전송 및 수신에만 초점을 맞춰 주입된 생산자 및/또는 소비자와 함께 작동해야 합니다. 이렇게 하면 나중에 다른 pub-sub 플랫폼으로 전환하기로 결정하더라도 온도 모듈을 변경할 필요가 없습니다.
이러한 추상화를 달성하기 위해 Kafka의 생산자 및 소비자 구현의 세부 사항을 처리하는 생산자 및 소비자 클래스를 만들 수 있습니다. 프로듀서부터 시작해 보겠습니다.
// src/kafka/kafka.module.ts @Module({ imports: [], providers: [KafkaProducer, KafkaConsumer], exports: [KafkaProducer, KafkaConsumer], }) export class KafkaModule {}
// src/producer/producer.service.ts @Injectable() export class ProducerService implements OnApplicationShutdown { // Expects any producer that implements the IProducer interface private readonly producer: IProducer; constructor( @Inject('broker') broker: string, @Inject('topic') topic: string, ) { this.producer = new KafkaProducer(broker, topic); } /** The produce() and message can receive more parameters, * refer to produce method in src/kafka/kafka.producer.ts */ async produce(message: { key?: string; value: string }) { if (!this.producer.isConnected()) { await this.producer.connect(); } await this.producer.produce(message); } async onApplicationShutdown() { await this.producer.disconnect(); } }
이제 소비자는
// src/producer/producer.module.ts @Module({ imports: [KafkaModule], providers: [ ProducerService, { provide: 'broker', useValue: 'default-broker-value', }, { provide: 'topic', useValue: 'default-topic-value', }, ], exports: [ProducerService], }) export class ProducerModule {}
// src/consumer/consumer.service.ts @Injectable() export class ConsumerService implements OnApplicationShutdown { // Expects any consumer that implements the IConsumer interface private readonly consumer: IConsumer; constructor( @Inject('broker') broker: string, @Inject('topic') topic: string, @Inject('groupId') groupId: string, ) { this.consumer = new KafkaConsumer(broker, topic, groupId); } async consume(onMessage: OnMessage) { if (!this.consumer.isConnected()) { await this.consumer.connect(); } await this.consumer.consume(onMessage); } async onApplicationShutdown() { await this.consumer.disconnect(); } }
이제 온도 모듈 구축에 집중할 수 있습니다. 온도.service.ts 파일에서 온도를 등록하는 메서드를 생성합니다. 이 예에서는 생성자를 사용하여 브로커에 온도 데이터를 전송합니다. 또한 데모 목적으로 들어오는 메시지를 처리하는 방법을 구현하겠습니다.
이러한 메소드는 다른 서비스나 컨트롤러에서 호출할 수 있습니다. 그러나 단순화를 위해 이 예에서는 애플리케이션이 시작될 때 onModuleInit 메소드를 활용하여 직접 호출하겠습니다.
{ temperature: 42, timeStamp: new Date(), };
// src/interfaces/producer.interface.ts export interface IProducer { produce: (message: any) => Promise<void>; connect: () => Promise<void>; disconnect: () => Promise<void>; isConnected: () => boolean; }
그렇습니다! Docker 컨테이너에서 실행 중인 브로커를 사용하면 애플리케이션을 시작하여 메시지를 보내고 받을 수 있습니다. 또한 다음 명령을 사용하여 브로커 컨테이너 내부에서 셸을 열 수 있습니다.
docker exec --workdir /opt/kafka/bin/ -it Broker sh
여기에서 브로커와 직접 상호 작용하고 애플리케이션에 메시지를 보내고, 메시지를 받고, 새 주제를 만드는 등의 작업을 수행할 수 있습니다.
이 예제의 코드가 담긴 저장소입니다.
위 내용은 실제 사례를 통한 Kafka 기본 사항의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!