最新のアプリケーションが複雑になるにつれて、メッセージングは強力なツールになりました。この分野では、RabbitMQ は、異なるアプリケーション間でメッセージを配信するために使用できるメッセージ ブローカーとして非常に人気があります。
この記事では、Go 言語で RabbitMQ を使用する方法を検討します。このガイドでは次の内容について説明します:
RabbitMQ の概要
RabbitMQ は、オープン ソースのメッセージ ブローカー ミドルウェアです。これは、アプリケーション間のメッセージングを簡単に処理できる、信頼性が高く、可用性が高く、スケーラブルで使いやすいブローカーです。
RabbitMQ は、AMQP、MQTT、STOMP などの複数のメッセージング プロトコルをサポートしています。また、メッセージ配布、メッセージ永続化、メッセージ確認応答、メッセージ キューイングなどの多くの高度な機能も備えています。
RabbitMQ のインストール
RabbitMQ を使用する前に、次のコマンドを使用してインストールする必要があります:
$ sudo apt-get install rabbitmq-server
RabbitMQ を Docker コンテナとしてインストールすることもできます。詳細については、公式ウェブサイトをご覧ください。
RabbitMQ の基本概念
RabbitMQ の使用を開始する前に、いくつかの基本概念を理解しましょう。
Go 言語で RabbitMQ を始める
Go 言語を使用して、簡単な RabbitMQ プロデューサーとコンシューマーを作成してみましょう。
まず、Go 言語用の RabbitMQ クライアントをインストールする必要があります:
$ go get github.com/streadway/amqp
次に、RabbitMQ の単純なプロデューサーとして次のコードを使用します:
package main import ( "log" "github.com/streadway/amqp" ) func main() { // 连接RabbitMQ服务器 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() // 建立通道 ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() // 声明一个队列 q, err := ch.QueueDeclare( "hello", // 队列名称 false, // 是否持久化 false, // 是否自动删除 false, // 是否具有排他性 false, // 是否阻塞处理 nil, // 额外的参数 ) if err != nil { log.Fatalf("Failed to declare a queue: %v", err) } // 发送两条消息 for _, body := range []string{"Hello", "World"} { err = ch.Publish( "", // 交换机名称 q.Name, // 队列名称 false, // 是否强制遵循新的基于名称的路由规则 false, // 是否立即将消息传递给消费者 amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }, ) if err != nil { log.Fatalf("Failed to publish a message: %v", err) } log.Printf("Sent a message: %v", body) } }
たとえば、「hello」という名前のキューに接続し、for ループを使用して 2 つのメッセージをキューに送信します。各メッセージは、Publishing 構造でラップされた単純な文字列です。 ch.Publish()
このメソッドは、メッセージをキューにパブリッシュするために使用されます。
ここで、これらのメッセージを受信するコンシューマを作成しましょう:
package main import ( "log" "github.com/streadway/amqp" ) func main() { // 连接RabbitMQ服务器 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() // 建立通道 ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() // 声明一个队列 q, err := ch.QueueDeclare( "hello", // 队列名称 false, // 是否持久化 false, // 是否自动删除 false, // 是否具有排他性 false, // 是否阻塞处理 nil, // 额外的参数 ) if err != nil { log.Fatalf("Failed to declare a queue: %v", err) } // 将通道设置为接收模式 msgs, err := ch.Consume( q.Name, // 队列名称 "", // 消费者名称 true, // 自动确认消息 false, // 是否具有排他性 false, // 是否阻塞处理 false, // 额外参数 ) if err != nil { log.Fatalf("Failed to register a consumer: %v", err) } // 使用通道消费消息 forever := make(chan bool) go func() { for d := range msgs { log.Printf("Received a message: %v", string(d.Body)) } }() log.Printf("Waiting for messages...") <-forever }
このコンシューマの例では、「hello」という名前のキューからメッセージを取得し、エコーを実行します。この例では、チャネルの ch.Consume()
メソッドを使用してキューからメッセージを消費します。
RabbitMQ と Go 言語の高度な使用法
このセクションでは、RabbitMQ と Go 言語の高度な使用法について説明します。
まず、RabbitMQ と Go 言語を使用してメッセージ確認を実装する方法を見てみましょう。メッセージ確認応答は通常、メッセージが正しく処理されたことを確認するために使用されます。
まず、コンシューマ側で確認モードをオンにする必要があります:
msgs, err := ch.Consume( q.Name, // 队列名称 "", // 消费者名称 false, // 自动确认消息 false, // 是否具有排他性 false, // 是否阻塞处理 false, // 额外参数 )
手動確認モードでは、コンシューマ側で各メッセージを明示的に確認する必要があります:
for d := range msgs { log.Printf("Received a message: %v", string(d.Body)) // 确认消息 d.Ack(false) }
RabbitMQ の RPC モードを使用して分散 RPC 呼び出しを実装することもできます。 RPC モードでは、クライアント アプリケーションが RabbitMQ にリクエストを送信し、RabbitMQ がリクエストを適切なサーバーに転送し、サーバーがリクエストを処理して応答を返します。
まず、RPC リクエストを受信するスイッチを宣言する必要があります:
ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() // 声明一个Direct类型的交换机 err = ch.ExchangeDeclare( "rpc_exchange", // 交换机名称 "direct", // 交换机类型 true, // 是否持久化 false, // 是否自动删除 false, // 是否具有排他性 false, // 是否阻塞处理 nil, // 额外参数 ) if err != nil { log.Fatalf("Failed to declare a exchange: %v", err) } // 声明一个接收RPC请求的队列 q, err := ch.QueueDeclare("", false, false, true, false, nil) if err != nil { log.Fatalf("Failed to declare a queue: %v", err) } // 将队列绑定到交换机 err = ch.QueueBind( q.Name, // 队列名称 "rpc_routing", // 路由键 "rpc_exchange", // 交换机名称 false, // 是否强制遵循新的基于名称的路由规则 nil, // 额外参数 ) if err != nil { log.Fatalf("Failed to bind a queue: %v", err) }
次に、RPC リクエストを処理する関数を作成します:
func rpcHandler(body []byte) []byte { log.Printf("Received RPC request: %s", string(body)) // 处理请求 result := []byte("Hello, World!") return result }
次に、次のことを行う必要があります。 write RPC リクエストを受信するコンシューマ:
msg, ok, err := ch.Get(q.Name, true) if err != nil { log.Fatalf("Failed to handle RPC request: %v", err) } if !ok { log.Printf("No messages available") return } // 处理RPC请求 response := rpcHandler(msg.Body) // 发送RPC响应 err = ch.Publish( "", // 交换机名称 msg.ReplyTo, // 回调队列名称 false, // 是否强制遵循新的基于名称的路由规则 false, // 是否立即将消息传递给消费者 amqp.Publishing{ // 发布消息 ContentType: "text/plain", CorrelationId: msg.CorrelationId, Body: response, }, ) if err != nil { log.Fatalf("Failed to publish a message: %v", err) } log.Printf("Sent RPC response: %v", string(response))
この例では、ch.Get()
メソッドを使用してキューからメッセージを取得し、rpcHandler に送信します。 ()
処理メソッド。処理が完了したら、ch.Publish()
メソッドを使用して応答をクライアントに送り返します。
結論
RabbitMQ は、アプリケーション間のメッセージングを簡単に処理できる強力なツールです。このガイドでは、Go での RabbitMQ の基本と高度な使用法について説明します。この知識を独自のプロジェクトに適用して、RabbitMQ を使用した効率的なメッセージングを行うことができます。
以上がGo での RabbitMQ の使用: 完全ガイドの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。