


How to implement RabbitMQ+WebSocket message push based on Hyperf
Introduction
A simple large-screen message push based on Hyperf WebSocket RabbitMQ.
Idea
Use the WebSocket protocol to maintain a stateful long connection between the client and the server, and
save the client ID of the link. Subscribing to messages published by a publisher broadcasts messages against a saved client id.
WebSocket service
composer require hyperf/websocket-server
Configuration file [config/autoload/server.php]
<?php return [ 'mode' => SWOOLE_PROCESS, 'servers' => [ [ 'name' => 'http', 'type' => Server::SERVER_HTTP, 'host' => '0.0.0.0', 'port' => 11111, 'sock_type' => SWOOLE_SOCK_TCP, 'callbacks' => [ SwooleEvent::ON_REQUEST => [Hyperf\HttpServer\Server::class, 'onRequest'], ], ], [ 'name' => 'ws', 'type' => Server::SERVER_WEBSOCKET, 'host' => '0.0.0.0', 'port' => 12222, 'sock_type' => SWOOLE_SOCK_TCP, 'callbacks' => [ SwooleEvent::ON_HAND_SHAKE => [Hyperf\WebSocketServer\Server::class, 'onHandShake'], SwooleEvent::ON_MESSAGE => [Hyperf\WebSocketServer\Server::class, 'onMessage'], SwooleEvent::ON_CLOSE => [Hyperf\WebSocketServer\Server::class, 'onClose'], ], ], ],
WebSocket server End code example
<?php declare(strict_types=1); /** * This file is part of Hyperf. * * @link https://www.hyperf.io * @document https://doc.hyperf.io * @contact group@hyperf.io * @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE */ namespace App\Controller; use Hyperf\Contract\OnCloseInterface; use Hyperf\Contract\OnMessageInterface; use Hyperf\Contract\OnOpenInterface; use Swoole\Http\Request; use Swoole\Server; use Swoole\Websocket\Frame; use Swoole\WebSocket\Server as WebSocketServer; class WebSocketController extends Controller implements OnMessageInterface, OnOpenInterface, OnCloseInterface { /** * 发送消息 * @param WebSocketServer $server * @param Frame $frame */ public function onMessage(WebSocketServer $server, Frame $frame): void { //心跳刷新缓存 $redis = $this->container->get(\Redis::class); //获取所有的客户端id $fdList = $redis->sMembers('websocket_sjd_1'); //如果当前客户端在客户端集合中,就刷新 if (in_array($frame->fd, $fdList)) { $redis->sAdd('websocket_sjd_1', $frame->fd); $redis->expire('websocket_sjd_1', 7200); } $server->push($frame->fd, 'Recv: ' . $frame->data); } /** * 客户端失去链接 * @param Server $server * @param int $fd * @param int $reactorId */ public function onClose(Server $server, int $fd, int $reactorId): void { //删掉客户端id $redis = $this->container->get(\Redis::class); //移除集合中指定的value $redis->sRem('websocket_sjd_1', $fd); var_dump('closed'); } /** * 客户端链接 * @param WebSocketServer $server * @param Request $request */ public function onOpen(WebSocketServer $server, Request $request): void { //保存客户端id $redis = $this->container->get(\Redis::class); $res1 = $redis->sAdd('websocket_sjd_1', $request->fd); var_dump($res1); $res = $redis->expire('websocket_sjd_1', 7200); var_dump($res); $server->push($request->fd, 'Opened'); } }
WebSocket front-end code
function WebSocketTest() { if ("WebSocket" in window) { console.log("您的浏览器支持 WebSocket!"); var num = 0 // 打开一个 web socket var ws = new WebSocket("ws://127.0.0.1:12222"); ws.onopen = function () { // Web Socket 已连接上,使用 send() 方法发送数据 //alert("数据发送中..."); //ws.send("发送数据"); }; window.setInterval(function () { //每隔5秒钟发送一次心跳,避免websocket连接因超时而自动断开 var ping = {"type": "ping"}; ws.send(JSON.stringify(ping)); }, 5000); ws.onmessage = function (evt) { var d = JSON.parse(evt.data); console.log(d); if (d.code == 300) { $(".address").text(d.address) } if (d.code == 200) { var v = d.data console.log(v); num++ var str = `<div class="item"> <p>${v.recordOutTime}</p> <p>${v.userOutName}</p> <p>${v.userOutNum}</p> <p>${v.doorOutName}</p> </div>` $(".tableHead").after(str) if (num > 7) { num-- $(".table .item:nth-last-child(1)").remove() } } }; ws.error = function (e) { console.log(e) alert(e) } ws.onclose = function () { // 关闭 websocket alert("连接已关闭..."); }; } else { alert("您的浏览器不支持 WebSocket!"); } }
AMQP component
composer require hyperf/amqp
Configuration file[ config/autoload/amqp.php]
<?php return [ 'default' => [ 'host' => 'localhost', 'port' => 5672, 'user' => 'guest', 'password' => 'guest', 'vhost' => '/', 'pool' => [ 'min_connections' => 1, 'max_connections' => 10, 'connect_timeout' => 10.0, 'wait_timeout' => 3.0, 'heartbeat' => -1, ], 'params' => [ 'insist' => false, 'login_method' => 'AMQPLAIN', 'login_response' => null, 'locale' => 'en_US', 'connection_timeout' => 3.0, 'read_write_timeout' => 6.0, 'context' => null, 'keepalive' => false, 'heartbeat' => 3, ], ], ];
MQ consumer code
<?php declare(strict_types=1); namespace App\Amqp\Consumer; use Hyperf\Amqp\Annotation\Consumer; use Hyperf\Amqp\Message\ConsumerMessage; use Hyperf\Amqp\Result; use Hyperf\Server\Server; use Hyperf\Server\ServerFactory; /** * @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1) */ class DemoConsumer extends ConsumerMessage { /** * rabbmitMQ消费端代码 * @param $data * @return string */ public function consume($data): string { print_r($data); //获取集合中所有的value $redis = $this->container->get(\Redis::class); $fdList=$redis->sMembers('websocket_sjd_1'); $server=$this->container->get(ServerFactory::class)->getServer()->getServer(); foreach($fdList as $key=>$v){ if(!empty($v)){ $server->push((int)$v, $data); } } return Result::ACK; } }
Controller code
/** * test * @return array */ public function test() { $data = array( 'code' => 200, 'data' => [ 'userOutName' => 'ccflow', 'userOutNum' => '9999', 'recordOutTime' => date("Y-m-d H:i:s", time()), 'doorOutName' => '教师公寓', ] ); $data = \GuzzleHttp\json_encode($data); $message = new DemoProducer($data); $producer = ApplicationContext::getContainer()->get(Producer::class); $result = $producer->produce($message); var_dump($result); $user = $this->request->input('user', 'Hyperf'); $method = $this->request->getMethod(); return [ 'method' => $method, 'message' => "{$user}.", ]; }
Final effect
Recommended: "PHP Tutorial"
The above is the detailed content of How to implement RabbitMQ+WebSocket message push based on Hyperf. For more information, please follow other related articles on the PHP Chinese website!

Hot AI Tools

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Undress AI Tool
Undress images for free

Clothoff.io
AI clothes remover

Video Face Swap
Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Article

Hot Tools

Notepad++7.3.1
Easy-to-use and free code editor

SublimeText3 Chinese version
Chinese version, very easy to use

Zend Studio 13.0.1
Powerful PHP integrated development environment

Dreamweaver CS6
Visual web development tools

SublimeText3 Mac version
God-level code editing software (SublimeText3)

Hot Topics



Hyperf is an excellent PHP framework. Its main features are fast, flexible and scalable. It is currently widely used in the industry. In the process of developing using the Hyperf framework, we often encounter situations that require configuration management. This article will introduce how to use the Hyperf framework for configuration management and provide specific code examples. 1. The location of the configuration file. When developing using the Hyperf framework, the configuration file is usually placed in the config directory, or it can be entered in the .env file.

How to use the Hyperf framework for file downloading Introduction: File downloading is a common requirement when developing web applications using the Hyperf framework. This article will introduce how to use the Hyperf framework for file downloading, including specific code examples. 1. Preparation Before starting, make sure you have installed the Hyperf framework and successfully created a Hyperf application. 2. Create a file download controller First, we need to create a controller to handle file download requests. Open the terminal and enter

Since its birth in 2004, PHP has been one of the most popular development languages in the world. With the rapid development of the Internet and the continuous innovation of technology, the development of PHP is also changing with each passing day. Among them, microservice architecture has gradually become a popular trend in software development today. This article will take you into the world of PHPHyperf microservice development, from entry to proficiency. 1. What is microservice architecture? Microservices architecture is a system architecture built on a set of small, independently deployed service components. Compared with traditional monolithic application architecture, microservice architecture

How to use the Hyperf framework for request current limiting Introduction: In modern Internet applications, how to ensure the stability of the system under high concurrency is very important. Request throttling is one of the common coping strategies. This article will introduce how to use the Hyperf framework to limit request flow and give specific code examples. 1. What is request current limiting? Request current limiting refers to limiting the number of request visits to the system within a period of time to prevent the system from crashing due to too many requests. Through reasonable current limiting strategies, better service quality and stability can be provided. H

How to use the Hyperf framework for image processing Introduction: With the rapid development of the mobile Internet, image processing has become more and more important in modern Web development. Hyperf is a high-performance framework based on Swoole, which provides a wealth of components and functions, including image processing. This article will introduce how to use the Hyperf framework for image processing and provide specific code examples. 1. Install the Hyperf framework: Before starting, we first make sure that the Hyperf framework has been installed. Compo

How to use the Hyperf framework for data paging Introduction: Data paging is very common in actual Web development. Paging can make it easier for users to browse large amounts of data. Hyperf is a high-performance PHP framework that provides a powerful set of features and components. This article will introduce how to use the Hyperf framework for data paging and give detailed code examples. 1. Preparation: Before starting, you need to ensure that the Hyperf framework has been correctly installed and configured. Can be done via Composer

How to use the Hyperf framework for cache management Cache is one of the important means to improve application performance, and modern frameworks provide us with more convenient cache management tools. This article will introduce how to use the Hyperf framework for cache management and provide specific code examples. The Hyperf framework is a high-performance framework developed based on Swoole. It has a rich set of built-in components and tools, including powerful cache management functions. The Hyperf framework supports multiple cache drivers, such as Redis and Memcach.

In recent years, microservices architecture has become a mainstream way to build modern applications. It improves the scalability, maintainability, and deployability of a large application by splitting it into small, autonomous services. In a microservice architecture, each service is developed, deployed, and run independently, and they interact through lightweight communication mechanisms. When building microservice applications, choosing a suitable development framework is very critical. PHPHyperf is a microservice framework based on Swoole's high-performance coroutine network framework
