Dieses Projekt ist ein Springboot + Kafak-Integrationsprojekt und verwendet daher die Kafak-Verbrauchsanmerkung @KafkaListener in Springboot.
Konfigurieren Sie zunächst mehrere durch Kommas getrennte Themen in application.properties.
Methode: Verwenden Sie den SpEl-Ausdruck von Spring, um Themen wie folgt zu konfigurieren: @KafkaListener(topics = „#{’${topics}’.split(’,’)}“)
Run Das Programm und der Konsolendruckeffekt sind wie folgt:
Da nur ein Verbraucherthread geöffnet ist, werden alle Themen und Partitionen diesem Thread zugewiesen.
Wenn Sie mehrere Verbraucherthreads öffnen möchten, um diese Themen zu konsumieren, fügen Sie den Parameter concurrency der @KafkaListener-Annotation zur Anzahl der gewünschten Verbraucher hinzu (beachten Sie, dass die Anzahl der Verbraucher kleiner oder gleich der Anzahl sein muss). Verbraucher, die Sie möchten) Die Summe der Anzahl der Partitionen aller Themen Ändern Sie das Thema und konsumieren Sie es, während das Programm ausgeführt wird?
ans:Nach dem Versuch kann diese Anforderung nicht mit der Annotation @KafkaListener erfüllt werden Consumer basierend auf den @KafkaListener-Annotationsinformationen, um das angegebene Thema zu konsumieren. Wenn das Thema während der Ausführung des Programms geändert wird, ist es dem Verbraucher nicht gestattet, die Verbraucherkonfiguration zu ändern und das Thema anschließend erneut zu abonnieren.
Aber wir können einen Kompromiss finden, der darin besteht, den topicPattern-Parameter von @KafkaListener für den Themenabgleich zu verwenden.
Ultimative MethodeIdeeVerwenden Sie die native Client-Abhängigkeit von Kafka, initialisieren Sie Verbraucher manuell und starten Sie Verbraucher-Threads, anstatt @KafkaListener zu verwenden.
Im Verbraucherthread ruft jeder Zyklus die neuesten Themeninformationen aus der Konfiguration, Datenbank oder anderen Konfigurationsquellen ab, vergleicht sie mit dem vorherigen Thema und abonniert das Thema erneut oder initialisiert den Verbraucher, wenn Änderungen auftreten.Implementierung
Kafka-Client-Abhängigkeit hinzufügen (diese Testserver-Kafka-Version: 2.12-2.4.0)<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.3.0</version> </dependency>
@Service @Slf4j public class KafkaConsumers implements InitializingBean { /** * 消费者 */ private static KafkaConsumer<String, String> consumer; /** * topic */ private List<String> topicList; public static String getNewTopic() { try { return org.apache.commons.io.FileUtils.readLines(new File("D:/topic.txt"), "utf-8").get(0); } catch (IOException e) { e.printStackTrace(); } return null; } /** * 初始化消费者(配置写死是为了快速测试,请大家使用配置文件) * * @param topicList * @return */ public KafkaConsumer<String, String> getInitConsumer(List<String> topicList) { //配置信息 Properties props = new Properties(); //kafka服务器地址 props.put("bootstrap.servers", "192.168.9.185:9092"); //必须指定消费者组 props.put("group.id", "haha"); //设置数据key和value的序列化处理类 props.put("key.deserializer", StringDeserializer.class); props.put("value.deserializer", StringDeserializer.class); //创建消息者实例 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //订阅topic的消息 consumer.subscribe(topicList); return consumer; } /** * 开启消费者线程 * 异常请自己根据需求自己处理 */ @Override public void afterPropertiesSet() { // 初始化topic topicList = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic())); if (org.apache.commons.collections.CollectionUtils.isNotEmpty(topicList)) { consumer = getInitConsumer(topicList); // 开启一个消费者线程 new Thread(() -> { while (true) { // 模拟从配置源中获取最新的topic(字符串,逗号隔开) final List<String> newTopic = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic())); // 如果topic发生变化 if (!topicList.equals(newTopic)) { log.info("topic 发生变化:newTopic:{},oldTopic:{}-------------------------", newTopic, topicList); // method one:重新订阅topic: topicList = newTopic; consumer.subscribe(newTopic); // method two:关闭原来的消费者,重新初始化一个消费者 //consumer.close(); //topicList = newTopic; //consumer = getInitConsumer(newTopic); continue; } ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.println("key:" + record.key() + "" + ",value:" + record.value()); } } }).start(); } } }
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
Wenn Sie diese Konfiguration anpassen möchten, können Sie die
Laufergebnisse beim Initialisieren des Verbrauchers hinzufügen (im getesteten Thema sind keine Daten vorhanden)Hinweis: KafkaConsumer ist threadunsicher Verwenden Sie nicht eine KafkaConsumer-Instanz, um mehrere Verbraucher zu öffnen. Um mehrere Verbraucher zu öffnen, müssen Sie mehrere neue KafkaConsumer-Instanzen erstellen.
Das obige ist der detaillierte Inhalt vonSo geben Sie mit @KafkaListener in Springboot + Kafka dynamisch mehrere Themen an. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!