目錄
kafka是什麼?
應用場景
首頁 Java java教程 SpringBoot怎麼整合Kafka工具類

SpringBoot怎麼整合Kafka工具類

May 13, 2023 pm 06:52 PM
springboot kafka

kafka是什麼?

Kafka是由Apache軟體基金會開發的開源串流處理平台,由Scala和Java編寫。 Kafka是一種高吞吐量的分散式發布訂閱訊息系統,它可以處理消費者在網站中的所有動作流資料。這種動作(網頁瀏覽,搜尋和其他使用者的行動)是在現代網路上的許多社會功能的關鍵因素。這些資料通常是由於吞吐量的要求而透過處理日誌和日誌聚合來解決。對於像Hadoop一樣的日誌資料和離線分析系統,但又要求即時處理的限制,這是一個可行的解決方案。 Kafka的目的是透過Hadoop的平行載入機制來統一線上和離線的訊息處理,也是為了透過叢集來提供即時的訊息。

應用場景

  • 訊息系統: Kafka 和傳統的訊息系統(也稱為訊息中間件)都具備系統解耦、冗餘儲存、流量削峰、緩衝、非同步通訊、擴充性、可恢復性等功能。同時,Kafka 也提供了大多數訊息系統難以實現的訊息順序性保障及回溯消費的功能。

  • 儲存系統: Kafka 把訊息持久化到磁碟,相較於其他基於記憶體儲存的系統而言,有效地降低了資料遺失的風險。也正是得益於Kafka 的訊息持久化功能和多副本機制,我們可以把Kafka 作為長期的資料儲存系統來使用,只需要把對應的資料保留策略設定為「永久」或啟用主題的日誌壓縮功能即可。

  • 流式處理平台: Kafka 不僅為每個流行的串流處理框架提供了可靠的資料來源,還提供了一個完整的串流處理類別庫,例如視窗、連接、變換和聚合等各類操作。

下面看下SpringBoot整合Kafka工具類別的詳細程式碼。

pom.xml

 <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.6.3</version>
        </dependency>
        <dependency>
            <groupId>fastjson</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>
登入後複製

工具類別

package com.bbl.demo.utils;

import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import com.alibaba.fastjson.JSONObject;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;


public class KafkaUtils {
    private static AdminClient admin;
    /**
     * 私有静态方法,创建Kafka生产者
     * @author o
     * @return KafkaProducer
     */
    private static KafkaProducer<String, String> createProducer() {
        Properties props = new Properties();
        //声明kafka的地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node01:9092,node02:9092,node03:9092");
        //0、1 和 all:0表示只要把消息发送出去就返回成功;1表示只要Leader收到消息就返回成功;all表示所有副本都写入数据成功才算成功
        props.put("acks", "all");
        //重试次数
        props.put("retries", Integer.MAX_VALUE);
        //批处理的字节数
        props.put("batch.size", 16384);
        //批处理的延迟时间,当批次数据未满之时等待的时间
        props.put("linger.ms", 1);
        //用来约束KafkaProducer能够使用的内存缓冲的大小的,默认值32MB
        props.put("buffer.memory", 33554432);
        // properties.put("value.serializer",
        // "org.apache.kafka.common.serialization.ByteArraySerializer");
        // properties.put("key.serializer",
        // "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return new KafkaProducer<String, String>(props);
    }

