Maison > Java > javaDidacticiel > Comment utiliser @KafkaListener pour recevoir simultanément des messages par lots dans Spring Boot

Comment utiliser @KafkaListener pour recevoir simultanément des messages par lots dans Spring Boot

WBOY
Libérer: 2023-05-13 14:01:06
avant
1696 Les gens l'ont consulté

###La première étape, la consommation simultanée###
Regardez d'abord le code. Le point clé est que nous utilisons ConcurrentKafkaListenerContainerFactory et définissons factory.setConcurrency(4); consommation, il sera simultané Réglé sur 4, c'est-à-dire qu'il y a 4 KafkaMessageListenerContainers)

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(4);
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
Copier après la connexion

Notez que vous pouvez également ajouter spring.kafka.listener.concurrency=3 directement à application.properties, puis utiliser @KafkaListener pour une consommation simultanée .

###La deuxième étape est la consommation par lots###
Ensuite, la consommation par lots. Les points clés sont factory.setBatchListener(true);
et propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
L'un consiste à activer la consommation par lots et l'autre à définir le nombre maximum d'enregistrements de messages que la consommation par lots peut consommer chacun. temps.

Il est important de noter que le ConsumerConfig.MAX_POLL_RECORDS_CONFIG que nous définissons est de 50, ce qui ne signifie pas que nous continuerons d'attendre si 50 messages ne sont pas atteints. L'explication officielle est "Le nombre maximum d'enregistrements renvoyés dans un seul appel à poll()." Autrement dit, 50 représente le nombre maximum d'enregistrements renvoyés dans un sondage.

Vous pouvez voir dans le journal de démarrage qu'il y a max.poll.interval.ms = 300000, ce qui signifie que nous appelons poll une fois par intervalle max.poll.interval.ms. Chaque sondage renvoie jusqu'à 50 enregistrements.

L'explication officielle de max.poll.interval.ms est "Le délai maximum entre les invocations de poll() lors de l'utilisation de la gestion des groupes de consommateurs. Cela impose une limite supérieure à la durée pendant laquelle le consommateur peut être inactif avant de récupérer plus d'enregistrements. Si poll() n'est pas appelé avant l'expiration de ce délai, alors le consommateur est considéré comme ayant échoué et le groupe va rééquilibrer afin de réaffecter les partitions à un autre membre ";

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(4);
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

   @Bean
    public Map consumerConfigs() {
        Map propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getBroker());
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, propsConfig.getEnableAutoCommit());
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, propsConfig.getGroupId());
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, propsConfig.getAutoOffsetReset());
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
        return propsMap;
    }
Copier après la connexion

Capture d'écran du journal de démarrage

Comment utiliser @KafkaListener pour recevoir simultanément des messages par lots dans Spring Boot

À propos de max.poll. Capture d'écran de l'explication officielle de .records et max.poll.interval.ms :

Comment utiliser @KafkaListener pour recevoir simultanément des messages par lots dans Spring Boot

###La troisième étape, la consommation de partition###
Pour un sujet avec une seule partition, la consommation de partition n'est pas nécessaire car elle n'a aucun sens. L'exemple suivant concerne le cas où il y a 2 partitions (il y a 4 méthodes ListenPartitionX dans mon code complet, et mon sujet a 4 partitions définies. Les lecteurs peuvent l'ajuster en fonction de leur propre situation).

public class MyListener {
    private static final String TPOIC = "topic02";

    @KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "0" }) })
    public void listenPartition0(List<ConsumerRecord<?, ?>> records) {
        log.info("Id0 Listener, Thread ID: " + Thread.currentThread().getId());
        log.info("Id0 records size " +  records.size());

        for (ConsumerRecord<?, ?> record : records) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            log.info("Received: " + record);
            if (kafkaMessage.isPresent()) {
                Object message = record.value();
                String topic = record.topic();
                log.info("p0 Received message={}",  message);
            }
        }
    }

    @KafkaListener(id = "id1", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "1" }) })
    public void listenPartition1(List<ConsumerRecord<?, ?>> records) {
        log.info("Id1 Listener, Thread ID: " + Thread.currentThread().getId());
        log.info("Id1 records size " +  records.size());

        for (ConsumerRecord<?, ?> record : records) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            log.info("Received: " + record);
            if (kafkaMessage.isPresent()) {
                Object message = record.value();
                String topic = record.topic();
                log.info("p1 Received message={}",  message);
            }
        }
}
Copier après la connexion

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Étiquettes associées:
source:yisu.com
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal