Maison > Java > javaDidacticiel > le corps du texte

Comment SpringBoot intègre-t-il RabbitMq ?

WBOY
Libérer: 2023-05-25 10:00:38
avant
771 Les gens l'ont consulté

SpringBoot intègre RabbitMq dans la pratique

spring-boot-starter-amqp

Advanced Message Queuing Protocol (AMQP) est un protocole câblé neutre en termes de plate-forme pour les middlewares de messages. Le projet Spring AMQP applique les concepts fondamentaux de Spring au développement de solutions de messagerie basées sur AMQP. Spring Boot offre certaines commodités pour travailler avec AMQP via RabbitMQ, notamment le "Starter" spring-boot-starter-amqp.

Il est très simple d'intégrer RabbitMQ à springboot Si vous l'utilisez avec très peu de configuration, springboot fournit divers supports pour les messages du projet spring-boot-starter-amqp.

Ajouter des dépendances

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
Copier après la connexion

RabbitMQ est un courtier de messages léger, fiable, évolutif et portable basé sur le protocole AMQP. Spring utilise RabbitMQ pour communiquer via le protocole AMQP.

Configuration des propriétés

La configuration de RabbitMQ est contrôlée par les propriétés de configuration externes spring.rabbitmq.*. Par exemple, vous pouvez déclarer l'application.properties suivante dans la section suivante :

spring.rabbitmq.host = localhost
 spring.rabbitmq.port = 5672
 spring.rabbitmq.username = guest
 spring.rabbitmq.password
Copier après la connexion

Démarrage rapide

1. Configuration de la file d'attente

package com.example.rabbitmqdemo.config;


import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ 配置类
 *
 * @author itguang
 * @create
@Configuration
public class RabbitConfig

    @Bean
    public Queue queue(){
        return new Queue("hello");
    }
}
Copier après la connexion

2 Sender

rabbitTemplate est l'implémentation par défaut fournie par springboot

package com.example.rabbitmqdemo.rabbitmq;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Date;

