> 백엔드 개발 > PHP 튜토리얼 > Hyperf + RabbitMQ + WebSocket을 기반으로 메시지 푸시 구현

Hyperf + RabbitMQ + WebSocket을 기반으로 메시지 푸시 구현

Guanhui
풀어 주다: 2023-04-08 18:24:02
앞으로
2632명이 탐색했습니다.

Hyperf + RabbitMQ + WebSocket을 기반으로 메시지 푸시 구현

Hyperf+ WebSocket +RabbitMQ를 기반으로 한 간단한 대형 화면 메시지 푸시입니다.

Idea

WebSocket 프로토콜을 사용하여 클라이언트와 서버 간의 상태 저장 긴 링크를 유지하고,

링크의 클라이언트 ID를 저장하세요. 게시자가 게시한 메시지를 구독하면 저장된 클라이언트 ID에 대한 메시지가 브로드캐스트됩니다.

WebSocket 서비스

composer에는 hyperf/websocket-server가 필요합니다

구성 파일 [config/autoload/server.php]

<?php
return [
    &#39;mode&#39; => SWOOLE_PROCESS,
    &#39;servers&#39; => [
        [
            &#39;name&#39; => &#39;http&#39;,
            &#39;type&#39; => Server::SERVER_HTTP,
            &#39;host&#39; => &#39;0.0.0.0&#39;,
            &#39;port&#39; => 11111,
            &#39;sock_type&#39; => SWOOLE_SOCK_TCP,
            &#39;callbacks&#39; => [
                SwooleEvent::ON_REQUEST => [Hyperf\HttpServer\Server::class, &#39;onRequest&#39;],
            ],
        ],
        [
            &#39;name&#39; => &#39;ws&#39;,
            &#39;type&#39; => Server::SERVER_WEBSOCKET,
            &#39;host&#39; => &#39;0.0.0.0&#39;,
            &#39;port&#39; => 12222,
            &#39;sock_type&#39; => SWOOLE_SOCK_TCP,
            &#39;callbacks&#39; => [
                SwooleEvent::ON_HAND_SHAKE => [Hyperf\WebSocketServer\Server::class, &#39;onHandShake&#39;],
                SwooleEvent::ON_MESSAGE => [Hyperf\WebSocketServer\Server::class, &#39;onMessage&#39;],
                SwooleEvent::ON_CLOSE => [Hyperf\WebSocketServer\Server::class, &#39;onClose&#39;],
            ],
        ],
    ],
로그인 후 복사

WebSocket 서버 측 코드 예제

<?php
declare(strict_types=1);
/**
 * This file is part of Hyperf.
 *
 * @link     https://www.hyperf.io
 * @document https://doc.hyperf.io
 * @contact  group@hyperf.io
 * @license  https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
 */
namespace App\Controller;
use Hyperf\Contract\OnCloseInterface;
use Hyperf\Contract\OnMessageInterface;
use Hyperf\Contract\OnOpenInterface;
use Swoole\Http\Request;
use Swoole\Server;
use Swoole\Websocket\Frame;
use Swoole\WebSocket\Server as WebSocketServer;
class WebSocketController extends Controller implements OnMessageInterface, OnOpenInterface, OnCloseInterface
{
    /**
     * 发送消息
     * @param WebSocketServer $server
     * @param Frame $frame
     */
    public function onMessage(WebSocketServer $server, Frame $frame): void
    {
        //心跳刷新缓存
        $redis = $this->container->get(\Redis::class);
        //获取所有的客户端id
        $fdList = $redis->sMembers(&#39;websocket_sjd_1&#39;);
        //如果当前客户端在客户端集合中,就刷新
        if (in_array($frame->fd, $fdList)) {
            $redis->sAdd(&#39;websocket_sjd_1&#39;, $frame->fd);
            $redis->expire(&#39;websocket_sjd_1&#39;, 7200);
        }
        $server->push($frame->fd, &#39;Recv: &#39; . $frame->data);
    }
    /**
     * 客户端失去链接
     * @param Server $server
     * @param int $fd
     * @param int $reactorId
     */
    public function onClose(Server $server, int $fd, int $reactorId): void
    {
        //删掉客户端id
        $redis = $this->container->get(\Redis::class);
        //移除集合中指定的value
        $redis->sRem(&#39;websocket_sjd_1&#39;, $fd);
        var_dump(&#39;closed&#39;);
    }
    /**
     * 客户端链接
     * @param WebSocketServer $server
     * @param Request $request
     */
    public function onOpen(WebSocketServer $server, Request $request): void
    {
        //保存客户端id
        $redis = $this->container->get(\Redis::class);
        $res1 = $redis->sAdd(&#39;websocket_sjd_1&#39;, $request->fd);
        var_dump($res1);
        $res = $redis->expire(&#39;websocket_sjd_1&#39;, 7200);
        var_dump($res);
        $server->push($request->fd, &#39;Opened&#39;);
    }
}
로그인 후 복사

WebSocket 프런트 엔드 코드

function WebSocketTest() {
        if ("WebSocket" in window) {
            console.log("您的浏览器支持 WebSocket!");
            var num = 0
            // 打开一个 web socket
            var ws = new WebSocket("ws://127.0.0.1:12222");
            ws.onopen = function () {
                // Web Socket 已连接上,使用 send() 方法发送数据
                //alert("数据发送中...");
                //ws.send("发送数据");
            };
            window.setInterval(function () { //每隔5秒钟发送一次心跳,避免websocket连接因超时而自动断开
                var ping = {"type": "ping"};
                ws.send(JSON.stringify(ping));
            }, 5000);
            ws.onmessage = function (evt) {
                var d = JSON.parse(evt.data);
                console.log(d);
                if (d.code == 300) {
                    $(".address").text(d.address)
                }
                if (d.code == 200) {
                    var v = d.data
                    console.log(v);
                    num++
                    var str = `<div class="item">
                                    <p>${v.recordOutTime}</p>
                                    <p>${v.userOutName}</p>
                                    <p>${v.userOutNum}</p>
                                    <p>${v.doorOutName}</p>
                                </div>`
                    $(".tableHead").after(str)
                    if (num > 7) {
                        num--
                        $(".table .item:nth-last-child(1)").remove()
                    }
                }
            };
            ws.error = function (e) {
                console.log(e)
                alert(e)
            }
            ws.onclose = function () {
                // 关闭 websocket
                alert("连接已关闭...");
            };
        } else {
            alert("您的浏览器不支持 WebSocket!");
        }
    }
로그인 후 복사

AMQP 구성 요소

composer require hyperf/amqp
로그인 후 복사

구성 파일 [config/autoload/amqp.php]

<?php
return [
    &#39;default&#39; => [
        &#39;host&#39; => &#39;localhost&#39;,
        &#39;port&#39; => 5672,
        &#39;user&#39; => &#39;guest&#39;,
        &#39;password&#39; => &#39;guest&#39;,
        &#39;vhost&#39; => &#39;/&#39;,
        &#39;pool&#39; => [
            &#39;min_connections&#39; => 1,
            &#39;max_connections&#39; => 10,
            &#39;connect_timeout&#39; => 10.0,
            &#39;wait_timeout&#39; => 3.0,
            &#39;heartbeat&#39; => -1,
        ],
        &#39;params&#39; => [
            &#39;insist&#39; => false,
            &#39;login_method&#39; => &#39;AMQPLAIN&#39;,
            &#39;login_response&#39; => null,
            &#39;locale&#39; => &#39;en_US&#39;,
            &#39;connection_timeout&#39; => 3.0,
            &#39;read_write_timeout&#39; => 6.0,
            &#39;context&#39; => null,
            &#39;keepalive&#39; => false,
            &#39;heartbeat&#39; => 3,
        ],
    ],
];
로그인 후 복사

MQ 소비자 코드

<?php
declare(strict_types=1);
namespace App\Amqp\Consumer;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Amqp\Result;
use Hyperf\Server\Server;
use Hyperf\Server\ServerFactory;
/**
 * @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1)
 */
class DemoConsumer extends ConsumerMessage
{
    /**
     * rabbmitMQ消费端代码
     * @param $data
     * @return string
     */
    public function consume($data): string
    {
        print_r($data);
        //获取集合中所有的value
        $redis = $this->container->get(\Redis::class);
        $fdList=$redis->sMembers(&#39;websocket_sjd_1&#39;);
        $server=$this->container->get(ServerFactory::class)->getServer()->getServer();
        foreach($fdList as $key=>$v){
            if(!empty($v)){
                $server->push((int)$v, $data);
            }
        }
        return Result::ACK;
    }
}
로그인 후 복사

컨트롤러 코드

  /**
     * test
     * @return array
     */
    public function test()
    {
        $data = array(
            &#39;code&#39; => 200,
            &#39;data&#39; => [
                &#39;userOutName&#39; => &#39;ccflow&#39;,
                &#39;userOutNum&#39; => &#39;9999&#39;,
                &#39;recordOutTime&#39; => date("Y-m-d H:i:s", time()),
                &#39;doorOutName&#39; => &#39;教师公寓&#39;,
            ]
        );
        $data = \GuzzleHttp\json_encode($data);
        $message = new DemoProducer($data);
        $producer = ApplicationContext::getContainer()->get(Producer::class);
        $result = $producer->produce($message);
        var_dump($result);
        $user = $this->request->input(&#39;user&#39;, &#39;Hyperf&#39;);
        $method = $this->request->getMethod();
        return [
            &#39;method&#39; => $method,
            &#39;message&#39; => "{$user}.",
        ];
    }
로그인 후 복사

최종 효과

Hyperf + RabbitMQ + WebSocket을 기반으로 메시지 푸시 구현

추천 튜토리얼: "PHP 튜토리얼"

위 내용은 Hyperf + RabbitMQ + WebSocket을 기반으로 메시지 푸시 구현의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

관련 라벨:
원천:learnku.com
본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
인기 튜토리얼
더>
최신 다운로드
더>
웹 효과
웹사이트 소스 코드
웹사이트 자료
프론트엔드 템플릿