Jadual Kandungan
Apakah itu kafka?
Senario aplikasi
Rumah Java javaTutorial Bagaimana SpringBoot menyepadukan kelas alat Kafka

Bagaimana SpringBoot menyepadukan kelas alat Kafka

May 13, 2023 pm 06:52 PM
springboot kafka

Apakah itu kafka?

Kafka ialah platform pemprosesan aliran sumber terbuka yang dibangunkan oleh Yayasan Perisian Apache dan ditulis dalam Scala dan Java. Kafka ialah sistem pemesejan terbitan-langganan teragih berdaya tinggi yang boleh memproses semua data penstriman tindakan pengguna dalam tapak web. Tindakan sedemikian (semakan imbas web, carian dan tindakan pengguna lain) merupakan faktor utama dalam banyak fungsi sosial di web moden. Data ini biasanya ditangani dengan memproses log dan pengagregatan log disebabkan keperluan pemprosesan. Ini adalah penyelesaian yang berdaya maju untuk data log dan sistem analisis luar talian seperti Hadoop, tetapi dengan kekangan pemprosesan masa nyata. Tujuan Kafka adalah untuk menyatukan pemprosesan mesej dalam talian dan luar talian melalui mekanisme pemuatan selari Hadoop, dan untuk menyediakan mesej masa nyata melalui kluster.

Senario aplikasi

  • Sistem mesej: Kafka dan sistem pemesejan tradisional (juga dipanggil perisian tengah mesej) kedua-duanya mempunyai penyahgandingan sistem, storan berlebihan dan pengurangan puncak trafik , penimbalan, tak segerak komunikasi, kebolehskalaan, kebolehpulihan dan fungsi lain. Pada masa yang sama, Kafka juga menyediakan jaminan urutan mesej dan fungsi penggunaan retroaktif yang sukar dicapai dalam kebanyakan sistem pemesejan.

  • Sistem storan: Kafka meneruskan mesej ke cakera, dengan berkesan mengurangkan risiko kehilangan data berbanding sistem berasaskan storan memori yang lain. Terima kasih kepada fungsi kegigihan mesej Kafka dan mekanisme berbilang salinan yang kami boleh menggunakan Kafka sebagai sistem penyimpanan data jangka panjang Kami hanya perlu menetapkan dasar pengekalan data yang sepadan kepada "kekal" atau mendayakan fungsi pemampatan log topik. Itu sahaja.

  • Platform pemprosesan penstriman: Kafka bukan sahaja menyediakan sumber data yang boleh dipercayai untuk setiap rangka kerja penstriman popular, tetapi juga menyediakan perpustakaan kelas penstriman yang lengkap, seperti tingkap, Pelbagai operasi seperti gabungan, transformasi dan agregasi.

Mari kita lihat kod terperinci kelas alat Kafka yang menyepadukan SpringBoot.

pom.xml

 <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.6.3</version>
        </dependency>
        <dependency>
            <groupId>fastjson</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>
Salin selepas log masuk

Kelas alat

package com.bbl.demo.utils;

import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import com.alibaba.fastjson.JSONObject;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;


public class KafkaUtils {
    private static AdminClient admin;
    /**
     * 私有静态方法,创建Kafka生产者
     * @author o
     * @return KafkaProducer
     */
    private static KafkaProducer<String, String> createProducer() {
        Properties props = new Properties();
        //声明kafka的地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node01:9092,node02:9092,node03:9092");
        //0、1 和 all:0表示只要把消息发送出去就返回成功;1表示只要Leader收到消息就返回成功;all表示所有副本都写入数据成功才算成功
        props.put("acks", "all");
        //重试次数
        props.put("retries", Integer.MAX_VALUE);
        //批处理的字节数
        props.put("batch.size", 16384);
        //批处理的延迟时间,当批次数据未满之时等待的时间
        props.put("linger.ms", 1);
        //用来约束KafkaProducer能够使用的内存缓冲的大小的,默认值32MB
        props.put("buffer.memory", 33554432);
        // properties.put("value.serializer",
        // "org.apache.kafka.common.serialization.ByteArraySerializer");
        // properties.put("key.serializer",
        // "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return new KafkaProducer<String, String>(props);
    }

