Rumah > Java > javaTutorial > teks badan

Bagaimanakah SpringBoot mengintegrasikan RabbitMq?

WBOY
Lepaskan: 2023-05-25 10:00:38
ke hadapan
770 orang telah melayarinya

SpringBoot menyepadukan RabbitMq dalam amalan

spring-boot-starter-amqp

Advanced Message Queuing Protocol (AMQP) ialah protokol berwayar neutral platform untuk perisian tengah mesej. Projek Spring AMQP menggunakan konsep Spring teras untuk pembangunan penyelesaian pemesejan berasaskan AMQP. Spring Boot menyediakan beberapa kemudahan untuk bekerja dengan AMQP melalui RabbitMQ, termasuk spring-boot-starter-amqp "Starter".

Sangat mudah untuk menyepadukan RabbitMQ dengan springboot Jika anda hanya menggunakannya dengan konfigurasi yang sangat sedikit, springboot menyediakan pelbagai sokongan untuk mesej dalam projek spring-boot-starter-amqp.

Tambah pergantungan

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
Salin selepas log masuk

RabbitMQ ialah broker mesej yang ringan, boleh dipercayai, berskala dan mudah alih berdasarkan protokol AMQP. Spring menggunakan RabbitMQ untuk berkomunikasi melalui protokol AMQP.

Konfigurasi Harta

Konfigurasi RabbitMQ dikawal oleh sifat konfigurasi luaran spring.rabbitmq.*. Sebagai contoh, anda boleh mengisytiharkan application.properties berikut dalam bahagian berikut:

spring.rabbitmq.host = localhost
 spring.rabbitmq.port = 5672
 spring.rabbitmq.username = guest
 spring.rabbitmq.password
Salin selepas log masuk

Bermula Dengan Cepat

1 Konfigurasi Baris Gilir

package com.example.rabbitmqdemo.config;


import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ 配置类
 *
 * @author itguang
 * @create
@Configuration
public class RabbitConfig

    @Bean
    public Queue queue(){
        return new Queue("hello");
    }
}
Salin selepas log masuk

2 Pengirim

rabbitTemplate Ia adalah pelaksanaan lalai yang disediakan oleh springboot.

package com.example.rabbitmqdemo.rabbitmq;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Date;

