Spring Cloud Stream ist ein Framework, das die Entwicklung nachrichtengesteuerter Mikrodienste durch die Abstraktion von Nachrichtenbrokern wie Apache Kafka und RabbitMQ vereinfacht. Eine der leistungsstarken Funktionen von Spring Cloud Stream ist die Fähigkeit zur nahtlosen Integration mit Kafka, sodass Entwickler robuste und skalierbare ereignisgesteuerte Anwendungen erstellen können. Der Kafka-Ordner in Spring Cloud Stream bietet eine Möglichkeit, einfach eine Verbindung zu Kafka-Themen herzustellen.
In diesem Blog befassen wir uns mit der Verwendung eines Consumer-Interceptors mit Spring Cloud Stream Kafka Binder. Interceptors in Kafka bieten einen Mechanismus zum Abfangen und Ändern von Datensätzen, bevor sie von der Anwendung verwendet werden, und bieten Möglichkeiten zur Protokollierung, Metrikerfassung und Datenmanipulation.
Bevor Sie in die Details eintauchen, stellen Sie sicher, dass Sie die folgenden Voraussetzungen erfüllen:
Zuerst richten wir ein einfaches Spring Boot-Projekt mit den notwendigen Abhängigkeiten für Spring Cloud Stream und Kafka ein.
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Hoxton.SR10</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
dependencies { implementation 'org.springframework.boot:spring-boot-starter' implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka' testImplementation 'org.springframework.boot:spring-boot-starter-test' } dependencyManagement { imports { mavenBom "org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR10" } }
Als nächstes konfigurieren Sie den Kafka-Ordner in der Datei application.yml.
spring: cloud: stream: bindings: input: destination: my-topic group: my-group consumer: interceptor-classes: com.example.MyConsumerInterceptor kafka: binder: brokers: localhost:9092
Um einen Consumer-Interceptor zu erstellen, implementieren Sie die von Kafka bereitgestellte ConsumerInterceptor-Schnittstelle. Mit dieser Schnittstelle können Sie eine benutzerdefinierte Logik zum Abfangen und Verarbeiten von Datensätzen definieren, bevor sie die Anwendung erreichen.
package com.example; import org.apache.kafka.clients.consumer.ConsumerInterceptor; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.Configurable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; public class MyConsumerInterceptor implements ConsumerInterceptor<String, String>, Configurable { private static final Logger logger = LoggerFactory.getLogger(MyConsumerInterceptor.class); @Override public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) { records.forEach(record -> { logger.info("Intercepted record: key = {}, value = {}", record.key(), record.value()); // Add your custom logic here }); return records; } @Override public void onCommit(Map offsets) { // Custom logic on commit } @Override public void close() { // Cleanup resources if necessary } @Override public void configure(Map<String, ?> configs) { // Configuration logic } }
Erstellen Sie eine einfache Verbraucheranwendung, die Nachrichten aus einem Kafka-Thema abhört.
package com.example; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.Message; @SpringBootApplication @EnableBinding(KafkaProcessor.class) public class KafkaConsumerApplication { public static void main(String[] args) { SpringApplication.run(KafkaConsumerApplication.class, args); } @StreamListener("input") public void handle(Message<String> message) { System.out.println("Received message: " + message.getPayload()); } }
Definieren Sie eine Schnittstelle zum Binden des Eingabekanals an das Kafka-Thema.
package com.example; import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; public interface KafkaProcessor { String INPUT = "input"; @Input(INPUT) SubscribableChannel input(); }
Wenn Nachrichten zum Kafka-Thema erstellt werden, fängt der MyConsumerInterceptor die Datensätze ab und Sie sollten die abgefangenen Protokollnachrichten sehen.
In diesem Blog haben wir untersucht, wie man einen Consumer-Interceptor mit Spring Cloud Stream Kafka Binder verwendet. Interceptors bieten eine leistungsstarke Möglichkeit, Datensätze zu verarbeiten, zu protokollieren und zu manipulieren, bevor sie von der Anwendung verwendet werden. Durch die Integration benutzerdefinierter Interceptoren können Sie die Funktionalität Ihrer Kafka-Konsumenten verbessern und wertvolle Funktionen wie Protokollierung, Metrikerfassung und Datentransformation hinzufügen.
Wenn Sie die in diesem Leitfaden beschriebenen Schritte befolgen, sollten Sie in der Lage sein, Consumer-Interceptors nahtlos in Ihren Spring Cloud Stream-Anwendungen zu implementieren und zu konfigurieren. Viel Spaß beim Codieren!
Das obige ist der detaillierte Inhalt vonErkundung des Spring Cloud Stream Kafka Binder Consumer Interceptor. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!