springboot+kafka에서 @KafkaListener를 사용하여 여러 주제를 동적으로 지정하는 방법
Description
이 프로젝트는 springboot+kafak 통합 프로젝트이므로 springboot에서 kafak 소비 주석 @KafkaListener를 사용합니다
먼저 application.properties에서 여러 주제를 쉼표로 구분하여 구성합니다.
방법: Spring의 SpEl 표현식을 사용하여 주제를 다음과 같이 구성합니다. @KafkaListener(topics = “#{’${topics}’.split(’,’)}”)
Run 프로그램 및 콘솔 인쇄 효과는 다음과 같습니다.
단 하나의 소비자 스레드만 열렸으므로 모든 주제와 파티션이 이 스레드에 할당됩니다.
이러한 주제를 소비하기 위해 여러 소비자 스레드를 열려면 원하는 소비자 수에 @KafkaListener 주석의 concurrency 매개변수를 추가하세요(소비자 수는 소비자가 원하는) 모든 주제의 파티션 수의 합)
프로그램을 실행하면 콘솔 인쇄 효과는 다음과 같습니다.
자주 묻는 질문을 정리하자면
방법 프로그램이 실행되는 동안 주제를 변경하고 소비할 수 있나요?
ans: 시도한 후에는 @KafkaListener 주석을 사용하여 이 요구 사항을 달성할 수 없습니다. 프로그램이 소비자를 초기화합니다. @KafkaListener 주석 정보를 기반으로 지정된 주제를 사용합니다. 프로그램이 실행되는 동안 주제가 수정되면 소비자는 소비자 구성을 수정한 다음 주제를 다시 구독할 수 없습니다.
하지만 주제 일치를 위해 @KafkaListener의 topicPattern 매개변수를 사용하는 절충안이 있을 수 있습니다.
궁극적인 방법
Idea
Kafka 기본 클라이언트 종속성을 사용하고 @KafkaListener를 사용하는 대신 수동으로 소비자를 초기화하고 소비자 스레드를 시작합니다.
소비자 스레드에서 각 주기는 구성, 데이터베이스 또는 기타 구성 소스에서 최신 주제 정보를 얻고 이를 이전 주제와 비교하며 변경 사항이 발생하면 주제를 다시 구독하거나 소비자를 초기화합니다.
구현
kafka 클라이언트 종속성 추가(이 테스트 서버 kafka 버전: 2.12-2.4.0)
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.3.0</version> </dependency>
Code
@Service @Slf4j public class KafkaConsumers implements InitializingBean { /** * 消费者 */ private static KafkaConsumer<String, String> consumer; /** * topic */ private List<String> topicList; public static String getNewTopic() { try { return org.apache.commons.io.FileUtils.readLines(new File("D:/topic.txt"), "utf-8").get(0); } catch (IOException e) { e.printStackTrace(); } return null; } /** * 初始化消费者(配置写死是为了快速测试,请大家使用配置文件) * * @param topicList * @return */ public KafkaConsumer<String, String> getInitConsumer(List<String> topicList) { //配置信息 Properties props = new Properties(); //kafka服务器地址 props.put("bootstrap.servers", "192.168.9.185:9092"); //必须指定消费者组 props.put("group.id", "haha"); //设置数据key和value的序列化处理类 props.put("key.deserializer", StringDeserializer.class); props.put("value.deserializer", StringDeserializer.class); //创建消息者实例 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //订阅topic的消息 consumer.subscribe(topicList); return consumer; } /** * 开启消费者线程 * 异常请自己根据需求自己处理 */ @Override public void afterPropertiesSet() { // 初始化topic topicList = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic())); if (org.apache.commons.collections.CollectionUtils.isNotEmpty(topicList)) { consumer = getInitConsumer(topicList); // 开启一个消费者线程 new Thread(() -> { while (true) { // 模拟从配置源中获取最新的topic(字符串,逗号隔开) final List<String> newTopic = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic())); // 如果topic发生变化 if (!topicList.equals(newTopic)) { log.info("topic 发生变化:newTopic:{},oldTopic:{}-------------------------", newTopic, topicList); // method one:重新订阅topic: topicList = newTopic; consumer.subscribe(newTopic); // method two:关闭原来的消费者,重新初始化一个消费者 //consumer.close(); //topicList = newTopic; //consumer = getInitConsumer(newTopic); continue; } ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.println("key:" + record.key() + "" + ",value:" + record.value()); } } }).start(); } } }
72번째 코드 줄에 대해 이야기해 보겠습니다.
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
위 코드 줄의 의미: 100ms Kafka의 브로커가 데이터를 반환할 때까지 기다립니다. 슈퍼마켓 매개변수는 사용 가능한 데이터가 있는지 여부에 관계없이 폴링이 반환될 수 있는 시간을 지정합니다.
주제를 수정한 후에는 이 설문 조사에서 가져온 메시지가 처리될 때까지 기다려야 하며 주제를 다시 구독하기 전에 while(true) 루프 중에 주제의 변경 사항을 감지해야 합니다. 한 번에 poll() 메서드는 500입니다. 아래와 같이 kafka 클라이언트 소스 코드에 설정되어 있습니다.
이 구성을 사용자 정의하려면 소비자를 초기화할 때
실행 결과를 추가할 수 있습니다(테스트된 주제에는 데이터가 없습니다)
참고: KafkaConsumer는 스레드에 안전하지 않습니다. 여러 소비자를 열려면 하나의 KafkaConsumer 인스턴스를 사용하지 마세요. 여러 소비자를 열려면 여러 개의 새로운 KafkaConsumer 인스턴스를 만들어야 합니다.
위 내용은 springboot+kafka에서 @KafkaListener를 사용하여 여러 주제를 동적으로 지정하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

핫 AI 도구

Undresser.AI Undress
사실적인 누드 사진을 만들기 위한 AI 기반 앱

AI Clothes Remover
사진에서 옷을 제거하는 온라인 AI 도구입니다.

Undress AI Tool
무료로 이미지를 벗다

Clothoff.io
AI 옷 제거제

AI Hentai Generator
AI Hentai를 무료로 생성하십시오.

인기 기사

뜨거운 도구

메모장++7.3.1
사용하기 쉬운 무료 코드 편집기

SublimeText3 중국어 버전
중국어 버전, 사용하기 매우 쉽습니다.

스튜디오 13.0.1 보내기
강력한 PHP 통합 개발 환경

드림위버 CS6
시각적 웹 개발 도구

SublimeText3 Mac 버전
신 수준의 코드 편집 소프트웨어(SublimeText3)

뜨거운 주제











인터넷과 기술의 발달로 디지털 투자에 대한 관심이 높아지고 있습니다. 많은 투자자들은 더 높은 투자 수익을 얻기 위해 계속해서 투자 전략을 탐색하고 연구합니다. 주식거래에 있어서 실시간 주식분석은 의사결정에 매우 중요한데, Kafka 실시간 메시지 큐와 PHP 기술을 활용하는 것은 효율적이고 실용적인 수단이다. 1. Kafka 소개 Kafka는 LinkedIn에서 개발한 처리량이 높은 분산 게시 및 구독 메시징 시스템입니다. 카프카의 주요 기능은 다음과 같습니다.

SpringBoot와 SpringMVC는 모두 Java 개발에서 일반적으로 사용되는 프레임워크이지만 둘 사이에는 몇 가지 분명한 차이점이 있습니다. 이 기사에서는 이 두 프레임워크의 기능과 용도를 살펴보고 차이점을 비교할 것입니다. 먼저 SpringBoot에 대해 알아봅시다. SpringBoot는 Spring 프레임워크를 기반으로 하는 애플리케이션의 생성 및 배포를 단순화하기 위해 Pivotal 팀에서 개발되었습니다. 독립 실행형 실행 파일을 구축하는 빠르고 가벼운 방법을 제공합니다.

이 글에서는 dubbo+nacos+Spring Boot의 실제 개발에 대해 이야기하기 위해 자세한 예제를 작성하겠습니다. 이 기사에서는 이론적 지식을 너무 많이 다루지는 않지만 dubbo를 nacos와 통합하여 개발 환경을 신속하게 구축하는 방법을 설명하는 가장 간단한 예를 작성합니다.

Kafka 시각화 도구를 위한 다섯 가지 옵션 ApacheKafka는 대량의 실시간 데이터를 처리할 수 있는 분산 스트림 처리 플랫폼입니다. 실시간 데이터 파이프라인, 메시지 대기열 및 이벤트 기반 애플리케이션을 구축하는 데 널리 사용됩니다. Kafka의 시각화 도구는 사용자가 Kafka 클러스터를 모니터링 및 관리하고 Kafka 데이터 흐름을 더 잘 이해하는 데 도움이 될 수 있습니다. 다음은 널리 사용되는 5가지 Kafka 시각화 도구에 대한 소개입니다.

React 및 Apache Kafka를 사용하여 실시간 데이터 처리 애플리케이션을 구축하는 방법 소개: 빅 데이터 및 실시간 데이터 처리가 증가함에 따라 실시간 데이터 처리 애플리케이션 구축은 많은 개발자의 추구 사항이 되었습니다. 널리 사용되는 프런트엔드 프레임워크인 React와 고성능 분산 메시징 시스템인 Apache Kafka의 조합은 실시간 데이터 처리 애플리케이션을 구축하는 데 도움이 될 수 있습니다. 이 기사에서는 React와 Apache Kafka를 사용하여 실시간 데이터 처리 애플리케이션을 구축하는 방법을 소개합니다.

올바른 Kafka 시각화 도구를 선택하는 방법은 무엇입니까? 다섯 가지 도구 비교 분석 소개: Kafka는 빅데이터 분야에서 널리 사용되는 고성능, 높은 처리량의 분산 메시지 대기열 시스템입니다. Kafka의 인기로 인해 점점 더 많은 기업과 개발자가 Kafka 클러스터를 쉽게 모니터링하고 관리하기 위한 시각적 도구를 필요로 하고 있습니다. 이 기사에서는 일반적으로 사용되는 5가지 Kafka 시각화 도구를 소개하고 각 기능을 비교하여 독자가 자신의 필요에 맞는 도구를 선택할 수 있도록 돕습니다. 1. 카프카매니저

RockyLinux에 ApacheKafka를 설치하려면 다음 단계를 수행할 수 있습니다. 시스템 업데이트: 먼저 RockyLinux 시스템이 최신인지 확인하고 다음 명령을 실행하여 시스템 패키지를 업데이트합니다. sudoyumupdate Java 설치: ApacheKafka는 Java에 의존하므로 먼저 JDK(Java Development Kit)를 설치해야 합니다. OpenJDK는 다음 명령을 통해 설치할 수 있습니다. sudoyuminstalljava-1.8.0-openjdk-devel 다운로드 및 압축 해제: ApacheKafka 공식 웹사이트()를 방문하여 최신 바이너리 패키지를 다운로드합니다. 안정적인 버전을 선택하세요

최근 몇 년 동안 빅 데이터와 활발한 오픈 소스 커뮤니티가 증가하면서 점점 더 많은 기업이 증가하는 데이터 요구 사항을 충족하기 위해 고성능 대화형 데이터 처리 시스템을 찾기 시작했습니다. 이러한 기술 업그레이드의 물결 속에서 go-zero와 Kafka+Avro는 점점 더 많은 기업에서 주목을 받고 채택되고 있습니다. go-zero는 Golang 언어를 기반으로 개발된 마이크로서비스 프레임워크로, 기업이 효율적인 마이크로서비스 애플리케이션 시스템을 신속하게 구축할 수 있도록 설계되었으며, 고성능, 사용 용이성, 쉬운 확장성을 갖추고 있습니다. 급속한 성장
