Table of Contents
Description
Summarize the most frequently asked question
Ultimate method
Idea
Implementation
Code
Home Java javaTutorial How to dynamically specify multiple topics with @KafkaListener in springboot+kafka

How to dynamically specify multiple topics with @KafkaListener in springboot+kafka

May 20, 2023 pm 08:58 PM
springboot kafka @kafkalistener

Description

This project is a springboot kafak integration project, so it uses the kafak consumption annotation @KafkaListener in springboot

First, configure application.properties separated by commas Multiple topics.

How to dynamically specify multiple topics with @KafkaListener in springboot+kafka

Method: Use Spring’s SpEl expression to configure topics as: @KafkaListener(topics = “#{’${topics}’.split(’ ,’)}”)

How to dynamically specify multiple topics with @KafkaListener in springboot+kafka

Run the program, and the console print effect is as follows:

How to dynamically specify multiple topics with @KafkaListener in springboot+kafka

Because it is only open A consumer thread, so all topics and partitions are assigned to this thread.

If you want to open multiple consumer threads to consume these topics, add the parameter concurrency of the @KafkaListener annotation. The value can be the number of consumers you want (note that consumption The number of partitions must be less than or equal to the total number of partitions of all topics you have opened)

How to dynamically specify multiple topics with @KafkaListener in springboot+kafka

Run the program, and the console printout will be as follows:

How to dynamically specify multiple topics with @KafkaListener in springboot+kafka

Summarize the most frequently asked question

How to change the topic while the program is running, so that consumers can consume the modified topic?

ans: After trying, this requirement cannot be achieved using the @KafkaListener annotation. When the program starts, the program will initialize the consumer based on the @KafkaListener annotation information to consume the specified topic. If the topic is modified while the program is running, the consumer will not be allowed to modify the consumer configuration and then re-subscribe to the topic.

However, we can have a compromise, which is to use the topicPattern parameter of @KafkaListener for topic matching.

Ultimate method

Idea

Use Kafka native client dependency, manually initialize the consumer and start the consumer thread instead of using @KafkaListener.

In the consumer thread, each cycle obtains the latest topic information from the configuration, database or other configuration sources, compares it with the previous topic, and if changes occur, resubscribe to the topic or initialize the consumer.

Implementation

Add kafka client dependency (this test server kafka version: 2.12-2.4.0)

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>2.3.0</version>
</dependency>
Copy after login

Code

@Service
@Slf4j
public class KafkaConsumers implements InitializingBean {

    /**
     * 消费者
     */
    private static KafkaConsumer<String, String> consumer;
    /**
     * topic
     */
    private List<String> topicList;

    public static String getNewTopic() {
        try {
            return org.apache.commons.io.FileUtils.readLines(new File("D:/topic.txt"), "utf-8").get(0);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 初始化消费者(配置写死是为了快速测试,请大家使用配置文件)
     *
     * @param topicList
     * @return
     */
    public KafkaConsumer<String, String> getInitConsumer(List<String> topicList) {
        //配置信息
        Properties props = new Properties();
        //kafka服务器地址
        props.put("bootstrap.servers", "192.168.9.185:9092");
        //必须指定消费者组
        props.put("group.id", "haha");
        //设置数据key和value的序列化处理类
        props.put("key.deserializer", StringDeserializer.class);
        props.put("value.deserializer", StringDeserializer.class);
        //创建消息者实例
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //订阅topic的消息
        consumer.subscribe(topicList);
        return consumer;
    }

    /**
     * 开启消费者线程
     * 异常请自己根据需求自己处理
     */
    @Override
    public void afterPropertiesSet() {
        // 初始化topic
        topicList = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic()));
        if (org.apache.commons.collections.CollectionUtils.isNotEmpty(topicList)) {
            consumer = getInitConsumer(topicList);
            // 开启一个消费者线程
            new Thread(() -> {
                while (true) {
                    // 模拟从配置源中获取最新的topic(字符串,逗号隔开)
                    final List<String> newTopic = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic()));
                    // 如果topic发生变化
                    if (!topicList.equals(newTopic)) {
                        log.info("topic 发生变化:newTopic:{},oldTopic:{}-------------------------", newTopic, topicList);
                        // method one:重新订阅topic:
                        topicList = newTopic;
                        consumer.subscribe(newTopic);
                        // method two:关闭原来的消费者,重新初始化一个消费者
                        //consumer.close();
                        //topicList = newTopic;
                        //consumer = getInitConsumer(newTopic);
                        continue;
                    }
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.println("key:" + record.key() + "" + ",value:" + record.value());
                    }
                }
            }).start();
        }
    }
}
Copy after login

