目录
springboot配置双kafka
引入Maven kafka jar、准备两个kafka;
配置yml配置文件
配置KafkaConfig类
发送工具类MyKafkaProducer
测试类
接收类
测试结果
首页 Java java教程 springboot怎么配置双kafka

springboot怎么配置双kafka

May 10, 2023 pm 06:43 PM
springboot kafka

springboot配置双kafka

使用spring boot 2.0.8.RELEASE 版本

引入Maven kafka jar、准备两个kafka;

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
登录后复制

配置yml配置文件

spring:
  kafka:
    bootstrap-servers: 180.167.180.242:9092 #kafka的访问地址,多个用","隔开
    consumer:
      enable-auto-commit: true
      group-id: kafka #群组ID
  outkafka:
    bootstrap-servers: localhost:9092 #kafka的访问地址,多个用","隔开
    consumer:
      enable-auto-commit: true
      group-id: kafka_1 #群组ID
登录后复制

配置KafkaConfig类

import java.util.HashMap;
import java.util.Map;
 
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
 
@Configuration
@EnableKafka
public class KafkaConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String innerServers;
    @Value("${spring.kafka.consumer.group-id}")
    private String innerGroupid;
    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private String innerEnableAutoCommit;
 
    @Bean
    @Primary//理解为默认优先选择当前容器下的消费者工厂
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
 
    @Bean//第一个消费者工厂的bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
 
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, innerServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, innerGroupid);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, innerEnableAutoCommit);
//        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
//        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
    
    @Bean //生产者工厂配置
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(senderProps());
    }
    
    @Bean //kafka发送消息模板
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<String, String>(producerFactory());
    }
    
    /**
     * 生产者配置方法
     *
     * 生产者有三个必选属性
     * <p>
     * 1.bootstrap.servers broker地址清单,清单不要包含所有的broker地址,
     * 生产者会从给定的broker里查找到其他broker的信息。不过建议至少提供两个broker信息,一旦 其中一个宕机,生产者仍能能够连接到集群上。
     * </p>
     * <p>
     * 2.key.serializer broker希望接收到的消息的键和值都是字节数组。 生产者用对应的类把键对象序列化成字节数组。
     * </p>
     * <p>
     * 3.value.serializer 值得序列化方式
     * </p>
     *
     *
     * @return
     */
    private Map<String, Object> senderProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, innerServers);
        /**
         * 当从broker接收到的是临时可恢复的异常时,生产者会向broker重发消息,但是不能无限
         * 制重发,如果重发次数达到限制值,生产者将不会重试并返回错误。
         * 通过retries属性设置。默认情况下生产者会在重试后等待100ms,可以通过 retries.backoff.ms属性进行修改
         */
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        /**
         * 在考虑完成请求之前,生产者要求leader收到的确认数量。这可以控制发送记录的持久性。允许以下设置:
         * <ul>
         * <li>
         * <code> acks = 0 </ code>如果设置为零,则生产者将不会等待来自服务器的任何确认。该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且
         * <code>retries </ code>配置将不会生效(因为客户端通常不会知道任何故障)。为每条记录返回的偏移量始终设置为-1。
         * <li> <code> acks = 1 </code>
         * 这意味着leader会将记录写入其本地日志,但无需等待所有follower的完全确认即可做出回应。在这种情况下,
         * 如果leader在确认记录后立即失败但在关注者复制之前,则记录将丢失。
         * <li><code> acks = all </code>
         * 这意味着leader将等待完整的同步副本集以确认记录。这保证了只要至少一个同步副本仍然存活,记录就不会丢失。这是最强有力的保证。
         * 这相当于acks = -1设置
         */
        props.put(ProducerConfig.ACKS_CONFIG, "1");
        /**
         * 当有多条消息要被发送到统一分区是,生产者会把他们放到统一批里。kafka通过批次的概念来 提高吞吐量,但是也会在增加延迟。
         */
        // 以下配置当缓存数量达到16kb,就会触发网络请求,发送消息
