這篇文章主要介紹了關於NodeJS如何操作訊息佇列RabbitMQ的分析,有著一定的參考價值,現在分享給大家,有需要的朋友可以參考一下
訊息(Message)是指在應用程式間傳送的資料。訊息可以非常簡單,例如只包含文字字串,也可以更複雜,可能包含嵌入物件。
訊息佇列(Message Queue)是一種應用間的通訊方式,訊息傳送後可以立即返回,由訊息系統來確保訊息的可靠傳遞。訊息發佈者只管把訊息發佈到 MQ 中而不用管誰來取,訊息使用者只管從 MQ 取訊息而不管是誰發布的。這樣發布者和使用者都不用知道對方的存在。
RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq。
甚至現在部分NoSQL也可做訊息佇列,如Redis。
上規模的公司都會有自己的日誌分析系統,日誌系統是怎麼實現的呢?
圖解:使用者在存取應用程式的時候,我們要記錄使用者的操作記錄和系統的例外日誌,常規的做法是將系統產生的日誌保存到伺服器磁碟,在伺服器中開啟定時任務,定時將磁碟的日誌資訊傳入mq中(生產者),也定時將mq中的消息取出並存到對應的資料庫,如ElasticSearch或Hive中。
上面的案例介紹了MQ的一個使用場景,我這裡是用RabbitMQ舉例,現實專案中可能用到的是Kafka。
先安裝brew(mac為例)
/usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"
安裝RabbitMQ
brew install rabbitmq
執行RabbitMQ
進入到/usr/local/Cellar/rabbitmq/3.7 .7,執行
sbin/rabbitmq-server
啟動外掛
進入到/usr/local/Cellar/rabbitmq/3.7.7/sbin
./rabbitmq-plugins enable rabbitmq_management
登陸管理介面
開啟瀏覽器輸入:http://localhost:15672,RabbitMQ預設15672連接埠六. Nodejs操作RabbitMQ
#
網路上可以找到好幾個對應的Node SDK,這裡推薦amqplib
1.生產者
/** * 对RabbitMQ的封装 */ let amqp = require('amqplib'); class RabbitMQ { constructor() { this.hosts = []; this.index = 0; this.length = this.hosts.length; this.open = amqp.connect(this.hosts[this.index]); } sendQueueMsg(queueName, msg, errCallBack) { let self = this; self.open .then(function (conn) { return conn.createChannel(); }) .then(function (channel) { return channel.assertQueue(queueName).then(function (ok) { return channel.sendToQueue(queueName, new Buffer(msg), { persistent: true }); }) .then(function (data) { if (data) { errCallBack && errCallBack("success"); channel.close(); } }) .catch(function () { setTimeout(() => { if (channel) { channel.close(); } }, 500) }); }) .catch(function () { let num = self.index++; if (num <= self.length - 1) { self.open = amqp.connect(self.hosts[num]); } else { self.index == 0; } }); } }
2. 消費者
/** * 对RabbitMQ的封装 */ let amqp = require('amqplib'); class RabbitMQ { constructor() { this.open = amqp.connect(this.hosts[this.index]); } receiveQueueMsg(queueName, receiveCallBack, errCallBack) { let self = this; self.open .then(function (conn) { return conn.createChannel(); }) .then(function (channel) { return channel.assertQueue(queueName) .then(function (ok) { return channel.consume(queueName, function (msg) { if (msg !== null) { let data = msg.content.toString(); channel.ack(msg); receiveCallBack && receiveCallBack(data); } }) .finally(function () { setTimeout(() => { if (channel) { channel.close(); } }, 500) }); }) }) .catch(function () { let num = self.index++; if (num <= self.length - 1) { self.open = amqp.connect(self.hosts[num]); } else { self.index = 0; self.open = amqp.connect(self.hosts[0]); } }); }
3. 透過生產者向MQ發送一個訊息,並建立佇列
let mq = new RabbitMQ(); mq.sendQueueMsg('testQueue', 'my first message', (error) => { console.log(error) })
執行之後,我們開啟管理平台,發現RabbbitMQ已經接受到了一個訊息:
並且RabbbitMQ新增了一個佇列testQueue
4. 取得指定佇列的訊息
let mq = new RabbitMQ(); mq.receiveQueueMsg('testQueue',(msg) => { console.log(msg) })// 输出结果:my first message复制代码
此時開啟RabbitMQ管理平台,訊息數量已經變成0
綜上:我們簡單講述了訊息佇列及RabbitMQ相關的一些知識,以及我們如何透過nodejs來生產與消費訊息。
以上就是本文的全部內容,希望對大家的學習有所幫助,更多相關內容請關注PHP中文網!
相關推薦:
#透過node.js來調取baidu-aip -SDK實作身分證辨識的功能
以上是對於NodeJS如何操作訊息佇列RabbitMQ的分析的詳細內容。更多資訊請關注PHP中文網其他相關文章!