Home > Java > javaTutorial > body text

How to implement broadcast messages in RocketMQ in Springboot

PHPz
Release: 2023-05-11 20:13:16
forward
1152 people have browsed it

There are two main types of RocketMQ message modes: broadcast mode and cluster mode (load balancing mode)

Broadcast mode is that every consumer will consume messages;

Load balancing mode is that every consumer A consumption will only be consumed once by a certain consumer;

We generally use the load balancing mode in our business. Of course, some special scenarios require the use of broadcast mode, such as sending a message to an email, mobile phone, or in-site prompt. ;

We can set it through the messageModel attribute value of @RocketMQMessageListener, MessageModel.BROADCASTING is the broadcast mode, MessageModel.CLUSTERING is the default cluster load balancing mode

Let’s introduce springboot rockermq integration to implement broadcast messages

  • Create a Springboot project and add rockermq dependency

<!--rocketMq依赖-->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.1</version>
</dependency>
Copy after login
  • Configure rocketmq

# Port
server:
port: 8083

# Configure rocketmq
rocketmq:
name-server: 127.0.0.1:9876
#producer
producer:
#Producer group name, which must be unique in an application
group : group1
#The default timeout for message sending is 3000ms
send-message-timeout: 3000
#When the message reaches 4096 bytes, the message will be compressed. Default 4096
compress-message-body-threshold: 4096
#Maximum message limit, default is 128K
max-message-size: 4194304
#Number of retries for failed synchronization message sending
retry-times-when-send-failed: 3
#Whether to retry other agents when internal sending fails, this parameter will only take effect when there are multiple brokers
retry-next-server: true
# Number of retries for failed asynchronous message sending
retry-times-when-send-async-failed: 3

  • Production end: Create a new controller to send messages

The production end can send the message according to the normal sending logic

package com.example.springbootrocketdemo.controller;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * 广播消息
 * @author qzz
 */
@RestController
public class RocketMQBroadCOntroller {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    /**
     * 发送广播消息
     */
    @RequestMapping("/testBroadSend")
    public void testSyncSend(){
        //参数一:topic   如果想添加tag,可以使用"topic:tag"的写法
        //参数二:消息内容
        for(int i=0;i<10;i++){
            rocketMQTemplate.convertAndSend("test-topic-broad","test-message"+i);
        }
    }
}
Copy after login
  • Create two consumers to consume the message

We first cluster load balancing test, add messageModel=MessageModel.CLUSTERING

Consumer 1:

package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
 * 广播消息
 * 配置RocketMQ监听
 * MessageModel.CLUSTERING:集群模式
 * MessageModel.BROADCASTING:广播模式
 * @author qzz
 */
@Service
@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.CLUSTERING)
public class RocketMQBroadConsumerListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("集群模式 消费者1,消费消息:"+s);
    }
}
Copy after login

Consumer 2: In the same consumerGroup and topic as consumer 1

package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
 * 广播消息
 * 配置RocketMQ监听
 * MessageModel.CLUSTERING:集群模式
 * MessageModel.BROADCASTING:广播模式
 * @author qzz
 */
@Service
@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.CLUSTERING)
public class RocketMQBroadConsumerListener2 implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("集群模式 消费者2,消费消息:"+s);
    }
}
Copy after login
  • Start the service and test cluster mode consumption

Cluster mode test: two consumers share the message equally

How to implement broadcast messages in RocketMQ in Springboot

  • Change the messageModel attribute values ​​of the above two consumers to broadcast mode

Consumer 1:

package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
 * 广播消息
 * 配置RocketMQ监听
 * MessageModel.CLUSTERING:集群模式
 * MessageModel.BROADCASTING:广播模式
 * @author qzz
 */
@Service
@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.BROADCASTING)
public class RocketMQBroadConsumerListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("广播消息1 广播模式,消费消息:"+s);
    }
}
Copy after login

Consumer 2: In the same consumerGroup and topic as Consumer 1

package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
 * 广播消息
 * 配置RocketMQ监听
 * MessageModel.CLUSTERING:集群模式
 * MessageModel.BROADCASTING:广播模式
 * @author qzz
 */
@Service
@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.BROADCASTING)
public class RocketMQBroadConsumerListener2 implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("广播消息2 广播模式,消费消息:"+s);
    }
}
Copy after login
  • Restart the service and test the broadcast mode consumption

How to implement broadcast messages in RocketMQ in Springboot

#

The above is the detailed content of How to implement broadcast messages in RocketMQ in Springboot. For more information, please follow other related articles on the PHP Chinese website!

Related labels:
source:yisu.com
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template