    /**
     * 私有静态方法,创建Kafka消费者
     * @author o
     * @return KafkaConsumer
     */
    private static KafkaConsumer<String, String> createConsumer() {
        Properties props = new Properties();
        //声明kafka的地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node01:9092,node02:9092,node03:9092");
        //每个消费者分配独立的消费者组编号
        props.put("group.id", "111");
        //如果value合法,则自动提交偏移量
        props.put("enable.auto.commit", "true");
        //设置多久一次更新被消费消息的偏移量
        props.put("auto.commit.interval.ms", "1000");
        //设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息
        props.put("session.timeout.ms", "30000");
        //自动重置offset
        props.put("auto.offset.reset","earliest");
        // properties.put("value.serializer",
        // "org.apache.kafka.common.serialization.ByteArraySerializer");
        // properties.put("key.serializer",
        // "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return new KafkaConsumer<String, String>(props);
    }
    /**
     * 私有静态方法,创建Kafka集群管理员对象
     * @author o
     */
    public static void createAdmin(String servers){
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,servers);
        admin = AdminClient.create(props);
    }

    /**
     * 私有静态方法,创建Kafka集群管理员对象
     * @author o
     * @return AdminClient
     */
    private static void createAdmin(){
        createAdmin("node01:9092,node02:9092,node03:9092");
    }

    /**
     * 传入kafka约定的topic,json格式字符串,发送给kafka集群
     * @author o
     * @param topic
     * @param jsonMessage
     */
    public static void sendMessage(String topic, String jsonMessage) {
        KafkaProducer<String, String> producer = createProducer();
        producer.send(new ProducerRecord<String, String>(topic, jsonMessage));
        producer.close();
    }

    /**
     * 传入kafka约定的topic消费数据,用于测试,数据最终会输出到控制台上
     * @author o
     * @param topic
     */
    public static void consume(String topic) {
        KafkaConsumer<String, String> consumer = createConsumer();
        consumer.subscribe(Arrays.asList(topic));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100));
            for (ConsumerRecord<String, String> record : records){
                System.out.printf("offset = %d, key = %s, value = %s",record.offset(), record.key(), record.value());
                System.out.println();
            }
        }
    }
    /**
     * 传入kafka约定的topic数组,消费数据
     * @author o
     * @param topics
     */
    public static void consume(String ... topics) {
        KafkaConsumer<String, String> consumer = createConsumer();
        consumer.subscribe(Arrays.asList(topics));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100));
            for (ConsumerRecord<String, String> record : records){
                System.out.printf("offset = %d, key = %s, value = %s",record.offset(), record.key(), record.value());
                System.out.println();
            }
        }
    }
    /**
     * 传入kafka约定的topic,json格式字符串数组,发送给kafka集群
     * 用于批量发送消息,性能较高。
     * @author o
     * @param topic
     * @param jsonMessages
     * @throws InterruptedException
     */
    public static void sendMessage(String topic, String... jsonMessages) throws InterruptedException {
        KafkaProducer<String, String> producer = createProducer();
        for (String jsonMessage : jsonMessages) {
            producer.send(new ProducerRecord<String, String>(topic, jsonMessage));
        }
        producer.close();
    }

    /**
     * 传入kafka约定的topic,Map集合,内部转为json发送给kafka集群 <br>
     * 用于批量发送消息,性能较高。
     * @author o
     * @param topic
     * @param mapMessageToJSONForArray
     */
    public static void sendMessage(String topic, List<Map<Object, Object>> mapMessageToJSONForArray) {
        KafkaProducer<String, String> producer = createProducer();
        for (Map<Object, Object> mapMessageToJSON : mapMessageToJSONForArray) {
            String array = JSONObject.toJSON(mapMessageToJSON).toString();
            producer.send(new ProducerRecord<String, String>(topic, array));
        }
        producer.close();
    }

    /**
     * 传入kafka约定的topic,Map,内部转为json发送给kafka集群
     * @author o
     * @param topic
     * @param mapMessageToJSON
     */
    public static void sendMessage(String topic, Map<Object, Object> mapMessageToJSON) {
        KafkaProducer<String, String> producer = createProducer();
        String array = JSONObject.toJSON(mapMessageToJSON).toString();
        producer.send(new ProducerRecord<String, String>(topic, array));
        producer.close();
    }

    /**
     * 创建主题
     * @author o
     * @param name 主题的名称
     * @param numPartitions 主题的分区数
     * @param replicationFactor 主题的每个分区的副本因子
     */
    public static void createTopic(String name,int numPartitions,int replicationFactor){
        if(admin == null) {
            createAdmin();
        }
        Map<String, String> configs = new HashMap<>();
        CreateTopicsResult result = admin.createTopics(Arrays.asList(new NewTopic(name, numPartitions, (short) replicationFactor).configs(configs)));
        //以下内容用于判断创建主题的结果
        for (Map.Entry<String, KafkaFuture<Void>> entry : result.values().entrySet()) {
            try {
                entry.getValue().get();
                System.out.println("topic "+entry.getKey()+" created");
            } catch (InterruptedException | ExecutionException e) {
                if (ExceptionUtils.getRootCause(e) instanceof TopicExistsException) {
                    System.out.println("topic "+entry.getKey()+" existed");
                }
            }
        }
    }

    /**
     * 删除主题
     * @author o
     * @param names 主题的名称
     */
    public static void deleteTopic(String name,String ... names){
        if(admin == null) {
            createAdmin();
        }
        Map<String, String> configs = new HashMap<>();
        Collection<String> topics = Arrays.asList(names);
        topics.add(name);
        DeleteTopicsResult result = admin.deleteTopics(topics);
        //以下内容用于判断删除主题的结果
        for (Map.Entry<String, KafkaFuture<Void>> entry : result.values().entrySet()) {
            try {
                entry.getValue().get();
                System.out.println("topic "+entry.getKey()+" deleted");
            } catch (InterruptedException | ExecutionException e) {
                if (ExceptionUtils.getRootCause(e) instanceof UnknownTopicOrPartitionException) {
                    System.out.println("topic "+entry.getKey()+" not exist");
                }
            }
        }
    }
    /**
     * 查看主题详情
     * @author o
     * @param names 主题的名称
     */
    public static void describeTopic(String name,String ... names){
        if(admin == null) {
            createAdmin();
        }
        Map<String, String> configs = new HashMap<>();
        Collection<String> topics = Arrays.asList(names);
        topics.add(name);
        DescribeTopicsResult result = admin.describeTopics(topics);
        //以下内容用于显示主题详情的结果
        for (Map.Entry<String, KafkaFuture<TopicDescription>> entry : result.values().entrySet()) {
            try {
                entry.getValue().get();
                System.out.println("topic "+entry.getKey()+" describe");
                System.out.println("\t name: "+entry.getValue().get().name());
                System.out.println("\t partitions: ");
                entry.getValue().get().partitions().stream().forEach(p-> {
                    System.out.println("\t\t index: "+p.partition());
                    System.out.println("\t\t\t leader: "+p.leader());
                    System.out.println("\t\t\t replicas: "+p.replicas());
                    System.out.println("\t\t\t isr: "+p.isr());
                });
                System.out.println("\t internal: "+entry.getValue().get().isInternal());
            } catch (InterruptedException | ExecutionException e) {
                if (ExceptionUtils.getRootCause(e) instanceof UnknownTopicOrPartitionException) {
                    System.out.println("topic "+entry.getKey()+" not exist");
                }
            }
        }
    }

    /**
     * 查看主题列表
     * @author o
     * @return Set<String> TopicList
     */
    public static Set<String> listTopic(){
        if(admin == null) {
            createAdmin();
        }
        ListTopicsResult result = admin.listTopics();
        try {
            result.names().get().stream().map(x->x+"\t").forEach(System.out::print);
            return result.names().get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
            return null;
        }
    }

    public static void main(String[] args) {
        System.out.println(listTopic());
    }
}
Salin selepas log masuk

