Rumah > Java > javaTutorial > Cara untuk menentukan berbilang topik secara dinamik dengan @KafkaListener dalam springboot+kafka

Cara untuk menentukan berbilang topik secara dinamik dengan @KafkaListener dalam springboot+kafka

WBOY
Lepaskan: 2023-05-20 20:58:19
ke hadapan
3709 orang telah melayarinya

Penerangan

Projek ini ialah projek integrasi springboot+kafak, jadi ia menggunakan anotasi penggunaan kafak @KafkaListener dalam springboot

Pertama, konfigurasi dalam application.properties dipisahkan dengan koma pelbagai topik.

Cara untuk menentukan berbilang topik secara dinamik dengan @KafkaListener dalam springboot+kafka

Kaedah: Gunakan ungkapan SpEl Spring untuk mengkonfigurasi topik sebagai: @KafkaListener(topik = “#{’${topik}’.split(’ ,’ )}”)

Cara untuk menentukan berbilang topik secara dinamik dengan @KafkaListener dalam springboot+kafka

Jalankan atur cara dan kesan cetakan konsol adalah seperti berikut: Urutan pengguna, jadi semua topik dan sekatan diperuntukkan kepada urutan ini.

Jika anda ingin membuka berbilang urutan pengguna untuk menggunakan topik ini, tambahkan parameter anotasi @KafkaListener Cara untuk menentukan berbilang topik secara dinamik dengan @KafkaListener dalam springboot+kafkakonkurensi

dan nilainya boleh menjadi bilangan pengguna yang anda inginkan (perhatikan bahawa penggunaan Bilangan ia mestilah kurang daripada atau sama dengan jumlah bilangan partition bagi semua topik yang telah anda buka)

Jalankan atur cara dan kesan cetakan konsol adalah seperti berikut:

Cara untuk menentukan berbilang topik secara dinamik dengan @KafkaListener dalam springboot+kafka

Untuk meringkaskan soalan yang paling kerap ditanya

Cara untuk menentukan berbilang topik secara dinamik dengan @KafkaListener dalam springboot+kafkaBagaimana untuk menukar topik semasa program sedang berjalan supaya pengguna boleh menggunakan topik yang diubah suai?

jawapan: Selepas mencuba, keperluan ini tidak boleh dicapai menggunakan anotasi @KafkaListener Apabila program bermula, program akan memulakan pengguna berdasarkan maklumat anotasi @KafkaListener untuk menggunakan. topik yang ditentukan. Jika topik diubah suai semasa program berjalan, pengguna tidak akan dibenarkan mengubah suai konfigurasi pengguna dan kemudian melanggan semula topik tersebut.

Walau bagaimanapun, kita boleh berkompromi, iaitu menggunakan parameter topicPattern @KafkaListener untuk pemadanan topik.

Kaedah muktamad

Idea

Gunakan pergantungan pelanggan asli Kafka, mulakan pengguna secara manual dan mulakan urutan pengguna dan bukannya menggunakan @KafkaListener.

Dalam urutan pengguna, setiap kitaran memperoleh maklumat topik terkini daripada konfigurasi, pangkalan data atau sumber konfigurasi lain, membandingkannya dengan topik sebelumnya dan jika perubahan berlaku, langgan semula topik atau mulakan pengguna.

Pelaksanaan

Tambahkan pergantungan pelanggan kafka (pelayan ujian ini versi kafka: 2.12-2.4.0)

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>2.3.0</version>
</dependency>
Salin selepas log masuk

Kod

@Service
@Slf4j
public class KafkaConsumers implements InitializingBean {

    /**
     * 消费者
     */
    private static KafkaConsumer<String, String> consumer;
    /**
     * topic
     */
    private List<String> topicList;

    public static String getNewTopic() {
        try {
            return org.apache.commons.io.FileUtils.readLines(new File("D:/topic.txt"), "utf-8").get(0);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 初始化消费者(配置写死是为了快速测试,请大家使用配置文件)
     *
     * @param topicList
     * @return
     */
    public KafkaConsumer<String, String> getInitConsumer(List<String> topicList) {
        //配置信息
        Properties props = new Properties();
        //kafka服务器地址
        props.put("bootstrap.servers", "192.168.9.185:9092");
        //必须指定消费者组
        props.put("group.id", "haha");
        //设置数据key和value的序列化处理类
        props.put("key.deserializer", StringDeserializer.class);
        props.put("value.deserializer", StringDeserializer.class);
        //创建消息者实例
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //订阅topic的消息
        consumer.subscribe(topicList);
        return consumer;
    }

    /**
     * 开启消费者线程
     * 异常请自己根据需求自己处理
     */
    @Override
    public void afterPropertiesSet() {
        // 初始化topic
        topicList = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic()));
        if (org.apache.commons.collections.CollectionUtils.isNotEmpty(topicList)) {
            consumer = getInitConsumer(topicList);
            // 开启一个消费者线程
            new Thread(() -> {
                while (true) {
                    // 模拟从配置源中获取最新的topic(字符串,逗号隔开)
                    final List<String> newTopic = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic()));
                    // 如果topic发生变化
                    if (!topicList.equals(newTopic)) {
                        log.info("topic 发生变化:newTopic:{},oldTopic:{}-------------------------", newTopic, topicList);
                        // method one:重新订阅topic:
                        topicList = newTopic;
                        consumer.subscribe(newTopic);
                        // method two:关闭原来的消费者,重新初始化一个消费者
                        //consumer.close();
                        //topicList = newTopic;
                        //consumer = getInitConsumer(newTopic);
                        continue;
                    }
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.println("key:" + record.key() + "" + ",value:" + record.value());
                    }
                }
            }).start();
        }
    }
}
Salin selepas log masuk

Mari kita bincangkannya Line 72 kod:

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
Salin selepas log masuk

Barisan kod di atas bermaksud: tunggu broker Kafka memulangkan data dalam masa 100ms Parameter pasar raya menentukan berapa lama selepas tinjauan pendapat boleh kembali, tidak kira sama ada terdapat data yang tersedia atau tidak.

Selepas mengubah suai topik, anda mesti menunggu sehingga mesej yang ditarik oleh tinjauan pendapat ini diproses dan mengesan perubahan dalam topik semasa gelung sementara (benar) sebelum anda boleh melanggan semula topik tersebut.

kaedah poll() Bilangan lalai mesej yang diperoleh dalam satu tarikan ialah: 500, seperti yang ditunjukkan dalam rajah di bawah, ditetapkan dalam kod sumber pelanggan kafka.

Jika anda ingin menyesuaikan konfigurasi ini, anda boleh menambah

Cara untuk menentukan berbilang topik secara dinamik dengan @KafkaListener dalam springboot+kafka

hasil yang sedang dijalankan (topik ujian) apabila memulakan pengguna Tiada data dalam semua)

Cara untuk menentukan berbilang topik secara dinamik dengan @KafkaListener dalam springboot+kafka

Nota: KafkaConsumer adalah thread-unsafe. Jangan gunakan satu contoh KafkaConsumer untuk membuka berbilang pengguna, anda memerlukan yang baharu Contoh KafkaConsumer.

Atas ialah kandungan terperinci Cara untuk menentukan berbilang topik secara dinamik dengan @KafkaListener dalam springboot+kafka. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Label berkaitan:
sumber:yisu.com
Kenyataan Laman Web ini
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn
Tutorial Popular
Lagi>
Muat turun terkini
Lagi>
kesan web
Kod sumber laman web
Bahan laman web
Templat hujung hadapan