Projek ini ialah projek integrasi springboot+kafak, jadi ia menggunakan anotasi penggunaan kafak @KafkaListener dalam springboot
Pertama, konfigurasi dalam application.properties dipisahkan dengan koma pelbagai topik.
Kaedah: Gunakan ungkapan SpEl Spring untuk mengkonfigurasi topik sebagai: @KafkaListener(topik = “#{’${topik}’.split(’ ,’ )}”)
Jalankan atur cara dan kesan cetakan konsol adalah seperti berikut: Urutan pengguna, jadi semua topik dan sekatan diperuntukkan kepada urutan ini.
Jika anda ingin membuka berbilang urutan pengguna untuk menggunakan topik ini, tambahkan parameter anotasi @KafkaListener konkurensi
dan nilainya boleh menjadi bilangan pengguna yang anda inginkan (perhatikan bahawa penggunaan Bilangan ia mestilah kurang daripada atau sama dengan jumlah bilangan partition bagi semua topik yang telah anda buka)Jalankan atur cara dan kesan cetakan konsol adalah seperti berikut:
Untuk meringkaskan soalan yang paling kerap ditanya
Bagaimana untuk menukar topik semasa program sedang berjalan supaya pengguna boleh menggunakan topik yang diubah suai?
jawapan: Selepas mencuba, keperluan ini tidak boleh dicapai menggunakan anotasi @KafkaListener Apabila program bermula, program akan memulakan pengguna berdasarkan maklumat anotasi @KafkaListener untuk menggunakan. topik yang ditentukan. Jika topik diubah suai semasa program berjalan, pengguna tidak akan dibenarkan mengubah suai konfigurasi pengguna dan kemudian melanggan semula topik tersebut.
Walau bagaimanapun, kita boleh berkompromi, iaitu menggunakan parameter topicPattern @KafkaListener untuk pemadanan topik.Kaedah muktamad
IdeaGunakan pergantungan pelanggan asli Kafka, mulakan pengguna secara manual dan mulakan urutan pengguna dan bukannya menggunakan @KafkaListener. Dalam urutan pengguna, setiap kitaran memperoleh maklumat topik terkini daripada konfigurasi, pangkalan data atau sumber konfigurasi lain, membandingkannya dengan topik sebelumnya dan jika perubahan berlaku, langgan semula topik atau mulakan pengguna. PelaksanaanTambahkan pergantungan pelanggan kafka (pelayan ujian ini versi kafka: 2.12-2.4.0)<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.3.0</version> </dependency>
@Service @Slf4j public class KafkaConsumers implements InitializingBean { /** * 消费者 */ private static KafkaConsumer<String, String> consumer; /** * topic */ private List<String> topicList; public static String getNewTopic() { try { return org.apache.commons.io.FileUtils.readLines(new File("D:/topic.txt"), "utf-8").get(0); } catch (IOException e) { e.printStackTrace(); } return null; } /** * 初始化消费者(配置写死是为了快速测试,请大家使用配置文件) * * @param topicList * @return */ public KafkaConsumer<String, String> getInitConsumer(List<String> topicList) { //配置信息 Properties props = new Properties(); //kafka服务器地址 props.put("bootstrap.servers", "192.168.9.185:9092"); //必须指定消费者组 props.put("group.id", "haha"); //设置数据key和value的序列化处理类 props.put("key.deserializer", StringDeserializer.class); props.put("value.deserializer", StringDeserializer.class); //创建消息者实例 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //订阅topic的消息 consumer.subscribe(topicList); return consumer; } /** * 开启消费者线程 * 异常请自己根据需求自己处理 */ @Override public void afterPropertiesSet() { // 初始化topic topicList = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic())); if (org.apache.commons.collections.CollectionUtils.isNotEmpty(topicList)) { consumer = getInitConsumer(topicList); // 开启一个消费者线程 new Thread(() -> { while (true) { // 模拟从配置源中获取最新的topic(字符串,逗号隔开) final List<String> newTopic = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic())); // 如果topic发生变化 if (!topicList.equals(newTopic)) { log.info("topic 发生变化:newTopic:{},oldTopic:{}-------------------------", newTopic, topicList); // method one:重新订阅topic: topicList = newTopic; consumer.subscribe(newTopic); // method two:关闭原来的消费者,重新初始化一个消费者 //consumer.close(); //topicList = newTopic; //consumer = getInitConsumer(newTopic); continue; } ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.println("key:" + record.key() + "" + ",value:" + record.value()); } } }).start(); } } }
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
kaedah poll() Bilangan lalai mesej yang diperoleh dalam satu tarikan ialah: 500, seperti yang ditunjukkan dalam rajah di bawah, ditetapkan dalam kod sumber pelanggan kafka.
Jika anda ingin menyesuaikan konfigurasi ini, anda boleh menambah
hasil yang sedang dijalankan (topik ujian) apabila memulakan pengguna Tiada data dalam semua)
Nota: KafkaConsumer adalah thread-unsafe. Jangan gunakan satu contoh KafkaConsumer untuk membuka berbilang pengguna, anda memerlukan yang baharu Contoh KafkaConsumer.
Atas ialah kandungan terperinci Cara untuk menentukan berbilang topik secara dinamik dengan @KafkaListener dalam springboot+kafka. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!