Spring Cloud Stream은 Apache Kafka 및 RabbitMQ와 같은 메시지 브로커를 추상화하여 메시지 기반 마이크로서비스 개발을 단순화하는 프레임워크입니다. Spring Cloud Stream의 강력한 기능 중 하나는 Kafka와 원활하게 통합하여 개발자가 강력하고 확장 가능한 이벤트 기반 애플리케이션을 구축할 수 있도록 하는 기능입니다. Spring Cloud Stream의 Kafka 바인더는 Kafka 주제에 쉽게 연결하는 방법을 제공합니다.
이번 블로그에서는 Spring Cloud Stream Kafka Binder와 함께 소비자 인터셉터를 사용하는 방법을 살펴보겠습니다. Kafka의 인터셉터는 레코드가 애플리케이션에서 사용되기 전에 가로채서 변경하는 메커니즘을 제공하여 로깅, 메트릭 수집 및 데이터 조작 기회를 제공합니다.
자세한 내용을 살펴보기 전에 다음 전제 조건을 충족하는지 확인하세요.
먼저 Spring Cloud Stream 및 Kafka에 필요한 종속성을 갖춘 간단한 Spring Boot 프로젝트를 설정해 보겠습니다.
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Hoxton.SR10</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
dependencies { implementation 'org.springframework.boot:spring-boot-starter' implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka' testImplementation 'org.springframework.boot:spring-boot-starter-test' } dependencyManagement { imports { mavenBom "org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR10" } }
다음으로 application.yml 파일에서 Kafka 바인더를 구성합니다.
spring: cloud: stream: bindings: input: destination: my-topic group: my-group consumer: interceptor-classes: com.example.MyConsumerInterceptor kafka: binder: brokers: localhost:9092
소비자 인터셉터를 생성하려면 Kafka에서 제공하는 ConsumerInterceptor 인터페이스를 구현하세요. 이 인터페이스를 사용하면 기록이 애플리케이션에 도달하기 전에 가로채고 처리하기 위한 사용자 정의 논리를 정의할 수 있습니다.
package com.example; import org.apache.kafka.clients.consumer.ConsumerInterceptor; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.Configurable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; public class MyConsumerInterceptor implements ConsumerInterceptor<String, String>, Configurable { private static final Logger logger = LoggerFactory.getLogger(MyConsumerInterceptor.class); @Override public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) { records.forEach(record -> { logger.info("Intercepted record: key = {}, value = {}", record.key(), record.value()); // Add your custom logic here }); return records; } @Override public void onCommit(Map offsets) { // Custom logic on commit } @Override public void close() { // Cleanup resources if necessary } @Override public void configure(Map<String, ?> configs) { // Configuration logic } }
Kafka 주제의 메시지를 수신하는 간단한 소비자 애플리케이션을 만듭니다.
package com.example; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.Message; @SpringBootApplication @EnableBinding(KafkaProcessor.class) public class KafkaConsumerApplication { public static void main(String[] args) { SpringApplication.run(KafkaConsumerApplication.class, args); } @StreamListener("input") public void handle(Message<String> message) { System.out.println("Received message: " + message.getPayload()); } }
입력 채널을 Kafka 주제에 바인딩하기 위한 인터페이스를 정의합니다.
package com.example; import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; public interface KafkaProcessor { String INPUT = "input"; @Input(INPUT) SubscribableChannel input(); }
Kafka 주제에 대한 메시지가 생성되면 MyConsumerInterceptor가 레코드를 가로채고 가로채는 로그 메시지가 표시됩니다.
이 블로그에서는 Spring Cloud Stream Kafka Binder와 함께 소비자 인터셉터를 사용하는 방법을 살펴보았습니다. 인터셉터는 애플리케이션에서 레코드를 사용하기 전에 레코드를 처리, 기록 및 조작하는 강력한 방법을 제공합니다. 사용자 정의 인터셉터를 통합하면 로깅, 메트릭 수집, 데이터 변환과 같은 중요한 기능을 추가하여 Kafka 소비자의 기능을 향상할 수 있습니다.
이 가이드에 설명된 단계를 따르면 Spring Cloud Stream 애플리케이션에서 소비자 인터셉터를 원활하게 구현하고 구성할 수 있습니다. 즐거운 코딩하세요!
위 내용은 Spring Cloud Stream Kafka 바인더 소비자 인터셉터 탐색의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!