如何基於Hyperf實現RabbitMQ+WebSocket訊息推播
介紹
基於 Hyperf WebSocket RabbitMQ 實作的一個簡單大螢幕的訊息推送。
思路
利用 WebSocket 協定讓客戶端和伺服器端保持有狀態的長鏈接,
儲存連結上來的客戶端 id。訂閱發布者發布的訊息針對已儲存的客戶端 id 進行廣播訊息。
WebSocket 服務
composer require hyperf/websocket-server
設定檔[config/autoload/server.php]
<?php return [ 'mode' => SWOOLE_PROCESS, 'servers' => [ [ 'name' => 'http', 'type' => Server::SERVER_HTTP, 'host' => '0.0.0.0', 'port' => 11111, 'sock_type' => SWOOLE_SOCK_TCP, 'callbacks' => [ SwooleEvent::ON_REQUEST => [Hyperf\HttpServer\Server::class, 'onRequest'], ], ], [ 'name' => 'ws', 'type' => Server::SERVER_WEBSOCKET, 'host' => '0.0.0.0', 'port' => 12222, 'sock_type' => SWOOLE_SOCK_TCP, 'callbacks' => [ SwooleEvent::ON_HAND_SHAKE => [Hyperf\WebSocketServer\Server::class, 'onHandShake'], SwooleEvent::ON_MESSAGE => [Hyperf\WebSocketServer\Server::class, 'onMessage'], SwooleEvent::ON_CLOSE => [Hyperf\WebSocketServer\Server::class, 'onClose'], ], ], ],
#WebSocket 伺服器端程式碼範例
<?php declare(strict_types=1); /** * This file is part of Hyperf. * * @link https://www.hyperf.io * @document https://doc.hyperf.io * @contact group@hyperf.io * @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE */ namespace App\Controller; use Hyperf\Contract\OnCloseInterface; use Hyperf\Contract\OnMessageInterface; use Hyperf\Contract\OnOpenInterface; use Swoole\Http\Request; use Swoole\Server; use Swoole\Websocket\Frame; use Swoole\WebSocket\Server as WebSocketServer; class WebSocketController extends Controller implements OnMessageInterface, OnOpenInterface, OnCloseInterface { /** * 发送消息 * @param WebSocketServer $server * @param Frame $frame */ public function onMessage(WebSocketServer $server, Frame $frame): void { //心跳刷新缓存 $redis = $this->container->get(\Redis::class); //获取所有的客户端id $fdList = $redis->sMembers('websocket_sjd_1'); //如果当前客户端在客户端集合中,就刷新 if (in_array($frame->fd, $fdList)) { $redis->sAdd('websocket_sjd_1', $frame->fd); $redis->expire('websocket_sjd_1', 7200); } $server->push($frame->fd, 'Recv: ' . $frame->data); } /** * 客户端失去链接 * @param Server $server * @param int $fd * @param int $reactorId */ public function onClose(Server $server, int $fd, int $reactorId): void { //删掉客户端id $redis = $this->container->get(\Redis::class); //移除集合中指定的value $redis->sRem('websocket_sjd_1', $fd); var_dump('closed'); } /** * 客户端链接 * @param WebSocketServer $server * @param Request $request */ public function onOpen(WebSocketServer $server, Request $request): void { //保存客户端id $redis = $this->container->get(\Redis::class); $res1 = $redis->sAdd('websocket_sjd_1', $request->fd); var_dump($res1); $res = $redis->expire('websocket_sjd_1', 7200); var_dump($res); $server->push($request->fd, 'Opened'); } }
WebSocket 前端程式碼
function WebSocketTest() { if ("WebSocket" in window) { console.log("您的浏览器支持 WebSocket!"); var num = 0 // 打开一个 web socket var ws = new WebSocket("ws://127.0.0.1:12222"); ws.onopen = function () { // Web Socket 已连接上,使用 send() 方法发送数据 //alert("数据发送中..."); //ws.send("发送数据"); }; window.setInterval(function () { //每隔5秒钟发送一次心跳,避免websocket连接因超时而自动断开 var ping = {"type": "ping"}; ws.send(JSON.stringify(ping)); }, 5000); ws.onmessage = function (evt) { var d = JSON.parse(evt.data); console.log(d); if (d.code == 300) { $(".address").text(d.address) } if (d.code == 200) { var v = d.data console.log(v); num++ var str = `<div class="item"> <p>${v.recordOutTime}</p> <p>${v.userOutName}</p> <p>${v.userOutNum}</p> <p>${v.doorOutName}</p> </div>` $(".tableHead").after(str) if (num > 7) { num-- $(".table .item:nth-last-child(1)").remove() } } }; ws.error = function (e) { console.log(e) alert(e) } ws.onclose = function () { // 关闭 websocket alert("连接已关闭..."); }; } else { alert("您的浏览器不支持 WebSocket!"); } }
AMQP 元件
composer require hyperf/amqp
#設定檔[ config/autoload/amqp.php]
<?php return [ 'default' => [ 'host' => 'localhost', 'port' => 5672, 'user' => 'guest', 'password' => 'guest', 'vhost' => '/', 'pool' => [ 'min_connections' => 1, 'max_connections' => 10, 'connect_timeout' => 10.0, 'wait_timeout' => 3.0, 'heartbeat' => -1, ], 'params' => [ 'insist' => false, 'login_method' => 'AMQPLAIN', 'login_response' => null, 'locale' => 'en_US', 'connection_timeout' => 3.0, 'read_write_timeout' => 6.0, 'context' => null, 'keepalive' => false, 'heartbeat' => 3, ], ], ];
MQ 消費者程式碼
<?php declare(strict_types=1); namespace App\Amqp\Consumer; use Hyperf\Amqp\Annotation\Consumer; use Hyperf\Amqp\Message\ConsumerMessage; use Hyperf\Amqp\Result; use Hyperf\Server\Server; use Hyperf\Server\ServerFactory; /** * @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1) */ class DemoConsumer extends ConsumerMessage { /** * rabbmitMQ消费端代码 * @param $data * @return string */ public function consume($data): string { print_r($data); //获取集合中所有的value $redis = $this->container->get(\Redis::class); $fdList=$redis->sMembers('websocket_sjd_1'); $server=$this->container->get(ServerFactory::class)->getServer()->getServer(); foreach($fdList as $key=>$v){ if(!empty($v)){ $server->push((int)$v, $data); } } return Result::ACK; } }
控制器程式碼
#/** * test * @return array */ public function test() { $data = array( 'code' => 200, 'data' => [ 'userOutName' => 'ccflow', 'userOutNum' => '9999', 'recordOutTime' => date("Y-m-d H:i:s", time()), 'doorOutName' => '教师公寓', ] ); $data = \GuzzleHttp\json_encode($data); $message = new DemoProducer($data); $producer = ApplicationContext::getContainer()->get(Producer::class); $result = $producer->produce($message); var_dump($result); $user = $this->request->input('user', 'Hyperf'); $method = $this->request->getMethod(); return [ 'method' => $method, 'message' => "{$user}.", ]; }
最終效果
#推薦:《PHP教學》
以上是如何基於Hyperf實現RabbitMQ+WebSocket訊息推播的詳細內容。更多資訊請關注PHP中文網其他相關文章!

熱AI工具

Undresser.AI Undress
人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover
用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool
免費脫衣圖片

Clothoff.io
AI脫衣器

AI Hentai Generator
免費產生 AI 無盡。

熱門文章

熱工具

記事本++7.3.1
好用且免費的程式碼編輯器

SublimeText3漢化版
中文版,非常好用

禪工作室 13.0.1
強大的PHP整合開發環境

Dreamweaver CS6
視覺化網頁開發工具

SublimeText3 Mac版
神級程式碼編輯軟體(SublimeText3)

熱門話題

Hyperf是一個優秀的PHP框架,其主要的特點是快速、靈活、可擴展,目前在業界有廣泛的應用。在使用Hyperf框架進行開發的過程中,我們常常會遇到需要組態管理的情況。本文將介紹如何使用Hyperf框架進行組態管理,並提供具體的程式碼範例。一、設定檔的位置在使用Hyperf框架進行開發的時候,設定檔通常會放在config目錄下,也可以在.env檔中進

如何使用Hyperf框架進行檔案下載引言:在使用Hyperf框架開發Web應用程式時,檔案下載是一個常見的需求。本文將介紹如何使用Hyperf框架進行檔案下載,包括具體的程式碼範例。一、準備工作在開始之前,確保你已經安裝好了Hyperf框架並成功創建了一個Hyperf應用程式。二、建立文件下載控制器首先,我們需要建立一個控制器來處理文件下載的請求。打開終端,進

從2004年誕生至今,PHP一直是全球最受歡迎的開發語言之一。隨著網路的快速發展和技術的不斷創新,PHP的發展也日新月異。其中,微服務架構逐漸成為當今軟體開發的熱門趨勢。本文將帶你進入PHPHyperf微服務開發的世界,從入門到精通。一、什麼是微服務架構?微服務架構是一種基於一組小型、獨立部署的服務元件所建構的系統架構。相較於傳統的單體應用架構,微服務架構通

如何使用Hyperf框架進行請求限流引言:在現代網路應用中,如何在高並發的情況下確保系統的穩定性是非常重要的。請求限流是常見的因應策略之一。本文將介紹如何使用Hyperf框架進行請求限流,並給出具體的程式碼範例。一、什麼是請求限流請求限流是指在一段時間內限制系統的請求存取量,避免系統因為過多的請求而崩潰。透過合理的限流策略,可以提供更好的服務品質和穩定性。 H

如何使用Hyperf框架進行圖片處理引言:隨著行動互聯網的快速發展,圖片處理在現代Web開發中變得愈發重要。 Hyperf是一款基於Swoole的高效能框架,它提供了豐富的元件和功能,包括圖片處理。本文將介紹如何使用Hyperf框架進行圖片處理,並提供具體的程式碼範例。一、安裝Hyperf框架:在開始之前,我們先確保已經安裝了Hyperf框架。可以透過Compo

如何使用Hyperf框架進行資料分頁引言:資料分頁在實際的Web開發中非常常見,透過分頁可以讓使用者瀏覽大量資料時更加便捷。 Hyperf是一個高效能的PHP框架,提供了一系列強大的特性和元件。本文將介紹如何使用Hyperf框架進行資料分頁,並給出詳細的程式碼範例。一、準備工作:在開始之前,需要確保已經正確安裝和配置了Hyperf框架。可以透過Composer進行

如何使用Hyperf框架進行快取管理快取是提高應用效能的重要手段之一,而現代框架為我們提供了更便利的快取管理工具。本文將介紹如何使用Hyperf框架進行快取管理,並提供具體的程式碼範例。 Hyperf框架是基於Swoole拓展開發的高效能框架,內建了豐富的元件和工具,其中包括強大的快取管理功能。 Hyperf框架支援多種快取驅動,如Redis、Memcach

近年來,微服務架構已成為建立現代應用程式的主流方式。它透過將大型應用程式拆分成小而自治的服務,從而提高了應用程式的可擴展性、可維護性和可部署性。在微服務架構中,每個服務都是獨立開發、部署和運行的,它們之間透過輕量級的通訊機制進行互動。在建構微服務應用時,選擇一個適合的開發框架非常關鍵。 PHPHyperf是一個基於Swoole高效能協程網路框架的微服務框架