Atas ialah kandungan terperinci Bagaimana SpringBoot menyepadukan kelas alat Kafka. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan Laman Web ini
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn

Alat AI Hot

Undresser.AI Undress

Undresser.AI Undress

Apl berkuasa AI untuk mencipta foto bogel yang realistik

AI Clothes Remover

AI Clothes Remover

Alat AI dalam talian untuk mengeluarkan pakaian daripada foto.

Undress AI Tool

Undress AI Tool

Gambar buka pakaian secara percuma

Clothoff.io

Clothoff.io

Penyingkiran pakaian AI

Video Face Swap

Video Face Swap

Tukar muka dalam mana-mana video dengan mudah menggunakan alat tukar muka AI percuma kami!

Alat panas

Notepad++7.3.1

Notepad++7.3.1

Editor kod yang mudah digunakan dan percuma

SublimeText3 versi Cina

SublimeText3 versi Cina

Versi Cina, sangat mudah digunakan

Hantar Studio 13.0.1

Hantar Studio 13.0.1

Persekitaran pembangunan bersepadu PHP yang berkuasa

Dreamweaver CS6

Dreamweaver CS6

Alat pembangunan web visual

SublimeText3 versi Mac

SublimeText3 versi Mac

Perisian penyuntingan kod peringkat Tuhan (SublimeText3)

Bagaimana untuk melaksanakan analisis saham masa nyata menggunakan PHP dan Kafka Bagaimana untuk melaksanakan analisis saham masa nyata menggunakan PHP dan Kafka Jun 28, 2023 am 10:04 AM

Dengan perkembangan Internet dan teknologi, pelaburan digital telah menjadi topik yang semakin membimbangkan. Ramai pelabur terus meneroka dan mengkaji strategi pelaburan, dengan harapan memperoleh pulangan pelaburan yang lebih tinggi. Dalam perdagangan saham, analisis saham masa nyata adalah sangat penting untuk membuat keputusan, dan penggunaan baris gilir mesej masa nyata Kafka dan teknologi PHP adalah cara yang cekap dan praktikal. 1. Pengenalan kepada Kafka Kafka ialah sistem pemesejan terbitan dan langgan yang diedarkan tinggi yang dibangunkan oleh LinkedIn. Ciri-ciri utama Kafka ialah

Analisis perbandingan dan perbezaan antara SpringBoot dan SpringMVC Analisis perbandingan dan perbezaan antara SpringBoot dan SpringMVC Dec 29, 2023 am 11:02 AM

SpringBoot dan SpringMVC adalah kedua-dua rangka kerja yang biasa digunakan dalam pembangunan Java, tetapi terdapat beberapa perbezaan yang jelas antara mereka. Artikel ini akan meneroka ciri dan penggunaan kedua-dua rangka kerja ini dan membandingkan perbezaannya. Mula-mula, mari belajar tentang SpringBoot. SpringBoot telah dibangunkan oleh pasukan Pivotal untuk memudahkan penciptaan dan penggunaan aplikasi berdasarkan rangka kerja Spring. Ia menyediakan cara yang pantas dan ringan untuk membina bersendirian, boleh dilaksanakan

Cara membina aplikasi pemprosesan data masa nyata menggunakan React dan Apache Kafka Cara membina aplikasi pemprosesan data masa nyata menggunakan React dan Apache Kafka Sep 27, 2023 pm 02:25 PM

Cara menggunakan React dan Apache Kafka untuk membina aplikasi pemprosesan data masa nyata Pengenalan: Dengan peningkatan data besar dan pemprosesan data masa nyata, membina aplikasi pemprosesan data masa nyata telah menjadi usaha ramai pembangun. Gabungan React, rangka kerja bahagian hadapan yang popular dan Apache Kafka, sistem pemesejan teragih berprestasi tinggi, boleh membantu kami membina aplikasi pemprosesan data masa nyata. Artikel ini akan memperkenalkan cara menggunakan React dan Apache Kafka untuk membina aplikasi pemprosesan data masa nyata, dan

Tutorial praktikal pembangunan SpringBoot+Dubbo+Nacos Tutorial praktikal pembangunan SpringBoot+Dubbo+Nacos Aug 15, 2023 pm 04:49 PM