/**
 * 消息发送者
 *
 * @author itguang
 * @create

@Component
public class HelloSender


    @Autowired
    private AmqpTemplate amqpTemplate;


    public void send(){
        String context = "hello----"+LocalDateTime.now();
        System.out.println("send:"+context);
        //往名称为 hello 的queue中发送消息
        this.amqpTemplate.convertAndSend("hello",context);
    }

}
Copier après la connexion

3 Receiver

.
package com.example.rabbitmqdemo.rabbitmq;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消息接受者
 *
 * @author itguang
 * @create

@Component
@RabbitListener(queues = "hello") //监听 名称为 hello 的queue
public class HelloReceiver

    //消息处理器
    @RabbitHandler
    public void process(String message){
        System.out.println("Receiver:"+message);

    }


}
Copier après la connexion

Test

package com.example.rabbitmqdemo;

import com.example.rabbitmqdemo.rabbitmq.HelloSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqdemoApplicationTests

    @Autowired
    HelloSender helloSender;

    @Test
    public void contextLoads() {
        helloSender.send();

    }
}
Copier après la connexion

Afficher les résultats de sortie de la console

send:hello----2018-04-21T11:29:47.739
Receiver:hello----2018-04-21T11:29:47.739
Copier après la connexion

Envoi un à plusieurs : un expéditeur et plusieurs récepteurs

J'ai apporté une petite modification au code ci-dessus. L'extrémité de réception a enregistré deux récepteurs, Receiver1 et Receiver2, et le récepteur. L'extrémité d'envoi a rejoint le nombre de paramètres, l'extrémité de réception imprime les paramètres reçus, ce qui suit est le code de test, envoyez une centaine de messages pour observer l'effet d'exécution des deux extrémités de réception

  • Ajoutez une file d'attente appelée hello2

package com.example.rabbitmqdemo.config;


import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ 配置类
 *
 * @author itguang
 * @create
@Configuration
public class RabbitConfig



    @Bean
    public Queue queue(){
        return new Queue("hello");
    }

    @Bean
    public Queue queue2(){
        return new Queue("hello2");
    }



}
Copier après la connexion
  • Send à la file d'attente Hello2 Message, accepte un paramètre de nombre

package com.example.rabbitmqdemo.rabbitmq;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Date;

/**
 * 消息发送者
 *
 * @author itguang
 * @create

@Component
public class HelloSender


    @Autowired
    private AmqpTemplate amqpTemplate;


    public void send(){
        String context = "hello----"+LocalDateTime.now();
        System.out.println("send:"+context);
        this.amqpTemplate.convertAndSend("hello",context);
    }

    //给hello2发送消息,并接受一个计数参数
    public void send2(int i){
        String context = i+"";
        System.out.println(context+"--send:");
        this.amqpTemplate.convertAndSend("hello2",context);
    }
}
Copier après la connexion
  • two hello2 destinataires

rrreeRreee

test

@Component
@RabbitListener(queues = "hello2")
public class HelloReceiver1


    @RabbitHandler
    public void process(String message){

        System.out.println("Receiver1:"+message);
    }


}
Copier après la connexion

View la sortie de la console:

@Component
@RabbitListener(queues = "hello2")
public class HelloReceiver2

    @RabbitHandler
    public void process(String message){
        System.out.println("Receiver2:"+message);
    }
}
Copier après la connexion

you peut voir: lorsque le message est envoyé à 63, le destinataire Destinataire a reçu le message,
Conclusion :

Un expéditeur, N destinataires, après test, le message sera envoyé à N destinataires de manière uniforme

Many-to-many : Plusieurs expéditeurs à plusieurs destinataires

Nous pouvons injecter deux expéditeurs et les mettre dans la boucle, comme suit :

@Test
    public void manyReceiver(){
        for (int i=0;i<100;i++){
            helloSender.send2(i);
        }

    }
Copier après la connexion

Exécutez le test unitaire et visualisez la sortie de la console :

0--send:
1--send:
2--send:
3--send:
4--send:

...(省略)

58--send:
59--send:
60--send:
61--send:
62--send:
63--send:
Receiver2:1
Receiver1:0
64--send:
65--send:
Receiver1:2
Receiver2:3
66--send:
Receiver1:4
Receiver2:5
...(省略)
Copier après la connexion

Conclusion : tout comme un à plusieurs, le destinataire recevra toujours le message de manière uniforme.

Envoyer l'objet

Nous créons d'abord un objet de classe d'entité Utilisateur, notez qu'il doit implémenter l'interface Serialisable.

@Test
    public void many2many(){
      for (int i=0;i<100;i++){
          helloSender.send2(i);
          helloSender2.send2(i);

      }
    }
Copier après la connexion

Créez ensuite une file d'attente dans le fichier de configuration, appeléeobject_queue

0--send:
0--send:
1--send:
1--send:
2--send:
2--send:
3--send:
3--send:

...(省略)

22--send:
22--send:
23--send:
23--send:
24--send:
24--send:
Receiver2:0
25--send:
25--send:
Receiver2:1
26--send:
Receiver2:2
26--send:
Receiver2:3
27--send:
Receiver1:0
27--send:
Receiver2:4
Receiver1:1
28--send:
Receiver2:5
Receiver1:2
28--send:
Receiver2:6
Receiver1:3
29--send:
Receiver2:7
Receiver1:4
29--send:
Receiver2:8
Receiver1:5
30--send:
Receiver2:9
Receiver1:6
30--send:
31--send:
31--send:
32--send:
32--send:
Copier après la connexion

Voici les deux éléments du Objet utilisateur Un expéditeur ObjectSender et un récepteur ObjectReceiver :

package com.example.rabbitmqdemo.pojo;

import java.io.Serializable;

/**
 * @author itguang
 * @create
public class User implements Serializable


    private String username;
    private String password;

    public User(String username, String password) {
        this.username = username;
        this.password = password;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    @Override
    public String toString() {
        return "User{" +
                "username='" + username + '\'' +
                ", password='" + password + '\'' +
                '}';
    }
}
Copier après la connexion
@Bean
    public Queue queue3(){
        return new Queue("object_queue");
    }
Copier après la connexion

Exécutez le test unitaire et affichez les résultats de sortie de la console :

@Component
public class ObjectSender

    @Autowired
    AmqpTemplate amqpTemplate;

    public void sendUser(User user){

        System.out.println("Send object:"+user.toString());
        this.amqpTemplate.convertAndSend("object_queue",user);

    }
}
Copier après la connexion

Topic Exchange

topic est la méthode la plus flexible de RabbitMQ et peut être librement liée à différents objets en fonction sur la file d'attente de Routing_key

Configurez d'abord les règles du sujet. Deux files d'attente sont utilisées ici pour tester

@Component
@RabbitListener(queues = "object_queue")
public class ObjectReceiver

    @RabbitHandler
    public void objectReceiver(User user){

        System.out.println("Receiver object:"+user.toString());

    }
}
Copier après la connexion

Les expéditeurs de messages : tous deux utilisent topicExchange et sont liés à différentes clés de routage

Send object:User{username='李增光', password='666666'}
Receiver object:User{username='李增光', password='666666'}
Copier après la connexion

Deux destinataires de messages spécifient respectivement des files d'attente différentes

package com.example.rabbitmqdemo.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author itguang
 * @create
@Configuration
public class TopicRabbitConfig


    final static String message = "topic.message";
    final static String messages = "topic.messages";


    //创建两个 Queue
    @Bean
    public Queue queueMessage(){
        return new Queue(TopicRabbitConfig.message);
    }

    @Bean
    public Queue queueMessages(){
        return new Queue(TopicRabbitConfig.messages);
    }

    //配置 TopicExchange,指定名称为 topicExchange
    @Bean
    public TopicExchange exchange(){
        return new TopicExchange("topicExchange");
    }

    //给队列绑定 exchange 和 routing_key

    @Bean
    public Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange){
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    @Bean
    public Binding bingingExchangeMessages(Queue queueMessages,TopicExchange exchange){
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }


}
Copier après la connexion
package com.example.rabbitmqdemo.topic;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author itguang
 * @create
@Component
public class TopicSender

    @Autowired
    AmqpTemplate amqpTemplate;

    public void send1(){
        String context = "hi, i am message 1";
        System.out.println("Sender : " + context);
        amqpTemplate.convertAndSend("topicExchange","topic.message",context);
    }

    public void send2() {
        String context = "hi, i am messages 2";
        System.out.println("Sender : " + context);
        amqpTemplate.convertAndSend("topicExchange", "topic.messages", context);
    }
}
Copier après la connexion

Test :

L'envoi de send1 correspondra au topic.# et au topic.message. Les deux récepteurs peuvent recevoir des messages, l'envoi de send2 uniquement au topic.# peut correspondre à tous les messages que seul le Receiver2 écoute

Fanout Exchange

Fanout est le mode de diffusion ou mode d'abonnement que nous connaissons. Il envoie un message au commutateur Fanout, et toutes les files d'attente liées à ce commutateur reçoivent ce message.

Configuration liée à Fanout :

package com.example.rabbitmqdemo.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author itguang
 * @create
@Component
@RabbitListener(queues = "topic.message")
public class TopicReceiver1

    @RabbitHandler
    public void process(String message){

        System.out.println("Receiver topic.message :"+ message);

    }

}
Copier après la connexion

Expéditeur du message :

这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略

package com.example.rabbitmqdemo.fanout;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author itguang
 * @create
@Component
public class FanoutSender


    @Autowired
    AmqpTemplate amqpTemplate;


    public void send(){

        String context = "hi, fanout msg ";
        System.out.println("Sender : " + context);
        //这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略:
        amqpTemplate.convertAndSend("fanoutExchange","", context);

    }

}
Copier après la connexion

三个消息接受者:

@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA


    @RabbitHandler
    public void process(String message){

        System.out.println("Receiver form fanout.A: "+message);

    }

}

@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB


    @RabbitHandler
    public void process(String message){

        System.out.println("Receiver form fanout.B: "+message);

    }

}

@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC


    @RabbitHandler
    public void process(String message){

        System.out.println("Receiver form fanout.C: "+message);

    }

}
Copier après la connexion

运行单元测试,查看结果:

Sender : hi, fanout msg 

Receiver form fanout.C: hi, fanout msg 
Receiver form fanout.A: hi, fanout msg 
Receiver form fanout.B: hi, fanout msg
Copier après la connexion

结果说明,绑定到fanout交换机上面的队列都收到了消息.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Étiquettes associées:
source:yisu.com
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal