


How to dynamically specify multiple topics with @KafkaListener in springboot+kafka
Description
This project is a springboot kafak integration project, so it uses the kafak consumption annotation @KafkaListener in springboot
First, configure application.properties separated by commas Multiple topics.
Method: Use Spring’s SpEl expression to configure topics as: @KafkaListener(topics = “#{’${topics}’.split(’ ,’)}”)
Run the program, and the console print effect is as follows:
Because it is only open A consumer thread, so all topics and partitions are assigned to this thread.
If you want to open multiple consumer threads to consume these topics, add the parameter concurrency of the @KafkaListener annotation. The value can be the number of consumers you want (note that consumption The number of partitions must be less than or equal to the total number of partitions of all topics you have opened)
Run the program, and the console printout will be as follows:
Summarize the most frequently asked question
How to change the topic while the program is running, so that consumers can consume the modified topic?
ans: After trying, this requirement cannot be achieved using the @KafkaListener annotation. When the program starts, the program will initialize the consumer based on the @KafkaListener annotation information to consume the specified topic. If the topic is modified while the program is running, the consumer will not be allowed to modify the consumer configuration and then re-subscribe to the topic.
However, we can have a compromise, which is to use the topicPattern parameter of @KafkaListener for topic matching.
Ultimate method
Idea
Use Kafka native client dependency, manually initialize the consumer and start the consumer thread instead of using @KafkaListener.
In the consumer thread, each cycle obtains the latest topic information from the configuration, database or other configuration sources, compares it with the previous topic, and if changes occur, resubscribe to the topic or initialize the consumer.
Implementation
Add kafka client dependency (this test server kafka version: 2.12-2.4.0)
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.3.0</version> </dependency>
Code
@Service @Slf4j public class KafkaConsumers implements InitializingBean { /** * 消费者 */ private static KafkaConsumer<String, String> consumer; /** * topic */ private List<String> topicList; public static String getNewTopic() { try { return org.apache.commons.io.FileUtils.readLines(new File("D:/topic.txt"), "utf-8").get(0); } catch (IOException e) { e.printStackTrace(); } return null; } /** * 初始化消费者(配置写死是为了快速测试,请大家使用配置文件) * * @param topicList * @return */ public KafkaConsumer<String, String> getInitConsumer(List<String> topicList) { //配置信息 Properties props = new Properties(); //kafka服务器地址 props.put("bootstrap.servers", "192.168.9.185:9092"); //必须指定消费者组 props.put("group.id", "haha"); //设置数据key和value的序列化处理类 props.put("key.deserializer", StringDeserializer.class); props.put("value.deserializer", StringDeserializer.class); //创建消息者实例 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //订阅topic的消息 consumer.subscribe(topicList); return consumer; } /** * 开启消费者线程 * 异常请自己根据需求自己处理 */ @Override public void afterPropertiesSet() { // 初始化topic topicList = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic())); if (org.apache.commons.collections.CollectionUtils.isNotEmpty(topicList)) { consumer = getInitConsumer(topicList); // 开启一个消费者线程 new Thread(() -> { while (true) { // 模拟从配置源中获取最新的topic(字符串,逗号隔开) final List<String> newTopic = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic())); // 如果topic发生变化 if (!topicList.equals(newTopic)) { log.info("topic 发生变化:newTopic:{},oldTopic:{}-------------------------", newTopic, topicList); // method one:重新订阅topic: topicList = newTopic; consumer.subscribe(newTopic); // method two:关闭原来的消费者,重新初始化一个消费者 //consumer.close(); //topicList = newTopic; //consumer = getInitConsumer(newTopic); continue; } ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.println("key:" + record.key() + "" + ",value:" + record.value()); } } }).start(); } } }
Let’s talk about it Line 72 of code:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
The above line of code means: wait for Kafka's broker to return data within 100ms. The supermarket parameter specifies how long after poll can return, regardless of whether there is available data or not.
After modifying the topic, you must wait until the messages pulled by this poll are processed, and detect changes in the topic during the while (true) loop before you can re-subscribe to the topic.
poll() method The default number of messages obtained in one pull is: 500, as shown in the figure below, set in the kafka client source code.
If you want to customize this configuration, you can add
running results (test topic) when initializing the consumer There is no data in all)
Note: KafkaConsumer is thread-unsafe. Do not use one KafkaConsumer instance to open multiple consumers. To open multiple consumers, you need new KafkaConsumer instance.
The above is the detailed content of How to dynamically specify multiple topics with @KafkaListener in springboot+kafka. For more information, please follow other related articles on the PHP Chinese website!

Hot AI Tools

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Undress AI Tool
Undress images for free

Clothoff.io
AI clothes remover

AI Hentai Generator
Generate AI Hentai for free.

Hot Article

Hot Tools

Notepad++7.3.1
Easy-to-use and free code editor

SublimeText3 Chinese version
Chinese version, very easy to use

Zend Studio 13.0.1
Powerful PHP integrated development environment

Dreamweaver CS6
Visual web development tools

SublimeText3 Mac version
God-level code editing software (SublimeText3)

Hot Topics

With the development of the Internet and technology, digital investment has become a topic of increasing concern. Many investors continue to explore and study investment strategies, hoping to obtain a higher return on investment. In stock trading, real-time stock analysis is very important for decision-making, and the use of Kafka real-time message queue and PHP technology is an efficient and practical means. 1. Introduction to Kafka Kafka is a high-throughput distributed publish and subscribe messaging system developed by LinkedIn. The main features of Kafka are

SpringBoot and SpringMVC are both commonly used frameworks in Java development, but there are some obvious differences between them. This article will explore the features and uses of these two frameworks and compare their differences. First, let's learn about SpringBoot. SpringBoot was developed by the Pivotal team to simplify the creation and deployment of applications based on the Spring framework. It provides a fast, lightweight way to build stand-alone, executable

Five options for Kafka visualization tools ApacheKafka is a distributed stream processing platform capable of processing large amounts of real-time data. It is widely used to build real-time data pipelines, message queues, and event-driven applications. Kafka's visualization tools can help users monitor and manage Kafka clusters and better understand Kafka data flows. The following is an introduction to five popular Kafka visualization tools: ConfluentControlCenterConfluent

This article will write a detailed example to talk about the actual development of dubbo+nacos+Spring Boot. This article will not cover too much theoretical knowledge, but will write the simplest example to illustrate how dubbo can be integrated with nacos to quickly build a development environment.

How to choose the right Kafka visualization tool? Comparative analysis of five tools Introduction: Kafka is a high-performance, high-throughput distributed message queue system that is widely used in the field of big data. With the popularity of Kafka, more and more enterprises and developers need a visual tool to easily monitor and manage Kafka clusters. This article will introduce five commonly used Kafka visualization tools and compare their features and functions to help readers choose the tool that suits their needs. 1. KafkaManager

To install ApacheKafka on RockyLinux, you can follow the following steps: Update system: First, make sure your RockyLinux system is up to date, execute the following command to update the system package: sudoyumupdate Install Java: ApacheKafka depends on Java, so you need to install JavaDevelopmentKit (JDK) first ). OpenJDK can be installed through the following command: sudoyuminstalljava-1.8.0-openjdk-devel Download and decompress: Visit the ApacheKafka official website () to download the latest binary package. Choose a stable version

Commonly used directories for springBoot projects. The directory structure and naming specifications of springBoot projects are introduced based on the directory structure and naming specifications during SpringBoot development. Through the introduction, we can help you solve the problem. How to plan the directory structure in actual projects? How to name directories more standardizedly? What do each directory mean? Wait three questions. Directory description servicex//Project name|-admin-ui//Management service front-end code (usually UI and SERVICE are put into one project for easy management)|-servicex-auth//Module 1|-servicex-common//Module 2|-servicex-gateway//Module 3|

In recent years, with the rise of big data and active open source communities, more and more enterprises have begun to look for high-performance interactive data processing systems to meet the growing data needs. In this wave of technology upgrades, go-zero and Kafka+Avro are being paid attention to and adopted by more and more enterprises. go-zero is a microservice framework developed based on the Golang language. It has the characteristics of high performance, ease of use, easy expansion, and easy maintenance. It is designed to help enterprises quickly build efficient microservice application systems. its rapid growth
