Hyperf に基づいて RabbitMQ+WebSocket メッセージ プッシュを実装する方法
はじめに
Hyperf WebSocket RabbitMQ に基づくシンプルな大画面メッセージ プッシュ。
アイデア
WebSocket プロトコルを使用して、クライアントとサーバー間のステートフルな長い接続を維持し、リンクのクライアント ID を保存します。 。パブリッシャーによってパブリッシュされたメッセージをサブスクライブすると、保存されたクライアント ID に対してメッセージがブロードキャストされます。
WebSocket サービスcomposer require hyperf/websocket-server
<?php
return [
'mode' => SWOOLE_PROCESS,
'servers' => [
[
'name' => 'http',
'type' => Server::SERVER_HTTP,
'host' => '0.0.0.0',
'port' => 11111,
'sock_type' => SWOOLE_SOCK_TCP,
'callbacks' => [
SwooleEvent::ON_REQUEST => [Hyperf\HttpServer\Server::class, 'onRequest'],
],
],
[
'name' => 'ws',
'type' => Server::SERVER_WEBSOCKET,
'host' => '0.0.0.0',
'port' => 12222,
'sock_type' => SWOOLE_SOCK_TCP,
'callbacks' => [
SwooleEvent::ON_HAND_SHAKE => [Hyperf\WebSocketServer\Server::class, 'onHandShake'],
SwooleEvent::ON_MESSAGE => [Hyperf\WebSocketServer\Server::class, 'onMessage'],
SwooleEvent::ON_CLOSE => [Hyperf\WebSocketServer\Server::class, 'onClose'],
],
],
],
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/
namespace App\Controller;
use Hyperf\Contract\OnCloseInterface;
use Hyperf\Contract\OnMessageInterface;
use Hyperf\Contract\OnOpenInterface;
use Swoole\Http\Request;
use Swoole\Server;
use Swoole\Websocket\Frame;
use Swoole\WebSocket\Server as WebSocketServer;
class WebSocketController extends Controller implements OnMessageInterface, OnOpenInterface, OnCloseInterface
{
/**
* 发送消息
* @param WebSocketServer $server
* @param Frame $frame
*/
public function onMessage(WebSocketServer $server, Frame $frame): void
{
//心跳刷新缓存
$redis = $this->container->get(\Redis::class);
//获取所有的客户端id
$fdList = $redis->sMembers('websocket_sjd_1');
//如果当前客户端在客户端集合中,就刷新
if (in_array($frame->fd, $fdList)) {
$redis->sAdd('websocket_sjd_1', $frame->fd);
$redis->expire('websocket_sjd_1', 7200);
}
$server->push($frame->fd, 'Recv: ' . $frame->data);
}
/**
* 客户端失去链接
* @param Server $server
* @param int $fd
* @param int $reactorId
*/
public function onClose(Server $server, int $fd, int $reactorId): void
{
//删掉客户端id
$redis = $this->container->get(\Redis::class);
//移除集合中指定的value
$redis->sRem('websocket_sjd_1', $fd);
var_dump('closed');
}
/**
* 客户端链接
* @param WebSocketServer $server
* @param Request $request
*/
public function onOpen(WebSocketServer $server, Request $request): void
{
//保存客户端id
$redis = $this->container->get(\Redis::class);
$res1 = $redis->sAdd('websocket_sjd_1', $request->fd);
var_dump($res1);
$res = $redis->expire('websocket_sjd_1', 7200);
var_dump($res);
$server->push($request->fd, 'Opened');
}
}
function WebSocketTest() {
if ("WebSocket" in window) {
console.log("您的浏览器支持 WebSocket!");
var num = 0
// 打开一个 web socket
var ws = new WebSocket("ws://127.0.0.1:12222");
ws.onopen = function () {
// Web Socket 已连接上,使用 send() 方法发送数据
//alert("数据发送中...");
//ws.send("发送数据");
};
window.setInterval(function () { //每隔5秒钟发送一次心跳,避免websocket连接因超时而自动断开
var ping = {"type": "ping"};
ws.send(JSON.stringify(ping));
}, 5000);
ws.onmessage = function (evt) {
var d = JSON.parse(evt.data);
console.log(d);
if (d.code == 300) {
$(".address").text(d.address)
}
if (d.code == 200) {
var v = d.data
console.log(v);
num++
var str = `<div class="item">
<p>${v.recordOutTime}</p>
<p>${v.userOutName}</p>
<p>${v.userOutNum}</p>
<p>${v.doorOutName}</p>
</div>`
$(".tableHead").after(str)
if (num > 7) {
num--
$(".table .item:nth-last-child(1)").remove()
}
}
};
ws.error = function (e) {
console.log(e)
alert(e)
}
ws.onclose = function () {
// 关闭 websocket
alert("连接已关闭...");
};
} else {
alert("您的浏览器不支持 WebSocket!");
}
}
composer require hyperf/amqp
<?php
return [
'default' => [
'host' => 'localhost',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'vhost' => '/',
'pool' => [
'min_connections' => 1,
'max_connections' => 10,
'connect_timeout' => 10.0,
'wait_timeout' => 3.0,
'heartbeat' => -1,
],
'params' => [
'insist' => false,
'login_method' => 'AMQPLAIN',
'login_response' => null,
'locale' => 'en_US',
'connection_timeout' => 3.0,
'read_write_timeout' => 6.0,
'context' => null,
'keepalive' => false,
'heartbeat' => 3,
],
],
];
<?php
declare(strict_types=1);
namespace App\Amqp\Consumer;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Amqp\Result;
use Hyperf\Server\Server;
use Hyperf\Server\ServerFactory;
/**
* @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1)
*/
class DemoConsumer extends ConsumerMessage
{
/**
* rabbmitMQ消费端代码
* @param $data
* @return string
*/
public function consume($data): string
{
print_r($data);
//获取集合中所有的value
$redis = $this->container->get(\Redis::class);
$fdList=$redis->sMembers('websocket_sjd_1');
$server=$this->container->get(ServerFactory::class)->getServer()->getServer();
foreach($fdList as $key=>$v){
if(!empty($v)){
$server->push((int)$v, $data);
}
}
return Result::ACK;
}
}
/**
* test
* @return array
*/
public function test()
{
$data = array(
'code' => 200,
'data' => [
'userOutName' => 'ccflow',
'userOutNum' => '9999',
'recordOutTime' => date("Y-m-d H:i:s", time()),
'doorOutName' => '教师公寓',
]
);
$data = \GuzzleHttp\json_encode($data);
$message = new DemoProducer($data);
$producer = ApplicationContext::getContainer()->get(Producer::class);
$result = $producer->produce($message);
var_dump($result);
$user = $this->request->input('user', 'Hyperf');
$method = $this->request->getMethod();
return [
'method' => $method,
'message' => "{$user}.",
];
}
推奨: 「
以上がHyperf に基づいて RabbitMQ+WebSocket メッセージ プッシュを実装する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ホットAIツール

Undresser.AI Undress
リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover
写真から衣服を削除するオンライン AI ツール。

Undress AI Tool
脱衣画像を無料で

Clothoff.io
AI衣類リムーバー

AI Hentai Generator
AIヘンタイを無料で生成します。

人気の記事

ホットツール

メモ帳++7.3.1
使いやすく無料のコードエディター

SublimeText3 中国語版
中国語版、とても使いやすい

ゼンドスタジオ 13.0.1
強力な PHP 統合開発環境

ドリームウィーバー CS6
ビジュアル Web 開発ツール

SublimeText3 Mac版
神レベルのコード編集ソフト(SublimeText3)

ホットトピック









Hyperf は優れた PHP フレームワークであり、その主な特徴は高速、柔軟、スケーラブルであり、現在業界で広く使用されています。 Hyperf フレームワークを使用した開発のプロセスでは、構成管理が必要な状況によく遭遇します。この記事では、構成管理に Hyperf フレームワークを使用する方法を紹介し、具体的なコード例を示します。 1. 構成ファイルの場所 Hyperf フレームワークを使用して開発する場合、構成ファイルは通常 config ディレクトリに配置されるか、.env ファイルに入力されます。

Hyperf フレームワークを使用してファイルをダウンロードする方法 はじめに: ファイルのダウンロードは、Hyperf フレームワークを使用して Web アプリケーションを開発する場合の一般的な要件です。この記事では、Hyperf フレームワークを使用してファイルをダウンロードする方法を、具体的なコード例を含めて紹介します。 1. 準備 開始する前に、Hyperf フレームワークがインストールされ、Hyperf アプリケーションが正常に作成されていることを確認してください。 2. ファイル ダウンロード コントローラーを作成する まず、ファイル ダウンロード リクエストを処理するコントローラーを作成する必要があります。ターミナルを開いて入力します

2004 年の誕生以来、PHP は世界で最も人気のある開発言語の 1 つです。インターネットの急速な発展と技術の継続的な革新に伴い、PHP の開発も日々変化しています。その中で、マイクロサービス アーキテクチャは、今日のソフトウェア開発において徐々に人気のトレンドになってきています。この記事では、入門から習熟まで、PHPHyperf マイクロサービス開発の世界を紹介します。 1. マイクロサービス アーキテクチャとは何ですか?マイクロサービス アーキテクチャは、独立してデプロイされた一連の小規模なサービス コンポーネントに基づいて構築されたシステム アーキテクチャです。従来のモノリシック アプリケーション アーキテクチャと比較して、マイクロサービス アーキテクチャ

リクエスト電流制限に Hyperf フレームワークを使用する方法 はじめに: 最新のインターネット アプリケーションでは、高い同時実行下でシステムの安定性を確保する方法が非常に重要です。リクエストのスロットルは、一般的な対処戦略の 1 つです。この記事では、Hyperf フレームワークを使用してリクエスト フローを制限する方法と、具体的なコード例を紹介します。 1. リクエスト電流制限とは何ですか? リクエスト電流制限とは、リクエストが多すぎることによるシステムのクラッシュを防ぐために、一定期間内にシステムへのリクエストのアクセス数を制限することを指します。合理的な電流制限戦略を通じて、より優れたサービス品質と安定性を提供できます。 H

データ ページングに Hyperf フレームワークを使用する方法 はじめに: データ ページングは実際の Web 開発で非常に一般的であり、ページングを使用すると、ユーザーは大量のデータを簡単に参照できます。 Hyperf は、強力な機能とコンポーネントのセットを提供する高性能 PHP フレームワークです。この記事では、データ ページングに Hyperf フレームワークを使用する方法を紹介し、詳細なコード例を示します。 1. 準備: 開始する前に、Hyperf フレームワークが正しくインストールされ、構成されていることを確認する必要があります。 Composer経由で実行できます

画像処理に Hyperf フレームワークを使用する方法 はじめに: モバイル インターネットの急速な発展に伴い、最新の Web 開発において画像処理の重要性がますます高まっています。 Hyperf は、画像処理を含む豊富なコンポーネントと機能を提供する Swoole ベースの高性能フレームワークです。この記事では、画像処理に Hyperf フレームワークを使用する方法を紹介し、具体的なコード例を示します。 1. Hyperf フレームワークをインストールします。開始する前に、まず Hyperf フレームワークがインストールされていることを確認します。コンポ

キャッシュ管理に Hyperf フレームワークを使用する方法 キャッシュはアプリケーションのパフォーマンスを向上させる重要な手段の 1 つであり、最新のフレームワークはより便利なキャッシュ管理ツールを提供します。この記事では、キャッシュ管理に Hyperf フレームワークを使用する方法を紹介し、具体的なコード例を示します。 Hyperf フレームワークは、Swoole をベースに開発された高性能フレームワークで、強力なキャッシュ管理機能を含む豊富な組み込みコンポーネントとツールのセットを備えています。 Hyperf フレームワークは、Redis や Memcach などの複数のキャッシュ ドライバーをサポートします。

近年、マイクロサービス アーキテクチャは、最新のアプリケーションを構築するための主流の方法となっています。大規模なアプリケーションを小さな自律的なサービスに分割することで、そのスケーラビリティ、保守性、展開性が向上します。マイクロサービス アーキテクチャでは、各サービスは独立して開発、デプロイ、実行され、軽量の通信メカニズムを通じて相互作用します。マイクロサービス アプリケーションを構築する場合、適切な開発フレームワークを選択することが非常に重要です。 PHPHyperf は、Swoole の高性能コルーチン ネットワーク フレームワークに基づくマイクロサービス フレームワークです。
