Sample code for springboot project to configure multiple kafka
1.spring-kafka
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.3.5.RELEASE</version> </dependency>
2. Configuration file related information
kafka.bootstrap-servers=localhost:9092 kafka.consumer.group.id=20230321 #可以并发消费的线程数 (通常与partition数量一致) kafka.consumer.concurrency=10 kafka.consumer.enable.auto.commit=false kafka.bootstrap-servers.pic=localhost:29092 kafka.consumer.group.id.pic=20230322_pic kafka.consumer.concurrency.pic=10 kafka.consumer.enable.auto.commit.pic=false
3.kafka configuration class
@Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${kafka.consumer.group.id}") private String groupId; @Value("${kafka.consumer.concurrency}") private int concurrency; @Value("${kafka.consumer.enable.auto.commit}") private String autoCommit; @Value("${kafka.bootstrap-servers}") private String bootstrapServer; @Value("${kafka.consumer.group.id.pic}") private String groupIdPic; @Value("${kafka.consumer.concurrency.pic}") private int concurrencyPic; @Value("${kafka.consumer.enable.auto.commit.pic}") private String autoCommitPic; @Value("${kafka.bootstrap-servers.pic}") private String bootstrapServerPic; @Bean public ConsumerFactory<String, String> consumerFactory() { String bootstrapServers = bootstrapServer; Map<String, Object> configProps = new HashMap<>(16); configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit); return new DefaultKafkaConsumerFactory<>(configProps); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(concurrency); factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE); return factory; } @Bean public ConsumerFactory<String, String> consumerFactoryPic() { String bootstrapServers = bootstrapServerPic; Map<String, Object> configProps = new HashMap<>(16); configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdPic); configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommitPic); return new DefaultKafkaConsumerFactory<>(configProps); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryPic() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactoryPic()); factory.setConcurrency(concurrencyPic); factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE); return factory; } }
4.Consumption topic messages
@KafkaListener(topics = "xxxxx", containerFactory = "kafkaListenerContainerFactoryPic") public void receive(ConsumerRecord<String, String> message, Acknowledgment ack) { try { String jsonString = message.value(); if (StringUtils.isNoneBlank(jsonString)) { log.info("消费:{}",jsonString); //TODO .... } } catch (Exception e) { log.error(" receive topic error ", e); } finally { ack.acknowledge(); } } @KafkaListener(topics = "xxxxxx", containerFactory = "kafkaListenerContainerFactory") public void receive(ConsumerRecord<String, String> message, Acknowledgment ack) { try { if (StringUtils.isNoneBlank(message.value())) { //TODO .... } } catch (Exception e) { logger.error(" receive topic error ", e); } finally { ack.acknowledge(); } }
The above is the detailed content of Sample code for springboot project to configure multiple kafka. For more information, please follow other related articles on the PHP Chinese website!

Hot AI Tools

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Undress AI Tool
Undress images for free

Clothoff.io
AI clothes remover

AI Hentai Generator
Generate AI Hentai for free.

Hot Article

Hot Tools

Notepad++7.3.1
Easy-to-use and free code editor

SublimeText3 Chinese version
Chinese version, very easy to use

Zend Studio 13.0.1
Powerful PHP integrated development environment

Dreamweaver CS6
Visual web development tools

SublimeText3 Mac version
God-level code editing software (SublimeText3)

Hot Topics



With the development of the Internet and technology, digital investment has become a topic of increasing concern. Many investors continue to explore and study investment strategies, hoping to obtain a higher return on investment. In stock trading, real-time stock analysis is very important for decision-making, and the use of Kafka real-time message queue and PHP technology is an efficient and practical means. 1. Introduction to Kafka Kafka is a high-throughput distributed publish and subscribe messaging system developed by LinkedIn. The main features of Kafka are

SpringBoot and SpringMVC are both commonly used frameworks in Java development, but there are some obvious differences between them. This article will explore the features and uses of these two frameworks and compare their differences. First, let's learn about SpringBoot. SpringBoot was developed by the Pivotal team to simplify the creation and deployment of applications based on the Spring framework. It provides a fast, lightweight way to build stand-alone, executable

How to use React and Apache Kafka to build real-time data processing applications Introduction: With the rise of big data and real-time data processing, building real-time data processing applications has become the pursuit of many developers. The combination of React, a popular front-end framework, and Apache Kafka, a high-performance distributed messaging system, can help us build real-time data processing applications. This article will introduce how to use React and Apache Kafka to build real-time data processing applications, and

This article will write a detailed example to talk about the actual development of dubbo+nacos+Spring Boot. This article will not cover too much theoretical knowledge, but will write the simplest example to illustrate how dubbo can be integrated with nacos to quickly build a development environment.

Five options for Kafka visualization tools ApacheKafka is a distributed stream processing platform capable of processing large amounts of real-time data. It is widely used to build real-time data pipelines, message queues, and event-driven applications. Kafka's visualization tools can help users monitor and manage Kafka clusters and better understand Kafka data flows. The following is an introduction to five popular Kafka visualization tools: ConfluentControlCenterConfluent

How to choose the right Kafka visualization tool? Comparative analysis of five tools Introduction: Kafka is a high-performance, high-throughput distributed message queue system that is widely used in the field of big data. With the popularity of Kafka, more and more enterprises and developers need a visual tool to easily monitor and manage Kafka clusters. This article will introduce five commonly used Kafka visualization tools and compare their features and functions to help readers choose the tool that suits their needs. 1. KafkaManager

To install ApacheKafka on RockyLinux, you can follow the following steps: Update system: First, make sure your RockyLinux system is up to date, execute the following command to update the system package: sudoyumupdate Install Java: ApacheKafka depends on Java, so you need to install JavaDevelopmentKit (JDK) first ). OpenJDK can be installed through the following command: sudoyuminstalljava-1.8.0-openjdk-devel Download and decompress: Visit the ApacheKafka official website () to download the latest binary package. Choose a stable version

In recent years, with the rise of big data and active open source communities, more and more enterprises have begun to look for high-performance interactive data processing systems to meet the growing data needs. In this wave of technology upgrades, go-zero and Kafka+Avro are being paid attention to and adopted by more and more enterprises. go-zero is a microservice framework developed based on the Golang language. It has the characteristics of high performance, ease of use, easy expansion, and easy maintenance. It is designed to help enterprises quickly build efficient microservice application systems. its rapid growth