//        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        // 每条消息在缓存中的最长时间,如果超过这个时间就会忽略batch.size的限制,由客户端立即将消息发送出去
//        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
//        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
    
    @Value("${spring.outkafka.bootstrap-servers}")
    private String outServers;
    @Value("${spring.outkafka.consumer.group-id}")
    private String outGroupid;
    @Value("${spring.outkafka.consumer.enable-auto-commit}")
    private String outEnableAutoCommit;
    
 
    static {
        
    }
    
    /**
     * 连接第二个kafka集群的配置
     */
    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactoryOutSchedule() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactoryOutSchedule());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
 
    @Bean
    public ConsumerFactory<Integer, String> consumerFactoryOutSchedule() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigsOutSchedule());
    }
 
    /**
     * 连接第二个集群的消费者配置
     */
    @Bean
    public Map<String, Object> consumerConfigsOutSchedule() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, outServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, outGroupid);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, outEnableAutoCommit);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
    
    @Bean //生产者工厂配置
    public ProducerFactory<String, String> producerOutFactory() {
        return new DefaultKafkaProducerFactory<>(senderOutProps());
    }
    
    @Bean //kafka发送消息模板
    public KafkaTemplate<String, String> kafkaOutTemplate() {
        return new KafkaTemplate<String, String>(producerOutFactory());
    }
    
    /**
     * 生产者配置方法
     *
     * 生产者有三个必选属性
     * <p>
     * 1.bootstrap.servers broker地址清单,清单不要包含所有的broker地址,
     * 生产者会从给定的broker里查找到其他broker的信息。不过建议至少提供两个broker信息,一旦 其中一个宕机,生产者仍能能够连接到集群上。
     * </p>
     * <p>
     * 2.key.serializer broker希望接收到的消息的键和值都是字节数组。 生产者用对应的类把键对象序列化成字节数组。
     * </p>
     * <p>
     * 3.value.serializer 值得序列化方式
     * </p>
     *
     *
     * @return
     */
    private Map<String, Object> senderOutProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, outServers);
        /**
         * 当从broker接收到的是临时可恢复的异常时,生产者会向broker重发消息,但是不能无限
         * 制重发,如果重发次数达到限制值,生产者将不会重试并返回错误。
         * 通过retries属性设置。默认情况下生产者会在重试后等待100ms,可以通过 retries.backoff.ms属性进行修改
         */
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        /**
         * 在考虑完成请求之前,生产者要求leader收到的确认数量。这可以控制发送记录的持久性。允许以下设置:
         * <ul>
         * <li>
         * <code> acks = 0 </ code>如果设置为零,则生产者将不会等待来自服务器的任何确认。该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且
         * <code>retries </ code>配置将不会生效(因为客户端通常不会知道任何故障)。为每条记录返回的偏移量始终设置为-1。
         * <li> <code> acks = 1 </code>
         * 这意味着leader会将记录写入其本地日志,但无需等待所有follower的完全确认即可做出回应。在这种情况下,
         * 如果leader在确认记录后立即失败但在关注者复制之前,则记录将丢失。
         * <li><code> acks = all </code>
         * 这意味着leader将等待完整的同步副本集以确认记录。这保证了只要至少一个同步副本仍然存活,记录就不会丢失。这是最强有力的保证。
         * 这相当于acks = -1设置
         */
        props.put(ProducerConfig.ACKS_CONFIG, "1");
        /**
         * 当有多条消息要被发送到统一分区是,生产者会把他们放到统一批里。kafka通过批次的概念来 提高吞吐量,但是也会在增加延迟。
         */
        // 以下配置当缓存数量达到16kb,就会触发网络请求,发送消息
//        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        // 每条消息在缓存中的最长时间,如果超过这个时间就会忽略batch.size的限制,由客户端立即将消息发送出去
//        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
//        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
}
登录后复制

发送工具类MyKafkaProducer

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
 
import lombok.extern.slf4j.Slf4j;
 
/**
 * <p>
 * <b>KafkaProducer Description:</b> kafka生产者
 * </p>
 *
 * @author douzaixing<b>DATE</b> 2019年7月8日 下午4:09:29
 */
@Component // 这个必须加入容器不然,不会执行
@EnableScheduling // 这里是为了测试加入定时调度
@Slf4j
public class MyKafkaProducer {
 
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
 