Let’s talk about it Line 72 of code:

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
Copy after login

The above line of code means: wait for Kafka's broker to return data within 100ms. The supermarket parameter specifies how long after poll can return, regardless of whether there is available data or not.

After modifying the topic, you must wait until the messages pulled by this poll are processed, and detect changes in the topic during the while (true) loop before you can re-subscribe to the topic.

poll() method The default number of messages obtained in one pull is: 500, as shown in the figure below, set in the kafka client source code.

How to dynamically specify multiple topics with @KafkaListener in springboot+kafka

If you want to customize this configuration, you can add

How to dynamically specify multiple topics with @KafkaListener in springboot+kafka

running results (test topic) when initializing the consumer There is no data in all)

How to dynamically specify multiple topics with @KafkaListener in springboot+kafka

Note: KafkaConsumer is thread-unsafe. Do not use one KafkaConsumer instance to open multiple consumers. To open multiple consumers, you need new KafkaConsumer instance.

The above is the detailed content of How to dynamically specify multiple topics with @KafkaListener in springboot+kafka. For more information, please follow other related articles on the PHP Chinese website!

Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn

Hot AI Tools

Undresser.AI Undress

Undresser.AI Undress

AI-powered app for creating realistic nude photos

AI Clothes Remover

AI Clothes Remover

Online AI tool for removing clothes from photos.

Undress AI Tool

Undress AI Tool

Undress images for free

Clothoff.io

Clothoff.io

AI clothes remover

AI Hentai Generator

AI Hentai Generator

Generate AI Hentai for free.

Hot Article

R.E.P.O. Energy Crystals Explained and What They Do (Yellow Crystal)
2 weeks ago By 尊渡假赌尊渡假赌尊渡假赌
Hello Kitty Island Adventure: How To Get Giant Seeds
1 months ago By 尊渡假赌尊渡假赌尊渡假赌
Two Point Museum: All Exhibits And Where To Find Them
1 months ago By 尊渡假赌尊渡假赌尊渡假赌

Hot Tools

Notepad++7.3.1

Notepad++7.3.1

Easy-to-use and free code editor

SublimeText3 Chinese version

SublimeText3 Chinese version

Chinese version, very easy to use

Zend Studio 13.0.1

Zend Studio 13.0.1

Powerful PHP integrated development environment

Dreamweaver CS6

Dreamweaver CS6

Visual web development tools

SublimeText3 Mac version

SublimeText3 Mac version

God-level code editing software (SublimeText3)

How to implement real-time stock analysis using PHP and Kafka How to implement real-time stock analysis using PHP and Kafka Jun 28, 2023 am 10:04 AM

With the development of the Internet and technology, digital investment has become a topic of increasing concern. Many investors continue to explore and study investment strategies, hoping to obtain a higher return on investment. In stock trading, real-time stock analysis is very important for decision-making, and the use of Kafka real-time message queue and PHP technology is an efficient and practical means. 1. Introduction to Kafka Kafka is a high-throughput distributed publish and subscribe messaging system developed by LinkedIn. The main features of Kafka are

Comparison and difference analysis between SpringBoot and SpringMVC Comparison and difference analysis between SpringBoot and SpringMVC Dec 29, 2023 am 11:02 AM

