springboot项目配置多个kafka的示例代码
1.spring-kafka
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.3.5.RELEASE</version> </dependency>
2.配置文件相关信息
kafka.bootstrap-servers=localhost:9092 kafka.consumer.group.id=20230321 #可以并发消费的线程数 (通常与partition数量一致) kafka.consumer.concurrency=10 kafka.consumer.enable.auto.commit=false kafka.bootstrap-servers.pic=localhost:29092 kafka.consumer.group.id.pic=20230322_pic kafka.consumer.concurrency.pic=10 kafka.consumer.enable.auto.commit.pic=false
3.kafka配置类
@Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${kafka.consumer.group.id}") private String groupId; @Value("${kafka.consumer.concurrency}") private int concurrency; @Value("${kafka.consumer.enable.auto.commit}") private String autoCommit; @Value("${kafka.bootstrap-servers}") private String bootstrapServer; @Value("${kafka.consumer.group.id.pic}") private String groupIdPic; @Value("${kafka.consumer.concurrency.pic}") private int concurrencyPic; @Value("${kafka.consumer.enable.auto.commit.pic}") private String autoCommitPic; @Value("${kafka.bootstrap-servers.pic}") private String bootstrapServerPic; @Bean public ConsumerFactory<String, String> consumerFactory() { String bootstrapServers = bootstrapServer; Map<String, Object> configProps = new HashMap<>(16); configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit); return new DefaultKafkaConsumerFactory<>(configProps); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(concurrency); factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE); return factory; } @Bean public ConsumerFactory<String, String> consumerFactoryPic() { String bootstrapServers = bootstrapServerPic; Map<String, Object> configProps = new HashMap<>(16); configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdPic); configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommitPic); return new DefaultKafkaConsumerFactory<>(configProps); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryPic() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactoryPic()); factory.setConcurrency(concurrencyPic); factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE); return factory; } }
4.消费主题消息
@KafkaListener(topics = "xxxxx", containerFactory = "kafkaListenerContainerFactoryPic") public void receive(ConsumerRecord<String, String> message, Acknowledgment ack) { try { String jsonString = message.value(); if (StringUtils.isNoneBlank(jsonString)) { log.info("消费:{}",jsonString); //TODO .... } } catch (Exception e) { log.error(" receive topic error ", e); } finally { ack.acknowledge(); } } @KafkaListener(topics = "xxxxxx", containerFactory = "kafkaListenerContainerFactory") public void receive(ConsumerRecord<String, String> message, Acknowledgment ack) { try { if (StringUtils.isNoneBlank(message.value())) { //TODO .... } } catch (Exception e) { logger.error(" receive topic error ", e); } finally { ack.acknowledge(); } }
以上是springboot项目配置多个kafka的示例代码的详细内容。更多信息请关注PHP中文网其他相关文章!

热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

AI Hentai Generator
免费生成ai无尽的。

热门文章

热工具

记事本++7.3.1
好用且免费的代码编辑器

SublimeText3汉化版
中文版,非常好用

禅工作室 13.0.1
功能强大的PHP集成开发环境

Dreamweaver CS6
视觉化网页开发工具

SublimeText3 Mac版
神级代码编辑软件(SublimeText3)

热门话题

随着互联网和科技的发展,数字化投资已成为人们越来越关注的话题。很多投资者不断探索和研究投资策略,希望能够获得更高的投资回报率。股票交易中,实时的股票分析对决策非常重要,其中使用Kafka实时消息队列和PHP技术实现更是一种高效且实用的手段。一、Kafka介绍Kafka是由LinkedIn公司开发的一个高吞吐量的分布式发布、订阅消息系统。Kafka的主要特点是

SpringBoot和SpringMVC都是Java开发中常用的框架,但它们之间有一些明显的差异。本文将探究这两个框架的特点和用途,并对它们的差异进行比较。首先,我们来了解一下SpringBoot。SpringBoot是由Pivotal团队开发的,它旨在简化基于Spring框架的应用程序的创建和部署。它提供了一种快速、轻量级的方式来构建独立的、可执行

如何利用React和ApacheKafka构建实时数据处理应用引言:随着大数据与实时数据处理的兴起,构建实时数据处理应用成为了很多开发者的追求。React作为一个流行的前端框架,与ApacheKafka作为一个高性能的分布式消息传递系统的结合,可以帮助我们搭建实时数据处理应用。本文将介绍如何利用React和ApacheKafka构建实时数据处理应用,并

本文来写个详细的例子来说下dubbo+nacos+Spring Boot开发实战。本文不会讲述太多的理论的知识,会写一个最简单的例子来说明dubbo如何与nacos整合,快速搭建开发环境。

Kafka可视化工具的五种选择ApacheKafka是一个分布式流处理平台,能够处理大量实时数据。它广泛用于构建实时数据管道、消息队列和事件驱动的应用程序。Kafka的可视化工具可以帮助用户监控和管理Kafka集群,并更好地理解Kafka数据流。以下是对五种流行的Kafka可视化工具的介绍:ConfluentControlCenterConfluent

如何选择合适的Kafka可视化工具?五款工具对比分析引言:Kafka是一种高性能、高吞吐量的分布式消息队列系统,被广泛应用于大数据领域。随着Kafka的流行,越来越多的企业和开发者需要一个可视化工具来方便地监控和管理Kafka集群。本文将介绍五款常用的Kafka可视化工具,并对比它们的特点和功能,帮助读者选择适合自己需求的工具。一、KafkaManager

在RockyLinux上安装ApacheKafka可以按照以下步骤进行操作:更新系统:首先,确保你的RockyLinux系统是最新的,执行以下命令更新系统软件包:sudoyumupdate安装Java:ApacheKafka依赖于Java,因此需要先安装JavaDevelopmentKit(JDK)。可以通过以下命令安装OpenJDK:sudoyuminstalljava-1.8.0-openjdk-devel下载和解压:访问ApacheKafka官方网站()下载最新的二进制包。选择一个稳定版本

近年来,随着大数据的兴起和活跃的开源社区,越来越多的企业开始寻找高性能的交互式数据处理系统来满足日益增长的数据需求。在这场技术升级的浪潮中,go-zero和Kafka+Avro被越来越多的企业所关注和采用。go-zero是一款基于Golang语言开发的微服务框架,具有高性能、易用、易扩展、易维护等特点,旨在帮助企业快速构建高效的微服务应用系统。它的快速成长得
