Spring Cloud Stream は、Apache Kafka や RabbitMQ などのメッセージ ブローカーを抽象化することで、メッセージ駆動型のマイクロサービスの開発を簡素化するフレームワークです。 Spring Cloud Stream の強力な機能の 1 つは、Kafka とシームレスに統合できることで、開発者は堅牢でスケーラブルなイベント駆動型アプリケーションを構築できます。 Spring Cloud Stream の Kafka バインダーは、Kafka トピックに簡単に接続する方法を提供します。
このブログでは、Spring Cloud Stream Kafka Binder でコンシューマ インターセプタを使用する方法を詳しく説明します。 Kafka のインターセプターは、アプリケーションによって使用される前にレコードをインターセプトして変更するメカニズムを提供し、ロギング、メトリクス収集、データ操作の機会を提供します。
詳細に入る前に、次の前提条件を満たしていることを確認してください。
まず、Spring Cloud Stream と Kafka に必要な依存関係を含む単純な Spring Boot プロジェクトをセットアップしましょう。
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Hoxton.SR10</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
dependencies { implementation 'org.springframework.boot:spring-boot-starter' implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka' testImplementation 'org.springframework.boot:spring-boot-starter-test' } dependencyManagement { imports { mavenBom "org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR10" } }
次に、application.yml ファイルで Kafka バインダーを構成します。
spring: cloud: stream: bindings: input: destination: my-topic group: my-group consumer: interceptor-classes: com.example.MyConsumerInterceptor kafka: binder: brokers: localhost:9092
コンシューマ インターセプタを作成するには、Kafka が提供する ConsumerInterceptor インターフェイスを実装します。このインターフェイスを使用すると、アプリケーションに到達する前にレコードをインターセプトして処理するためのカスタム ロジックを定義できます。
package com.example; import org.apache.kafka.clients.consumer.ConsumerInterceptor; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.Configurable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; public class MyConsumerInterceptor implements ConsumerInterceptor<String, String>, Configurable { private static final Logger logger = LoggerFactory.getLogger(MyConsumerInterceptor.class); @Override public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) { records.forEach(record -> { logger.info("Intercepted record: key = {}, value = {}", record.key(), record.value()); // Add your custom logic here }); return records; } @Override public void onCommit(Map offsets) { // Custom logic on commit } @Override public void close() { // Cleanup resources if necessary } @Override public void configure(Map<String, ?> configs) { // Configuration logic } }
Kafka トピックからのメッセージをリッスンする単純なコンシューマ アプリケーションを作成します。
package com.example; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.Message; @SpringBootApplication @EnableBinding(KafkaProcessor.class) public class KafkaConsumerApplication { public static void main(String[] args) { SpringApplication.run(KafkaConsumerApplication.class, args); } @StreamListener("input") public void handle(Message<String> message) { System.out.println("Received message: " + message.getPayload()); } }
入力チャネルを Kafka トピックにバインドするためのインターフェイスを定義します。
package com.example; import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; public interface KafkaProcessor { String INPUT = "input"; @Input(INPUT) SubscribableChannel input(); }
Kafka トピックに対してメッセージが生成されると、MyConsumerInterceptor がレコードをインターセプトし、インターセプトされたログ メッセージが表示されるはずです。
このブログでは、Spring Cloud Stream Kafka Binder でコンシューマ インターセプタを使用する方法を検討してきました。インターセプターは、アプリケーションによってレコードが消費される前に、レコードを処理、ログ記録、および操作するための強力な方法を提供します。カスタム インターセプターを統合することで、Kafka コンシューマーの機能を強化し、ロギング、メトリクス収集、データ変換などの貴重な機能を追加できます。
このガイドで概説されている手順に従うことで、Spring Cloud Stream アプリケーションにコンシューマ インターセプタをシームレスに実装して構成できるようになります。コーディングを楽しんでください!
以上がSpring Cloud Stream Kafka Binder Consumer Interceptor の探索の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。