まず、必要な依存関係を pom.xml ファイルに追加します。
<!-- RocketMQ Spring Boot dependency for Spring Boot 3 --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.3.1</version> <exclusions> <exclusion> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> </exclusion> </exclusions> </dependency> <!-- Dependency compatible with MQ cluster version 5.3.0 --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>5.3.0</version> </dependency>
bootstrap.yaml ファイルで RocketMQ 設定を構成します。
rocketmq: name-server: 192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876 # Replace with actual NameServer addresses consumer: group: consume-group-test access-key: access # Configure if ACL is used secret-key: secret consume-message-batch-max-size: 50 # Max messages per batch pull-batch-size: 100 # Max messages pulled from Broker topics: project: "group-topic-1" groups: project: "consume-group-1" # Use different groups for different business processes
構成クラス MqConfigProperties を作成します:
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; import lombok.Data; import java.io.Serializable; /** * RocketMQ Configuration Class */ @Data @Component @ConfigurationProperties(prefix = "rocketmq") public class MqConfigProperties implements Serializable { private static final long serialVersionUID = 1L; @Autowired private RocketMQProperties rocketMQProperties; private TopicProperties topics; private GroupProperties groups; /** * Topic Configuration Class */ @Data public static class TopicProperties implements Serializable { private static final long serialVersionUID = 1L; private String project; } /** * Consumer Group Configuration Class */ @Data public static class GroupProperties implements Serializable { private static final long serialVersionUID = 1L; private String project; } }
コンシューマ クラス UserConsumer を作成します:
import com.alibaba.fastjson2.JSONObject; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.spring.support.RocketMQUtil; import org.springframework.context.ApplicationContext; import org.springframework.context.SmartLifecycle; import org.springframework.stereotype.Component; import lombok.extern.slf4j.Slf4j; import javax.annotation.Resource; import java.util.List; /** * Batch Consumer Implementation */ @Component @Slf4j public class UserConsumer implements SmartLifecycle { @Resource private MqConfigProperties mqConfigProperties; @Resource private ApplicationContext applicationContext; private volatile boolean running; private DefaultMQPushConsumer consumer; @Override public void start() { if (isRunning()) { throw new IllegalStateException("Consumer is already running"); } initConsumer(); setRunning(true); log.info("UserConsumer started successfully."); } @Override public void stop() { if (isRunning() && consumer != null) { consumer.shutdown(); setRunning(false); log.info("UserConsumer stopped."); } } @Override public boolean isRunning() { return running; } private void setRunning(boolean running) { this.running = running; } private void initConsumer() { String topic = mqConfigProperties.getTopics().getProject(); String group = mqConfigProperties.getGroups().getProject(); String nameServer = mqConfigProperties.getRocketMQProperties().getNameServer(); String accessKey = mqConfigProperties.getRocketMQProperties().getConsumer().getAccessKey(); String secretKey = mqConfigProperties.getRocketMQProperties().getConsumer().getSecretKey(); RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(), accessKey, secretKey); consumer = rpcHook != null ? new DefaultMQPushConsumer(group, rpcHook, new AllocateMessageQueueAveragely()) : new DefaultMQPushConsumer(group); consumer.setNamesrvAddr(nameServer); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.setConsumeMessageBatchMaxSize(100); // Set the batch size for consumption consumer.subscribe(topic, "*"); consumer.setMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { log.info("Received {} messages", msgs.size()); for (MessageExt message : msgs) { String body = new String(message.getBody()); log.info("Processing message: {}", body); User user = JSONObject.parseObject(body, User.class); processUser(user); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); log.info("UserConsumer initialized with topic [{}] and group [{}].", topic, group); } private void processUser(User user) { log.info("Processing user with ID: {}", user.getId()); // Handle user-related business logic } }
バッチ メッセージを生成するには、次の UserProducer クラスを使用できます:
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import java.util.ArrayList; import java.util.List; public class UserProducer { private DefaultMQProducer producer; public void sendBatchMessages(List<User> users, String topic) { List<Message> messages = new ArrayList<>(); for (User user : users) { messages.add(new Message(topic, JSONObject.toJSONString(user).getBytes())); } try { producer.send(messages); } catch (Exception e) { log.error("Error sending batch messages", e); } } }
パフォーマンスの最適化: コンシューマ スレッド プールのサイズを調整できます。デフォルトでは、consumptionThreadMin=20 および ConsumerThreadMax=20 に設定されています。同時実行性の高いシナリオでは、スレッド プールのサイズを増やすとパフォーマンスが向上します。
エラー処理: 消費が失敗した場合は、無限再試行ループを避けるために RECONSUME_LATER に注意してください。ビジネス要件に基づいて最大再試行回数を設定します。
テナント分離: 間違ったグループからのデータの消費を避けるために、ビジネス モジュールごとに異なるグループを使用します。これは実稼働環境では特に重要です。
以上がSpring Boot で RocketMQ を使用してバッチ メッセージ消費を実装する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。