Wie SpringBoot Kafka-Toolklassen integriert
Was ist Kafka?
Kafka ist eine Open-Source-Stream-Verarbeitungsplattform, die von der Apache Software Foundation entwickelt und in Scala und Java geschrieben wurde. Kafka ist ein verteiltes Publish-Subscribe-Messagingsystem mit hohem Durchsatz, das alle Aktions-Streaming-Daten von Verbrauchern auf der Website verarbeiten kann. Solche Aktionen (Websurfen, Suchen und andere Benutzeraktionen) sind ein Schlüsselfaktor für viele soziale Funktionen im modernen Web. Aufgrund der Durchsatzanforderungen werden diese Daten in der Regel durch die Verarbeitung von Protokollen und die Protokollaggregation verarbeitet. Dies ist eine praktikable Lösung für Protokolldaten und Offline-Analysesysteme wie Hadoop, erfordert jedoch Einschränkungen bei der Echtzeitverarbeitung. Der Zweck von Kafka besteht darin, die Online- und Offline-Nachrichtenverarbeitung durch den parallelen Lademechanismus von Hadoop zu vereinheitlichen und Echtzeitnachrichten über den Cluster bereitzustellen.
Anwendungsszenarien
Nachrichtensystem: Kafka und traditionelle Nachrichtensysteme (auch Nachrichten-Middleware genannt) verfügen beide über Systementkopplung und Redundanz. Es bietet Funktionen wie z B. Ersatzspeicher, Begrenzung von Verkehrsspitzen, Pufferung, asynchrone Kommunikation, Skalierbarkeit und Wiederherstellbarkeit. Gleichzeitig bietet Kafka auch Funktionen zur Garantie der Nachrichtensequenz und zum rückwirkenden Verbrauch, die in den meisten Messaging-Systemen nur schwer zu erreichen sind.
Speichersystem: Kafka speichert Nachrichten auf der Festplatte, wodurch das Risiko eines Datenverlusts im Vergleich zu anderen speicherbasierten Systemen effektiv verringert wird. Dank der Nachrichtenpersistenzfunktion und des Mehrfachkopiemechanismus von Kafka können wir Kafka als langfristiges Datenspeichersystem verwenden. Wir müssen lediglich die entsprechende Datenaufbewahrungsrichtlinie auf „permanent“ setzen oder die Protokollkomprimierungsfunktion des Themas aktivieren. Das ist es.
Streaming-Verarbeitungsplattform: Kafka bietet nicht nur eine zuverlässige Datenquelle für jedes gängige Streaming-Framework, sondern auch eine vollständige Streaming-Klassenbibliothek, z. B. verschiedene Operationen wie Windows , Verbindungen, Transformationen und Aggregationen.
Werfen wir einen Blick auf den detaillierten Code von SpringBoot, der die Kafka-Toolklasse integriert.
pom.xml
<dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.12.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.6.3</version> </dependency> <dependency> <groupId>fastjson</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency>
Tools
package com.bbl.demo.utils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.kafka.clients.admin.*; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import com.alibaba.fastjson.JSONObject; import java.time.Duration; import java.util.*; import java.util.concurrent.ExecutionException; public class KafkaUtils { private static AdminClient admin; /** * 私有静态方法,创建Kafka生产者 * @author o * @return KafkaProducer */ private static KafkaProducer<String, String> createProducer() { Properties props = new Properties(); //声明kafka的地址 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node01:9092,node02:9092,node03:9092"); //0、1 和 all:0表示只要把消息发送出去就返回成功;1表示只要Leader收到消息就返回成功;all表示所有副本都写入数据成功才算成功 props.put("acks", "all"); //重试次数 props.put("retries", Integer.MAX_VALUE); //批处理的字节数 props.put("batch.size", 16384); //批处理的延迟时间,当批次数据未满之时等待的时间 props.put("linger.ms", 1); //用来约束KafkaProducer能够使用的内存缓冲的大小的,默认值32MB props.put("buffer.memory", 33554432); // properties.put("value.serializer", // "org.apache.kafka.common.serialization.ByteArraySerializer"); // properties.put("key.serializer", // "org.apache.kafka.common.serialization.ByteArraySerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); return new KafkaProducer<String, String>(props); } /** * 私有静态方法,创建Kafka消费者 * @author o * @return KafkaConsumer */ private static KafkaConsumer<String, String> createConsumer() { Properties props = new Properties(); //声明kafka的地址 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node01:9092,node02:9092,node03:9092"); //每个消费者分配独立的消费者组编号 props.put("group.id", "111"); //如果value合法,则自动提交偏移量 props.put("enable.auto.commit", "true"); //设置多久一次更新被消费消息的偏移量 props.put("auto.commit.interval.ms", "1000"); //设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息 props.put("session.timeout.ms", "30000"); //自动重置offset props.put("auto.offset.reset","earliest"); // properties.put("value.serializer", // "org.apache.kafka.common.serialization.ByteArraySerializer"); // properties.put("key.serializer", // "org.apache.kafka.common.serialization.ByteArraySerializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return new KafkaConsumer<String, String>(props); } /** * 私有静态方法,创建Kafka集群管理员对象 * @author o */ public static void createAdmin(String servers){ Properties props = new Properties(); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,servers); admin = AdminClient.create(props); } /** * 私有静态方法,创建Kafka集群管理员对象 * @author o * @return AdminClient */ private static void createAdmin(){ createAdmin("node01:9092,node02:9092,node03:9092"); } /** * 传入kafka约定的topic,json格式字符串,发送给kafka集群 * @author o * @param topic * @param jsonMessage */ public static void sendMessage(String topic, String jsonMessage) { KafkaProducer<String, String> producer = createProducer(); producer.send(new ProducerRecord<String, String>(topic, jsonMessage)); producer.close(); } /** * 传入kafka约定的topic消费数据,用于测试,数据最终会输出到控制台上 * @author o * @param topic */ public static void consume(String topic) { KafkaConsumer<String, String> consumer = createConsumer(); consumer.subscribe(Arrays.asList(topic)); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100)); for (ConsumerRecord<String, String> record : records){ System.out.printf("offset = %d, key = %s, value = %s",record.offset(), record.key(), record.value()); System.out.println(); } } } /** * 传入kafka约定的topic数组,消费数据 * @author o * @param topics */ public static void consume(String ... topics) { KafkaConsumer<String, String> consumer = createConsumer(); consumer.subscribe(Arrays.asList(topics)); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100)); for (ConsumerRecord<String, String> record : records){ System.out.printf("offset = %d, key = %s, value = %s",record.offset(), record.key(), record.value()); System.out.println(); } } } /** * 传入kafka约定的topic,json格式字符串数组,发送给kafka集群 * 用于批量发送消息,性能较高。 * @author o * @param topic * @param jsonMessages * @throws InterruptedException */ public static void sendMessage(String topic, String... jsonMessages) throws InterruptedException { KafkaProducer<String, String> producer = createProducer(); for (String jsonMessage : jsonMessages) { producer.send(new ProducerRecord<String, String>(topic, jsonMessage)); } producer.close(); } /** * 传入kafka约定的topic,Map集合,内部转为json发送给kafka集群 <br> * 用于批量发送消息,性能较高。 * @author o * @param topic * @param mapMessageToJSONForArray */ public static void sendMessage(String topic, List<Map<Object, Object>> mapMessageToJSONForArray) { KafkaProducer<String, String> producer = createProducer(); for (Map<Object, Object> mapMessageToJSON : mapMessageToJSONForArray) { String array = JSONObject.toJSON(mapMessageToJSON).toString(); producer.send(new ProducerRecord<String, String>(topic, array)); } producer.close(); } /** * 传入kafka约定的topic,Map,内部转为json发送给kafka集群 * @author o * @param topic * @param mapMessageToJSON */ public static void sendMessage(String topic, Map<Object, Object> mapMessageToJSON) { KafkaProducer<String, String> producer = createProducer(); String array = JSONObject.toJSON(mapMessageToJSON).toString(); producer.send(new ProducerRecord<String, String>(topic, array)); producer.close(); } /** * 创建主题 * @author o * @param name 主题的名称 * @param numPartitions 主题的分区数 * @param replicationFactor 主题的每个分区的副本因子 */ public static void createTopic(String name,int numPartitions,int replicationFactor){ if(admin == null) { createAdmin(); } Map<String, String> configs = new HashMap<>(); CreateTopicsResult result = admin.createTopics(Arrays.asList(new NewTopic(name, numPartitions, (short) replicationFactor).configs(configs))); //以下内容用于判断创建主题的结果 for (Map.Entry<String, KafkaFuture<Void>> entry : result.values().entrySet()) { try { entry.getValue().get(); System.out.println("topic "+entry.getKey()+" created"); } catch (InterruptedException | ExecutionException e) { if (ExceptionUtils.getRootCause(e) instanceof TopicExistsException) { System.out.println("topic "+entry.getKey()+" existed"); } } } } /** * 删除主题 * @author o * @param names 主题的名称 */ public static void deleteTopic(String name,String ... names){ if(admin == null) { createAdmin(); } Map<String, String> configs = new HashMap<>(); Collection<String> topics = Arrays.asList(names); topics.add(name); DeleteTopicsResult result = admin.deleteTopics(topics); //以下内容用于判断删除主题的结果 for (Map.Entry<String, KafkaFuture<Void>> entry : result.values().entrySet()) { try { entry.getValue().get(); System.out.println("topic "+entry.getKey()+" deleted"); } catch (InterruptedException | ExecutionException e) { if (ExceptionUtils.getRootCause(e) instanceof UnknownTopicOrPartitionException) { System.out.println("topic "+entry.getKey()+" not exist"); } } } } /** * 查看主题详情 * @author o * @param names 主题的名称 */ public static void describeTopic(String name,String ... names){ if(admin == null) { createAdmin(); } Map<String, String> configs = new HashMap<>(); Collection<String> topics = Arrays.asList(names); topics.add(name); DescribeTopicsResult result = admin.describeTopics(topics); //以下内容用于显示主题详情的结果 for (Map.Entry<String, KafkaFuture<TopicDescription>> entry : result.values().entrySet()) { try { entry.getValue().get(); System.out.println("topic "+entry.getKey()+" describe"); System.out.println("\t name: "+entry.getValue().get().name()); System.out.println("\t partitions: "); entry.getValue().get().partitions().stream().forEach(p-> { System.out.println("\t\t index: "+p.partition()); System.out.println("\t\t\t leader: "+p.leader()); System.out.println("\t\t\t replicas: "+p.replicas()); System.out.println("\t\t\t isr: "+p.isr()); }); System.out.println("\t internal: "+entry.getValue().get().isInternal()); } catch (InterruptedException | ExecutionException e) { if (ExceptionUtils.getRootCause(e) instanceof UnknownTopicOrPartitionException) { System.out.println("topic "+entry.getKey()+" not exist"); } } } } /** * 查看主题列表 * @author o * @return Set<String> TopicList */ public static Set<String> listTopic(){ if(admin == null) { createAdmin(); } ListTopicsResult result = admin.listTopics(); try { result.names().get().stream().map(x->x+"\t").forEach(System.out::print); return result.names().get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); return null; } } public static void main(String[] args) { System.out.println(listTopic()); } }
Das obige ist der detaillierte Inhalt vonWie SpringBoot Kafka-Toolklassen integriert. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Heiße KI -Werkzeuge

Undresser.AI Undress
KI-gestützte App zum Erstellen realistischer Aktfotos

AI Clothes Remover
Online-KI-Tool zum Entfernen von Kleidung aus Fotos.

Undress AI Tool
Ausziehbilder kostenlos

Clothoff.io
KI-Kleiderentferner

AI Hentai Generator
Erstellen Sie kostenlos Ai Hentai.

Heißer Artikel

Heiße Werkzeuge

Notepad++7.3.1
Einfach zu bedienender und kostenloser Code-Editor

SublimeText3 chinesische Version
Chinesische Version, sehr einfach zu bedienen

Senden Sie Studio 13.0.1
Leistungsstarke integrierte PHP-Entwicklungsumgebung

Dreamweaver CS6
Visuelle Webentwicklungstools

SublimeText3 Mac-Version
Codebearbeitungssoftware auf Gottesniveau (SublimeText3)

Heiße Themen

Mit der Entwicklung des Internets und der Technologie sind digitale Investitionen zu einem Thema mit zunehmender Besorgnis geworden. Viele Anleger erforschen und studieren weiterhin Anlagestrategien in der Hoffnung, eine höhere Kapitalrendite zu erzielen. Im Aktienhandel ist die Aktienanalyse in Echtzeit für die Entscheidungsfindung sehr wichtig, und der Einsatz der Kafka-Echtzeit-Nachrichtenwarteschlange und der PHP-Technologie ist ein effizientes und praktisches Mittel. 1. Einführung in Kafka Kafka ist ein von LinkedIn entwickeltes verteiltes Publish- und Subscribe-Messagingsystem mit hohem Durchsatz. Die Hauptmerkmale von Kafka sind

SpringBoot und SpringMVC sind beide häufig verwendete Frameworks in der Java-Entwicklung, es gibt jedoch einige offensichtliche Unterschiede zwischen ihnen. In diesem Artikel werden die Funktionen und Verwendungsmöglichkeiten dieser beiden Frameworks untersucht und ihre Unterschiede verglichen. Lassen Sie uns zunächst etwas über SpringBoot lernen. SpringBoot wurde vom Pivotal-Team entwickelt, um die Erstellung und Bereitstellung von Anwendungen auf Basis des Spring-Frameworks zu vereinfachen. Es bietet eine schnelle und einfache Möglichkeit, eigenständige, ausführbare Dateien zu erstellen

Wie wählt man das richtige Kafka-Visualisierungstool aus? Vergleichende Analyse von fünf Tools Einführung: Kafka ist ein leistungsstarkes verteiltes Nachrichtenwarteschlangensystem mit hohem Durchsatz, das im Bereich Big Data weit verbreitet ist. Mit der Popularität von Kafka benötigen immer mehr Unternehmen und Entwickler ein visuelles Tool zur einfachen Überwachung und Verwaltung von Kafka-Clustern. In diesem Artikel werden fünf häufig verwendete Kafka-Visualisierungstools vorgestellt und ihre Merkmale und Funktionen verglichen, um den Lesern bei der Auswahl des Tools zu helfen, das ihren Anforderungen entspricht. 1. KafkaManager

Fünf Optionen für Kafka-Visualisierungstools ApacheKafka ist eine verteilte Stream-Verarbeitungsplattform, die große Mengen an Echtzeitdaten verarbeiten kann. Es wird häufig zum Aufbau von Echtzeit-Datenpipelines, Nachrichtenwarteschlangen und ereignisgesteuerten Anwendungen verwendet. Die Visualisierungstools von Kafka können Benutzern dabei helfen, Kafka-Cluster zu überwachen und zu verwalten und Kafka-Datenflüsse besser zu verstehen. Im Folgenden finden Sie eine Einführung in fünf beliebte Kafka-Visualisierungstools: ConfluentControlCenterConfluent

In diesem Artikel wird ein detailliertes Beispiel geschrieben, um über die tatsächliche Entwicklung von Dubbo + Nacos + Spring Boot zu sprechen. In diesem Artikel wird nicht zu viel theoretisches Wissen behandelt, sondern das einfachste Beispiel wird geschrieben, um zu veranschaulichen, wie Dubbo in Nacos integriert werden kann, um schnell eine Entwicklungsumgebung aufzubauen.

Um ApacheKafka auf RockyLinux zu installieren, können Sie die folgenden Schritte ausführen: Aktualisieren Sie das System: Stellen Sie zunächst sicher, dass Ihr RockyLinux-System auf dem neuesten Stand ist. Führen Sie den folgenden Befehl aus, um die Systempakete zu aktualisieren: sudoyumupdate Java installieren: ApacheKafka hängt von Java ab, also von Ihnen Sie müssen zuerst JavaDevelopmentKit (JDK) installieren. OpenJDK kann mit dem folgenden Befehl installiert werden: sudoyuminstalljava-1.8.0-openjdk-devel Herunterladen und dekomprimieren: Besuchen Sie die offizielle Website von ApacheKafka (), um das neueste Binärpaket herunterzuladen. Wählen Sie eine stabile Version

In den letzten Jahren haben mit dem Aufkommen von Big Data und aktiven Open-Source-Communities immer mehr Unternehmen begonnen, nach leistungsstarken interaktiven Datenverarbeitungssystemen zu suchen, um den wachsenden Datenanforderungen gerecht zu werden. In dieser Welle von Technologie-Upgrades werden Go-Zero und Kafka+Avro von immer mehr Unternehmen beachtet und übernommen. go-zero ist ein auf der Golang-Sprache entwickeltes Microservice-Framework. Es zeichnet sich durch hohe Leistung, Benutzerfreundlichkeit, einfache Erweiterung und einfache Wartung aus und soll Unternehmen dabei helfen, schnell effiziente Microservice-Anwendungssysteme aufzubauen. sein schnelles Wachstum

Häufig verwendete Verzeichnisse für SpringBoot-Projekte werden auf der Grundlage der Verzeichnisstruktur und Benennungsspezifikationen während der SpringBoot-Entwicklung eingeführt. Durch die Einführung können wir Ihnen bei der tatsächlichen Planung der Verzeichnisstruktur helfen Projekte? Wie können Verzeichnisse standardisierter benannt werden? Was bedeuten die einzelnen Verzeichnisse? Warten Sie drei Fragen. Verzeichnisbeschreibung servicex//Projektname|-admin-ui//Front-End-Code für den Verwaltungsdienst (normalerweise werden UI und SERVICE zur einfachen Verwaltung in einem Projekt zusammengefasst)|-servicex-auth//Modul 1|-servicex-common// Modul 2|-servicex-gateway//Modul 3|