    @Autowired
    private KafkaTemplate<String, String> kafkaOutTemplate;
 
    public ListenableFuture<SendResult<String, String>> send(String topic, String key, String json) {
        ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topic, key, json);
        log.info("inner kafka send #topic=" + topic + "#key=" + key + "#json=" + json + "#推送成功===========");
        return result;
    }
 
    public ListenableFuture<SendResult<String, String>> sendOut(String topic, String key, String json) {
        ListenableFuture<SendResult<String, String>> result = kafkaOutTemplate.send(topic, key, json);
        log.info("out kafka send #topic=" + topic + "#key=" + key + "#json=" + json + "#推送成功===========");
        return result;
    }
 
}
登录后复制

测试类

@Slf4j
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes={OesBcServiceApplication.class})
public class MoreKafkaTest {
    
    @Autowired
    private MyKafkaProducer kafkaProducer;
    
    @Test
    public void sendInner() {
        for (int i = 0; i < 1; i++) {
            kafkaProducer.send("inner_test", "douzi" + i, "liyuehua" + i);
            kafkaProducer.sendOut("out_test", "douziout" + i, "fanbingbing" + i);
        }
    }
}
登录后复制

接收类

@Component
@Slf4j
public class KafkaConsumer {  
    @KafkaListener(topics={"inner_test"}, containerFactory="kafkaListenerContainerFactory")
    public void innerlistener(ConsumerRecord<String, String> record) {
        log.info("inner kafka receive #key=" + record.key() + "#value=" + record.value());
    }
    
    @KafkaListener(topics={"out_test"}, containerFactory="kafkaListenerContainerFactoryOutSchedule")
    public void outListener(ConsumerRecord<String, String> record) {
        log.info("out kafka receive #key=" + record.key() + "#value=" + record.value());
    }
}
登录后复制

测试结果

07-11 12:41:27.811 INFO  [com.wondertek.oes.bc.service.send.MyKafkaProducer] - inner kafka send #topic=inner_test#key=douzi0#json=liyuehua0#推送成功===========
 
07-11 12:41:27.995 INFO  [com.wondertek.oes.bc.service.send.KafkaConsumer] - inner kafka receive #key=douzi0#value=liyuehua0
07-11 12:41:28.005 INFO  [com.wondertek.oes.bc.service.send.MyKafkaProducer] - out kafka send #topic=out_test#key=douziout0#json=fanbingbing0#推送成功===========
07-11 12:41:28.013 INFO  [com.wondertek.oes.bc.service.send.KafkaConsumer] - out kafka receive #key=douziout0#value=fanbingbing0

以上是springboot怎么配置双kafka的详细内容。更多信息请关注PHP中文网其他相关文章!

本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

AI Hentai Generator

AI Hentai Generator

免费生成ai无尽的。

热门文章

R.E.P.O.能量晶体解释及其做什么(黄色晶体)
4 周前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.最佳图形设置
4 周前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.如果您听不到任何人,如何修复音频
4 周前 By 尊渡假赌尊渡假赌尊渡假赌
WWE 2K25:如何解锁Myrise中的所有内容
1 个月前 By 尊渡假赌尊渡假赌尊渡假赌

热工具

记事本++7.3.1

记事本++7.3.1

好用且免费的代码编辑器

SublimeText3汉化版

SublimeText3汉化版

中文版,非常好用

禅工作室 13.0.1

禅工作室 13.0.1

功能强大的PHP集成开发环境

Dreamweaver CS6

Dreamweaver CS6

视觉化网页开发工具

SublimeText3 Mac版

SublimeText3 Mac版

神级代码编辑软件(SublimeText3)

如何使用PHP和Kafka实现实时股票分析 如何使用PHP和Kafka实现实时股票分析 Jun 28, 2023 am 10:04 AM

随着互联网和科技的发展,数字化投资已成为人们越来越关注的话题。很多投资者不断探索和研究投资策略,希望能够获得更高的投资回报率。股票交易中,实时的股票分析对决策非常重要,其中使用Kafka实时消息队列和PHP技术实现更是一种高效且实用的手段。一、Kafka介绍Kafka是由LinkedIn公司开发的一个高吞吐量的分布式发布、订阅消息系统。Kafka的主要特点是

