How to Implement Batch Message Consumption with RocketMQ in Spring Boot
1. Adding Dependencies
First, add the necessary dependencies to your pom.xml file:
<!-- RocketMQ Spring Boot dependency for Spring Boot 3 --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.3.1</version> <exclusions> <exclusion> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> </exclusion> </exclusions> </dependency> <!-- Dependency compatible with MQ cluster version 5.3.0 --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>5.3.0</version> </dependency>
2. Configuration File bootstrap.yaml
Configure your RocketMQ settings in the bootstrap.yaml file:
rocketmq: name-server: 192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876 # Replace with actual NameServer addresses consumer: group: consume-group-test access-key: access # Configure if ACL is used secret-key: secret consume-message-batch-max-size: 50 # Max messages per batch pull-batch-size: 100 # Max messages pulled from Broker topics: project: "group-topic-1" groups: project: "consume-group-1" # Use different groups for different business processes
3. Configuration Class MqConfigProperties
Create the configuration class MqConfigProperties:
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; import lombok.Data; import java.io.Serializable; /** * RocketMQ Configuration Class */ @Data @Component @ConfigurationProperties(prefix = "rocketmq") public class MqConfigProperties implements Serializable { private static final long serialVersionUID = 1L; @Autowired private RocketMQProperties rocketMQProperties; private TopicProperties topics; private GroupProperties groups; /** * Topic Configuration Class */ @Data public static class TopicProperties implements Serializable { private static final long serialVersionUID = 1L; private String project; } /** * Consumer Group Configuration Class */ @Data public static class GroupProperties implements Serializable { private static final long serialVersionUID = 1L; private String project; } }
4. Implementing the Consumer Code
Create the consumer class UserConsumer:
import com.alibaba.fastjson2.JSONObject; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.spring.support.RocketMQUtil; import org.springframework.context.ApplicationContext; import org.springframework.context.SmartLifecycle; import org.springframework.stereotype.Component; import lombok.extern.slf4j.Slf4j; import javax.annotation.Resource; import java.util.List; /** * Batch Consumer Implementation */ @Component @Slf4j public class UserConsumer implements SmartLifecycle { @Resource private MqConfigProperties mqConfigProperties; @Resource private ApplicationContext applicationContext; private volatile boolean running; private DefaultMQPushConsumer consumer; @Override public void start() { if (isRunning()) { throw new IllegalStateException("Consumer is already running"); } initConsumer(); setRunning(true); log.info("UserConsumer started successfully."); } @Override public void stop() { if (isRunning() && consumer != null) { consumer.shutdown(); setRunning(false); log.info("UserConsumer stopped."); } } @Override public boolean isRunning() { return running; } private void setRunning(boolean running) { this.running = running; } private void initConsumer() { String topic = mqConfigProperties.getTopics().getProject(); String group = mqConfigProperties.getGroups().getProject(); String nameServer = mqConfigProperties.getRocketMQProperties().getNameServer(); String accessKey = mqConfigProperties.getRocketMQProperties().getConsumer().getAccessKey(); String secretKey = mqConfigProperties.getRocketMQProperties().getConsumer().getSecretKey(); RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(), accessKey, secretKey); consumer = rpcHook != null ? new DefaultMQPushConsumer(group, rpcHook, new AllocateMessageQueueAveragely()) : new DefaultMQPushConsumer(group); consumer.setNamesrvAddr(nameServer); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.setConsumeMessageBatchMaxSize(100); // Set the batch size for consumption consumer.subscribe(topic, "*"); consumer.setMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { log.info("Received {} messages", msgs.size()); for (MessageExt message : msgs) { String body = new String(message.getBody()); log.info("Processing message: {}", body); User user = JSONObject.parseObject(body, User.class); processUser(user); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); log.info("UserConsumer initialized with topic [{}] and group [{}].", topic, group); } private void processUser(User user) { log.info("Processing user with ID: {}", user.getId()); // Handle user-related business logic } }
5. Producer Example Code
To produce batch messages, you can use the following UserProducer class:
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import java.util.ArrayList; import java.util.List; public class UserProducer { private DefaultMQProducer producer; public void sendBatchMessages(List<User> users, String topic) { List<Message> messages = new ArrayList<>(); for (User user : users) { messages.add(new Message(topic, JSONObject.toJSONString(user).getBytes())); } try { producer.send(messages); } catch (Exception e) { log.error("Error sending batch messages", e); } } }
6. Additional Optimization Suggestions
Performance Optimization: You can adjust the size of the consumer thread pool. By default, it's set to consumeThreadMin=20 and consumeThreadMax=20. In high-concurrency scenarios, increasing the thread pool size can enhance performance.
Error Handling: When consumption fails, be cautious with RECONSUME_LATER to avoid infinite retry loops. Set a maximum retry count based on your business requirements.
Tenant Isolation: Use different groups for different business modules to avoid consuming data from the wrong group. This is especially crucial in production environments.
The above is the detailed content of How to Implement Batch Message Consumption with RocketMQ in Spring Boot. For more information, please follow other related articles on the PHP Chinese website!

Hot AI Tools

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Undress AI Tool
Undress images for free

Clothoff.io
AI clothes remover

Video Face Swap
Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Article

Hot Tools

Notepad++7.3.1
Easy-to-use and free code editor

SublimeText3 Chinese version
Chinese version, very easy to use

Zend Studio 13.0.1
Powerful PHP integrated development environment

Dreamweaver CS6
Visual web development tools

SublimeText3 Mac version
God-level code editing software (SublimeText3)

Hot Topics

Troubleshooting and solutions to the company's security software that causes some applications to not function properly. Many companies will deploy security software in order to ensure internal network security. ...

Field mapping processing in system docking often encounters a difficult problem when performing system docking: how to effectively map the interface fields of system A...

When using MyBatis-Plus or other ORM frameworks for database operations, it is often necessary to construct query conditions based on the attribute name of the entity class. If you manually every time...

Solutions to convert names to numbers to implement sorting In many application scenarios, users may need to sort in groups, especially in one...

Start Spring using IntelliJIDEAUltimate version...

Conversion of Java Objects and Arrays: In-depth discussion of the risks and correct methods of cast type conversion Many Java beginners will encounter the conversion of an object into an array...

Detailed explanation of the design of SKU and SPU tables on e-commerce platforms This article will discuss the database design issues of SKU and SPU in e-commerce platforms, especially how to deal with user-defined sales...

When using TKMyBatis for database queries, how to gracefully get entity class variable names to build query conditions is a common problem. This article will pin...
