Spring Cloud Stream ialah rangka kerja yang memudahkan pembangunan perkhidmatan mikro dipacu mesej dengan mengabstraksi broker mesej seperti Apache Kafka dan RabbitMQ. Salah satu ciri hebat Spring Cloud Stream ialah keupayaannya untuk menyepadukan dengan lancar dengan Kafka, membolehkan pembangun membina aplikasi yang didorong peristiwa yang teguh dan berskala. Pengikat Kafka dalam Spring Cloud Stream menyediakan cara untuk menyambung ke topik Kafka dengan mudah.
Dalam blog ini, kami akan menyelidiki cara menggunakan pemintas pengguna dengan Spring Cloud Stream Kafka Binder. Pemintas dalam Kafka menyediakan mekanisme untuk memintas dan mengubah rekod sebelum ia digunakan oleh aplikasi, menawarkan peluang untuk pengelogan, pengumpulan metrik dan manipulasi data.
Sebelum menyelami butiran, pastikan anda mempunyai prasyarat berikut:
Mula-mula, mari kita sediakan projek Spring Boot yang ringkas dengan kebergantungan yang diperlukan untuk Spring Cloud Stream dan Kafka.
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Hoxton.SR10</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
dependencies { implementation 'org.springframework.boot:spring-boot-starter' implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka' testImplementation 'org.springframework.boot:spring-boot-starter-test' } dependencyManagement { imports { mavenBom "org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR10" } }
Seterusnya, konfigurasikan pengikat Kafka dalam fail application.yml.
spring: cloud: stream: bindings: input: destination: my-topic group: my-group consumer: interceptor-classes: com.example.MyConsumerInterceptor kafka: binder: brokers: localhost:9092
Untuk mencipta pemintas pengguna, laksanakan antara muka ConsumerInterceptor yang disediakan oleh Kafka. Antara muka ini membolehkan anda menentukan logik tersuai untuk memintas dan memproses rekod sebelum mereka sampai ke aplikasi.
package com.example; import org.apache.kafka.clients.consumer.ConsumerInterceptor; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.Configurable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; public class MyConsumerInterceptor implements ConsumerInterceptor<String, String>, Configurable { private static final Logger logger = LoggerFactory.getLogger(MyConsumerInterceptor.class); @Override public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) { records.forEach(record -> { logger.info("Intercepted record: key = {}, value = {}", record.key(), record.value()); // Add your custom logic here }); return records; } @Override public void onCommit(Map offsets) { // Custom logic on commit } @Override public void close() { // Cleanup resources if necessary } @Override public void configure(Map<String, ?> configs) { // Configuration logic } }
Buat aplikasi pengguna ringkas yang mendengar mesej daripada topik Kafka.
package com.example; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.Message; @SpringBootApplication @EnableBinding(KafkaProcessor.class) public class KafkaConsumerApplication { public static void main(String[] args) { SpringApplication.run(KafkaConsumerApplication.class, args); } @StreamListener("input") public void handle(Message<String> message) { System.out.println("Received message: " + message.getPayload()); } }
Tentukan antara muka untuk mengikat saluran input ke topik Kafka.
package com.example; import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; public interface KafkaProcessor { String INPUT = "input"; @Input(INPUT) SubscribableChannel input(); }
Apabila mesej dihasilkan kepada topik Kafka, MyConsumerInterceptor akan memintas rekod dan anda akan melihat mesej log yang dipintas.
Dalam blog ini, kami telah meneroka cara menggunakan pemintas pengguna dengan Spring Cloud Stream Kafka Binder. Pemintas menyediakan cara yang berkuasa untuk memproses, log dan memanipulasi rekod sebelum ia digunakan oleh aplikasi. Dengan menyepadukan pemintas tersuai, anda boleh meningkatkan kefungsian pengguna Kafka anda, menambahkan keupayaan berharga seperti pengelogan, pengumpulan metrik dan transformasi data.
Dengan mengikut langkah-langkah yang digariskan dalam panduan ini, anda seharusnya dapat melaksanakan dan mengkonfigurasi pemintas pengguna dalam aplikasi Spring Cloud Stream anda dengan lancar. Selamat mengekod!
Atas ialah kandungan terperinci Meneroka Aliran Awan Musim Bunga Kafka Binder Pemintas Pengguna. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!