    /**
     * 私有静态方法,创建Kafka消费者
     * @author o
     * @return KafkaConsumer
     */
    private static KafkaConsumer<String, String> createConsumer() {
        Properties props = new Properties();
        //声明kafka的地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node01:9092,node02:9092,node03:9092");
        //每个消费者分配独立的消费者组编号
        props.put("group.id", "111");
        //如果value合法,则自动提交偏移量
        props.put("enable.auto.commit", "true");
        //设置多久一次更新被消费消息的偏移量
        props.put("auto.commit.interval.ms", "1000");
        //设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息
        props.put("session.timeout.ms", "30000");
        //自动重置offset
        props.put("auto.offset.reset","earliest");
        // properties.put("value.serializer",
        // "org.apache.kafka.common.serialization.ByteArraySerializer");
        // properties.put("key.serializer",
        // "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return new KafkaConsumer<String, String>(props);
    }
    /**
     * 私有静态方法,创建Kafka集群管理员对象
     * @author o
     */
    public static void createAdmin(String servers){
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,servers);
        admin = AdminClient.create(props);
    }

    /**
     * 私有静态方法,创建Kafka集群管理员对象
     * @author o
     * @return AdminClient
     */
    private static void createAdmin(){
        createAdmin("node01:9092,node02:9092,node03:9092");
    }

    /**
     * 传入kafka约定的topic,json格式字符串,发送给kafka集群
     * @author o
     * @param topic
     * @param jsonMessage
     */
    public static void sendMessage(String topic, String jsonMessage) {
        KafkaProducer<String, String> producer = createProducer();
        producer.send(new ProducerRecord<String, String>(topic, jsonMessage));
        producer.close();
    }

    /**
     * 传入kafka约定的topic消费数据,用于测试,数据最终会输出到控制台上
     * @author o
     * @param topic
     */
    public static void consume(String topic) {
        KafkaConsumer<String, String> consumer = createConsumer();
        consumer.subscribe(Arrays.asList(topic));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100));
            for (ConsumerRecord<String, String> record : records){
                System.out.printf("offset = %d, key = %s, value = %s",record.offset(), record.key(), record.value());
                System.out.println();
            }
        }
    }
    /**
     * 传入kafka约定的topic数组,消费数据
     * @author o
     * @param topics
     */
    public static void consume(String ... topics) {
        KafkaConsumer<String, String> consumer = createConsumer();
        consumer.subscribe(Arrays.asList(topics));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100));
            for (ConsumerRecord<String, String> record : records){
                System.out.printf("offset = %d, key = %s, value = %s",record.offset(), record.key(), record.value());
                System.out.println();
            }
        }
    }
    /**
     * 传入kafka约定的topic,json格式字符串数组,发送给kafka集群
     * 用于批量发送消息,性能较高。
     * @author o
     * @param topic
     * @param jsonMessages
     * @throws InterruptedException
     */
    public static void sendMessage(String topic, String... jsonMessages) throws InterruptedException {
        KafkaProducer<String, String> producer = createProducer();
        for (String jsonMessage : jsonMessages) {
            producer.send(new ProducerRecord<String, String>(topic, jsonMessage));
        }
        producer.close();
    }

    /**
     * 传入kafka约定的topic,Map集合,内部转为json发送给kafka集群 <br>
     * 用于批量发送消息,性能较高。
     * @author o
     * @param topic
     * @param mapMessageToJSONForArray
     */
    public static void sendMessage(String topic, List<Map<Object, Object>> mapMessageToJSONForArray) {
        KafkaProducer<String, String> producer = createProducer();
        for (Map<Object, Object> mapMessageToJSON : mapMessageToJSONForArray) {
            String array = JSONObject.toJSON(mapMessageToJSON).toString();
            producer.send(new ProducerRecord<String, String>(topic, array));
        }
        producer.close();
    }

    /**
     * 传入kafka约定的topic,Map,内部转为json发送给kafka集群
     * @author o
     * @param topic
     * @param mapMessageToJSON
     */
    public static void sendMessage(String topic, Map<Object, Object> mapMessageToJSON) {
        KafkaProducer<String, String> producer = createProducer();
        String array = JSONObject.toJSON(mapMessageToJSON).toString();
        producer.send(new ProducerRecord<String, String>(topic, array));
        producer.close();
    }

    /**
     * 创建主题
     * @author o
     * @param name 主题的名称
     * @param numPartitions 主题的分区数
     * @param replicationFactor 主题的每个分区的副本因子
     */
    public static void createTopic(String name,int numPartitions,int replicationFactor){
        if(admin == null) {
            createAdmin();
        }
        Map<String, String> configs = new HashMap<>();
        CreateTopicsResult result = admin.createTopics(Arrays.asList(new NewTopic(name, numPartitions, (short) replicationFactor).configs(configs)));
        //以下内容用于判断创建主题的结果
        for (Map.Entry<String, KafkaFuture<Void>> entry : result.values().entrySet()) {
            try {
                entry.getValue().get();
                System.out.println("topic "+entry.getKey()+" created");
            } catch (InterruptedException | ExecutionException e) {
                if (ExceptionUtils.getRootCause(e) instanceof TopicExistsException) {
                    System.out.println("topic "+entry.getKey()+" existed");
                }
            }
        }
    }

    /**
     * 删除主题
     * @author o
     * @param names 主题的名称
     */
    public static void deleteTopic(String name,String ... names){
        if(admin == null) {
            createAdmin();
        }
        Map<String, String> configs = new HashMap<>();
        Collection<String> topics = Arrays.asList(names);
        topics.add(name);
        DeleteTopicsResult result = admin.deleteTopics(topics);
        //以下内容用于判断删除主题的结果
        for (Map.Entry<String, KafkaFuture<Void>> entry : result.values().entrySet()) {
            try {
                entry.getValue().get();
                System.out.println("topic "+entry.getKey()+" deleted");
            } catch (InterruptedException | ExecutionException e) {
                if (ExceptionUtils.getRootCause(e) instanceof UnknownTopicOrPartitionException) {
                    System.out.println("topic "+entry.getKey()+" not exist");
                }
            }
        }
    }
    /**
     * 查看主题详情
     * @author o
     * @param names 主题的名称
     */
    public static void describeTopic(String name,String ... names){
        if(admin == null) {
            createAdmin();
        }
        Map<String, String> configs = new HashMap<>();
        Collection<String> topics = Arrays.asList(names);
        topics.add(name);
        DescribeTopicsResult result = admin.describeTopics(topics);
        //以下内容用于显示主题详情的结果
        for (Map.Entry<String, KafkaFuture<TopicDescription>> entry : result.values().entrySet()) {
            try {
                entry.getValue().get();
                System.out.println("topic "+entry.getKey()+" describe");
                System.out.println("\t name: "+entry.getValue().get().name());
                System.out.println("\t partitions: ");
                entry.getValue().get().partitions().stream().forEach(p-> {
                    System.out.println("\t\t index: "+p.partition());
                    System.out.println("\t\t\t leader: "+p.leader());
                    System.out.println("\t\t\t replicas: "+p.replicas());
                    System.out.println("\t\t\t isr: "+p.isr());
                });
                System.out.println("\t internal: "+entry.getValue().get().isInternal());
            } catch (InterruptedException | ExecutionException e) {
                if (ExceptionUtils.getRootCause(e) instanceof UnknownTopicOrPartitionException) {
                    System.out.println("topic "+entry.getKey()+" not exist");
                }
            }
        }
    }

    /**
     * 查看主题列表
     * @author o
     * @return Set<String> TopicList
     */
    public static Set<String> listTopic(){
        if(admin == null) {
            createAdmin();
        }
        ListTopicsResult result = admin.listTopics();
        try {
            result.names().get().stream().map(x->x+"\t").forEach(System.out::print);
            return result.names().get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
            return null;
        }
    }

    public static void main(String[] args) {
        System.out.println(listTopic());
    }
}
登入後複製