SpringBoot and SpringMVC are both commonly used frameworks in Java development, but there are some obvious differences between them. This article will explore the features and uses of these two frameworks and compare their differences. First, let's learn about SpringBoot. SpringBoot was developed by the Pivotal team to simplify the creation and deployment of applications based on the Spring framework. It provides a fast, lightweight way to build stand-alone, executable

Five selections of visualization tools for exploring Kafka Five selections of visualization tools for exploring Kafka Feb 01, 2024 am 08:03 AM

Five options for Kafka visualization tools ApacheKafka is a distributed stream processing platform capable of processing large amounts of real-time data. It is widely used to build real-time data pipelines, message queues, and event-driven applications. Kafka's visualization tools can help users monitor and manage Kafka clusters and better understand Kafka data flows. The following is an introduction to five popular Kafka visualization tools: ConfluentControlCenterConfluent

SpringBoot+Dubbo+Nacos development practical tutorial SpringBoot+Dubbo+Nacos development practical tutorial Aug 15, 2023 pm 04:49 PM

This article will write a detailed example to talk about the actual development of dubbo+nacos+Spring Boot. This article will not cover too much theoretical knowledge, but will write the simplest example to illustrate how dubbo can be integrated with nacos to quickly build a development environment.

Comparative analysis of kafka visualization tools: How to choose the most appropriate tool? Comparative analysis of kafka visualization tools: How to choose the most appropriate tool? Jan 05, 2024 pm 12:15 PM

How to choose the right Kafka visualization tool? Comparative analysis of five tools Introduction: Kafka is a high-performance, high-throughput distributed message queue system that is widely used in the field of big data. With the popularity of Kafka, more and more enterprises and developers need a visual tool to easily monitor and manage Kafka clusters. This article will introduce five commonly used Kafka visualization tools and compare their features and functions to help readers choose the tool that suits their needs. 1. KafkaManager

How to install Apache Kafka on Rocky Linux? How to install Apache Kafka on Rocky Linux? Mar 01, 2024 pm 10:37 PM

To install ApacheKafka on RockyLinux, you can follow the following steps: Update system: First, make sure your RockyLinux system is up to date, execute the following command to update the system package: sudoyumupdate Install Java: ApacheKafka depends on Java, so you need to install JavaDevelopmentKit (JDK) first ). OpenJDK can be installed through the following command: sudoyuminstalljava-1.8.0-openjdk-devel Download and decompress: Visit the ApacheKafka official website () to download the latest binary package. Choose a stable version

What are the commonly used directories for springBoot projects? What are the commonly used directories for springBoot projects? Jun 27, 2023 pm 01:42 PM

Commonly used directories for springBoot projects. The directory structure and naming specifications of springBoot projects are introduced based on the directory structure and naming specifications during SpringBoot development. Through the introduction, we can help you solve the problem. How to plan the directory structure in actual projects? How to name directories more standardizedly? What do each directory mean? Wait three questions. Directory description servicex//Project name|-admin-ui//Management service front-end code (usually UI and SERVICE are put into one project for easy management)|-servicex-auth//Module 1|-servicex-common//Module 2|-servicex-gateway//Module 3|

The practice of go-zero and Kafka+Avro: building a high-performance interactive data processing system The practice of go-zero and Kafka+Avro: building a high-performance interactive data processing system Jun 23, 2023 am 09:04 AM

In recent years, with the rise of big data and active open source communities, more and more enterprises have begun to look for high-performance interactive data processing systems to meet the growing data needs. In this wave of technology upgrades, go-zero and Kafka+Avro are being paid attention to and adopted by more and more enterprises. go-zero is a microservice framework developed based on the Golang language. It has the characteristics of high performance, ease of use, easy expansion, and easy maintenance. It is designed to help enterprises quickly build efficient microservice application systems. its rapid growth

See all articles