Apabila aplikasi moden meningkat dalam kerumitan, pemesejan telah menjadi alat yang berkuasa. Dalam bidang ini, RabbitMQ telah menjadi broker mesej yang sangat popular yang boleh digunakan untuk menghantar mesej antara aplikasi yang berbeza.
Dalam artikel ini, kami akan meneroka cara menggunakan RabbitMQ dalam bahasa Go. Panduan ini akan merangkumi perkara berikut:
Pengenalan kepada RabbitMQ
RabbitMQ ialah perisian tengah broker mesej sumber terbuka. Ia adalah broker yang boleh dipercayai, sangat tersedia, berskala dan mudah digunakan yang membantu anda mengendalikan pemesejan antara aplikasi dengan mudah.
RabbitMQ menyokong berbilang protokol pemesejan, seperti AMQP, MQTT dan STOMP. Ia juga mempunyai banyak ciri lanjutan seperti pengedaran mesej, ketekunan mesej, pengakuan mesej dan baris gilir mesej.
Pemasangan RabbitMQ
Sebelum menggunakan RabbitMQ, anda perlu memasangnya menggunakan arahan berikut:
$ sudo apt-get install rabbitmq-server
Anda juga boleh memasang RabbitMQ sebagai bekas Docker. Untuk maklumat lanjut, sila layari laman web rasmi.
Konsep Asas RabbitMQ
Sebelum mula menggunakan RabbitMQ, mari kita fahami beberapa konsep asas.
Bermula dengan RabbitMQ dalam bahasa Go
Mari kami menulis pengeluar dan pengguna RabbitMQ yang ringkas menggunakan bahasa Go.
Pertama, anda perlu memasang klien RabbitMQ untuk bahasa Go:
$ go get github.com/streadway/amqp
Seterusnya, kami akan menggunakan kod berikut sebagai pengeluar mudah untuk RabbitMQ:
package main import ( "log" "github.com/streadway/amqp" ) func main() { // 连接RabbitMQ服务器 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() // 建立通道 ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() // 声明一个队列 q, err := ch.QueueDeclare( "hello", // 队列名称 false, // 是否持久化 false, // 是否自动删除 false, // 是否具有排他性 false, // 是否阻塞处理 nil, // 额外的参数 ) if err != nil { log.Fatalf("Failed to declare a queue: %v", err) } // 发送两条消息 for _, body := range []string{"Hello", "World"} { err = ch.Publish( "", // 交换机名称 q.Name, // 队列名称 false, // 是否强制遵循新的基于名称的路由规则 false, // 是否立即将消息传递给消费者 amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }, ) if err != nil { log.Fatalf("Failed to publish a message: %v", err) } log.Printf("Sent a message: %v", body) } }
Dalam ini contoh , kami menyambung ke baris gilir bernama "hello" dan menggunakan gelung for untuk menghantar dua mesej ke baris gilir. Setiap mesej ialah rentetan ringkas yang dibalut dalam struktur Penerbitan. Kaedah ch.Publish()
digunakan untuk menerbitkan mesej ke baris gilir.
Sekarang, mari buat pengguna untuk menerima mesej ini:
package main import ( "log" "github.com/streadway/amqp" ) func main() { // 连接RabbitMQ服务器 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() // 建立通道 ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() // 声明一个队列 q, err := ch.QueueDeclare( "hello", // 队列名称 false, // 是否持久化 false, // 是否自动删除 false, // 是否具有排他性 false, // 是否阻塞处理 nil, // 额外的参数 ) if err != nil { log.Fatalf("Failed to declare a queue: %v", err) } // 将通道设置为接收模式 msgs, err := ch.Consume( q.Name, // 队列名称 "", // 消费者名称 true, // 自动确认消息 false, // 是否具有排他性 false, // 是否阻塞处理 false, // 额外参数 ) if err != nil { log.Fatalf("Failed to register a consumer: %v", err) } // 使用通道消费消息 forever := make(chan bool) go func() { for d := range msgs { log.Printf("Received a message: %v", string(d.Body)) } }() log.Printf("Waiting for messages...") <-forever }
Dalam contoh pengguna ini, kami menarik mesej daripada baris gilir bernama "hello" dan kemudian Lakukan gema. Dalam contoh ini, kami menggunakan mesej daripada baris gilir menggunakan kaedah ch.Consume()
saluran.
Penggunaan lanjutan RabbitMQ dan bahasa Go
Dalam bahagian ini, kami akan meneroka penggunaan lanjutan RabbitMQ dan bahasa Go.
Mula-mula, mari kita terokai cara melaksanakan pengesahan mesej menggunakan bahasa RabbitMQ dan Go. Pengakuan mesej biasanya digunakan untuk memastikan bahawa mesej telah diproses dengan betul.
Pertama, kita perlu menghidupkan mod pengesahan di bahagian pengguna:
msgs, err := ch.Consume( q.Name, // 队列名称 "", // 消费者名称 false, // 自动确认消息 false, // 是否具有排他性 false, // 是否阻塞处理 false, // 额外参数 )
Dalam mod pengesahan manual, kami perlu mengesahkan secara eksplisit setiap mesej di sisi pengguna:
for d := range msgs { log.Printf("Received a message: %v", string(d.Body)) // 确认消息 d.Ack(false) }
Kami juga boleh menggunakan mod RPC RabbitMQ untuk melaksanakan panggilan RPC teragih. Dalam mod RPC, aplikasi klien menghantar permintaan kepada RabbitMQ, RabbitMQ memajukan permintaan ke pelayan yang sesuai, dan pelayan memproses permintaan dan mengembalikan respons.
Pertama, kita perlu mengisytiharkan suis untuk menerima permintaan RPC:
ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() // 声明一个Direct类型的交换机 err = ch.ExchangeDeclare( "rpc_exchange", // 交换机名称 "direct", // 交换机类型 true, // 是否持久化 false, // 是否自动删除 false, // 是否具有排他性 false, // 是否阻塞处理 nil, // 额外参数 ) if err != nil { log.Fatalf("Failed to declare a exchange: %v", err) } // 声明一个接收RPC请求的队列 q, err := ch.QueueDeclare("", false, false, true, false, nil) if err != nil { log.Fatalf("Failed to declare a queue: %v", err) } // 将队列绑定到交换机 err = ch.QueueBind( q.Name, // 队列名称 "rpc_routing", // 路由键 "rpc_exchange", // 交换机名称 false, // 是否强制遵循新的基于名称的路由规则 nil, // 额外参数 ) if err != nil { log.Fatalf("Failed to bind a queue: %v", err) }
Seterusnya, kita akan mencipta fungsi untuk mengendalikan permintaan RPC:
func rpcHandler(body []byte) []byte { log.Printf("Received RPC request: %s", string(body)) // 处理请求 result := []byte("Hello, World!") return result }
Kemudian, kita perlu tulis Pengguna untuk menerima permintaan RPC:
msg, ok, err := ch.Get(q.Name, true) if err != nil { log.Fatalf("Failed to handle RPC request: %v", err) } if !ok { log.Printf("No messages available") return } // 处理RPC请求 response := rpcHandler(msg.Body) // 发送RPC响应 err = ch.Publish( "", // 交换机名称 msg.ReplyTo, // 回调队列名称 false, // 是否强制遵循新的基于名称的路由规则 false, // 是否立即将消息传递给消费者 amqp.Publishing{ // 发布消息 ContentType: "text/plain", CorrelationId: msg.CorrelationId, Body: response, }, ) if err != nil { log.Fatalf("Failed to publish a message: %v", err) } log.Printf("Sent RPC response: %v", string(response))
Dalam contoh ini, kami menggunakan kaedah ch.Get()
untuk menarik mesej daripada baris gilir dan menghantarnya ke kaedah rpcHandler()
untuk diproses. Setelah pemprosesan selesai, kami menghantar semula respons kepada pelanggan menggunakan kaedah ch.Publish()
.
Kesimpulan
RabbitMQ ialah alat berkuasa yang membantu anda mengendalikan pemesejan antara aplikasi dengan mudah. Dalam panduan ini, kami merangkumi asas dan penggunaan lanjutan RabbitMQ dalam Go. Kini anda boleh menggunakan pengetahuan ini pada projek anda sendiri untuk pemesejan yang cekap dengan RabbitMQ!
Atas ialah kandungan terperinci Menggunakan RabbitMQ dalam Go: Panduan Lengkap. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!