/**
 * 消息发送者
 *
 * @author itguang
 * @create

@Component
public class HelloSender


    @Autowired
    private AmqpTemplate amqpTemplate;


    public void send(){
        String context = "hello----"+LocalDateTime.now();
        System.out.println("send:"+context);
        //往名称为 hello 的queue中发送消息
        this.amqpTemplate.convertAndSend("hello",context);
    }

}
Salin selepas log masuk
3 Receiver

package com.example.rabbitmqdemo.rabbitmq;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消息接受者
 *
 * @author itguang
 * @create

@Component
@RabbitListener(queues = "hello") //监听 名称为 hello 的queue
public class HelloReceiver

    //消息处理器
    @RabbitHandler
    public void process(String message){
        System.out.println("Receiver:"+message);

    }


}
Salin selepas log masuk

Ujian

package com.example.rabbitmqdemo;

import com.example.rabbitmqdemo.rabbitmq.HelloSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqdemoApplicationTests

    @Autowired
    HelloSender helloSender;

    @Test
    public void contextLoads() {
        helloSender.send();

    }
}
Salin selepas log masuk

Lihat output konsol

send:hello----2018-04-21T11:29:47.739
Receiver:hello----2018-04-21T11:29:47.739
Salin selepas log masuk
Satu-ke-banyak penghantaran: satu Pengirim dengan berbilang penerima

Membuat pengubahsuaian kecil pada kod di atas penerima mendaftarkan dua Penerima, Penerima1 dan Penerima2 menambahkan kiraan parameter, dan hujung penerima mencetak parameter yang diterima berikut ialah Kod ujian, hantar seratus mesej untuk melihat kesan pelaksanaan kedua-dua penerima

  • Tambah baris gilir yang dipanggil hello2

  • rreee
  • Hantar mesej untuk beratur hello2 dan terima parameter kiraan

package com.example.rabbitmqdemo.config;


import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ 配置类
 *
 * @author itguang
 * @create
@Configuration
public class RabbitConfig



    @Bean
    public Queue queue(){
        return new Queue("hello");
    }

    @Bean
    public Queue queue2(){
        return new Queue("hello2");
    }



}
Salin selepas log masuk
  • Dua penerima hello2

package com.example.rabbitmqdemo.rabbitmq;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Date;

/**
 * 消息发送者
 *
 * @author itguang
 * @create

@Component
public class HelloSender


    @Autowired
    private AmqpTemplate amqpTemplate;


    public void send(){
        String context = "hello----"+LocalDateTime.now();
        System.out.println("send:"+context);
        this.amqpTemplate.convertAndSend("hello",context);
    }

    //给hello2发送消息,并接受一个计数参数
    public void send2(int i){
        String context = i+"";
        System.out.println(context+"--send:");
        this.amqpTemplate.convertAndSend("hello2",context);
    }
}
Salin selepas log masuk
@Component
@RabbitListener(queues = "hello2")
public class HelloReceiver1


    @RabbitHandler
    public void process(String message){

        System.out.println("Receiver1:"+message);
    }


}
Salin selepas log masuk
Uji

@Component
@RabbitListener(queues = "hello2")
public class HelloReceiver2

    @RabbitHandler
    public void process(String message){
        System.out.println("Receiver2:"+message);
    }
}
Salin selepas log masuk

dan lihat output konsol:

@Test
    public void manyReceiver(){
        for (int i=0;i<100;i++){
            helloSender.send2(i);
        }

    }
Salin selepas log masuk
Anda boleh lihat: apabila mesej dihantar ke 63, penerima Penerima telah menerima mesej,

Kesimpulan:

Satu pengirim, N penerima Selepas ujian, mesej akan dihantar ke N penerima secara sekata

Many-to-many: berbilang penghantar Untuk berbilang penerima.

kita boleh menyuntik dua penghantar dan meletakkannya dalam gelung, seperti berikut:

0--send:
1--send:
2--send:
3--send:
4--send:

...(省略)

58--send:
59--send:
60--send:
61--send:
62--send:
63--send:
Receiver2:1
Receiver1:0
64--send:
65--send:
Receiver1:2
Receiver2:3
66--send:
Receiver1:4
Receiver2:5
...(省略)
Salin selepas log masuk
Jalankan ujian unit dan lihat output konsol:

@Test
    public void many2many(){
      for (int i=0;i<100;i++){
          helloSender.send2(i);
          helloSender2.send2(i);

      }
    }
Salin selepas log masuk
Kesimpulan: Sama seperti satu-ke-banyak, penghujung penerima masih akan menerima mesej secara sama rata

Menghantar objek

Mula-mula kami mencipta pengguna objek kelas entiti dan ambil perhatian bahawa antara muka Serializable mestilah dilaksanakan.

0--send:
0--send:
1--send:
1--send:
2--send:
2--send:
3--send:
3--send:

...(省略)

22--send:
22--send:
23--send:
23--send:
24--send:
24--send:
Receiver2:0
25--send:
25--send:
Receiver2:1
26--send:
Receiver2:2
26--send:
Receiver2:3
27--send:
Receiver1:0
27--send:
Receiver2:4
Receiver1:1
28--send:
Receiver2:5
Receiver1:2
28--send:
Receiver2:6
Receiver1:3
29--send:
Receiver2:7
Receiver1:4
29--send:
Receiver2:8
Receiver1:5
30--send:
Receiver2:9
Receiver1:6
30--send:
31--send:
31--send:
32--send:
32--send:
Salin selepas log masuk
Kemudian buat baris gilir lain dalam fail konfigurasi, dipanggil

object_queue

package com.example.rabbitmqdemo.pojo;

import java.io.Serializable;

/**
 * @author itguang
 * @create
public class User implements Serializable


    private String username;
    private String password;

    public User(String username, String password) {
        this.username = username;
        this.password = password;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    @Override
    public String toString() {
        return "User{" +
                "username='" + username + '\'' +
                ", password='" + password + '\'' +
                '}';
    }
}
Salin selepas log masuk
Berikut ialah dua penghantar objek Pengguna, ObjectSender dan penerima ObjectReceiver:

@Bean
    public Queue queue3(){
        return new Queue("object_queue");
    }
Salin selepas log masuk
@Component
public class ObjectSender

    @Autowired
    AmqpTemplate amqpTemplate;

    public void sendUser(User user){

        System.out.println("Send object:"+user.toString());
        this.amqpTemplate.convertAndSend("object_queue",user);

    }
}
Salin selepas log masuk
Jalankan ujian unit dan lihat output konsol:

@Component
@RabbitListener(queues = "object_queue")
public class ObjectReceiver

    @RabbitHandler
    public void objectReceiver(User user){

        System.out.println("Receiver object:"+user.toString());

    }
}
Salin selepas log masuk
Pertukaran Topik

topik ialah kaedah paling fleksibel dalam RabbitMQ, dan boleh terikat secara bebas kepada topik berbeza berdasarkan routing_key Baris gilir

Mula-mula konfigurasikan peraturan topik Di sini dua baris gilir digunakan untuk menguji

Send object:User{username='李增光', password='666666'}
Receiver object:User{username='李增光', password='666666'}
Salin selepas log masuk
Penghantar mesej: kedua-duanya menggunakan topicExchange dan terikat kepada routing_key yang berbeza

package com.example.rabbitmqdemo.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author itguang
 * @create
@Configuration
public class TopicRabbitConfig


    final static String message = "topic.message";
    final static String messages = "topic.messages";


    //创建两个 Queue
    @Bean
    public Queue queueMessage(){
        return new Queue(TopicRabbitConfig.message);
    }

    @Bean
    public Queue queueMessages(){
        return new Queue(TopicRabbitConfig.messages);
    }

    //配置 TopicExchange,指定名称为 topicExchange
    @Bean
    public TopicExchange exchange(){
        return new TopicExchange("topicExchange");
    }

    //给队列绑定 exchange 和 routing_key

    @Bean
    public Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange){
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    @Bean
    public Binding bingingExchangeMessages(Queue queueMessages,TopicExchange exchange){
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }


}
Salin selepas log masuk
Dua. penerima mesej, nyatakan baris gilir yang berbeza

package com.example.rabbitmqdemo.topic;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author itguang
 * @create
@Component
public class TopicSender

    @Autowired
    AmqpTemplate amqpTemplate;

    public void send1(){
        String context = "hi, i am message 1";
        System.out.println("Sender : " + context);
        amqpTemplate.convertAndSend("topicExchange","topic.message",context);
    }

    public void send2() {
        String context = "hi, i am messages 2";
        System.out.println("Sender : " + context);
        amqpTemplate.convertAndSend("topicExchange", "topic.messages", context);
    }
}
Salin selepas log masuk
package com.example.rabbitmqdemo.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author itguang
 * @create
@Component
@RabbitListener(queues = "topic.message")
public class TopicReceiver1

    @RabbitHandler
    public void process(String message){

        System.out.println("Receiver topic.message :"+ message);

    }

}
Salin selepas log masuk
Ujian:

Menghantar hantar1 akan sepadan dengan topik.# dan topik.mesej Kedua-dua Penerima boleh menerimanya kepada mesej, hantar topik sahaja .# boleh memadankan semua mesej yang hanya Receiver2 dengar

Fanout Exchange

Fanout ialah mod siaran atau mod langganan yang biasa kita gunakan, memberikan Fanout Suis menghantar mesej, dan semua baris gilir yang terikat pada suis menerima mesej.

Konfigurasi berkaitan Fanout:

package com.example.rabbitmqdemo.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author itguang
 * @create
@Component
@RabbitListener(queues = "topic.messages")
public class TopicReceiver2

    @RabbitHandler
    public void process(String message){

        System.out.println("Receiver topic.messages: "+ message);

    }

}
Salin selepas log masuk
Penghantar mesej:

这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略

package com.example.rabbitmqdemo.fanout;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author itguang
 * @create
@Component
public class FanoutSender


    @Autowired
    AmqpTemplate amqpTemplate;


    public void send(){

        String context = "hi, fanout msg ";
        System.out.println("Sender : " + context);
        //这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略:
        amqpTemplate.convertAndSend("fanoutExchange","", context);

    }

}
Salin selepas log masuk

三个消息接受者:

@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA


    @RabbitHandler
    public void process(String message){

        System.out.println("Receiver form fanout.A: "+message);

    }

}

@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB


    @RabbitHandler
    public void process(String message){

        System.out.println("Receiver form fanout.B: "+message);

    }

}

@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC


    @RabbitHandler
    public void process(String message){

        System.out.println("Receiver form fanout.C: "+message);

    }

}
Salin selepas log masuk

运行单元测试,查看结果:

Sender : hi, fanout msg 

Receiver form fanout.C: hi, fanout msg 
Receiver form fanout.A: hi, fanout msg 
Receiver form fanout.B: hi, fanout msg
Salin selepas log masuk

结果说明,绑定到fanout交换机上面的队列都收到了消息.

Atas ialah kandungan terperinci Bagaimanakah SpringBoot mengintegrasikan RabbitMq?. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Label berkaitan:
sumber:yisu.com
Kenyataan Laman Web ini
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn
Tutorial Popular
Lagi>
Muat turun terkini
Lagi>
kesan web
Kod sumber laman web
Bahan laman web
Templat hujung hadapan