Rumah > Java > javaTutorial > Bagaimana untuk melaksanakan mesej siaran dalam RocketMQ dalam Springboot

Bagaimana untuk melaksanakan mesej siaran dalam RocketMQ dalam Springboot

PHPz
Lepaskan: 2023-05-11 20:13:16
ke hadapan
1197 orang telah melayarinya

Terdapat dua mod mesej RocketMQ utama: mod siaran dan mod kluster (mod pengimbangan beban)

Mod siaran ialah setiap pengguna akan menggunakan mesej

Mod pengimbangan beban ialah setiap pengguna Penggunaan hanya akan digunakan sekali oleh pengguna tertentu

Kami biasanya menggunakan mod pengimbangan beban dalam perniagaan kami Sudah tentu, beberapa senario khas memerlukan penggunaan mod siaran, seperti menghantar mesej ke e-mel , telefon mudah alih atau gesaan dalam tapak;

Kami boleh menetapkannya melalui nilai atribut @RocketMQMessageListener messageModel ialah mod siaran dan MessageModel.BROADCASTING ialah beban kelompok lalai mod pengimbangan. MessageModel.CLUSTERING

Mari perkenalkan springboot+ rockermq integrates untuk melaksanakan mesej siaran

  • Buat projek Springboot dan tambahkan pergantungan rockermq

  • <!--rocketMq依赖-->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.1</version>
    </dependency>
    Salin selepas log masuk
  • Konfigurasikan roketmq

# Port

pelayan:
port: 8083

# Konfigurasi roketmq

roket :
name-server: 127.0.0.1:9876
#producer
producer:
#Producer group name, yang mesti unik dalam aplikasi
group: group1
#The lalai tamat masa untuk penghantaran mesej ialah 3000ms
hantar-mesej- tamat masa: 3000
#Apabila mesej mencapai 4096 bait, mesej akan dimampatkan. Lalai 4096
compress-message-body-threshold: 4096
#Had mesej maksimum, lalai ialah 128K
max-message-size: 4194304
#Bilangan percubaan semula untuk penghantaran mesej penyegerakan yang gagal retry-times-when-send-failed: 3
#Sama ada hendak mencuba semula ejen lain apabila penghantaran dalaman gagal, parameter ini hanya berkuat kuasa apabila terdapat berbilang broker
cuba semula-next-server: true
# Nombor daripada cubaan semula apabila penghantaran mesej tak segerak gagal
masa cuba semula-apabila-hantar-async-gagal: 3

    Tamat pengeluaran: Cipta pengawal baharu untuk menghantar mesej
  • Tamat pengeluaran boleh menghantar mesej mengikut logik penghantaran biasa
package com.example.springbootrocketdemo.controller;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * 广播消息
 * @author qzz
 */
@RestController
public class RocketMQBroadCOntroller {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    /**
     * 发送广播消息
     */
    @RequestMapping("/testBroadSend")
    public void testSyncSend(){
        //参数一:topic   如果想添加tag,可以使用"topic:tag"的写法
        //参数二:消息内容
        for(int i=0;i<10;i++){
            rocketMQTemplate.convertAndSend("test-topic-broad","test-message"+i);
        }
    }
}
Salin selepas log masuk

    Buat dua pengguna untuk menggunakan mesej
  • < .

Mulakan perkhidmatan dan uji penggunaan mod kluster

Ujian mod kluster: dua pengguna berkongsi mesej secara sama rata

  • Tukar nilai atribut messageModel kedua-dua pengguna di atas kepada mod siaran

Pengguna 1: Bagaimana untuk melaksanakan mesej siaran dalam RocketMQ dalam Springboot

package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
 * 广播消息
 * 配置RocketMQ监听
 * MessageModel.CLUSTERING:集群模式
 * MessageModel.BROADCASTING:广播模式
 * @author qzz
 */
@Service
@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.CLUSTERING)
public class RocketMQBroadConsumerListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("集群模式 消费者1,消费消息:"+s);
    }
}
Salin selepas log masuk

Pengguna 2: Dalam Kumpulan dan topik pengguna yang sama seperti pengguna 1
    package com.example.springbootrocketdemo.config;
    import org.apache.rocketmq.spring.annotation.MessageModel;
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Service;
    /**
     * 广播消息
     * 配置RocketMQ监听
     * MessageModel.CLUSTERING:集群模式
     * MessageModel.BROADCASTING:广播模式
     * @author qzz
     */
    @Service
    @RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.CLUSTERING)
    public class RocketMQBroadConsumerListener2 implements RocketMQListener<String> {
        @Override
        public void onMessage(String s) {
            System.out.println("集群模式 消费者2,消费消息:"+s);
        }
    }
    Salin selepas log masuk
  • Mulakan semula perkhidmatan dan uji penggunaan mod siaran

Atas ialah kandungan terperinci Bagaimana untuk melaksanakan mesej siaran dalam RocketMQ dalam Springboot. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Label berkaitan:
sumber:yisu.com
Kenyataan Laman Web ini
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn
Tutorial Popular
Lagi>
Muat turun terkini
Lagi>
kesan web
Kod sumber laman web
Bahan laman web
Templat hujung hadapan