springboot+kafka中@KafkaListener動態指定多個topic怎麼實現
說明
本項目為springboot kafak的整合項目,故其用了springboot中對kafak的消費註解@KafkaListener
首先,application.properties中配置用逗號隔開的多個topic。
方法:利用Spring的SpEl表達式,將topics 設定為:@KafkaListener(topics = “#{’${topics}’.split(’ ,’)}”)
運行程序,console打印的效果如下:
因為只開了一條消費者線程,所以所有的topic和分區都分配給這條線程。
如果你想開多條消費者線程去消費這些topic,添加@KafkaListener註解的參數concurrency的值為自己想要的消費者個數即可(注意,消費者數要小於等於你開的所有topic的分區數總和)
#運行程序,console打印的效果如下:
總結大家問的最多的一個問題
如何在程式運作的過程中,改變topic,消費者能夠消費修改後的topic?
ans: 經過嘗試,使用@KafkaListener註解實作不了這個需求,在程式啟動的時候,程式就會根據@KafkaListener的註解資訊初始化好消費者去消費指定好的topic。如果在程式運作的過程中,修改topic,不會讓此消費者修改消費者的配置再重新訂閱topic的。
不過我們可以有個摺疊中的辦法,就是利用@KafkaListener的topicPattern參數來進行topic比對。
終極方法
想法
使用 Kafka 原生客戶依賴,手動初始化消費者並啟動消費者線程,而不是使用 @KafkaListener。
在消費者線程中,每次循環都從配置、資料庫或其他配置來源獲取最新的topic信息,與先前的topic比較,如果發生變化,重新訂閱topic或初始化消費者。
實作
加入kafka客戶端依賴(本次測試服務端kafka版本:2.12-2.4.0)
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.3.0</version> </dependency>
程式碼
@Service @Slf4j public class KafkaConsumers implements InitializingBean { /** * 消费者 */ private static KafkaConsumer<String, String> consumer; /** * topic */ private List<String> topicList; public static String getNewTopic() { try { return org.apache.commons.io.FileUtils.readLines(new File("D:/topic.txt"), "utf-8").get(0); } catch (IOException e) { e.printStackTrace(); } return null; } /** * 初始化消费者(配置写死是为了快速测试,请大家使用配置文件) * * @param topicList * @return */ public KafkaConsumer<String, String> getInitConsumer(List<String> topicList) { //配置信息 Properties props = new Properties(); //kafka服务器地址 props.put("bootstrap.servers", "192.168.9.185:9092"); //必须指定消费者组 props.put("group.id", "haha"); //设置数据key和value的序列化处理类 props.put("key.deserializer", StringDeserializer.class); props.put("value.deserializer", StringDeserializer.class); //创建消息者实例 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //订阅topic的消息 consumer.subscribe(topicList); return consumer; } /** * 开启消费者线程 * 异常请自己根据需求自己处理 */ @Override public void afterPropertiesSet() { // 初始化topic topicList = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic())); if (org.apache.commons.collections.CollectionUtils.isNotEmpty(topicList)) { consumer = getInitConsumer(topicList); // 开启一个消费者线程 new Thread(() -> { while (true) { // 模拟从配置源中获取最新的topic(字符串,逗号隔开) final List<String> newTopic = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic())); // 如果topic发生变化 if (!topicList.equals(newTopic)) { log.info("topic 发生变化:newTopic:{},oldTopic:{}-------------------------", newTopic, topicList); // method one:重新订阅topic: topicList = newTopic; consumer.subscribe(newTopic); // method two:关闭原来的消费者,重新初始化一个消费者 //consumer.close(); //topicList = newTopic; //consumer = getInitConsumer(newTopic); continue; } ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.println("key:" + record.key() + "" + ",value:" + record.value()); } } }).start(); } } }
說一下第72行程式碼:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
上面這行程式碼表示:在100ms內等待Kafka的broker回傳資料.超市參數指定poll在多久之後可以回,不管有沒有可用的資料都要回傳。
在修改topic後,必須等到此次poll拉取的訊息處理完,while(true)循環的時候偵測topic發生變化,才能重新訂閱topic.
poll()方法一次拉取得訊息數預設為:500,如下圖,kafka客戶端原始碼中設定的。
如果想自訂此配置,可在初始化消費者時加入
執行結果(測試的topic中都無資料)
注意:KafkaConsumer是線程不安全的,不要用一個KafkaConsumer實例開啟多個消費者,要開啟多個消費者,需要new 多個KafkaConsumer實例。
以上是springboot+kafka中@KafkaListener動態指定多個topic怎麼實現的詳細內容。更多資訊請關注PHP中文網其他相關文章!

熱AI工具

Undresser.AI Undress
人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover
用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool
免費脫衣圖片

Clothoff.io
AI脫衣器

Video Face Swap
使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱門文章

熱工具

記事本++7.3.1
好用且免費的程式碼編輯器

SublimeText3漢化版
中文版,非常好用

禪工作室 13.0.1
強大的PHP整合開發環境

Dreamweaver CS6
視覺化網頁開發工具

SublimeText3 Mac版
神級程式碼編輯軟體(SublimeText3)

隨著網路和科技的發展,數位化投資已成為人們越來越關注的話題。許多投資人不斷探索研究投資策略,希望能獲得更高的投資報酬率。在股票交易中,即時的股票分析對決策非常重要,其中使用Kafka即時訊息隊列和PHP技術實現更是一種高效且實用的手段。一、Kafka介紹Kafka是由LinkedIn公司開發的一個高吞吐量的分散式發布、訂閱訊息系統。 Kafka的主要特點是

如何利用React和ApacheKafka來建立即時資料處理應用介紹:隨著大數據與即時資料處理的興起,建構即時資料處理應用成為了許多開發者的追求。 React作為一個流行的前端框架,與ApacheKafka作為一個高效能的分散式訊息系統的結合,可以幫助我們建立即時資料處理應用。本文將介紹如何利用React和ApacheKafka建構即時資料處理應用,並

SpringBoot和SpringMVC都是Java開發中常用的框架,但它們之間有一些明顯的差異。本文將探究這兩個框架的特點和用途,並對它們的差異進行比較。首先,我們來了解一下SpringBoot。 SpringBoot是由Pivotal團隊開發的,它旨在簡化基於Spring框架的應用程式的建立和部署。它提供了一種快速、輕量級的方式來建立獨立的、可執行

本文來寫個詳細的例子來說下dubbo+nacos+Spring Boot開發實戰。本文不會講述太多的理論的知識,會寫一個最簡單的例子來說明dubbo如何與nacos整合,快速建構開發環境。

如何選擇合適的Kafka視覺化工具?五款工具比較分析引言:Kafka是一種高效能、高吞吐量的分散式訊息佇列系統,被廣泛應用於大數據領域。隨著Kafka的流行,越來越多的企業和開發者需要一個視覺化工具來方便地監控和管理Kafka叢集。本文將介紹五款常用的Kafka視覺化工具,並比較它們的特色和功能,幫助讀者選擇適合自己需求的工具。一、KafkaManager

Kafka視覺化工具的五種選擇ApacheKafka是一個分散式串流處理平台,能夠處理大量即時資料。它廣泛用於建立即時資料管道、訊息佇列和事件驅動的應用程式。 Kafka的視覺化工具可以幫助使用者監控和管理Kafka集群,並且更好地理解Kafka資料流。以下是對五種流行的Kafka視覺化工具的介紹:ConfluentControlCenterConfluent

在RockyLinux上安裝ApacheKafka可以按照以下步驟進行操作:更新系統:首先,確保你的RockyLinux系統是最新的,執行以下命令更新系統軟體包:sudoyumupdate安裝Java:ApacheKafka依賴Java,因此需要先安裝JavaDevelopmentKit(JDKK )。可以透過以下指令安裝OpenJDK:sudoyuminstalljava-1.8.0-openjdk-devel下載和解壓縮:造訪ApacheKafka官方網站()下載最新的二進位套件。選擇一個穩定版本

近年來,隨著大數據的興起和活躍的開源社區,越來越多的企業開始尋找高效能的互動式資料處理系統來滿足日益增長的資料需求。在這場技術升級的浪潮中,go-zero和Kafka+Avro被越來越多的企業所關注和採用。 go-zero是一款基於Golang語言開發的微服務框架,具有高效能、易用、易於擴展、易於維護等特點,旨在幫助企業快速建立高效的微服務應用系統。它的快速成長得
