Comment configurer Dual Kafka dans Springboot
Springboot configure Dual Kafka
Utilisez Spring Boot 2.0.8.RELEASE version
Introduisez le pot Maven Kafka et préparez deux Kafka
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
Configurez le fichier de configuration yml
spring: kafka: bootstrap-servers: 180.167.180.242:9092 #kafka的访问地址,多个用","隔开 consumer: enable-auto-commit: true group-id: kafka #群组ID outkafka: bootstrap-servers: localhost:9092 #kafka的访问地址,多个用","隔开 consumer: enable-auto-commit: true group-id: kafka_1 #群组ID
Configurez la classe KafkaConfig
import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; @Configuration @EnableKafka public class KafkaConfig { @Value("${spring.kafka.bootstrap-servers}") private String innerServers; @Value("${spring.kafka.consumer.group-id}") private String innerGroupid; @Value("${spring.kafka.consumer.enable-auto-commit}") private String innerEnableAutoCommit; @Bean @Primary//理解为默认优先选择当前容器下的消费者工厂 KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean//第一个消费者工厂的bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, innerServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, innerGroupid); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, innerEnableAutoCommit); // props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); // props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } @Bean //生产者工厂配置 public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(senderProps()); } @Bean //kafka发送消息模板 public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<String, String>(producerFactory()); } /** * 生产者配置方法 * * 生产者有三个必选属性 * <p> * 1.bootstrap.servers broker地址清单,清单不要包含所有的broker地址, * 生产者会从给定的broker里查找到其他broker的信息。不过建议至少提供两个broker信息,一旦 其中一个宕机,生产者仍能能够连接到集群上。 * </p> * <p> * 2.key.serializer broker希望接收到的消息的键和值都是字节数组。 生产者用对应的类把键对象序列化成字节数组。 * </p> * <p> * 3.value.serializer 值得序列化方式 * </p> * * * @return */ private Map<String, Object> senderProps() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, innerServers); /** * 当从broker接收到的是临时可恢复的异常时,生产者会向broker重发消息,但是不能无限 * 制重发,如果重发次数达到限制值,生产者将不会重试并返回错误。 * 通过retries属性设置。默认情况下生产者会在重试后等待100ms,可以通过 retries.backoff.ms属性进行修改 */ props.put(ProducerConfig.RETRIES_CONFIG, 0); /** * 在考虑完成请求之前,生产者要求leader收到的确认数量。这可以控制发送记录的持久性。允许以下设置: * <ul> * <li> * <code> acks = 0 </ code>如果设置为零,则生产者将不会等待来自服务器的任何确认。该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且 * <code>retries </ code>配置将不会生效(因为客户端通常不会知道任何故障)。为每条记录返回的偏移量始终设置为-1。 * <li> <code> acks = 1 </code> * 这意味着leader会将记录写入其本地日志,但无需等待所有follower的完全确认即可做出回应。在这种情况下, * 如果leader在确认记录后立即失败但在关注者复制之前,则记录将丢失。 * <li><code> acks = all </code> * 这意味着leader将等待完整的同步副本集以确认记录。这保证了只要至少一个同步副本仍然存活,记录就不会丢失。这是最强有力的保证。 * 这相当于acks = -1设置 */ props.put(ProducerConfig.ACKS_CONFIG, "1"); /** * 当有多条消息要被发送到统一分区是,生产者会把他们放到统一批里。kafka通过批次的概念来 提高吞吐量,但是也会在增加延迟。 */ // 以下配置当缓存数量达到16kb,就会触发网络请求,发送消息 // props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 每条消息在缓存中的最长时间,如果超过这个时间就会忽略batch.size的限制,由客户端立即将消息发送出去 // props.put(ProducerConfig.LINGER_MS_CONFIG, 1); // props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } @Value("${spring.outkafka.bootstrap-servers}") private String outServers; @Value("${spring.outkafka.consumer.group-id}") private String outGroupid; @Value("${spring.outkafka.consumer.enable-auto-commit}") private String outEnableAutoCommit; static { } /** * 连接第二个kafka集群的配置 */ @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactoryOutSchedule() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactoryOutSchedule()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public ConsumerFactory<Integer, String> consumerFactoryOutSchedule() { return new DefaultKafkaConsumerFactory<>(consumerConfigsOutSchedule()); } /** * 连接第二个集群的消费者配置 */ @Bean public Map<String, Object> consumerConfigsOutSchedule() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, outServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, outGroupid); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, outEnableAutoCommit); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } @Bean //生产者工厂配置 public ProducerFactory<String, String> producerOutFactory() { return new DefaultKafkaProducerFactory<>(senderOutProps()); } @Bean //kafka发送消息模板 public KafkaTemplate<String, String> kafkaOutTemplate() { return new KafkaTemplate<String, String>(producerOutFactory()); } /** * 生产者配置方法 * * 生产者有三个必选属性 * <p> * 1.bootstrap.servers broker地址清单,清单不要包含所有的broker地址, * 生产者会从给定的broker里查找到其他broker的信息。不过建议至少提供两个broker信息,一旦 其中一个宕机,生产者仍能能够连接到集群上。 * </p> * <p> * 2.key.serializer broker希望接收到的消息的键和值都是字节数组。 生产者用对应的类把键对象序列化成字节数组。 * </p> * <p> * 3.value.serializer 值得序列化方式 * </p> * * * @return */ private Map<String, Object> senderOutProps() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, outServers); /** * 当从broker接收到的是临时可恢复的异常时,生产者会向broker重发消息,但是不能无限 * 制重发,如果重发次数达到限制值,生产者将不会重试并返回错误。 * 通过retries属性设置。默认情况下生产者会在重试后等待100ms,可以通过 retries.backoff.ms属性进行修改 */ props.put(ProducerConfig.RETRIES_CONFIG, 0); /** * 在考虑完成请求之前,生产者要求leader收到的确认数量。这可以控制发送记录的持久性。允许以下设置: * <ul> * <li> * <code> acks = 0 </ code>如果设置为零,则生产者将不会等待来自服务器的任何确认。该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且 * <code>retries </ code>配置将不会生效(因为客户端通常不会知道任何故障)。为每条记录返回的偏移量始终设置为-1。 * <li> <code> acks = 1 </code> * 这意味着leader会将记录写入其本地日志,但无需等待所有follower的完全确认即可做出回应。在这种情况下, * 如果leader在确认记录后立即失败但在关注者复制之前,则记录将丢失。 * <li><code> acks = all </code> * 这意味着leader将等待完整的同步副本集以确认记录。这保证了只要至少一个同步副本仍然存活,记录就不会丢失。这是最强有力的保证。 * 这相当于acks = -1设置 */ props.put(ProducerConfig.ACKS_CONFIG, "1"); /** * 当有多条消息要被发送到统一分区是,生产者会把他们放到统一批里。kafka通过批次的概念来 提高吞吐量,但是也会在增加延迟。 */ // 以下配置当缓存数量达到16kb,就会触发网络请求,发送消息 // props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 每条消息在缓存中的最长时间,如果超过这个时间就会忽略batch.size的限制,由客户端立即将消息发送出去 // props.put(ProducerConfig.LINGER_MS_CONFIG, 1); // props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } }
Envoyez la classe d'outil MyKafkaProducer ;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; /** * <p> * <b>KafkaProducer Description:</b> kafka生产者 * </p> * * @author douzaixing<b>DATE</b> 2019年7月8日 下午4:09:29 */ @Component // 这个必须加入容器不然,不会执行 @EnableScheduling // 这里是为了测试加入定时调度 @Slf4j public class MyKafkaProducer { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Autowired private KafkaTemplate<String, String> kafkaOutTemplate; public ListenableFuture<SendResult<String, String>> send(String topic, String key, String json) { ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topic, key, json); log.info("inner kafka send #topic=" + topic + "#key=" + key + "#json=" + json + "#推送成功==========="); return result; } public ListenableFuture<SendResult<String, String>> sendOut(String topic, String key, String json) { ListenableFuture<SendResult<String, String>> result = kafkaOutTemplate.send(topic, key, json); log.info("out kafka send #topic=" + topic + "#key=" + key + "#json=" + json + "#推送成功==========="); return result; } }
Classe de test
@Slf4j @RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest(classes={OesBcServiceApplication.class}) public class MoreKafkaTest { @Autowired private MyKafkaProducer kafkaProducer; @Test public void sendInner() { for (int i = 0; i < 1; i++) { kafkaProducer.send("inner_test", "douzi" + i, "liyuehua" + i); kafkaProducer.sendOut("out_test", "douziout" + i, "fanbingbing" + i); } } }
Classe de réception
@Component @Slf4j public class KafkaConsumer { @KafkaListener(topics={"inner_test"}, containerFactory="kafkaListenerContainerFactory") public void innerlistener(ConsumerRecord<String, String> record) { log.info("inner kafka receive #key=" + record.key() + "#value=" + record.value()); } @KafkaListener(topics={"out_test"}, containerFactory="kafkaListenerContainerFactoryOutSchedule") public void outListener(ConsumerRecord<String, String> record) { log.info("out kafka receive #key=" + record.key() + "#value=" + record.value()); } }
Résultats des tests
07-11 12:41:27.811 INFO [com.wondertek.oes.bc.service.send.MyKafkaProducer] - envoi de kafka interne #topic=inner_test#key =douzi0#json=liyuehua0#Push réussi===========
07-11 12:41:27.995 INFO [com.wondertek.oes.bc.service.send.KafkaConsumer] - réception kafka intérieure #key=douzi0#value=liyuehua0
07-11 12:41:28.005 INFO [com.wondertek.oes.bc.service.send.MyKafkaProducer] - envoi de kafka #topic=out_test#key=douziout0#json=fanbingbing0# Push réussi ===========
07-11 12:41:28.013 INFO [com.wondertek.oes.bc.service.send.KafkaConsumer] - sortie kafka reçoit #key=douziout0#value=fanbingbing0
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Outils d'IA chauds

Undresser.AI Undress
Application basée sur l'IA pour créer des photos de nu réalistes

AI Clothes Remover
Outil d'IA en ligne pour supprimer les vêtements des photos.

Undress AI Tool
Images de déshabillage gratuites

Clothoff.io
Dissolvant de vêtements AI

AI Hentai Generator
Générez AI Hentai gratuitement.

Article chaud

Outils chauds

Bloc-notes++7.3.1
Éditeur de code facile à utiliser et gratuit

SublimeText3 version chinoise
Version chinoise, très simple à utiliser

Envoyer Studio 13.0.1
Puissant environnement de développement intégré PHP

Dreamweaver CS6
Outils de développement Web visuel

SublimeText3 version Mac
Logiciel d'édition de code au niveau de Dieu (SublimeText3)

Avec le développement d’Internet et de la technologie, l’investissement numérique est devenu un sujet de préoccupation croissant. De nombreux investisseurs continuent d’explorer et d’étudier des stratégies d’investissement, dans l’espoir d’obtenir un retour sur investissement plus élevé. Dans le domaine du trading d'actions, l'analyse boursière en temps réel est très importante pour la prise de décision, et l'utilisation de la file d'attente de messages en temps réel Kafka et de la technologie PHP constitue un moyen efficace et pratique. 1. Introduction à Kafka Kafka est un système de messagerie distribué de publication et d'abonnement à haut débit développé par LinkedIn. Les principales fonctionnalités de Kafka sont

SpringBoot et SpringMVC sont tous deux des frameworks couramment utilisés dans le développement Java, mais il existe des différences évidentes entre eux. Cet article explorera les fonctionnalités et les utilisations de ces deux frameworks et comparera leurs différences. Tout d’abord, découvrons SpringBoot. SpringBoot a été développé par l'équipe Pivotal pour simplifier la création et le déploiement d'applications basées sur le framework Spring. Il fournit un moyen rapide et léger de créer des fichiers exécutables autonomes.

Comment utiliser React et Apache Kafka pour créer des applications de traitement de données en temps réel Introduction : Avec l'essor du Big Data et du traitement de données en temps réel, la création d'applications de traitement de données en temps réel est devenue la priorité de nombreux développeurs. La combinaison de React, un framework front-end populaire, et d'Apache Kafka, un système de messagerie distribué hautes performances, peut nous aider à créer des applications de traitement de données en temps réel. Cet article expliquera comment utiliser React et Apache Kafka pour créer des applications de traitement de données en temps réel, et

Cet article écrira un exemple détaillé pour parler du développement réel de dubbo+nacos+Spring Boot. Cet article ne couvrira pas trop de connaissances théoriques, mais écrira l'exemple le plus simple pour illustrer comment dubbo peut être intégré à nacos pour créer rapidement un environnement de développement.

Cinq options pour les outils de visualisation Kafka ApacheKafka est une plateforme de traitement de flux distribué capable de traiter de grandes quantités de données en temps réel. Il est largement utilisé pour créer des pipelines de données en temps réel, des files d'attente de messages et des applications basées sur des événements. Les outils de visualisation de Kafka peuvent aider les utilisateurs à surveiller et gérer les clusters Kafka et à mieux comprendre les flux de données Kafka. Ce qui suit est une introduction à cinq outils de visualisation Kafka populaires : ConfluentControlCenterConfluent

Comment choisir le bon outil de visualisation Kafka ? Analyse comparative de cinq outils Introduction : Kafka est un système de file d'attente de messages distribué à haute performance et à haut débit, largement utilisé dans le domaine du Big Data. Avec la popularité de Kafka, de plus en plus d'entreprises et de développeurs ont besoin d'un outil visuel pour surveiller et gérer facilement les clusters Kafka. Cet article présentera cinq outils de visualisation Kafka couramment utilisés et comparera leurs caractéristiques et fonctions pour aider les lecteurs à choisir l'outil qui répond à leurs besoins. 1. KafkaManager

Pour installer ApacheKafka sur RockyLinux, vous pouvez suivre les étapes suivantes : Mettre à jour le système : Tout d'abord, assurez-vous que votre système RockyLinux est à jour, exécutez la commande suivante pour mettre à jour les packages système : sudoyumupdate Installer Java : ApacheKafka dépend de Java, vous vous devez d'abord installer JavaDevelopmentKit (JDK). OpenJDK peut être installé via la commande suivante : sudoyuminstalljava-1.8.0-openjdk-devel Télécharger et décompresser : Visitez le site officiel d'ApacheKafka () pour télécharger le dernier package binaire. Choisissez une version stable

Ces dernières années, avec l'essor du Big Data et des communautés open source actives, de plus en plus d'entreprises ont commencé à rechercher des systèmes de traitement de données interactifs hautes performances pour répondre aux besoins croissants en matière de données. Dans cette vague de mises à niveau technologiques, le go-zero et Kafka+Avro suscitent l’attention et sont adoptés par de plus en plus d’entreprises. go-zero est un framework de microservices développé sur la base du langage Golang. Il présente les caractéristiques de hautes performances, de facilité d'utilisation, d'extension facile et de maintenance facile. Il est conçu pour aider les entreprises à créer rapidement des systèmes d'applications de microservices efficaces. sa croissance rapide
