Rumah pembangunan bahagian belakang tutorial php 如何基于Hyperf实现RabbitMQ+WebSocket消息推送

如何基于Hyperf实现RabbitMQ+WebSocket消息推送

May 06, 2020 pm 01:49 PM
hyperf

介绍

基于 Hyperf+ WebSocket +RabbitMQ 实现的一个简单大屏幕的消息推送。

思路

利用 WebSocket 协议让客户端和服务器端保持有状态的长链接,

保存链接上来的客户端 id。订阅发布者发布的消息针对已保存的客户端 id 进行广播消息。

WebSocket 服务

composer require hyperf/websocket-server
Salin selepas log masuk

配置文件 [config/autoload/server.php]

<?php
return [
    &#39;mode&#39; => SWOOLE_PROCESS,
    &#39;servers&#39; => [
        [
            &#39;name&#39; => &#39;http&#39;,
            &#39;type&#39; => Server::SERVER_HTTP,
            &#39;host&#39; => &#39;0.0.0.0&#39;,
            &#39;port&#39; => 11111,
            &#39;sock_type&#39; => SWOOLE_SOCK_TCP,
            &#39;callbacks&#39; => [
                SwooleEvent::ON_REQUEST => [Hyperf\HttpServer\Server::class, &#39;onRequest&#39;],
            ],
        ],
        [
            &#39;name&#39; => &#39;ws&#39;,
            &#39;type&#39; => Server::SERVER_WEBSOCKET,
            &#39;host&#39; => &#39;0.0.0.0&#39;,
            &#39;port&#39; => 12222,
            &#39;sock_type&#39; => SWOOLE_SOCK_TCP,
            &#39;callbacks&#39; => [
                SwooleEvent::ON_HAND_SHAKE => [Hyperf\WebSocketServer\Server::class, &#39;onHandShake&#39;],
                SwooleEvent::ON_MESSAGE => [Hyperf\WebSocketServer\Server::class, &#39;onMessage&#39;],
                SwooleEvent::ON_CLOSE => [Hyperf\WebSocketServer\Server::class, &#39;onClose&#39;],
            ],
        ],
    ],
Salin selepas log masuk

WebSocket 服务器端代码示例

<?php
declare(strict_types=1);
/**
 * This file is part of Hyperf.
 *
 * @link     https://www.hyperf.io
 * @document https://doc.hyperf.io
 * @contact  group@hyperf.io
 * @license  https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
 */
namespace App\Controller;
use Hyperf\Contract\OnCloseInterface;
use Hyperf\Contract\OnMessageInterface;
use Hyperf\Contract\OnOpenInterface;
use Swoole\Http\Request;
use Swoole\Server;
use Swoole\Websocket\Frame;
use Swoole\WebSocket\Server as WebSocketServer;
class WebSocketController extends Controller implements OnMessageInterface, OnOpenInterface, OnCloseInterface
{
    /**
     * 发送消息
     * @param WebSocketServer $server
     * @param Frame $frame
     */
    public function onMessage(WebSocketServer $server, Frame $frame): void
    {
        //心跳刷新缓存
        $redis = $this->container->get(\Redis::class);
        //获取所有的客户端id
        $fdList = $redis->sMembers(&#39;websocket_sjd_1&#39;);
        //如果当前客户端在客户端集合中,就刷新
        if (in_array($frame->fd, $fdList)) {
            $redis->sAdd(&#39;websocket_sjd_1&#39;, $frame->fd);
            $redis->expire(&#39;websocket_sjd_1&#39;, 7200);
        }
        $server->push($frame->fd, &#39;Recv: &#39; . $frame->data);
    }
    /**
     * 客户端失去链接
     * @param Server $server
     * @param int $fd
     * @param int $reactorId
     */
    public function onClose(Server $server, int $fd, int $reactorId): void
    {
        //删掉客户端id
        $redis = $this->container->get(\Redis::class);
        //移除集合中指定的value
        $redis->sRem(&#39;websocket_sjd_1&#39;, $fd);
        var_dump(&#39;closed&#39;);
    }
    /**
     * 客户端链接
     * @param WebSocketServer $server
     * @param Request $request
     */
    public function onOpen(WebSocketServer $server, Request $request): void
    {
        //保存客户端id
        $redis = $this->container->get(\Redis::class);
        $res1 = $redis->sAdd(&#39;websocket_sjd_1&#39;, $request->fd);
        var_dump($res1);
        $res = $redis->expire(&#39;websocket_sjd_1&#39;, 7200);
        var_dump($res);
        $server->push($request->fd, &#39;Opened&#39;);
    }
}
Salin selepas log masuk

WebSocket 前端代码

    function WebSocketTest() {
        if ("WebSocket" in window) {
            console.log("您的浏览器支持 WebSocket!");
            var num = 0
            // 打开一个 web socket
            var ws = new WebSocket("ws://127.0.0.1:12222");
            ws.onopen = function () {
                // Web Socket 已连接上,使用 send() 方法发送数据
                //alert("数据发送中...");
                //ws.send("发送数据");
            };
            window.setInterval(function () { //每隔5秒钟发送一次心跳,避免websocket连接因超时而自动断开
                var ping = {"type": "ping"};
                ws.send(JSON.stringify(ping));
            }, 5000);
            ws.onmessage = function (evt) {
                var d = JSON.parse(evt.data);
                console.log(d);
                if (d.code == 300) {
                    $(".address").text(d.address)
                }
                if (d.code == 200) {
                    var v = d.data
                    console.log(v);
                    num++
                    var str = `<div class="item">
                                    <p>${v.recordOutTime}</p>
                                    <p>${v.userOutName}</p>
                                    <p>${v.userOutNum}</p>
                                    <p>${v.doorOutName}</p>
                                </div>`
                    $(".tableHead").after(str)
                    if (num > 7) {
                        num--
                        $(".table .item:nth-last-child(1)").remove()
                    }
                }
            };
            ws.error = function (e) {
                console.log(e)
                alert(e)
            }
            ws.onclose = function () {
                // 关闭 websocket
                alert("连接已关闭...");
            };
        } else {
            alert("您的浏览器不支持 WebSocket!");
        }
    }
Salin selepas log masuk

AMQP 组件

composer require hyperf/amqp
Salin selepas log masuk

配置文件 [config/autoload/amqp.php]

<?php
return [
    &#39;default&#39; => [
        &#39;host&#39; => &#39;localhost&#39;,
        &#39;port&#39; => 5672,
        &#39;user&#39; => &#39;guest&#39;,
        &#39;password&#39; => &#39;guest&#39;,
        &#39;vhost&#39; => &#39;/&#39;,
        &#39;pool&#39; => [
            &#39;min_connections&#39; => 1,
            &#39;max_connections&#39; => 10,
            &#39;connect_timeout&#39; => 10.0,
            &#39;wait_timeout&#39; => 3.0,
            &#39;heartbeat&#39; => -1,
        ],
        &#39;params&#39; => [
            &#39;insist&#39; => false,
            &#39;login_method&#39; => &#39;AMQPLAIN&#39;,
            &#39;login_response&#39; => null,
            &#39;locale&#39; => &#39;en_US&#39;,
            &#39;connection_timeout&#39; => 3.0,
            &#39;read_write_timeout&#39; => 6.0,
            &#39;context&#39; => null,
            &#39;keepalive&#39; => false,
            &#39;heartbeat&#39; => 3,
        ],
    ],
];
Salin selepas log masuk

MQ 消费者代码

<?php
declare(strict_types=1);
namespace App\Amqp\Consumer;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Amqp\Result;
use Hyperf\Server\Server;
use Hyperf\Server\ServerFactory;
/**
 * @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1)
 */
class DemoConsumer extends ConsumerMessage
{
    /**
     * rabbmitMQ消费端代码
     * @param $data
     * @return string
     */
    public function consume($data): string
    {
        print_r($data);
        //获取集合中所有的value
        $redis = $this->container->get(\Redis::class);
        $fdList=$redis->sMembers(&#39;websocket_sjd_1&#39;);
        $server=$this->container->get(ServerFactory::class)->getServer()->getServer();
        foreach($fdList as $key=>$v){
            if(!empty($v)){
                $server->push((int)$v, $data);
            }
        }
        return Result::ACK;
    }
}
Salin selepas log masuk

控制器代码

    /**
     * test
     * @return array
     */
    public function test()
    {
        $data = array(
            &#39;code&#39; => 200,
            &#39;data&#39; => [
                &#39;userOutName&#39; => &#39;ccflow&#39;,
                &#39;userOutNum&#39; => &#39;9999&#39;,
                &#39;recordOutTime&#39; => date("Y-m-d H:i:s", time()),
                &#39;doorOutName&#39; => &#39;教师公寓&#39;,
            ]
        );
        $data = \GuzzleHttp\json_encode($data);
        $message = new DemoProducer($data);
        $producer = ApplicationContext::getContainer()->get(Producer::class);
        $result = $producer->produce($message);
        var_dump($result);
        $user = $this->request->input(&#39;user&#39;, &#39;Hyperf&#39;);
        $method = $this->request->getMethod();
        return [
            &#39;method&#39; => $method,
            &#39;message&#39; => "{$user}.",
        ];
    }
Salin selepas log masuk

最终效果

ab7e49780093484c182c1baf0dbedce.png

推荐:《PHP教程

Atas ialah kandungan terperinci 如何基于Hyperf实现RabbitMQ+WebSocket消息推送. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan Laman Web ini
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn

Alat AI Hot

Undresser.AI Undress

Undresser.AI Undress

Apl berkuasa AI untuk mencipta foto bogel yang realistik

AI Clothes Remover

AI Clothes Remover

Alat AI dalam talian untuk mengeluarkan pakaian daripada foto.

Undress AI Tool

Undress AI Tool

Gambar buka pakaian secara percuma

Clothoff.io

Clothoff.io

Penyingkiran pakaian AI

AI Hentai Generator

AI Hentai Generator

Menjana ai hentai secara percuma.

Artikel Panas

R.E.P.O. Kristal tenaga dijelaskan dan apa yang mereka lakukan (kristal kuning)
4 minggu yang lalu By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. Tetapan grafik terbaik
4 minggu yang lalu By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. Cara Memperbaiki Audio Jika anda tidak dapat mendengar sesiapa
4 minggu yang lalu By 尊渡假赌尊渡假赌尊渡假赌
WWE 2K25: Cara Membuka Segala -galanya Di Myrise
1 bulan yang lalu By 尊渡假赌尊渡假赌尊渡假赌

Alat panas

Notepad++7.3.1

Notepad++7.3.1

Editor kod yang mudah digunakan dan percuma

SublimeText3 versi Cina

SublimeText3 versi Cina

Versi Cina, sangat mudah digunakan

Hantar Studio 13.0.1

Hantar Studio 13.0.1

Persekitaran pembangunan bersepadu PHP yang berkuasa

Dreamweaver CS6

Dreamweaver CS6

Alat pembangunan web visual

SublimeText3 versi Mac

SublimeText3 versi Mac

Perisian penyuntingan kod peringkat Tuhan (SublimeText3)

Cara menggunakan rangka kerja Hyperf untuk pengurusan konfigurasi Cara menggunakan rangka kerja Hyperf untuk pengurusan konfigurasi Oct 28, 2023 am 10:07 AM

Hyperf ialah rangka kerja PHP yang sangat baik. Ciri utamanya adalah pantas, fleksibel dan berskala Ia kini digunakan secara meluas dalam industri. Dalam proses membangunkan menggunakan rangka kerja Hyperf, kami sering menghadapi situasi yang memerlukan pengurusan konfigurasi. Artikel ini akan memperkenalkan cara menggunakan rangka kerja Hyperf untuk pengurusan konfigurasi dan memberikan contoh kod khusus. 1. Lokasi fail konfigurasi Apabila membangun menggunakan rangka kerja Hyperf, fail konfigurasi biasanya diletakkan dalam direktori konfigurasi, atau ia boleh dimasukkan ke dalam fail .env.

Cara menggunakan rangka kerja Hyperf untuk memuat turun fail Cara menggunakan rangka kerja Hyperf untuk memuat turun fail Oct 21, 2023 am 08:23 AM

Cara menggunakan rangka kerja Hyperf untuk memuat turun fail Pengenalan: Muat turun fail adalah keperluan biasa apabila membangunkan aplikasi web menggunakan rangka kerja Hyperf. Artikel ini akan memperkenalkan cara menggunakan rangka kerja Hyperf untuk memuat turun fail, termasuk contoh kod khusus. 1. Persediaan Sebelum memulakan, pastikan anda telah memasang rangka kerja Hyperf dan berjaya mencipta aplikasi Hyperf. 2. Buat pengawal muat turun fail Mula-mula, kita perlu mencipta pengawal untuk mengendalikan permintaan muat turun fail. Buka terminal dan masuk

Panduan Pembangunan Perkhidmatan Mikro PHP Hyperf: Dari Permulaan kepada Penguasaan Panduan Pembangunan Perkhidmatan Mikro PHP Hyperf: Dari Permulaan kepada Penguasaan Sep 12, 2023 am 10:31 AM

Sejak dilahirkan pada tahun 2004, PHP telah menjadi salah satu bahasa pembangunan yang paling popular di dunia. Dengan perkembangan pesat Internet dan inovasi teknologi yang berterusan, pembangunan PHP juga berubah setiap hari. Antaranya, seni bina perkhidmatan mikro secara beransur-ansur menjadi trend popular dalam pembangunan perisian hari ini. Artikel ini akan membawa anda ke dalam dunia pembangunan perkhidmatan mikro PHPHyperf, daripada kemasukan kepada kemahiran. 1. Apakah seni bina perkhidmatan mikro? Seni bina Microservices ialah seni bina sistem yang dibina di atas satu set komponen perkhidmatan yang kecil dan digunakan secara bebas. Berbanding dengan seni bina aplikasi monolitik tradisional, seni bina perkhidmatan mikro

Cara menggunakan rangka kerja Hyperf untuk mengehadkan aliran permintaan Cara menggunakan rangka kerja Hyperf untuk mengehadkan aliran permintaan Oct 20, 2023 pm 01:58 PM

Cara menggunakan rangka kerja Hyperf untuk mengehadkan permintaan semasa Pengenalan: Dalam aplikasi Internet moden, cara memastikan kestabilan sistem di bawah konkurensi tinggi adalah sangat penting. Permintaan pendikit adalah salah satu strategi mengatasi biasa. Artikel ini akan memperkenalkan cara menggunakan rangka kerja Hyperf untuk mengehadkan aliran permintaan dan memberikan contoh kod khusus. 1. Apakah pengehadan semasa permintaan? Pengehadan semasa permintaan merujuk kepada mengehadkan bilangan lawatan permintaan ke sistem dalam tempoh masa untuk mengelakkan sistem daripada ranap kerana terlalu banyak permintaan. Melalui strategi pengehadan semasa yang munasabah, kualiti dan kestabilan perkhidmatan yang lebih baik dapat disediakan. H

Cara menggunakan rangka kerja Hyperf untuk halaman data Cara menggunakan rangka kerja Hyperf untuk halaman data Oct 20, 2023 am 11:25 AM

Cara menggunakan rangka kerja Hyperf untuk paging data Pengenalan: Paging data adalah sangat biasa dalam pembangunan Web sebenar Paging boleh memudahkan pengguna menyemak imbas data yang banyak. Hyperf ialah rangka kerja PHP berprestasi tinggi yang menyediakan set ciri dan komponen yang berkuasa. Artikel ini akan memperkenalkan cara menggunakan rangka kerja Hyperf untuk halaman data dan memberikan contoh kod terperinci. 1. Penyediaan: Sebelum memulakan, anda perlu memastikan bahawa rangka kerja Hyperf telah dipasang dan dikonfigurasikan dengan betul. Boleh dilakukan melalui Komposer

Cara menggunakan rangka kerja Hyperf untuk pemprosesan imej Cara menggunakan rangka kerja Hyperf untuk pemprosesan imej Oct 24, 2023 pm 12:04 PM

Cara menggunakan rangka kerja Hyperf untuk pemprosesan imej Pengenalan: Dengan perkembangan pesat Internet mudah alih, pemprosesan imej telah menjadi semakin penting dalam pembangunan Web moden. Hyperf ialah rangka kerja berprestasi tinggi berdasarkan Swoole, yang menyediakan pelbagai komponen dan fungsi, termasuk pemprosesan imej. Artikel ini akan memperkenalkan cara menggunakan rangka kerja Hyperf untuk pemprosesan imej dan memberikan contoh kod khusus. 1. Pasang rangka kerja Hyperf: Sebelum memulakan, kami pastikan dahulu rangka kerja Hyperf telah dipasang. kompo

Cara menggunakan rangka kerja Hyperf untuk pengurusan cache Cara menggunakan rangka kerja Hyperf untuk pengurusan cache Oct 21, 2023 am 08:36 AM

Cara menggunakan rangka kerja Hyperf untuk pengurusan cache Cache ialah salah satu cara penting untuk meningkatkan prestasi aplikasi, dan rangka kerja moden memberikan kami alat pengurusan cache yang lebih mudah. Artikel ini akan memperkenalkan cara menggunakan rangka kerja Hyperf untuk pengurusan cache dan memberikan contoh kod khusus. Rangka kerja Hyperf ialah rangka kerja berprestasi tinggi yang dibangunkan berdasarkan Swoole Ia mempunyai set komponen dan alatan terbina dalam yang kaya, termasuk fungsi pengurusan cache yang berkuasa. Rangka kerja Hyperf menyokong berbilang pemacu cache, seperti Redis dan Memcach.

Membina aplikasi perkhidmatan mikro berskala: Terokai ciri teknikal PHP Hyperf Membina aplikasi perkhidmatan mikro berskala: Terokai ciri teknikal PHP Hyperf Sep 11, 2023 pm 07:01 PM

Dalam beberapa tahun kebelakangan ini, seni bina perkhidmatan mikro telah menjadi cara arus perdana untuk membina aplikasi moden. Ia meningkatkan kebolehskalaan, kebolehselenggaraan dan kebolehlaksanaan aplikasi besar dengan membahagikannya kepada perkhidmatan autonomi yang kecil. Dalam seni bina perkhidmatan mikro, setiap perkhidmatan dibangunkan, digunakan dan dijalankan secara bebas, dan ia berinteraksi melalui mekanisme komunikasi yang ringan. Apabila membina aplikasi perkhidmatan mikro, memilih rangka kerja pembangunan yang sesuai adalah sangat kritikal. PHPHyperf ialah rangka kerja mikro perkhidmatan berdasarkan rangka kerja rangkaian coroutine berprestasi tinggi Swoole

See all articles