Meneroka Aliran Awan Musim Bunga Kafka Binder Pemintas Pengguna

WBOY
Lepaskan: 2024-08-06 19:20:50
asal
1021 orang telah melayarinya

Exploring Spring Cloud Stream Kafka Binder Consumer Interceptor

pengenalan

Spring Cloud Stream ialah rangka kerja yang memudahkan pembangunan perkhidmatan mikro dipacu mesej dengan mengabstraksi broker mesej seperti Apache Kafka dan RabbitMQ. Salah satu ciri hebat Spring Cloud Stream ialah keupayaannya untuk menyepadukan dengan lancar dengan Kafka, membolehkan pembangun membina aplikasi yang didorong peristiwa yang teguh dan berskala. Pengikat Kafka dalam Spring Cloud Stream menyediakan cara untuk menyambung ke topik Kafka dengan mudah.

Dalam blog ini, kami akan menyelidiki cara menggunakan pemintas pengguna dengan Spring Cloud Stream Kafka Binder. Pemintas dalam Kafka menyediakan mekanisme untuk memintas dan mengubah rekod sebelum ia digunakan oleh aplikasi, menawarkan peluang untuk pengelogan, pengumpulan metrik dan manipulasi data.

Prasyarat

Sebelum menyelami butiran, pastikan anda mempunyai prasyarat berikut:

  • Kit Pembangunan Java (JDK) 8 atau lebih baru
  • Apache Kafka
  • But Musim Bunga 2.x atau lebih baru
  • Maven atau Gradle

Menyediakan Aplikasi Spring Boot

Mula-mula, mari kita sediakan projek Spring Boot yang ringkas dengan kebergantungan yang diperlukan untuk Spring Cloud Stream dan Kafka.

Maven pom.xml

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>Hoxton.SR10</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
Salin selepas log masuk

Gradle build.gradle

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
}

dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR10"
    }
}
Salin selepas log masuk

Mengkonfigurasi Kafka Binder

Seterusnya, konfigurasikan pengikat Kafka dalam fail application.yml.

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: my-topic
          group: my-group
          consumer:
            interceptor-classes: com.example.MyConsumerInterceptor
      kafka:
        binder:
          brokers: localhost:9092
Salin selepas log masuk

Mencipta Pemintas Pengguna Kafka

Untuk mencipta pemintas pengguna, laksanakan antara muka ConsumerInterceptor yang disediakan oleh Kafka. Antara muka ini membolehkan anda menentukan logik tersuai untuk memintas dan memproses rekod sebelum mereka sampai ke aplikasi.

package com.example;

import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.Configurable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class MyConsumerInterceptor implements ConsumerInterceptor<String, String>, Configurable {

    private static final Logger logger = LoggerFactory.getLogger(MyConsumerInterceptor.class);

    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        records.forEach(record -> {
            logger.info("Intercepted record: key = {}, value = {}", record.key(), record.value());
            // Add your custom logic here
        });
        return records;
    }

    @Override
    public void onCommit(Map offsets) {
        // Custom logic on commit
    }

    @Override
    public void close() {
        // Cleanup resources if necessary
    }

    @Override
    public void configure(Map<String, ?> configs) {
        // Configuration logic
    }
}
Salin selepas log masuk

Mencipta Aplikasi Pengguna

Buat aplikasi pengguna ringkas yang mendengar mesej daripada topik Kafka.

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;

@SpringBootApplication
@EnableBinding(KafkaProcessor.class)
public class KafkaConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaConsumerApplication.class, args);
    }

    @StreamListener("input")
    public void handle(Message<String> message) {
        System.out.println("Received message: " + message.getPayload());
    }
}
Salin selepas log masuk

Antara muka untuk Mengikat

Tentukan antara muka untuk mengikat saluran input ke topik Kafka.

package com.example;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface KafkaProcessor {
    String INPUT = "input";

    @Input(INPUT)
    SubscribableChannel input();
}
Salin selepas log masuk

Menjalankan Aplikasi

  1. Mulakan broker Kafka dan cipta topik yang diperlukan (topik saya).
  2. Jalankan aplikasi Spring Boot.

Apabila mesej dihasilkan kepada topik Kafka, MyConsumerInterceptor akan memintas rekod dan anda akan melihat mesej log yang dipintas.

Kesimpulan

Dalam blog ini, kami telah meneroka cara menggunakan pemintas pengguna dengan Spring Cloud Stream Kafka Binder. Pemintas menyediakan cara yang berkuasa untuk memproses, log dan memanipulasi rekod sebelum ia digunakan oleh aplikasi. Dengan menyepadukan pemintas tersuai, anda boleh meningkatkan kefungsian pengguna Kafka anda, menambahkan keupayaan berharga seperti pengelogan, pengumpulan metrik dan transformasi data.

Dengan mengikut langkah-langkah yang digariskan dalam panduan ini, anda seharusnya dapat melaksanakan dan mengkonfigurasi pemintas pengguna dalam aplikasi Spring Cloud Stream anda dengan lancar. Selamat mengekod!

Atas ialah kandungan terperinci Meneroka Aliran Awan Musim Bunga Kafka Binder Pemintas Pengguna. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

sumber:dev.to
Kenyataan Laman Web ini
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn
Tutorial Popular
Lagi>
Muat turun terkini
Lagi>
kesan web
Kod sumber laman web
Bahan laman web
Templat hujung hadapan
Tentang kita Penafian Sitemap
Laman web PHP Cina:Latihan PHP dalam talian kebajikan awam,Bantu pelajar PHP berkembang dengan cepat!