SpringBoot は RabbitMq をどのように統合しますか?

WBOY
リリース: 2023-05-25 10:00:38
転載
771 人が閲覧しました

SpringBoot は実際に RabbitMq を統合します

spring-boot-starter-amqp

Advanced Message Queuing Protocol (AMQP) は、メッセージ ミドルウェア用のプラットフォーム中立の有線プロトコルです。 Spring AMQP プロジェクトは、Spring のコア概念を AMQP ベースのメッセージング ソリューションの開発に適用します。 Spring Boot は、spring-boot-starter-amqp "Starter" など、RabbitMQ 経由で AMQP を操作するための便利な機能をいくつか提供します。

RabbitMQ と springboot を統合するのは非常に簡単です。ごくわずかな設定で使用するだけであれば、springboot は spring-boot-starter-amqp プロジェクト内のメッセージに対するさまざまなサポートを提供します。

依存関係の追加

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
ログイン後にコピー

RabbitMQ は、AMQP プロトコルに基づいた、軽量で信頼性が高く、スケーラブルでポータブルなメッセージ ブローカーです。 Spring は RabbitMQ を使用して AMQP プロトコル経由で通信します。

プロパティ構成

RabbitMQ 構成は、外部構成プロパティ spring.rabbitmq.* によって制御されます。たとえば、次の部分的な application.properties を次のように宣言できます:

spring.rabbitmq.host = localhost
 spring.rabbitmq.port = 5672
 spring.rabbitmq.username = guest
 spring.rabbitmq.password
ログイン後にコピー

Quick Start

1. Queue Configuration

package com.example.rabbitmqdemo.config;


import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ 配置类
 *
 * @author itguang
 * @create
@Configuration
public class RabbitConfig

    @Bean
    public Queue queue(){
        return new Queue("hello");
    }
}
ログイン後にコピー

2 Sender

rabbitTemplate Itこれは、springboot によって提供されるデフォルトの実装です。

package com.example.rabbitmqdemo.rabbitmq;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Date;