以上是SpringBoot怎麼整合Kafka工具類的詳細內容。更多資訊請關注PHP中文網其他相關文章!

本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

AI Hentai Generator

AI Hentai Generator

免費產生 AI 無盡。

熱門文章

R.E.P.O.能量晶體解釋及其做什麼(黃色晶體)
1 個月前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.最佳圖形設置
1 個月前 By 尊渡假赌尊渡假赌尊渡假赌
威爾R.E.P.O.有交叉遊戲嗎?
1 個月前 By 尊渡假赌尊渡假赌尊渡假赌

熱工具

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

強大的PHP整合開發環境

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

如何使用PHP和Kafka實現即時股票分析 如何使用PHP和Kafka實現即時股票分析 Jun 28, 2023 am 10:04 AM

隨著網路和科技的發展,數位化投資已成為人們越來越關注的話題。許多投資人不斷探索研究投資策略,希望能獲得更高的投資報酬率。在股票交易中,即時的股票分析對決策非常重要,其中使用Kafka即時訊息隊列和PHP技術實現更是一種高效且實用的手段。一、Kafka介紹Kafka是由LinkedIn公司開發的一個高吞吐量的分散式發布、訂閱訊息系統。 Kafka的主要特點是

SpringBoot與SpringMVC的比較及差別分析 SpringBoot與SpringMVC的比較及差別分析 Dec 29, 2023 am 11:02 AM

SpringBoot和SpringMVC都是Java開發中常用的框架,但它們之間有一些明顯的差異。本文將探究這兩個框架的特點和用途,並對它們的差異進行比較。首先,我們來了解一下SpringBoot。 SpringBoot是由Pivotal團隊開發的,它旨在簡化基於Spring框架的應用程式的建立和部署。它提供了一種快速、輕量級的方式來建立獨立的、可執行

如何利用React與Apache Kafka建構即時資料處理應用 如何利用React與Apache Kafka建構即時資料處理應用 Sep 27, 2023 pm 02:25 PM

如何利用React和ApacheKafka來建立即時資料處理應用介紹:隨著大數據與即時資料處理的興起,建構即時資料處理應用成為了許多開發者的追求。 React作為一個流行的前端框架,與ApacheKafka作為一個高效能的分散式訊息系統的結合,可以幫助我們建立即時資料處理應用。本文將介紹如何利用React和ApacheKafka建構即時資料處理應用,並

SpringBoot+Dubbo+Nacos 開發實戰教程 SpringBoot+Dubbo+Nacos 開發實戰教程 Aug 15, 2023 pm 04:49 PM

本文來寫個詳細的例子來說下dubbo+nacos+Spring Boot開發實戰。本文不會講述太多的理論的知識,會寫一個最簡單的例子來說明dubbo如何與nacos整合,快速建構開發環境。

五種選擇的可視化工具,用於探索Kafka 五種選擇的可視化工具,用於探索Kafka Feb 01, 2024 am 08:03 AM

Kafka視覺化工具的五種選擇ApacheKafka是一個分散式串流處理平台,能夠處理大量即時資料。它廣泛用於建立即時資料管道、訊息佇列和事件驅動的應用程式。 Kafka的視覺化工具可以幫助使用者監控和管理Kafka集群,並且更好地理解Kafka資料流。以下是對五種流行的Kafka視覺化工具的介紹:ConfluentControlCenterConfluent

kafka視覺化工具比較分析:如何選擇最適合的工具? kafka視覺化工具比較分析:如何選擇最適合的工具? Jan 05, 2024 pm 12:15 PM

如何選擇合適的Kafka視覺化工具?五款工具比較分析引言:Kafka是一種高效能、高吞吐量的分散式訊息佇列系統,被廣泛應用於大數據領域。隨著Kafka的流行,越來越多的企業和開發者需要一個視覺化工具來方便地監控和管理Kafka叢集。本文將介紹五款常用的Kafka視覺化工具,並比較它們的特色和功能,幫助讀者選擇適合自己需求的工具。一、KafkaManager

go-zero與Kafka+Avro的實踐:建構高效能的互動式資料處理系統 go-zero與Kafka+Avro的實踐:建構高效能的互動式資料處理系統 Jun 23, 2023 am 09:04 AM

近年來,隨著大數據的興起和活躍的開源社區,越來越多的企業開始尋找高效能的互動式資料處理系統來滿足日益增長的資料需求。在這場技術升級的浪潮中,go-zero和Kafka+Avro被越來越多的企業所關注和採用。 go-zero是一款基於Golang語言開發的微服務框架,具有高效能、易用、易於擴展、易於維護等特點,旨在幫助企業快速建立高效的微服務應用系統。它的快速成長得

如何在 Rocky Linux 上安裝 Apache Kafka? 如何在 Rocky Linux 上安裝 Apache Kafka? Mar 01, 2024 pm 10:37 PM

在RockyLinux上安裝ApacheKafka可以按照以下步驟進行操作:更新系統:首先,確保你的RockyLinux系統是最新的,執行以下命令更新系統軟體包:sudoyumupdate安裝Java:ApacheKafka依賴Java,因此需要先安裝JavaDevelopmentKit(JDKK )。可以透過以下指令安裝OpenJDK:sudoyuminstalljava-1.8.0-openjdk-devel下載和解壓縮:造訪ApacheKafka官方網站()下載最新的二進位套件。選擇一個穩定版本

See all articles