Artikel ini akan menulis contoh terperinci untuk bercakap tentang perkembangan sebenar dubbo+nacos+Spring Boot. Artikel ini tidak akan merangkumi terlalu banyak pengetahuan teori, tetapi akan menulis contoh paling mudah untuk menggambarkan bagaimana dubbo boleh disepadukan dengan nacos untuk membina persekitaran pembangunan dengan cepat.

Lima pilihan alat visualisasi untuk meneroka Kafka Lima pilihan alat visualisasi untuk meneroka Kafka Feb 01, 2024 am 08:03 AM

Lima pilihan untuk alat visualisasi Kafka ApacheKafka ialah platform pemprosesan strim teragih yang mampu memproses sejumlah besar data masa nyata. Ia digunakan secara meluas untuk membina saluran paip data masa nyata, baris gilir mesej dan aplikasi dipacu peristiwa. Alat visualisasi Kafka boleh membantu pengguna memantau dan mengurus kelompok Kafka serta lebih memahami aliran data Kafka. Berikut ialah pengenalan kepada lima alat visualisasi Kafka yang popular: ConfluentControlCenterConfluent

Analisis perbandingan alat visualisasi kafka: Bagaimana untuk memilih alat yang paling sesuai? Analisis perbandingan alat visualisasi kafka: Bagaimana untuk memilih alat yang paling sesuai? Jan 05, 2024 pm 12:15 PM

Bagaimana untuk memilih alat visualisasi Kafka yang betul? Analisis perbandingan lima alat Pengenalan: Kafka ialah sistem baris gilir mesej teragih berprestasi tinggi dan tinggi yang digunakan secara meluas dalam bidang data besar. Dengan populariti Kafka, semakin banyak perusahaan dan pembangun memerlukan alat visual untuk memantau dan mengurus kelompok Kafka dengan mudah. Artikel ini akan memperkenalkan lima alat visualisasi Kafka yang biasa digunakan dan membandingkan ciri serta fungsinya untuk membantu pembaca memilih alat yang sesuai dengan keperluan mereka. 1. KafkaManager

Amalan go-zero dan Kafka+Avro: membina sistem pemprosesan data interaktif berprestasi tinggi Amalan go-zero dan Kafka+Avro: membina sistem pemprosesan data interaktif berprestasi tinggi Jun 23, 2023 am 09:04 AM

Dalam tahun-tahun kebelakangan ini, dengan peningkatan data besar dan komuniti sumber terbuka yang aktif, semakin banyak perusahaan telah mula mencari sistem pemprosesan data interaktif berprestasi tinggi untuk memenuhi keperluan data yang semakin meningkat. Dalam gelombang peningkatan teknologi ini, go-zero dan Kafka+Avro sedang diberi perhatian dan diterima pakai oleh semakin banyak perusahaan. go-zero ialah rangka kerja mikroperkhidmatan yang dibangunkan berdasarkan bahasa Golang Ia mempunyai ciri-ciri prestasi tinggi, kemudahan penggunaan, pengembangan mudah dan penyelenggaraan yang mudah. ​​Ia direka untuk membantu perusahaan membina sistem aplikasi perkhidmatan mikro yang cekap. pertumbuhannya yang pesat

Bagaimana untuk memasang Apache Kafka pada Rocky Linux? Bagaimana untuk memasang Apache Kafka pada Rocky Linux? Mar 01, 2024 pm 10:37 PM

Untuk memasang ApacheKafka pada RockyLinux, anda boleh mengikuti langkah di bawah: Kemas kini sistem: Pertama, pastikan sistem RockyLinux anda dikemas kini, laksanakan arahan berikut untuk mengemas kini pakej sistem: sudoyumupdate Pasang Java: ApacheKafka bergantung pada Java, jadi anda perlu memasang Java Development Kit (JDK) terlebih dahulu ). OpenJDK boleh dipasang melalui arahan berikut: sudoyuminstalljava-1.8.0-openjdk-devel Muat turun dan nyahmampat: Lawati laman web rasmi ApacheKafka () untuk memuat turun pakej binari terkini. Pilih versi yang stabil

See all articles