SpringBoot与SpringMVC的比较及差别分析 SpringBoot与SpringMVC的比较及差别分析 Dec 29, 2023 am 11:02 AM

SpringBoot和SpringMVC都是Java开发中常用的框架,但它们之间有一些明显的差异。本文将探究这两个框架的特点和用途,并对它们的差异进行比较。首先,我们来了解一下SpringBoot。SpringBoot是由Pivotal团队开发的,它旨在简化基于Spring框架的应用程序的创建和部署。它提供了一种快速、轻量级的方式来构建独立的、可执行

如何利用React和Apache Kafka构建实时数据处理应用 如何利用React和Apache Kafka构建实时数据处理应用 Sep 27, 2023 pm 02:25 PM

如何利用React和ApacheKafka构建实时数据处理应用引言:随着大数据与实时数据处理的兴起,构建实时数据处理应用成为了很多开发者的追求。React作为一个流行的前端框架,与ApacheKafka作为一个高性能的分布式消息传递系统的结合,可以帮助我们搭建实时数据处理应用。本文将介绍如何利用React和ApacheKafka构建实时数据处理应用,并

SpringBoot+Dubbo+Nacos 开发实战教程 SpringBoot+Dubbo+Nacos 开发实战教程 Aug 15, 2023 pm 04:49 PM

本文来写个详细的例子来说下dubbo+nacos+Spring Boot开发实战。本文不会讲述太多的理论的知识,会写一个最简单的例子来说明dubbo如何与nacos整合,快速搭建开发环境。

五种选择的可视化工具,用于探索Kafka 五种选择的可视化工具,用于探索Kafka Feb 01, 2024 am 08:03 AM

Kafka可视化工具的五种选择ApacheKafka是一个分布式流处理平台,能够处理大量实时数据。它广泛用于构建实时数据管道、消息队列和事件驱动的应用程序。Kafka的可视化工具可以帮助用户监控和管理Kafka集群,并更好地理解Kafka数据流。以下是对五种流行的Kafka可视化工具的介绍:ConfluentControlCenterConfluent

kafka可视化工具对比分析:如何选择最合适的工具? kafka可视化工具对比分析:如何选择最合适的工具? Jan 05, 2024 pm 12:15 PM

如何选择合适的Kafka可视化工具?五款工具对比分析引言:Kafka是一种高性能、高吞吐量的分布式消息队列系统,被广泛应用于大数据领域。随着Kafka的流行,越来越多的企业和开发者需要一个可视化工具来方便地监控和管理Kafka集群。本文将介绍五款常用的Kafka可视化工具,并对比它们的特点和功能,帮助读者选择适合自己需求的工具。一、KafkaManager

如何在 Rocky Linux 上安装 Apache Kafka? 如何在 Rocky Linux 上安装 Apache Kafka? Mar 01, 2024 pm 10:37 PM

在RockyLinux上安装ApacheKafka可以按照以下步骤进行操作:更新系统:首先,确保你的RockyLinux系统是最新的,执行以下命令更新系统软件包:sudoyumupdate安装Java:ApacheKafka依赖于Java,因此需要先安装JavaDevelopmentKit(JDK)。可以通过以下命令安装OpenJDK:sudoyuminstalljava-1.8.0-openjdk-devel下载和解压:访问ApacheKafka官方网站()下载最新的二进制包。选择一个稳定版本

go-zero与Kafka+Avro的实践:构建高性能的交互式数据处理系统 go-zero与Kafka+Avro的实践:构建高性能的交互式数据处理系统 Jun 23, 2023 am 09:04 AM

近年来,随着大数据的兴起和活跃的开源社区,越来越多的企业开始寻找高性能的交互式数据处理系统来满足日益增长的数据需求。在这场技术升级的浪潮中,go-zero和Kafka+Avro被越来越多的企业所关注和采用。go-zero是一款基于Golang语言开发的微服务框架,具有高性能、易用、易扩展、易维护等特点,旨在帮助企业快速构建高效的微服务应用系统。它的快速成长得

See all articles