
Introduction
Spring Cloud Stream est un framework qui simplifie le développement de microservices basés sur des messages en faisant abstraction des courtiers de messages tels qu'Apache Kafka et RabbitMQ. L'une des fonctionnalités puissantes de Spring Cloud Stream est sa capacité à s'intégrer de manière transparente à Kafka, permettant aux développeurs de créer des applications événementielles robustes et évolutives. Le classeur Kafka dans Spring Cloud Stream offre un moyen de se connecter facilement aux sujets Kafka.
Dans ce blog, nous verrons comment utiliser un intercepteur consommateur avec Spring Cloud Stream Kafka Binder. Les intercepteurs de Kafka fournissent un mécanisme permettant d'intercepter et de modifier les enregistrements avant qu'ils ne soient consommés par l'application, offrant ainsi des opportunités de journalisation, de collecte de métriques et de manipulation de données.
Conditions préalables
Avant de plonger dans les détails, assurez-vous d'avoir les prérequis suivants :
- Kit de développement Java (JDK) 8 ou version ultérieure
- Apache Kafka
- Spring Boot 2.x ou version ultérieure
- Maven ou Gradle
Configuration de l'application Spring Boot
Tout d'abord, configurons un projet Spring Boot simple avec les dépendances nécessaires pour Spring Cloud Stream et Kafka.
Maven pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | <dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Hoxton.SR10</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
|
Copier après la connexion
Gradle build.gradle
1 2 3 4 5 6 7 8 9 10 11 | dependencies {
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR10"
}
}
|
Copier après la connexion
Configuration de Kafka Binder
Ensuite, configurez le classeur Kafka dans le fichier application.yml.
1 2 3 4 5 6 7 8 9 10 11 12 | spring:
cloud:
stream:
bindings:
input:
destination: my-topic
group: my-group
consumer:
interceptor-classes: com.example.MyConsumerInterceptor
kafka:
binder:
brokers: localhost:9092
|
Copier après la connexion
Création d'un intercepteur de consommateur Kafka
Pour créer un intercepteur consommateur, implémentez l'interface ConsumerInterceptor fournie par Kafka. Cette interface vous permet de définir une logique personnalisée pour intercepter et traiter les enregistrements avant qu'ils n'atteignent l'application.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | package com.example;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.Configurable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class MyConsumerInterceptor implements ConsumerInterceptor<String, String>, Configurable {
private static final Logger logger = LoggerFactory.getLogger(MyConsumerInterceptor. class );
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
records.forEach(record -> {
logger.info( "Intercepted record: key = {}, value = {}" , record.key(), record.value());
});
return records;
}
@Override
public void onCommit(Map offsets) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
|
Copier après la connexion
Création de l'application consommateur
Créez une application grand public simple qui écoute les messages d'un sujet Kafka.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | package com.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
@SpringBootApplication
@EnableBinding(KafkaProcessor. class )
public class KafkaConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaConsumerApplication. class , args);
}
@StreamListener( "input" )
public void handle(Message<String> message) {
System.out.println( "Received message: " + message.getPayload());
}
}
|
Copier après la connexion
Interface pour la liaison
Définissez une interface pour lier le canal d'entrée au sujet Kafka.
1 2 3 4 5 6 7 8 9 10 11 | package com.example;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface KafkaProcessor {
String INPUT = "input" ;
@Input(INPUT)
SubscribableChannel input();
}
|
Copier après la connexion
Exécution de l'application
- Démarrez le courtier Kafka et créez le sujet requis (my-topic).
- Exécutez l'application Spring Boot.
Lorsque des messages sont envoyés au sujet Kafka, MyConsumerInterceptor interceptera les enregistrements et vous devriez voir les messages de journal interceptés.
Conclusion
Dans ce blog, nous avons exploré comment utiliser un intercepteur consommateur avec Spring Cloud Stream Kafka Binder. Les intercepteurs constituent un moyen puissant de traiter, enregistrer et manipuler les enregistrements avant qu'ils ne soient consommés par l'application. En intégrant des intercepteurs personnalisés, vous pouvez améliorer les fonctionnalités de vos consommateurs Kafka, en ajoutant des fonctionnalités précieuses telles que la journalisation, la collecte de métriques et la transformation des données.
En suivant les étapes décrites dans ce guide, vous devriez être en mesure d'implémenter et de configurer des intercepteurs grand public dans vos applications Spring Cloud Stream de manière transparente. Bon codage !
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!