Kami tahu bahawa baris gilir mesej ialah corak seni bina yang biasa digunakan untuk menyelesaikan masalah seperti pemprosesan tak segerak dan pengagihan tugas, dan RabbitMQ kini merupakan salah satu perisian tengah mesej yang paling banyak digunakan. Dalam aplikasi praktikal, kita mungkin perlu menggunakan Golang untuk melaksanakan pemantauan RabbitMQ Artikel ini akan memperkenalkan cara menggunakan Golang untuk melaksanakan pemantauan RabbitMQ.
Persediaan
Sebelum anda bermula, anda perlu memastikan RabbitMQ telah dipasang. Memandangkan RabbitMQ bergantung pada Erlang, Erlang juga perlu dipasang.
Selepas pemasangan selesai, kami perlu memasang pakej pihak ketiga Golang. Antaranya, pakej AMQP adalah penting, yang membolehkan kami menyambung dan mengendalikan RabbitMQ dengan mudah.
pergi dapatkan github.com/streadway/amqp
Pelaksanaan kod
Pertama, kita perlu menyambung ke RabbitMQ. Selepas sambungan berjaya, kita perlu mengisytiharkan pertukaran bernama "test" dan taip "fanout". Pertukaran adalah bahagian penting dalam penghalaan mesej dalam RabbitMQ Ia bertanggungjawab untuk menerima mesej dan mengedarkannya ke baris gilir. Dalam kes ini, kami akan mengisytiharkan pertukaran yang dipanggil "ujian" dan menetapkan jenisnya kepada "fanout", yang bermaksud bahawa ia akan menyiarkan mesej kepada semua baris gilir yang melanggannya.
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
tunda sambungan .Close()
ch, err := conn.Channel()
failOnError(err, "Gagal membuka saluran")
tunda ch.Close()
err = ch.ExchangeDeclare(
"test", // name "fanout", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
Seterusnya, kita perlu mencipta baharu, tidak berterusan, dengan Queue dengan nama yang dijana secara automatik. Di sini kami akan menggunakan nama baris gilir untuk mengikatnya pada pertukaran "ujian" yang baru kami isytiharkan.
q, err := ch.QueueDeclare(
"", // name false, // durable false, // delete when unused true, // exclusive false, // no-wait nil, // arguments
)
failOnError(err, "Failed to declare a queue")
err = ch.QueueBind(
q.Name, // queue name "", // routing key "test", // exchange false, nil,
)
failOnError(err, "Gagal mengikat baris gilir")
Kini, RabbitMQ sudah bersedia dan kita boleh mula mendengar mesejnya. Kami boleh menggunakan fungsi Consume untuk melaksanakan mendengar mesej, yang membolehkan kami terus menerima mesej daripada baris gilir dan memprosesnya.
msg, err := ch.Consume(
q.Name, // queue name "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args
)
failOnError(err, "Gagal mendaftarkan pengguna")
untuk msg := julat msgs {
log.Printf("Received a message: %s", msg.Body)
}
Dalam kod di atas, kami menggunakan kaedah ch.Consume() untuk mendengar mesej dalam baris gilir yang ditentukan dan mengeluarkan kandungan mesej dengan mencetak log. Perlu diingatkan bahawa kami menggunakan gelung tak terhingga untuk menggunakan pendengaran mesej, yang bermaksud bahawa kami akan terus mendengar baris gilir sehingga program dihentikan atau ralat berlaku.
Kod lengkap adalah seperti berikut:
pakej utama
import (
"log" "github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil { log.Fatalf("%s: %s", msg, err) }
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() err = ch.ExchangeDeclare( "test", // name "fanout", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when unused true, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") err = ch.QueueBind( q.Name, // queue name "", // routing key "test", // exchange false, nil, ) failOnError(err, "Failed to bind a queue") msgs, err := ch.Consume( q.Name, // queue name "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") for msg := range msgs { log.Printf("Received a message: %s", msg.Body) }
}
Ringkasan
Artikel ini memperkenalkan cara menggunakan Golang untuk melaksanakan rabbitmq Untuk mendengar, pertama kita perlu menyambung ke rabbitmq, mengisytiharkan pertukaran, membuat baris gilir dan mengikat baris gilir ke pertukaran, dan akhirnya menggunakan pengguna untuk mendengar mesej dalam baris gilir. Saya harap artikel ini dapat membantu pembangun yang menggunakan Golang untuk pembangunan rabbitmq.
Atas ialah kandungan terperinci Golang melaksanakan pemantauan rabbitmq. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!