首頁 後端開發 php教程 如何基於Hyperf實現RabbitMQ+WebSocket訊息推播

如何基於Hyperf實現RabbitMQ+WebSocket訊息推播

May 06, 2020 pm 01:49 PM
hyperf

介紹

基於 Hyperf WebSocket RabbitMQ 實作的一個簡單大螢幕的訊息推送。

思路

利用 WebSocket 協定讓客戶端和伺服器端保持有狀態的長鏈接,

儲存連結上來的客戶端 id。訂閱發布者發布的訊息針對已儲存的客戶端 id 進行廣播訊息。

WebSocket 服務

composer require hyperf/websocket-server
登入後複製

設定檔[config/autoload/server.php]

<?php
return [
    &#39;mode&#39; => SWOOLE_PROCESS,
    &#39;servers&#39; => [
        [
            &#39;name&#39; => &#39;http&#39;,
            &#39;type&#39; => Server::SERVER_HTTP,
            &#39;host&#39; => &#39;0.0.0.0&#39;,
            &#39;port&#39; => 11111,
            &#39;sock_type&#39; => SWOOLE_SOCK_TCP,
            &#39;callbacks&#39; => [
                SwooleEvent::ON_REQUEST => [Hyperf\HttpServer\Server::class, &#39;onRequest&#39;],
            ],
        ],
        [
            &#39;name&#39; => &#39;ws&#39;,
            &#39;type&#39; => Server::SERVER_WEBSOCKET,
            &#39;host&#39; => &#39;0.0.0.0&#39;,
            &#39;port&#39; => 12222,
            &#39;sock_type&#39; => SWOOLE_SOCK_TCP,
            &#39;callbacks&#39; => [
                SwooleEvent::ON_HAND_SHAKE => [Hyperf\WebSocketServer\Server::class, &#39;onHandShake&#39;],
                SwooleEvent::ON_MESSAGE => [Hyperf\WebSocketServer\Server::class, &#39;onMessage&#39;],
                SwooleEvent::ON_CLOSE => [Hyperf\WebSocketServer\Server::class, &#39;onClose&#39;],
            ],
        ],
    ],
登入後複製

#WebSocket 伺服器端程式碼範例

<?php
declare(strict_types=1);
/**
 * This file is part of Hyperf.
 *
 * @link     https://www.hyperf.io
 * @document https://doc.hyperf.io
 * @contact  group@hyperf.io
 * @license  https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
 */
namespace App\Controller;
use Hyperf\Contract\OnCloseInterface;
use Hyperf\Contract\OnMessageInterface;
use Hyperf\Contract\OnOpenInterface;
use Swoole\Http\Request;
use Swoole\Server;
use Swoole\Websocket\Frame;
use Swoole\WebSocket\Server as WebSocketServer;
class WebSocketController extends Controller implements OnMessageInterface, OnOpenInterface, OnCloseInterface
{
    /**
     * 发送消息
     * @param WebSocketServer $server
     * @param Frame $frame
     */
    public function onMessage(WebSocketServer $server, Frame $frame): void
    {
        //心跳刷新缓存
        $redis = $this->container->get(\Redis::class);
        //获取所有的客户端id
        $fdList = $redis->sMembers(&#39;websocket_sjd_1&#39;);
        //如果当前客户端在客户端集合中,就刷新
        if (in_array($frame->fd, $fdList)) {
            $redis->sAdd(&#39;websocket_sjd_1&#39;, $frame->fd);
            $redis->expire(&#39;websocket_sjd_1&#39;, 7200);
        }
        $server->push($frame->fd, &#39;Recv: &#39; . $frame->data);
    }
    /**
     * 客户端失去链接
     * @param Server $server
     * @param int $fd
     * @param int $reactorId
     */
    public function onClose(Server $server, int $fd, int $reactorId): void
    {
        //删掉客户端id
        $redis = $this->container->get(\Redis::class);
        //移除集合中指定的value
        $redis->sRem(&#39;websocket_sjd_1&#39;, $fd);
        var_dump(&#39;closed&#39;);
    }
    /**
     * 客户端链接
     * @param WebSocketServer $server
     * @param Request $request
     */
    public function onOpen(WebSocketServer $server, Request $request): void
    {
        //保存客户端id
        $redis = $this->container->get(\Redis::class);
        $res1 = $redis->sAdd(&#39;websocket_sjd_1&#39;, $request->fd);
        var_dump($res1);
        $res = $redis->expire(&#39;websocket_sjd_1&#39;, 7200);
        var_dump($res);
        $server->push($request->fd, &#39;Opened&#39;);
    }
}
登入後複製

WebSocket 前端程式碼

    function WebSocketTest() {
        if ("WebSocket" in window) {
            console.log("您的浏览器支持 WebSocket!");
            var num = 0
            // 打开一个 web socket
            var ws = new WebSocket("ws://127.0.0.1:12222");
            ws.onopen = function () {
                // Web Socket 已连接上,使用 send() 方法发送数据
                //alert("数据发送中...");
                //ws.send("发送数据");
            };
            window.setInterval(function () { //每隔5秒钟发送一次心跳,避免websocket连接因超时而自动断开
                var ping = {"type": "ping"};
                ws.send(JSON.stringify(ping));
            }, 5000);
            ws.onmessage = function (evt) {
                var d = JSON.parse(evt.data);
                console.log(d);
                if (d.code == 300) {
                    $(".address").text(d.address)
                }
                if (d.code == 200) {
                    var v = d.data
                    console.log(v);
                    num++
                    var str = `<div class="item">
                                    <p>${v.recordOutTime}</p>
                                    <p>${v.userOutName}</p>
                                    <p>${v.userOutNum}</p>
                                    <p>${v.doorOutName}</p>
                                </div>`
                    $(".tableHead").after(str)
                    if (num > 7) {
                        num--
                        $(".table .item:nth-last-child(1)").remove()
                    }
                }
            };
            ws.error = function (e) {
                console.log(e)
                alert(e)
            }
            ws.onclose = function () {
                // 关闭 websocket
                alert("连接已关闭...");
            };
        } else {
            alert("您的浏览器不支持 WebSocket!");
        }
    }
登入後複製

AMQP 元件

composer require hyperf/amqp
登入後複製

#設定檔[ config/autoload/amqp.php]

<?php
return [
    &#39;default&#39; => [
        &#39;host&#39; => &#39;localhost&#39;,
        &#39;port&#39; => 5672,
        &#39;user&#39; => &#39;guest&#39;,
        &#39;password&#39; => &#39;guest&#39;,
        &#39;vhost&#39; => &#39;/&#39;,
        &#39;pool&#39; => [
            &#39;min_connections&#39; => 1,
            &#39;max_connections&#39; => 10,
            &#39;connect_timeout&#39; => 10.0,
            &#39;wait_timeout&#39; => 3.0,
            &#39;heartbeat&#39; => -1,
        ],
        &#39;params&#39; => [
            &#39;insist&#39; => false,
            &#39;login_method&#39; => &#39;AMQPLAIN&#39;,
            &#39;login_response&#39; => null,
            &#39;locale&#39; => &#39;en_US&#39;,
            &#39;connection_timeout&#39; => 3.0,
            &#39;read_write_timeout&#39; => 6.0,
            &#39;context&#39; => null,
            &#39;keepalive&#39; => false,
            &#39;heartbeat&#39; => 3,
        ],
    ],
];
登入後複製

MQ 消費者程式碼

<?php
declare(strict_types=1);
namespace App\Amqp\Consumer;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Amqp\Result;
use Hyperf\Server\Server;
use Hyperf\Server\ServerFactory;
/**
 * @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1)
 */
class DemoConsumer extends ConsumerMessage
{
    /**
     * rabbmitMQ消费端代码
     * @param $data
     * @return string
     */
    public function consume($data): string
    {
        print_r($data);
        //获取集合中所有的value
        $redis = $this->container->get(\Redis::class);
        $fdList=$redis->sMembers(&#39;websocket_sjd_1&#39;);
        $server=$this->container->get(ServerFactory::class)->getServer()->getServer();
        foreach($fdList as $key=>$v){
            if(!empty($v)){
                $server->push((int)$v, $data);
            }
        }
        return Result::ACK;
    }
}
登入後複製

控制器程式碼

#
    /**
     * test
     * @return array
     */
    public function test()
    {
        $data = array(
            &#39;code&#39; => 200,
            &#39;data&#39; => [
                &#39;userOutName&#39; => &#39;ccflow&#39;,
                &#39;userOutNum&#39; => &#39;9999&#39;,
                &#39;recordOutTime&#39; => date("Y-m-d H:i:s", time()),
                &#39;doorOutName&#39; => &#39;教师公寓&#39;,
            ]
        );
        $data = \GuzzleHttp\json_encode($data);
        $message = new DemoProducer($data);
        $producer = ApplicationContext::getContainer()->get(Producer::class);
        $result = $producer->produce($message);
        var_dump($result);
        $user = $this->request->input(&#39;user&#39;, &#39;Hyperf&#39;);
        $method = $this->request->getMethod();
        return [
            &#39;method&#39; => $method,
            &#39;message&#39; => "{$user}.",
        ];
    }
登入後複製

最終效果

如何基於Hyperf實現RabbitMQ+WebSocket訊息推播

#推薦:《PHP教學

以上是如何基於Hyperf實現RabbitMQ+WebSocket訊息推播的詳細內容。更多資訊請關注PHP中文網其他相關文章!

本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

AI Hentai Generator

AI Hentai Generator

免費產生 AI 無盡。

熱門文章

R.E.P.O.能量晶體解釋及其做什麼(黃色晶體)
1 個月前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.最佳圖形設置
1 個月前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.如果您聽不到任何人,如何修復音頻
1 個月前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.聊天命令以及如何使用它們
1 個月前 By 尊渡假赌尊渡假赌尊渡假赌

熱工具

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

強大的PHP整合開發環境

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

如何使用Hyperf框架進行設定管理 如何使用Hyperf框架進行設定管理 Oct 28, 2023 am 10:07 AM

Hyperf是一個優秀的PHP框架,其主要的特點是快速、靈活、可擴展,目前在業界有廣泛的應用。在使用Hyperf框架進行開發的過程中,我們常常會遇到需要組態管理的情況。本文將介紹如何使用Hyperf框架進行組態管理,並提供具體的程式碼範例。一、設定檔的位置在使用Hyperf框架進行開發的時候,設定檔通常會放在config目錄下,也可以在.env檔中進

如何使用Hyperf框架進行檔案下載 如何使用Hyperf框架進行檔案下載 Oct 21, 2023 am 08:23 AM

如何使用Hyperf框架進行檔案下載引言:在使用Hyperf框架開發Web應用程式時,檔案下載是一個常見的需求。本文將介紹如何使用Hyperf框架進行檔案下載,包括具體的程式碼範例。一、準備工作在開始之前,確保你已經安裝好了Hyperf框架並成功創建了一個Hyperf應用程式。二、建立文件下載控制器首先,我們需要建立一個控制器來處理文件下載的請求。打開終端,進

PHP Hyperf微服務開髮指南:從入門到精通 PHP Hyperf微服務開髮指南:從入門到精通 Sep 12, 2023 am 10:31 AM

從2004年誕生至今,PHP一直是全球最受歡迎的開發語言之一。隨著網路的快速發展和技術的不斷創新,PHP的發展也日新月異。其中,微服務架構逐漸成為當今軟體開發的熱門趨勢。本文將帶你進入PHPHyperf微服務開發的世界,從入門到精通。一、什麼是微服務架構?微服務架構是一種基於一組小型、獨立部署的服務元件所建構的系統架構。相較於傳統的單體應用架構,微服務架構通

如何使用Hyperf框架進行請求限流 如何使用Hyperf框架進行請求限流 Oct 20, 2023 pm 01:58 PM

如何使用Hyperf框架進行請求限流引言:在現代網路應用中,如何在高並發的情況下確保系統的穩定性是非常重要的。請求限流是常見的因應策略之一。本文將介紹如何使用Hyperf框架進行請求限流,並給出具體的程式碼範例。一、什麼是請求限流請求限流是指在一段時間內限制系統的請求存取量,避免系統因為過多的請求而崩潰。透過合理的限流策略,可以提供更好的服務品質和穩定性。 H

如何使用Hyperf框架進行圖片處理 如何使用Hyperf框架進行圖片處理 Oct 24, 2023 pm 12:04 PM

如何使用Hyperf框架進行圖片處理引言:隨著行動互聯網的快速發展,圖片處理在現代Web開發中變得愈發重要。 Hyperf是一款基於Swoole的高效能框架,它提供了豐富的元件和功能,包括圖片處理。本文將介紹如何使用Hyperf框架進行圖片處理,並提供具體的程式碼範例。一、安裝Hyperf框架:在開始之前,我們先確保已經安裝了Hyperf框架。可以透過Compo

如何使用Hyperf框架進行資料分頁 如何使用Hyperf框架進行資料分頁 Oct 20, 2023 am 11:25 AM

如何使用Hyperf框架進行資料分頁引言:資料分頁在實際的Web開發中非常常見,透過分頁可以讓使用者瀏覽大量資料時更加便捷。 Hyperf是一個高效能的PHP框架,提供了一系列強大的特性和元件。本文將介紹如何使用Hyperf框架進行資料分頁,並給出詳細的程式碼範例。一、準備工作:在開始之前,需要確保已經正確安裝和配置了Hyperf框架。可以透過Composer進行

如何使用Hyperf框架進行快取管理 如何使用Hyperf框架進行快取管理 Oct 21, 2023 am 08:36 AM

如何使用Hyperf框架進行快取管理快取是提高應用效能的重要手段之一,而現代框架為我們提供了更便利的快取管理工具。本文將介紹如何使用Hyperf框架進行快取管理,並提供具體的程式碼範例。 Hyperf框架是基於Swoole拓展開發的高效能框架,內建了豐富的元件和工具,其中包括強大的快取管理功能。 Hyperf框架支援多種快取驅動,如Redis、Memcach

建構可擴展的微服務應用:探索PHP Hyperf的技術特點 建構可擴展的微服務應用:探索PHP Hyperf的技術特點 Sep 11, 2023 pm 07:01 PM

近年來,微服務架構已成為建立現代應用程式的主流方式。它透過將大型應用程式拆分成小而自治的服務,從而提高了應用程式的可擴展性、可維護性和可部署性。在微服務架構中,每個服務都是獨立開發、部署和運行的,它們之間透過輕量級的通訊機制進行互動。在建構微服務應用時,選擇一個適合的開發框架非常關鍵。 PHPHyperf是一個基於Swoole高效能協程網路框架的微服務框架

See all articles