Advanced Message Queuing Protocol (AMQP) est un protocole câblé neutre en termes de plate-forme pour les middlewares de messages. Le projet Spring AMQP applique les concepts fondamentaux de Spring au développement de solutions de messagerie basées sur AMQP. Spring Boot offre certaines commodités pour travailler avec AMQP via RabbitMQ, notamment le "Starter" spring-boot-starter-amqp.
Il est très simple d'intégrer RabbitMQ à springboot Si vous l'utilisez avec très peu de configuration, springboot fournit divers supports pour les messages du projet spring-boot-starter-amqp.
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
RabbitMQ est un courtier de messages léger, fiable, évolutif et portable basé sur le protocole AMQP. Spring utilise RabbitMQ pour communiquer via le protocole AMQP.
La configuration de RabbitMQ est contrôlée par les propriétés de configuration externes spring.rabbitmq.*. Par exemple, vous pouvez déclarer l'application.properties suivante dans la section suivante :
spring.rabbitmq.host = localhost spring.rabbitmq.port = 5672 spring.rabbitmq.username = guest spring.rabbitmq.password
package com.example.rabbitmqdemo.config; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * RabbitMQ 配置类 * * @author itguang * @create @Configuration public class RabbitConfig @Bean public Queue queue(){ return new Queue("hello"); } }
rabbitTemplate est l'implémentation par défaut fournie par springboot
package com.example.rabbitmqdemo.rabbitmq; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.util.Date; /** * 消息发送者 * * @author itguang * @create @Component public class HelloSender @Autowired private AmqpTemplate amqpTemplate; public void send(){ String context = "hello----"+LocalDateTime.now(); System.out.println("send:"+context); //往名称为 hello 的queue中发送消息 this.amqpTemplate.convertAndSend("hello",context); } }
package com.example.rabbitmqdemo.rabbitmq; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 消息接受者 * * @author itguang * @create @Component @RabbitListener(queues = "hello") //监听 名称为 hello 的queue public class HelloReceiver //消息处理器 @RabbitHandler public void process(String message){ System.out.println("Receiver:"+message); } }
package com.example.rabbitmqdemo; import com.example.rabbitmqdemo.rabbitmq.HelloSender; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class RabbitmqdemoApplicationTests @Autowired HelloSender helloSender; @Test public void contextLoads() { helloSender.send(); } }
Afficher les résultats de sortie de la console
send:hello----2018-04-21T11:29:47.739 Receiver:hello----2018-04-21T11:29:47.739
J'ai apporté une petite modification au code ci-dessus. L'extrémité de réception a enregistré deux récepteurs, Receiver1 et Receiver2, et le récepteur. L'extrémité d'envoi a rejoint le nombre de paramètres, l'extrémité de réception imprime les paramètres reçus, ce qui suit est le code de test, envoyez une centaine de messages pour observer l'effet d'exécution des deux extrémités de réception
Ajoutez une file d'attente appelée hello2
package com.example.rabbitmqdemo.config; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * RabbitMQ 配置类 * * @author itguang * @create @Configuration public class RabbitConfig @Bean public Queue queue(){ return new Queue("hello"); } @Bean public Queue queue2(){ return new Queue("hello2"); } }
Send à la file d'attente Hello2 Message, accepte un paramètre de nombre
package com.example.rabbitmqdemo.rabbitmq; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.util.Date; /** * 消息发送者 * * @author itguang * @create @Component public class HelloSender @Autowired private AmqpTemplate amqpTemplate; public void send(){ String context = "hello----"+LocalDateTime.now(); System.out.println("send:"+context); this.amqpTemplate.convertAndSend("hello",context); } //给hello2发送消息,并接受一个计数参数 public void send2(int i){ String context = i+""; System.out.println(context+"--send:"); this.amqpTemplate.convertAndSend("hello2",context); } }
two hello2 destinataires
@Component @RabbitListener(queues = "hello2") public class HelloReceiver1 @RabbitHandler public void process(String message){ System.out.println("Receiver1:"+message); } }
View la sortie de la console:
@Component @RabbitListener(queues = "hello2") public class HelloReceiver2 @RabbitHandler public void process(String message){ System.out.println("Receiver2:"+message); } }
you peut voir: lorsque le message est envoyé à 63, le destinataire Destinataire a reçu le message,
Conclusion :
Un expéditeur, N destinataires, après test, le message sera envoyé à N destinataires de manière uniforme
Nous pouvons injecter deux expéditeurs et les mettre dans la boucle, comme suit :
@Test public void manyReceiver(){ for (int i=0;i<100;i++){ helloSender.send2(i); } }
Exécutez le test unitaire et visualisez la sortie de la console :
0--send: 1--send: 2--send: 3--send: 4--send: ...(省略) 58--send: 59--send: 60--send: 61--send: 62--send: 63--send: Receiver2:1 Receiver1:0 64--send: 65--send: Receiver1:2 Receiver2:3 66--send: Receiver1:4 Receiver2:5 ...(省略)
Conclusion : tout comme un à plusieurs, le destinataire recevra toujours le message de manière uniforme.
Nous créons d'abord un objet de classe d'entité Utilisateur, notez qu'il doit implémenter l'interface Serialisable.
@Test public void many2many(){ for (int i=0;i<100;i++){ helloSender.send2(i); helloSender2.send2(i); } }
Créez ensuite une file d'attente dans le fichier de configuration, appeléeobject_queue
0--send: 0--send: 1--send: 1--send: 2--send: 2--send: 3--send: 3--send: ...(省略) 22--send: 22--send: 23--send: 23--send: 24--send: 24--send: Receiver2:0 25--send: 25--send: Receiver2:1 26--send: Receiver2:2 26--send: Receiver2:3 27--send: Receiver1:0 27--send: Receiver2:4 Receiver1:1 28--send: Receiver2:5 Receiver1:2 28--send: Receiver2:6 Receiver1:3 29--send: Receiver2:7 Receiver1:4 29--send: Receiver2:8 Receiver1:5 30--send: Receiver2:9 Receiver1:6 30--send: 31--send: 31--send: 32--send: 32--send:
Voici les deux éléments du Objet utilisateur Un expéditeur ObjectSender et un récepteur ObjectReceiver :
package com.example.rabbitmqdemo.pojo; import java.io.Serializable; /** * @author itguang * @create public class User implements Serializable private String username; private String password; public User(String username, String password) { this.username = username; this.password = password; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } @Override public String toString() { return "User{" + "username='" + username + '\'' + ", password='" + password + '\'' + '}'; } }
@Bean public Queue queue3(){ return new Queue("object_queue"); }
Exécutez le test unitaire et affichez les résultats de sortie de la console :
@Component public class ObjectSender @Autowired AmqpTemplate amqpTemplate; public void sendUser(User user){ System.out.println("Send object:"+user.toString()); this.amqpTemplate.convertAndSend("object_queue",user); } }
topic est la méthode la plus flexible de RabbitMQ et peut être librement liée à différents objets en fonction sur la file d'attente de Routing_key
Configurez d'abord les règles du sujet. Deux files d'attente sont utilisées ici pour tester
@Component @RabbitListener(queues = "object_queue") public class ObjectReceiver @RabbitHandler public void objectReceiver(User user){ System.out.println("Receiver object:"+user.toString()); } }
Les expéditeurs de messages : tous deux utilisent topicExchange et sont liés à différentes clés de routage
Send object:User{username='李增光', password='666666'} Receiver object:User{username='李增光', password='666666'}
Deux destinataires de messages spécifient respectivement des files d'attente différentes
package com.example.rabbitmqdemo.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author itguang * @create @Configuration public class TopicRabbitConfig final static String message = "topic.message"; final static String messages = "topic.messages"; //创建两个 Queue @Bean public Queue queueMessage(){ return new Queue(TopicRabbitConfig.message); } @Bean public Queue queueMessages(){ return new Queue(TopicRabbitConfig.messages); } //配置 TopicExchange,指定名称为 topicExchange @Bean public TopicExchange exchange(){ return new TopicExchange("topicExchange"); } //给队列绑定 exchange 和 routing_key @Bean public Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange){ return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); } @Bean public Binding bingingExchangeMessages(Queue queueMessages,TopicExchange exchange){ return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#"); } }
package com.example.rabbitmqdemo.topic; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author itguang * @create @Component public class TopicSender @Autowired AmqpTemplate amqpTemplate; public void send1(){ String context = "hi, i am message 1"; System.out.println("Sender : " + context); amqpTemplate.convertAndSend("topicExchange","topic.message",context); } public void send2() { String context = "hi, i am messages 2"; System.out.println("Sender : " + context); amqpTemplate.convertAndSend("topicExchange", "topic.messages", context); } }
Test :
L'envoi de send1 correspondra au topic.# et au topic.message. Les deux récepteurs peuvent recevoir des messages, l'envoi de send2 uniquement au topic.# peut correspondre à tous les messages que seul le Receiver2 écoute
Fanout est le mode de diffusion ou mode d'abonnement que nous connaissons. Il envoie un message au commutateur Fanout, et toutes les files d'attente liées à ce commutateur reçoivent ce message.
Configuration liée à Fanout :
package com.example.rabbitmqdemo.topic; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @author itguang * @create @Component @RabbitListener(queues = "topic.message") public class TopicReceiver1 @RabbitHandler public void process(String message){ System.out.println("Receiver topic.message :"+ message); } }
Expéditeur du message :
这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略
package com.example.rabbitmqdemo.fanout; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author itguang * @create @Component public class FanoutSender @Autowired AmqpTemplate amqpTemplate; public void send(){ String context = "hi, fanout msg "; System.out.println("Sender : " + context); //这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略: amqpTemplate.convertAndSend("fanoutExchange","", context); } }
三个消息接受者:
@Component @RabbitListener(queues = "fanout.A") public class FanoutReceiverA @RabbitHandler public void process(String message){ System.out.println("Receiver form fanout.A: "+message); } } @Component @RabbitListener(queues = "fanout.B") public class FanoutReceiverB @RabbitHandler public void process(String message){ System.out.println("Receiver form fanout.B: "+message); } } @Component @RabbitListener(queues = "fanout.C") public class FanoutReceiverC @RabbitHandler public void process(String message){ System.out.println("Receiver form fanout.C: "+message); } }
运行单元测试,查看结果:
Sender : hi, fanout msg Receiver form fanout.C: hi, fanout msg Receiver form fanout.A: hi, fanout msg Receiver form fanout.B: hi, fanout msg
结果说明,绑定到fanout交换机上面的队列都收到了消息.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!