在过去的几周里,我一直在深入研究 Kafka 并一路做笔记,我决定组织和构建一篇博客文章,在上面,除了概念和技巧之外,还有一个使用构建的实际示例NestJS 和 KafkaJs。
Apache Kafka 是一个分布式事件流平台,旨在处理实时事件。它能够存储、处理和检索大规模、高吞吐量、低延迟的数据流,使其适合构建实时数据管道和事件驱动的应用程序。
这些权衡是有意的设计选择,旨在最大限度地提高 Kafka 的性能,但可能会给需要更大灵活性的用例带来挑战:
Kafka 集成了队列和发布-订阅消息传递模型的功能,为消费者提供每种方法的优势。
Kafka 采用分区日志系统来结合队列和发布-订阅模型的优点。日志是有序的记录序列,被分为多个分区,每个分区分配给不同的订阅者(消费者)。此设置使多个订阅者能够共享一个主题,同时保持可扩展性。
我们已经看到 Kafka 是一个旨在处理实时事件的平台,在讨论如何处理这些事件之前,我们需要对它们进行定义:
事件是记录应用程序的操作、事件或更改,例如付款、网站点击或温度读数。
Kafka 中的事件被建模为键/值对,其中键和值都被序列化为字节序列。
Kafka 将事件组织成有序日志,称为主题。当外部系统将事件写入 Kafka 时,它会被附加到主题的末尾。即使在阅读后,消息也会在主题中保留可配置的持续时间。与队列不同,主题具有持久性、可复制性和容错性,可以有效地存储事件记录。但日志只能顺序扫描,不能查询。
主题作为日志文件存储在磁盘上,但是磁盘具有有限的大小和 I/O 等限制。为了克服这个问题,Kafka 允许主题分为分区,将单个日志分解为多个可以分布在不同服务器上的日志。这种分区使 Kafka 能够水平扩展,增强其处理大量事件和高吞吐量的能力。
Kafka 根据分区是否有 key:
将消息分配给分区Kafka 使用名为 brokers 的节点作为分布式数据基础设施运行,这些节点共同形成 Kafka 集群。代理可以在裸机硬件、云实例、Kubernetes 管理的容器中、笔记本电脑上的 Docker 中或任何可以运行 JVM 进程的地方运行。
经纪商关注:
它们不执行消息计算或主题到主题的路由,从而保持设计简单高效。
Kafka 通过跨多个代理复制分区数据来确保数据的持久性和容错性。分区的主要副本是领导副本,而其他副本是跟随副本。数据被写入领导者,领导者自动将更新复制到追随者。
此复制过程可确保:
开发人员可以从这些保证中受益,而无需直接管理复制,因为 Kafka 会透明地处理它。
Kafka 生产者 是一个客户端应用程序,它将数据发送(发布)到 Kafka 主题。它负责创建消息(记录)并将其传输到 Kafka 集群。生产者根据其配置和消息密钥的存在来确定存储消息的主题和分区。生产者负责但不限于:
压缩:
生产者可以压缩消息以减少网络带宽和存储使用。
Kafka 消费者 是一个客户端应用程序,它从 Kafka 主题读取消息, 它按照自己的节奏从 Kafka 分区检索消息,允许实时或批量处理数据。请注意,Kafka 不会将消息推送给消费者,而是通过请求数据从 Kafka 分区中拉取消息。
消费者还可以跟踪他们已处理的抵消额。偏移量可以自动或手动提交,确保消费者失败时数据不会丢失。这允许灵活的消费,包括通过重置偏移量来重放消息。
消费者组是一组消费者,它们合作消费来自某些主题的数据,从而允许分布式处理主题的消息。
主题的分区在组内的消费者之间划分,确保每条消息仅由组内的一个消费者处理。多个消费组可以独立消费同一个主题,互不干扰。
当新的消费者加入组或现有消费者失败时,Kafka 会在组中的消费者之间重新分配分区,以确保覆盖所有分区。
Kafka中的序列化和反序列化是将数据在其原始格式和字节数组之间进行转换以进行传输和存储,从而使生产者和消费者能够高效地进行通信。
是将对象或数据结构转换为字节流以便传输或存储的过程。在生产者将数据发送到 Kafka 主题之前,它将数据(键和值)序列化为字节数组。
常见序列化格式:
是相反的过程,其中字节流被转换回其原始对象或数据结构。当消费者从 Kafka 主题读取数据时,它将字节数组反序列化回可用的格式进行处理。
压缩是指在存储或传输消息之前减小消息的大小。它通过在生产者、代理和消费者之间发送较小的有效负载来优化存储使用、减少网络带宽消耗并提高整体性能。
当生产者向 Kafka 主题发送消息时,它可以在传输之前对消息进行压缩。压缩的消息按原样存储在代理上,并由消费者在读取消息时解压缩。
虽然压缩可以节省资源,但必须平衡 CPU 使用率和压缩优势之间的权衡,选择适合您的用例的编解码器。
优化 Apache Kafka 的性能涉及微调各个组件以有效平衡吞吐量和延迟。本文仅触及该主题的表面,以下是调优 Kafka 时需要考虑的一些方面:
分区管理:
生产者配置:
消费者配置:
想象一个应用程序记录房间内的温度并使用 Kafka 传输该数据,然后另一个应用程序处理该数据。为简单起见,我们将仅关注 Kafka 方面,生产者和消费者都在同一应用程序中实现。在这种情况下,特定时刻记录的每个温度都代表一个事件:
{ temperature: 42, timeStamp: new Date(), };
所有代码都将在此存储库中。
首先,我们需要一个 Kafka 代理,但我们不需要在我们的机器中安装 Kafka,只需使用这个 Docker Kafka 镜像即可。
首先拉取该图像:
docker 拉 apache/kafka
然后运行它,映射 Kafka 在我们机器上的同一端口上侦听的端口:
docker run -d -p 9092:9092 --name Broker apache/kafka:latest
就是这样,我们有一个正在运行的 Kafka 代理,在继续之前,您可能想通过创建主题、发送和使用消息来尝试一下它,只需按照该图像页面上的说明进行操作即可。
为了构建我们的应用程序,我们将结合使用 NestJS 和 KafkaJS,首先使用 Nest CLI 创建应用程序
嵌套新的我的巢项目
在项目文件夹内安装kafkajs
npm 我卡夫卡
并生成以下模块
巢g莫卡夫卡
nest g mo 制作人
巢 g mo 消费者
巢穴温度
Kafka 模块 将处理所有 Kafka 特定的操作,包括管理用于连接、断开连接、发送和接收消息的消费者和生产者类。这将是唯一直接与 kafkajs 包交互的模块。
生产者和消费者模块将充当发布-订阅平台(在本例中为 Kafka)与应用程序其余部分之间的接口,抽象平台特定的详细信息。
温度模块将管理事件。它不需要知道正在使用哪个发布-订阅平台,只需要消费者和生产者即可运行。
创建模块后,我们还创建一个文件夹 src/interface 并在其中添加以下接口:
{ temperature: 42, timeStamp: new Date(), };
// src/interfaces/producer.interface.ts export interface IProducer { produce: (message: any) => Promise<void>; connect: () => Promise<void>; disconnect: () => Promise<void>; isConnected: () => boolean; }
在 src/kafka/ 文件夹中添加实现这些接口的生产者和消费者类:
// src/interfaces/consumer.interface.ts export type ConsumerMessage = { key?: string; value: any; }; export type OnMessage = (message: ConsumerMessage) => Promise<void>; export interface IConsumer { connect: () => Promise<void>; disconnect: () => Promise<void>; consume: (onMessage?: OnMessage) => Promise<void>; isConnected: () => boolean; }
// src/kafka/kafka.producer.ts export class KafkaProducer implements IProducer { private readonly logger = new Logger(KafkaProducer.name, { timestamp: true }); private readonly kafka: Kafka; private readonly producer: Producer; private connected: boolean = false; constructor( private readonly broker: string, private readonly topic: string, ) { // The client must be configured with at least one broker this.kafka = new Kafka({ brokers: [this.broker], }); this.producer = this.kafka.producer(); } async produce( message: Message, compression?: CompressionTypes, acks?: number, timeout?: number, ) { // To produce, at least a topic and a list of messages must be provided await this.producer.send({ topic: this.topic, messages: [message], compression, timeout, acks, }); } // To produce a message, the producer must be connected async connect() { try { // Just hooking up some logs in the producer events // And storing the connection status this.producer.on('producer.connect', () => { this.logger.log( `producer connected. broker: ${this.broker} topic: ${this.topic}`, ); this.connected = true; }); this.producer.on('producer.disconnect', () => { this.logger.log( `producer disconnected. broker: ${this.broker} topic: ${this.topic}`, ); this.connected = false; }); // Connect to Kafka await this.producer.connect(); } catch (err) { this.logger.error( `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`, err, ); } } async disconnect() { await this.producer.disconnect(); } isConnected(): boolean { return this.connected; } }
不要忘记在 kafka.module.ts 中导出这些类
// src/kafka/kafka.cosumer.ts export class KafkaConsumer implements IConsumer { private readonly logger = new Logger(KafkaConsumer.name, { timestamp: true }); private readonly kafka: Kafka; private readonly consumer: Consumer; private connected: boolean = false; constructor( private readonly broker: string, private readonly topic: string, private readonly groupId: string, ) { if (this.broker && this.topic && this.groupId) { // The client must be configured with at least one broker this.kafka = new Kafka({ brokers: [this.broker], }); this.consumer = this.kafka.consumer({ groupId: this.groupId }); } else { this.logger.warn('Broker, topic and groupId must be provided'); } } // The onMessage function will be called when a message is received async consume(onMessage: OnMessage) { // Here we subscribe to the topic ... await this.consumer.subscribe({ topic: this.topic }); // ... and handle the messages await this.consumer.run({ eachMessage: async (payload) => { try { this.logger.log( `message: ${payload.message.value.toString()} (topic: ${payload.topic}, partition: ${payload.partition})`, ); await onMessage({ key: payload.message.key?.toString(), value: payload.message.value.toString(), }); } catch (err) { this.logger.error('error on consuming message', err); } }, }); } // To consume, the consumer must be connected async connect() { try { // Just hooking up some logs in the consumer events // And storing the connection status this.consumer.on('consumer.connect', () => { this.logger.log( `consumer connected. broker: ${this.broker} topic: ${this.topic}`, ); this.connected = true; }); this.consumer.on('consumer.disconnect', () => { this.logger.log( `consumer disconnected. broker: ${this.broker} topic: ${this.topic}`, ); this.connected = false; }); await this.consumer.connect(); } catch (err) { this.logger.error( `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`, err, ); } } async disconnect() { await this.consumer.disconnect(); } isConnected(): boolean { return this.connected; } }
现在我们可以转到温度模块并实例化这些 Kafka 类并开始使用它们。然而,如果温度模块不必担心它正在使用哪个 pub-sub 平台,那就更好了。相反,它应该简单地与注入的生产者和/或消费者一起工作,只专注于发送和接收消息,而不管底层平台如何。这样,如果我们决定将来切换到不同的 pub-sub 平台,我们不需要对温度模块进行任何更改。
为了实现这种抽象,我们可以创建 Producer 和 Consumer 类来处理 Kafka Producer 和 Consumer 实现的细节。让我们从制作人开始:
// src/kafka/kafka.module.ts @Module({ imports: [], providers: [KafkaProducer, KafkaConsumer], exports: [KafkaProducer, KafkaConsumer], }) export class KafkaModule {}
// src/producer/producer.service.ts @Injectable() export class ProducerService implements OnApplicationShutdown { // Expects any producer that implements the IProducer interface private readonly producer: IProducer; constructor( @Inject('broker') broker: string, @Inject('topic') topic: string, ) { this.producer = new KafkaProducer(broker, topic); } /** The produce() and message can receive more parameters, * refer to produce method in src/kafka/kafka.producer.ts */ async produce(message: { key?: string; value: string }) { if (!this.producer.isConnected()) { await this.producer.connect(); } await this.producer.produce(message); } async onApplicationShutdown() { await this.producer.disconnect(); } }
现在,消费者:
// src/producer/producer.module.ts @Module({ imports: [KafkaModule], providers: [ ProducerService, { provide: 'broker', useValue: 'default-broker-value', }, { provide: 'topic', useValue: 'default-topic-value', }, ], exports: [ProducerService], }) export class ProducerModule {}
// src/consumer/consumer.service.ts @Injectable() export class ConsumerService implements OnApplicationShutdown { // Expects any consumer that implements the IConsumer interface private readonly consumer: IConsumer; constructor( @Inject('broker') broker: string, @Inject('topic') topic: string, @Inject('groupId') groupId: string, ) { this.consumer = new KafkaConsumer(broker, topic, groupId); } async consume(onMessage: OnMessage) { if (!this.consumer.isConnected()) { await this.consumer.connect(); } await this.consumer.consume(onMessage); } async onApplicationShutdown() { await this.consumer.disconnect(); } }
现在,我们可以专注于构建温度模块。在Temperature.service.ts 文件中,我们将创建一个方法来注册温度,在本例中,该方法将简单地使用生产者将温度数据发送到代理。此外,我们将实现一种方法来处理传入消息以用于演示目的。
这些方法可以由另一个服务或控制器调用。但是,为了简单起见,在本示例中,我们将在应用程序启动时利用 onModuleInit 方法直接调用它们。
{ temperature: 42, timeStamp: new Date(), };
// src/interfaces/producer.interface.ts export interface IProducer { produce: (message: any) => Promise<void>; connect: () => Promise<void>; disconnect: () => Promise<void>; isConnected: () => boolean; }
就是这样!通过在 Docker 容器中运行代理,您可以启动应用程序来发送和接收消息。此外,您可以使用以下命令在代理容器内打开 shell:
docker exec --workdir /opt/kafka/bin/ -it Broker sh
从那里,您可以直接与代理交互并向应用程序发送消息、从中接收消息、创建新主题等。
这是包含本示例代码的存储库。
以上是Kafka 基础知识和实际示例的详细内容。更多信息请关注PHP中文网其他相关文章!