/**
 * 消息发送者
 *
 * @author itguang
 * @create

@Component
public class HelloSender


    @Autowired
    private AmqpTemplate amqpTemplate;


    public void send(){
        String context = "hello----"+LocalDateTime.now();
        System.out.println("send:"+context);
        //往名称为 hello 的queue中发送消息
        this.amqpTemplate.convertAndSend("hello",context);
    }

}
ログイン後にコピー

3 Receiver

package com.example.rabbitmqdemo.rabbitmq;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消息接受者
 *
 * @author itguang
 * @create

@Component
@RabbitListener(queues = "hello") //监听 名称为 hello 的queue
public class HelloReceiver

    //消息处理器
    @RabbitHandler
    public void process(String message){
        System.out.println("Receiver:"+message);

    }


}
ログイン後にコピー

Test

package com.example.rabbitmqdemo;

import com.example.rabbitmqdemo.rabbitmq.HelloSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqdemoApplicationTests

    @Autowired
    HelloSender helloSender;

    @Test
    public void contextLoads() {
        helloSender.send();

    }
}
ログイン後にコピー

コンソール出力の表示

send:hello----2018-04-21T11:29:47.739
Receiver:hello----2018-04-21T11:29:47.739
ログイン後にコピー

1 対多の送信: 1複数の受信機を持つ送信者

# は、上記のコードに少し変更を加えました。受信側は 2 つの受信機、Receiver1 と Receiver2 を登録しました。送信側はパラメータ数を追加し、受信側は受信したパラメータを出力しました。以下はテスト コードでは、100 個のメッセージを送信して、2 つの受信側の実行効果を観察します。

  • hello2

package com.example.rabbitmqdemo.config;


import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ 配置类
 *
 * @author itguang
 * @create
@Configuration
public class RabbitConfig



    @Bean
    public Queue queue(){
        return new Queue("hello");
    }

    @Bean
    public Queue queue2(){
        return new Queue("hello2");
    }



}
ログイン後にコピー
    ## というキューを追加します。
  • #メッセージを hello2 キューに送信し、カウント パラメーターを受け入れます

package com.example.rabbitmqdemo.rabbitmq;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Date;

/**
 * 消息发送者
 *
 * @author itguang
 * @create

@Component
public class HelloSender


    @Autowired
    private AmqpTemplate amqpTemplate;


    public void send(){
        String context = "hello----"+LocalDateTime.now();
        System.out.println("send:"+context);
        this.amqpTemplate.convertAndSend("hello",context);
    }

    //给hello2发送消息,并接受一个计数参数
    public void send2(int i){
        String context = i+"";
        System.out.println(context+"--send:");
        this.amqpTemplate.convertAndSend("hello2",context);
    }
}
ログイン後にコピー
  • 2 つの hello2 受信者

@Component
@RabbitListener(queues = "hello2")
public class HelloReceiver1


    @RabbitHandler
    public void process(String message){

        System.out.println("Receiver1:"+message);
    }


}
ログイン後にコピー
@Component
@RabbitListener(queues = "hello2")
public class HelloReceiver2

    @RabbitHandler
    public void process(String message){
        System.out.println("Receiver2:"+message);
    }
}
ログイン後にコピー
テスト

@Test
    public void manyReceiver(){
        for (int i=0;i<100;i++){
            helloSender.send2(i);
        }

    }
ログイン後にコピー

コンソール出力を確認します:

0--send:
1--send:
2--send:
3--send:
4--send:

...(省略)

58--send:
59--send:
60--send:
61--send:
62--send:
63--send:
Receiver2:1
Receiver1:0
64--send:
65--send:
Receiver1:2
Receiver2:3
66--send:
Receiver1:4
Receiver2:5
...(省略)
ログイン後にコピー
メッセージが 63 に送信されると、受信側 Receiver がメッセージを受信したことがわかります。

結論:

1 人の送信者、N 人の受信者、テスト後、メッセージは N 人の受信者に均等に送信されます

多対多: 複数の送信者複数の受信者の場合

次のように 2 つのセンダーを挿入してループに入れることができます:

@Test
    public void many2many(){
      for (int i=0;i<100;i++){
          helloSender.send2(i);
          helloSender2.send2(i);

      }
    }
ログイン後にコピー
単体テストを実行してコンソール出力を表示します:

0--send:
0--send:
1--send:
1--send:
2--send:
2--send:
3--send:
3--send:

...(省略)

22--send:
22--send:
23--send:
23--send:
24--send:
24--send:
Receiver2:0
25--send:
25--send:
Receiver2:1
26--send:
Receiver2:2
26--send:
Receiver2:3
27--send:
Receiver1:0
27--send:
Receiver2:4
Receiver1:1
28--send:
Receiver2:5
Receiver1:2
28--send:
Receiver2:6
Receiver1:3
29--send:
Receiver2:7
Receiver1:4
29--send:
Receiver2:8
Receiver1:5
30--send:
Receiver2:9
Receiver1:6
30--send:
31--send:
31--send:
32--send:
32--send:
ログイン後にコピー
結論: 次のようになります。 1 対多の場合でも、受信側はメッセージを均等に受信します。

#送信オブジェクト

#最初にエンティティ クラス オブジェクト User を作成し、Serializable インターフェイスを実装する必要があることに注意してください。

package com.example.rabbitmqdemo.pojo;

import java.io.Serializable;

/**
 * @author itguang
 * @create
public class User implements Serializable


    private String username;
    private String password;

    public User(String username, String password) {
        this.username = username;
        this.password = password;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    @Override
    public String toString() {
        return "User{" +
                "username='" + username + '\'' +
                ", password='" + password + '\'' +
                '}';
    }
}
ログイン後にコピー

次に、構成ファイル内に

object_queue

<div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:php;toolbar:false">@Bean     public Queue queue3(){         return new Queue(&quot;object_queue&quot;);     }</pre><div class="contentsignin">ログイン後にコピー</div></div> という名前の別のキューを作成します。次に、User オブジェクトの 2 つの送信側 ObjectSender と受信側 ObjectReceiver を示します。

@Component
public class ObjectSender

    @Autowired
    AmqpTemplate amqpTemplate;

    public void sendUser(User user){

        System.out.println("Send object:"+user.toString());
        this.amqpTemplate.convertAndSend("object_queue",user);

    }
}
ログイン後にコピー
@Component
@RabbitListener(queues = "object_queue")
public class ObjectReceiver

    @RabbitHandler
    public void objectReceiver(User user){

        System.out.println("Receiver object:"+user.toString());

    }
}
ログイン後にコピー

単体テストを実行し、コンソール出力を表示します。

Send object:User{username='李增光', password='666666'}
Receiver object:User{username='李增光', password='666666'}
ログイン後にコピー

Topic Exchange

topic は RabbitMQ で最も柔軟な方法であり、routing_key のバインドに応じて自由に変更できます。キュー

最初にトピック ルールを設定します。ここでは 2 つのキューを使用してテストします。

package com.example.rabbitmqdemo.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author itguang
 * @create
@Configuration
public class TopicRabbitConfig


    final static String message = "topic.message";
    final static String messages = "topic.messages";


    //创建两个 Queue
    @Bean
    public Queue queueMessage(){
        return new Queue(TopicRabbitConfig.message);
    }

    @Bean
    public Queue queueMessages(){
        return new Queue(TopicRabbitConfig.messages);
    }

    //配置 TopicExchange,指定名称为 topicExchange
    @Bean
    public TopicExchange exchange(){
        return new TopicExchange("topicExchange");
    }

    //给队列绑定 exchange 和 routing_key

    @Bean
    public Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange){
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    @Bean
    public Binding bingingExchangeMessages(Queue queueMessages,TopicExchange exchange){
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }


}
ログイン後にコピー

メッセージ送信者: どちらも topicExchange を使用し、異なる routing_key にバインドします。

package com.example.rabbitmqdemo.topic;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author itguang
 * @create
@Component
public class TopicSender

    @Autowired
    AmqpTemplate amqpTemplate;

    public void send1(){
        String context = "hi, i am message 1";
        System.out.println("Sender : " + context);
        amqpTemplate.convertAndSend("topicExchange","topic.message",context);
    }

    public void send2() {
        String context = "hi, i am messages 2";
        System.out.println("Sender : " + context);
        amqpTemplate.convertAndSend("topicExchange", "topic.messages", context);
    }
}
ログイン後にコピー

2 つのメッセージ受信者、それぞれ異なるキューを指定します

package com.example.rabbitmqdemo.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author itguang
 * @create
@Component
@RabbitListener(queues = "topic.message")
public class TopicReceiver1

    @RabbitHandler
    public void process(String message){

        System.out.println("Receiver topic.message :"+ message);

    }

}
ログイン後にコピー
package com.example.rabbitmqdemo.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author itguang
 * @create
@Component
@RabbitListener(queues = "topic.messages")
public class TopicReceiver2

    @RabbitHandler
    public void process(String message){

        System.out.println("Receiver topic.messages: "+ message);

    }

}
ログイン後にコピー

Test:

send1 の送信は topic.# と topic.message の両方に一致します 受信者はメッセージを受信でき、送信 send2 のみ topic.# はすべてのメッセージに一致しますReceiver2 のみがリッスンする

Fanout Exchange

Fanout は、よく知られたブロードキャスト モードまたはサブスクリプション モードであり、メッセージを Fanout スイッチに送信し、すべてのキューがバインドされます。このスイッチにこのメッセージを受信します。

ファンアウト関連の設定:

package com.example.rabbitmqdemo.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.security.PublicKey;

/**
 * @author itguang
 * @create
@Configuration
public class FanOutRabbitMq


    //创建三个队列

    @Bean
    public Queue AMessage() {
        return new Queue("fanout.A");
    }

    @Bean
    public Queue BMessage() {
        return new Queue("fanout.B");
    }

    @Bean
    public Queue CMessage() {
        return new Queue("fanout.C");
    }


    //创建exchange,指定交换策略

    @Bean
    public FanoutExchange fanoutExchange() {

        return new FanoutExchange("fanoutExchange");
    }


    //分别给三个队列指定exchange,这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略:

    @Bean
    public Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(AMessage).to(fanoutExchange);

    }

    @Bean
    public Binding bindingExchangeB(Queue BMessage,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(BMessage).to(fanoutExchange);

    }

    @Bean
    Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
        return
ログイン後にコピー

メッセージ送信者:

这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略

package com.example.rabbitmqdemo.fanout;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author itguang
 * @create
@Component
public class FanoutSender


    @Autowired
    AmqpTemplate amqpTemplate;


    public void send(){

        String context = "hi, fanout msg ";
        System.out.println("Sender : " + context);
        //这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略:
        amqpTemplate.convertAndSend("fanoutExchange","", context);

    }

}
ログイン後にコピー

三个消息接受者:

@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA


    @RabbitHandler
    public void process(String message){

        System.out.println("Receiver form fanout.A: "+message);

    }

}

@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB


    @RabbitHandler
    public void process(String message){

        System.out.println("Receiver form fanout.B: "+message);

    }

}

@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC


    @RabbitHandler
    public void process(String message){

        System.out.println("Receiver form fanout.C: "+message);

    }

}
ログイン後にコピー

运行单元测试,查看结果:

Sender : hi, fanout msg 

Receiver form fanout.C: hi, fanout msg 
Receiver form fanout.A: hi, fanout msg 
Receiver form fanout.B: hi, fanout msg
ログイン後にコピー

结果说明,绑定到fanout交换机上面的队列都收到了消息.

以上がSpringBoot は RabbitMq をどのように統合しますか?の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

関連ラベル:
ソース:yisu.com
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート