SpringBoot가 Kafka 도구 클래스를 통합하는 방법
카프카란 무엇인가요?
Kafka는 Apache Software Foundation에서 개발하고 Scala 및 Java로 작성된 오픈 소스 스트림 처리 플랫폼입니다. Kafka는 웹사이트에서 소비자의 모든 액션 스트리밍 데이터를 처리할 수 있는 처리량이 높은 분산형 게시-구독 메시징 시스템입니다. 이러한 작업(웹 탐색, 검색 및 기타 사용자 작업)은 현대 웹의 많은 소셜 기능에서 핵심 요소입니다. 이 데이터는 일반적으로 처리량 요구 사항으로 인해 로그 및 로그 집계를 처리하여 처리됩니다. 이는 Hadoop과 같은 로그 데이터 및 오프라인 분석 시스템에 실행 가능한 솔루션이지만 실시간 처리의 제약이 있습니다. Kafka의 목적은 Hadoop의 병렬 로딩 메커니즘을 통해 온라인과 오프라인 메시지 처리를 통합하고 클러스터를 통해 실시간 메시지를 제공하는 것입니다.
애플리케이션 시나리오
메시지 시스템: Kafka와 기존 메시징 시스템(메시지 미들웨어라고도 함)에는 모두 시스템 분리, 중복 저장, 트래픽 피크 감소, 버퍼링, 비동기 통신, 확장성, 복구 가능성 및 기타 기능이 있습니다. 동시에 Kafka는 대부분의 메시징 시스템에서 달성하기 어려운 메시지 순서 보장 및 소급 소비 기능도 제공합니다.
스토리지 시스템: Kafka는 메시지를 디스크에 유지하므로 다른 메모리 스토리지 기반 시스템에 비해 데이터 손실 위험을 효과적으로 줄입니다. Kafka를 장기 데이터 저장 시스템으로 사용할 수 있는 것은 바로 Kafka의 메시지 지속성 기능과 다중 복사 메커니즘 덕분입니다. 해당 데이터 보존 정책을 "영구"로 설정하거나 해당 주제의 로그 압축 기능을 활성화하기만 하면 됩니다. 그게 다야.
스트리밍 처리 플랫폼: Kafka는 인기 있는 각 스트리밍 프레임워크에 대해 안정적인 데이터 소스를 제공할 뿐만 아니라 창, 연결, 변환, 집계 등과 같은 완전한 스트리밍 처리 라이브러리를 제공합니다.
Kafka 툴 클래스를 통합한 SpringBoot의 상세 코드를 살펴보겠습니다.
pom.xml
<dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.12.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.6.3</version> </dependency> <dependency> <groupId>fastjson</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency>
Tools
package com.bbl.demo.utils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.kafka.clients.admin.*; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import com.alibaba.fastjson.JSONObject; import java.time.Duration; import java.util.*; import java.util.concurrent.ExecutionException; public class KafkaUtils { private static AdminClient admin; /** * 私有静态方法,创建Kafka生产者 * @author o * @return KafkaProducer */ private static KafkaProducer<String, String> createProducer() { Properties props = new Properties(); //声明kafka的地址 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node01:9092,node02:9092,node03:9092"); //0、1 和 all:0表示只要把消息发送出去就返回成功;1表示只要Leader收到消息就返回成功;all表示所有副本都写入数据成功才算成功 props.put("acks", "all"); //重试次数 props.put("retries", Integer.MAX_VALUE); //批处理的字节数 props.put("batch.size", 16384); //批处理的延迟时间,当批次数据未满之时等待的时间 props.put("linger.ms", 1); //用来约束KafkaProducer能够使用的内存缓冲的大小的,默认值32MB props.put("buffer.memory", 33554432); // properties.put("value.serializer", // "org.apache.kafka.common.serialization.ByteArraySerializer"); // properties.put("key.serializer", // "org.apache.kafka.common.serialization.ByteArraySerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); return new KafkaProducer<String, String>(props); } /** * 私有静态方法,创建Kafka消费者 * @author o * @return KafkaConsumer */ private static KafkaConsumer<String, String> createConsumer() { Properties props = new Properties(); //声明kafka的地址 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node01:9092,node02:9092,node03:9092"); //每个消费者分配独立的消费者组编号 props.put("group.id", "111"); //如果value合法,则自动提交偏移量 props.put("enable.auto.commit", "true"); //设置多久一次更新被消费消息的偏移量 props.put("auto.commit.interval.ms", "1000"); //设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息 props.put("session.timeout.ms", "30000"); //自动重置offset props.put("auto.offset.reset","earliest"); // properties.put("value.serializer", // "org.apache.kafka.common.serialization.ByteArraySerializer"); // properties.put("key.serializer", // "org.apache.kafka.common.serialization.ByteArraySerializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return new KafkaConsumer<String, String>(props); } /** * 私有静态方法,创建Kafka集群管理员对象 * @author o */ public static void createAdmin(String servers){ Properties props = new Properties(); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,servers); admin = AdminClient.create(props); } /** * 私有静态方法,创建Kafka集群管理员对象 * @author o * @return AdminClient */ private static void createAdmin(){ createAdmin("node01:9092,node02:9092,node03:9092"); } /** * 传入kafka约定的topic,json格式字符串,发送给kafka集群 * @author o * @param topic * @param jsonMessage */ public static void sendMessage(String topic, String jsonMessage) { KafkaProducer<String, String> producer = createProducer(); producer.send(new ProducerRecord<String, String>(topic, jsonMessage)); producer.close(); } /** * 传入kafka约定的topic消费数据,用于测试,数据最终会输出到控制台上 * @author o * @param topic */ public static void consume(String topic) { KafkaConsumer<String, String> consumer = createConsumer(); consumer.subscribe(Arrays.asList(topic)); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100)); for (ConsumerRecord<String, String> record : records){ System.out.printf("offset = %d, key = %s, value = %s",record.offset(), record.key(), record.value()); System.out.println(); } } } /** * 传入kafka约定的topic数组,消费数据 * @author o * @param topics */ public static void consume(String ... topics) { KafkaConsumer<String, String> consumer = createConsumer(); consumer.subscribe(Arrays.asList(topics)); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100)); for (ConsumerRecord<String, String> record : records){ System.out.printf("offset = %d, key = %s, value = %s",record.offset(), record.key(), record.value()); System.out.println(); } } } /** * 传入kafka约定的topic,json格式字符串数组,发送给kafka集群 * 用于批量发送消息,性能较高。 * @author o * @param topic * @param jsonMessages * @throws InterruptedException */ public static void sendMessage(String topic, String... jsonMessages) throws InterruptedException { KafkaProducer<String, String> producer = createProducer(); for (String jsonMessage : jsonMessages) { producer.send(new ProducerRecord<String, String>(topic, jsonMessage)); } producer.close(); } /** * 传入kafka约定的topic,Map集合,内部转为json发送给kafka集群 <br> * 用于批量发送消息,性能较高。 * @author o * @param topic * @param mapMessageToJSONForArray */ public static void sendMessage(String topic, List<Map<Object, Object>> mapMessageToJSONForArray) { KafkaProducer<String, String> producer = createProducer(); for (Map<Object, Object> mapMessageToJSON : mapMessageToJSONForArray) { String array = JSONObject.toJSON(mapMessageToJSON).toString(); producer.send(new ProducerRecord<String, String>(topic, array)); } producer.close(); } /** * 传入kafka约定的topic,Map,内部转为json发送给kafka集群 * @author o * @param topic * @param mapMessageToJSON */ public static void sendMessage(String topic, Map<Object, Object> mapMessageToJSON) { KafkaProducer<String, String> producer = createProducer(); String array = JSONObject.toJSON(mapMessageToJSON).toString(); producer.send(new ProducerRecord<String, String>(topic, array)); producer.close(); } /** * 创建主题 * @author o * @param name 主题的名称 * @param numPartitions 主题的分区数 * @param replicationFactor 主题的每个分区的副本因子 */ public static void createTopic(String name,int numPartitions,int replicationFactor){ if(admin == null) { createAdmin(); } Map<String, String> configs = new HashMap<>(); CreateTopicsResult result = admin.createTopics(Arrays.asList(new NewTopic(name, numPartitions, (short) replicationFactor).configs(configs))); //以下内容用于判断创建主题的结果 for (Map.Entry<String, KafkaFuture<Void>> entry : result.values().entrySet()) { try { entry.getValue().get(); System.out.println("topic "+entry.getKey()+" created"); } catch (InterruptedException | ExecutionException e) { if (ExceptionUtils.getRootCause(e) instanceof TopicExistsException) { System.out.println("topic "+entry.getKey()+" existed"); } } } } /** * 删除主题 * @author o * @param names 主题的名称 */ public static void deleteTopic(String name,String ... names){ if(admin == null) { createAdmin(); } Map<String, String> configs = new HashMap<>(); Collection<String> topics = Arrays.asList(names); topics.add(name); DeleteTopicsResult result = admin.deleteTopics(topics); //以下内容用于判断删除主题的结果 for (Map.Entry<String, KafkaFuture<Void>> entry : result.values().entrySet()) { try { entry.getValue().get(); System.out.println("topic "+entry.getKey()+" deleted"); } catch (InterruptedException | ExecutionException e) { if (ExceptionUtils.getRootCause(e) instanceof UnknownTopicOrPartitionException) { System.out.println("topic "+entry.getKey()+" not exist"); } } } } /** * 查看主题详情 * @author o * @param names 主题的名称 */ public static void describeTopic(String name,String ... names){ if(admin == null) { createAdmin(); } Map<String, String> configs = new HashMap<>(); Collection<String> topics = Arrays.asList(names); topics.add(name); DescribeTopicsResult result = admin.describeTopics(topics); //以下内容用于显示主题详情的结果 for (Map.Entry<String, KafkaFuture<TopicDescription>> entry : result.values().entrySet()) { try { entry.getValue().get(); System.out.println("topic "+entry.getKey()+" describe"); System.out.println("\t name: "+entry.getValue().get().name()); System.out.println("\t partitions: "); entry.getValue().get().partitions().stream().forEach(p-> { System.out.println("\t\t index: "+p.partition()); System.out.println("\t\t\t leader: "+p.leader()); System.out.println("\t\t\t replicas: "+p.replicas()); System.out.println("\t\t\t isr: "+p.isr()); }); System.out.println("\t internal: "+entry.getValue().get().isInternal()); } catch (InterruptedException | ExecutionException e) { if (ExceptionUtils.getRootCause(e) instanceof UnknownTopicOrPartitionException) { System.out.println("topic "+entry.getKey()+" not exist"); } } } } /** * 查看主题列表 * @author o * @return Set<String> TopicList */ public static Set<String> listTopic(){ if(admin == null) { createAdmin(); } ListTopicsResult result = admin.listTopics(); try { result.names().get().stream().map(x->x+"\t").forEach(System.out::print); return result.names().get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); return null; } } public static void main(String[] args) { System.out.println(listTopic()); } }
위 내용은 SpringBoot가 Kafka 도구 클래스를 통합하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

핫 AI 도구

Undresser.AI Undress
사실적인 누드 사진을 만들기 위한 AI 기반 앱

AI Clothes Remover
사진에서 옷을 제거하는 온라인 AI 도구입니다.

Undress AI Tool
무료로 이미지를 벗다

Clothoff.io
AI 옷 제거제

AI Hentai Generator
AI Hentai를 무료로 생성하십시오.

인기 기사

뜨거운 도구

메모장++7.3.1
사용하기 쉬운 무료 코드 편집기

SublimeText3 중국어 버전
중국어 버전, 사용하기 매우 쉽습니다.

스튜디오 13.0.1 보내기
강력한 PHP 통합 개발 환경

드림위버 CS6
시각적 웹 개발 도구

SublimeText3 Mac 버전
신 수준의 코드 편집 소프트웨어(SublimeText3)

뜨거운 주제











인터넷과 기술의 발달로 디지털 투자에 대한 관심이 높아지고 있습니다. 많은 투자자들은 더 높은 투자 수익을 얻기 위해 계속해서 투자 전략을 탐색하고 연구합니다. 주식거래에 있어서 실시간 주식분석은 의사결정에 매우 중요한데, Kafka 실시간 메시지 큐와 PHP 기술을 활용하는 것은 효율적이고 실용적인 수단이다. 1. Kafka 소개 Kafka는 LinkedIn에서 개발한 처리량이 높은 분산 게시 및 구독 메시징 시스템입니다. 카프카의 주요 기능은 다음과 같습니다.

SpringBoot와 SpringMVC는 모두 Java 개발에서 일반적으로 사용되는 프레임워크이지만 둘 사이에는 몇 가지 분명한 차이점이 있습니다. 이 기사에서는 이 두 프레임워크의 기능과 용도를 살펴보고 차이점을 비교할 것입니다. 먼저 SpringBoot에 대해 알아봅시다. SpringBoot는 Spring 프레임워크를 기반으로 하는 애플리케이션의 생성 및 배포를 단순화하기 위해 Pivotal 팀에서 개발되었습니다. 독립 실행형 실행 파일을 구축하는 빠르고 가벼운 방법을 제공합니다.

Kafka 시각화 도구를 위한 다섯 가지 옵션 ApacheKafka는 대량의 실시간 데이터를 처리할 수 있는 분산 스트림 처리 플랫폼입니다. 실시간 데이터 파이프라인, 메시지 대기열 및 이벤트 기반 애플리케이션을 구축하는 데 널리 사용됩니다. Kafka의 시각화 도구는 사용자가 Kafka 클러스터를 모니터링 및 관리하고 Kafka 데이터 흐름을 더 잘 이해하는 데 도움이 될 수 있습니다. 다음은 널리 사용되는 5가지 Kafka 시각화 도구에 대한 소개입니다.

이 글에서는 dubbo+nacos+Spring Boot의 실제 개발에 대해 이야기하기 위해 자세한 예제를 작성하겠습니다. 이 기사에서는 이론적 지식을 너무 많이 다루지는 않지만 dubbo를 nacos와 통합하여 개발 환경을 신속하게 구축하는 방법을 설명하는 가장 간단한 예를 작성합니다.

올바른 Kafka 시각화 도구를 선택하는 방법은 무엇입니까? 다섯 가지 도구 비교 분석 소개: Kafka는 빅데이터 분야에서 널리 사용되는 고성능, 높은 처리량의 분산 메시지 대기열 시스템입니다. Kafka의 인기로 인해 점점 더 많은 기업과 개발자가 Kafka 클러스터를 쉽게 모니터링하고 관리하기 위한 시각적 도구를 필요로 하고 있습니다. 이 기사에서는 일반적으로 사용되는 5가지 Kafka 시각화 도구를 소개하고 각 기능을 비교하여 독자가 자신의 필요에 맞는 도구를 선택할 수 있도록 돕습니다. 1. 카프카매니저

React 및 Apache Kafka를 사용하여 실시간 데이터 처리 애플리케이션을 구축하는 방법 소개: 빅 데이터 및 실시간 데이터 처리가 증가함에 따라 실시간 데이터 처리 애플리케이션 구축은 많은 개발자의 추구 사항이 되었습니다. 널리 사용되는 프런트엔드 프레임워크인 React와 고성능 분산 메시징 시스템인 Apache Kafka의 조합은 실시간 데이터 처리 애플리케이션을 구축하는 데 도움이 될 수 있습니다. 이 기사에서는 React와 Apache Kafka를 사용하여 실시간 데이터 처리 애플리케이션을 구축하는 방법을 소개합니다.

RockyLinux에 ApacheKafka를 설치하려면 다음 단계를 수행할 수 있습니다. 시스템 업데이트: 먼저 RockyLinux 시스템이 최신인지 확인하고 다음 명령을 실행하여 시스템 패키지를 업데이트합니다. sudoyumupdate Java 설치: ApacheKafka는 Java에 의존하므로 먼저 JDK(Java Development Kit)를 설치해야 합니다. OpenJDK는 다음 명령을 통해 설치할 수 있습니다. sudoyuminstalljava-1.8.0-openjdk-devel 다운로드 및 압축 해제: ApacheKafka 공식 웹사이트()를 방문하여 최신 바이너리 패키지를 다운로드합니다. 안정적인 버전을 선택하세요

최근 몇 년 동안 빅 데이터와 활발한 오픈 소스 커뮤니티가 증가하면서 점점 더 많은 기업이 증가하는 데이터 요구 사항을 충족하기 위해 고성능 대화형 데이터 처리 시스템을 찾기 시작했습니다. 이러한 기술 업그레이드의 물결 속에서 go-zero와 Kafka+Avro는 점점 더 많은 기업에서 주목을 받고 채택되고 있습니다. go-zero는 Golang 언어를 기반으로 개발된 마이크로서비스 프레임워크로, 기업이 효율적인 마이크로서비스 애플리케이션 시스템을 신속하게 구축할 수 있도록 설계되었으며, 고성능, 사용 용이성, 쉬운 확장성을 갖추고 있습니다. 급속한 성장
