首页 php框架 Swoole swoole如何实现实时推送

swoole如何实现实时推送

Dec 09, 2019 am 09:14 AM
swoole

swoole如何实现实时推送

 swoole+Redis实现实时数据推送           (推荐学习: swoole视频教程

<?php
/**
 * ***************************************
 *            单进程保护                 *
 * ***************************************
 */
$phpSelf 			= realpath($_SERVER[&#39;PHP_SELF&#39;]);
$lockFile			= $phpSelf.&#39;.lock&#39;;
$lockFileHandle 	= fopen($lockFile, "w");
if ($lockFileHandle == false) {
	exit("Can not create lock file $lockFile\n");
}
if (!flock($lockFileHandle, LOCK_EX + LOCK_NB)) {
	exit(date("Y-m-d H:i:s")."Process already exist.\n");
}
 
/**
 * ***************************************
 *     进入程序,定义相关配置            *
 * ***************************************
 */
set_time_limit(0);
//socket会话的超时时间,根据业务场景设置,这里设置为永不超时
//如果设置了时间,则从socket建立=>传输=>关闭整个过程必须在定义的时间内完成,否则自动close该socket并抛出warning
ini_set(&#39;default_socket_timeout&#39;, -1);
$conf = array(
	&#39;listen&#39;  => array(&#39;host&#39; => &#39;0.0.0.0&#39;,&#39;port&#39; => &#39;8008&#39;),
	&#39;setting&#39; => array(
		//程序允许的最大连接数,用以设置server最大允许维持多少个TCP连接,超过该数量后,新连接将被拒绝,默认为ulimit -n的值,如果设置大于ulimit -n则强制重置为ulimit- n,如果确实需要设置超过ulimit -n的值,请修改系统值 vim /etc/security/limits.conf 修改nofile的值
		"max_conn"			=> 1024,
		//启用CPU亲和设置(在全异步非阻塞是可启用),在多核的服务器中,启用此特性会将swoole的reactor线程/worker进程绑定到固定的一个核上。可以避免进程/线程的运行时在多个核之间互相切换,提高CPU Cache的命中率,如何确定绑定在了哪个核上,请参考文档, 查看命令: taskset -p 进程id
		&#39;open_cpu_affinity&#39;	=> 0,
		//配置task进程数量,配置此参数后将会启用task功能。所以Server务必要注册onTask、onFinish2个事件回调函数。如果没有注册,服务器程序将无法启动.Task进程是同步阻塞的,配置方式与Worker同步模式一致。
		&#39;task_worker_num&#39;	=> 20,
		//设置task进程的最大任务数。一个task进程在处理完超过此数值的任务后将自动退出。这个参数是为了防止PHP进程内存溢出。如果不希望进程自动退出可以设置为0, 默认是0
		&#39;task_max_request&#39;	=> 1024, 
		//设置task的数据临时目录,在swoole_server中,如果投递的数据超过8192字节,将启用临时文件来保存数据。这里的task_tmpdir就是用来设置临时文件保存的位置。
		&#39;task_tmpdir&#39;		=> &#39;/tmp/&#39;,
		//worker进程数量,根据业务代码的模式作调整,全异步非阻塞可设置为CPU核数的1-4倍;同步阻塞,请参考文档调整
		&#39;worker_num&#39;		=> 8,
		//指定swoole错误日志文件
		&#39;log_file&#39; 			=> &#39;/tmp/log/log.txt&#39;,
		//SSL公钥和私钥的位置,启用wss必须在编译swoole时加入--enable-openssl选项
		&#39;ssl_cert_file&#39;		=> &#39;/usr/local/nginx/conf/server.cer&#39;,
		&#39;ssl_key_file&#39;		=> &#39;/usr/local/nginx/conf/server.key&#39;,
	),
);
 
/**
 * ***************************************
 *       初始化Redis连接                 *
 * ***************************************
 */
$redis = null;
$redis = new Redis();
$redis->connect(REDIS_HOST, REDIS_PORT);
$redis->auth(REDIS_PWD);
$GLOBALS[&#39;redis&#39;]=$redis;
 
/**
 * ***************************************
 *        脚本重启时,清除历史的数据     *
 * ***************************************
 */
$sArr = $redis->sMembers(REDIS_S_KEY);
if (!empty($sArr)) {
	foreach ((array)$sArr as $key => $sc) {
		$fdArr = $redis->sMembers(REDIS_S_FD.$sc);
		foreach ((array)$fdArr as $k => $fd) {
			$res1 = $redis->del(REDIS_FD_S.$fd);
		}
		$res2 = $redis->del(REDIS_S_FD.$sc);
	}
	$redis->del(REDIS_S_KEY);
}
$redis->del(REDIS_ZS_KEY);
 
/**
 * ***************************************
 *           绑定回调事件                *
 * ***************************************
 */
$ws = null;
//wss服务
$ws = new swoole_websocket_server($conf[&#39;listen&#39;][&#39;host&#39;], $conf[&#39;listen&#39;][&#39;port&#39;], SWOOLE_PROCESS, SWOOLE_SOCK_TCP | SWOOLE_SSL);
$ws->set($conf[&#39;setting&#39;]);
 
/**
 * Server启动在主进程的主线程回调此函数
 * 在此事件之前Swoole Server已进行了如下操作
 * 已创建了manager进程
 * 已创建了worker子进程
 * 已监听所有TCP/UDP端口
 * 已监听了定时器
 * 在onStart中创建的全局资源对象不能在worker进程中被使用,因为发生onStart调用时,worker进程已经创建好了。新创建的对象在主进程内,worker进程无法访问到此内存区域。因此全局对象创建的代码需要放置在swoole_server_start之前
 */
$ws->on(&#39;start&#39;, function ($ws) {
	swoole_set_process_name(PROCESS_NAME.&#39;_master&#39;);
});
 
/**
 * 与onStart回调在不同进程中并行执行的回调函数(不存在先后顺序)
 * @param: $ws swoole_websocket_server object
 * @param: $wid 创建该进程时swoole分配的id(不是进程id)
 * 注意点:
 * 1. 此事件在worker进程/task进程启动时发生。onWorkerStart/onStart是并发执行的,没有先后顺序,这里创建的对象可以在进程生命周期内使用
 * 2. swoole1.6.11之后task_worker中也会触发onWorkerStart,故而在下面的处理中,加入了判断业务类型$jobType是task还是work,如果是task则命名为****_Tasker_$id,如果是worker则命名为****_Worker_$id
 * 3. 发生PHP致命错误或者代码中主动调用exit时,Worker/Task进程会退出,管理进程会重新创建新的进程
 * 5. 如果想使用swoole_server_reload实现代码重载入,必须在workerStart中require你的业务文件,而不是在文件头部。在onWorkerStart调用之前已包含的文件,不会重新载入代码。
 * 6. 可以将公用的,不易变的php文件放置到onWorkerStart之前(例如上面的redis配置)。这样虽然不能重载入代码,但所有worker是共享的,不需要额外的内存来保存这些数据。
 * 7. onWorkerStart之后的代码每个worker都需要在内存中保存一份
 */
$ws->on(&#39;workerstart&#39;, function ($ws, $wid) {
	$jobType = $ws->taskworker ? &#39;Tasker&#39; : &#39;Worker&#39;;
	swoole_set_process_name(PROCESS_NAME.&#39;_&#39;.$jobType.&#39;_&#39;.$wid);
	$GLOBALS[&#39;ws&#39;] = $ws; //保存server对象到全局中以待使用
	if ($jobType == &#39;Worker&#39;) { //在某个worker进程上绑定redis订阅进程
		if ($wid === 0) {
            $dataRedis = null;
            $dataRedis = new Redis();
            $dataRedis->connect(REDIS_HOST_DATA, REDIS_PORT_DATA);
            $dataRedis->auth(REDIS_PWD_DATA);
            //使用psubscribe订阅指定模式的频道,这里*表示所有频道
            //请注意,redis订阅不提供区分库(db)的功能,所以多个库都同时在发布同一个名字的频道时,都将被订阅到
			$dataRedis->psubscribe(array("*"), "sendTask");
		}
	}
});
 
/**
 * 管理进程启用时,调用该回调函数
 * 注意manager进程中不能添加定时器
 * manager进程中可以调用sendMessage接口向其他工作进程发送消息
 */
$ws->on(&#39;managerstart&#39;, function ($ws) {
	swoole_set_process_name(PROCESS_NAME.&#39;_manage&#39;);
});
 
/**
 * swoole websocket服务特有的回调函数,此函数在websocket服务器中必须定义实现,否则websocket服务将无法启动
 * 当服务器收到来自客户端的数据帧时会回调此函数
 * @param: $ws为swoole_websocket_server对象,其结构在调试时可var_dump查看
 * @param: $frame为swoole_websocket_frame对象,包含了客户端发来的数据帧信息,包含以下四个属性:
 * @param: $frame->fd: 客户端的socket id,每个id对应一个客户端,推送消息的时候需要指定
 * @param: $frame->data: 数据内容,可以是文本内容或者是二进制数据(图片等),可以通过opcode的值来判断。$data 如果是文本类型,编码格式必然是UTF-8,这是WebSocket协议规定的
 * @param: $frame->opcode: WebSocket的OpCode类型,可以参考WebSocket协议标准文档, WEBSOCKET_OPCODE_TEXT = 0x1 ,文本数据; WEBSOCKET_OPCODE_BINARY = 0x2 ,二进制数据
 * @param: $frame->finish: 表示数据帧是否完整,一个WebSocket请求可能会分成多个数据帧进行发送
 * 注意点: 客户端发送的ping帧不会触发onMessage,底层会自动回复pong包
 */
$ws->on(&#39;message&#39;, function ($ws, $frame) {
    echo "Server has receive message\n";
    //接收到客户端请求,并建立连接之后,进行相应业务的处理
    handleClientData($ws, $frame);
});
 
/**
 * 在task_worker进程内被调用。worker进程可以使用swoole_server_task函数向task_worker进程投递新的任务(此处使用的是taskwait)
 * 当前的Task进程在调用onTask回调函数时会将进程状态切换为忙碌,这时将不再接收新的Task,当onTask函数返回时会将进程状态切换为空闲然后继续接收新的Task。
 * @param: $ws swoole_websocket_server object
 * @param: $tid task process id
 * @param: $wid from id 表示来自哪个Worker进程。$task_id和$wid组合起来才是全局唯一的,不同的worker进程投递的任务ID可能会有相同
 * @param: $data 需要执行的任务内容
 * 注意点: onTask函数执行时遇到致命错误退出,或者被外部进程强制kill,当前的任务会被丢弃,但不会影响其他正在排队的Task
 */
$ws->on(&#39;task&#39;, function ($ws, $tid, $wid, $data) {
	switch ($data[&#39;cmd&#39;]) {
		case &#39;pushToClient&#39;: $ret = pushToClientTask($ws, $data[&#39;key&#39;], $data[&#39;val&#39;]); break;
	}
	//1.7.2以上的版本,在onTask函数中 return字符串,表示将此内容返回给worker进程。worker进程中会触发onFinish函数,表示投递的task已完成。return的变量可以是任意非null的PHP变量
	return $returnContent;
	//1.7.2以前的版本,需要调用swoole_server->finish()函数将结果返回给worker进程
	// $ws->finish($data);
});
 
/**
 * 当worker进程投递的任务在task_worker中完成时,task进程会通过$ws->finish()方法将任务处理的结果发送给worker进程。
 * @param: $ws swoole_websocket_server object
 * @param: $tid task_id
 * @param: $data 任务处理后的结果内容
 * 注意点: task进程的onTask事件中没有调用finish方法或者return结果,worker进程不会触发onFinish
 *        执行onFinish逻辑的worker进程与下发task任务的worker进程是同一个进程
 */
$ws->on(&#39;finish&#39;, function($ws, $tid, $data) {
 
});
 
/**
 * TCP客户端连接关闭后,在worker进程中回调此函数
 * 在函数中可以做一些类似于删除业务中与每个客户端交互时存放的数据的操作
 * @param: $ws swoole_websocket_server object
 * @param: $fd 已关闭的fd interger
 * @param: $rid(可选),来自哪个reactor线程
 * 注意点: 
 * 1. onClose回调函数如果发生了致命错误,会导致连接泄漏。通过netstat命令会看到大量CLOSE_WAIT状态的TCP连接
 * 2. 查看命令netstat -anopc | grep 端口号,可以查看到TCP接收和发送队列是否有堆积以及TCP连接的状态
 * 3. 无论由客户端发起close还是服务器端主动调用$serv->close()关闭连接,都会触发此事件。因此只要连接关闭,就一定会回调此函数
 * 4. 1.7.7+版本以后onClose中依然可以调用connection_info方法获取到连接信息,在onClose回调函数执行完毕后才会调用close关闭TCP连接
 * 5. 这里回调onClose时表示客户端连接已经关闭,所以无需执行$server->close($fd)。代码中执行$serv->close($fd)会抛出PHP错误告警。也就是在onclose中不能再$ws->close()了.
 * 6. swoole-1.9.7版本修改了$reactorId参数,当服务器主动关闭连接时,底层会设置此参数为-1,可以通过判断$reactorId < 0来分辨关闭是由服务器端还是客户端发起的(debug时可以使用)
 */
$ws->on(&#39;close&#39;, function ($ws, $fd) {
	$redis = new Redis();
	$redis->connect(REDIS_HOST, REDIS_PORT);
	$redis->auth(REDIS_PWD);
	$sArr = $redis->sMembers(REDIS_FD_S.$fd);
	if (!empty($sArr)) {
		foreach ((array)$sArr as $key => $sc) {
			$res = $redis->sRem(REDIS_S_FD.$sc, $fd);
			$num = $redis->sCard(REDIS_S_FD.$sc);
			if ($num == &#39;0&#39;) {
				$redis->sRem(REDIS_S_KEY, $sc);
				$redis->hDel(REDIS_ZS_KEY, $sc);
			}
		}
	}
	$redis->del(REDIS_FD_S.$fd);
	$redis->close();
	echo "FD $fd has closed.\n";
});
 
/**
 * 开启swoole_websocket_server服务
 */
$ws->start();
 
 
/**
 * 接受到消息以后进行响应异步任务的执行
 * @param: $ws swoole_websocket_sever object
 * @param: $frame swoole_websocket_frame obejct
 */
function handleClientData($ws, $frame) {
	$data = $frame->data;
	$redis = new Redis();
	$redis->connect(REDIS_HOST, REDIS_PORT);
	$redis->auth(REDIS_PWD);
	$isMembers = $redis->sIsmember(REDIS_S_FD.$sc, $frame->fd);
	if (!$isMembers) {
		$res = $redis->sAdd(REDIS_S_FD.$sc, $frame->fd);
	}
	$redis->sAdd(REDIS_FD_S.$frame->fd, $sc);
	$isMembers = $redis->sIsmember(REDIS_S_KEY, $sc);
	if (!$isMembers) { 
		$redis->sAdd(REDIS_S_KEY, $sc);
	}
}
 
 
/**
 * redis订阅后的回调函数
 * @param: $ins instance实例
 * @param: $pattern 匹配模式
 * @param: $channel 频道名
 * @param: $data 数据
 * 注意点: subscribe和psubscribe两种不同的订阅方式的回调函数的参数个数不一样,后者多了$pattern参数
 */
function sendTask($ins, $pattern, $channel, $data) {
	//满足一些条件后,投递到task进程中进行推送
	$taskData = array(
		&#39;cmd&#39; => &#39;pushToClient&#39;,
		&#39;key&#39; => $sc,
		&#39;val&#39; => $data,
	);
	//请注意,taskwait是同步阻塞的,所以改脚本并不是全异步非阻塞的
	$GLOBALS[&#39;ws&#39;]->taskwait($taskData);
}
 
/**
 * 推送消息到指定的客户端
 * @param: $ws swoole_websocket_server object
 * @param: $sc 股票代码
 * @param: $data 要推送的数据
 */
function pushToClientTask($ws, $sc, $data) {
    $redis = new Redis();
    $redis->connect(REDIS_HOST, REDIS_PORT);
    $redis->auth(REDIS_PWD);
	$fdList = $redis->sMembers(REDIS_S_FD.$sArr[4]);
	if (!empty($fdList)) {
		foreach ((array)$fdList as $fd) {
			$res = $GLOBALS[&#39;ws&#39;]->push($fd, $data);
			echo "FD: $fd push $res.\n";
			if (!$res) { //推送失败,即客户端已经断开连接
				//从该fd订阅的所有股票中删除该fd
				$sArrOfFd = $redis->sMembers(REDIS_FD_S.$fd);
				if (!empty($sArrOfFd)) {
					foreach ((array)$sArrOfFd as $key => $sc) {
						$res = $redis->sRem(REDIS_S_FD.$sc, $fd);
						$num = $redis->sCard(REDIS_S_FD.$sc);
						if ($num == &#39;0&#39;) {
							$redis->sRem(REDIS_S_KEY, $sc);
							$redis->hDel(REDIS_ZS_KEY, $sc);
						}
					}
				}
				$redis->del(REDIS_FD_S.$fd);
			}
		}
	}
    $redis->close();
}
登录后复制

以上是swoole如何实现实时推送的详细内容。更多信息请关注PHP中文网其他相关文章!

本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

AI Hentai Generator

AI Hentai Generator

免费生成ai无尽的。

热门文章

R.E.P.O.能量晶体解释及其做什么(黄色晶体)
3 周前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.最佳图形设置
3 周前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.如果您听不到任何人,如何修复音频
3 周前 By 尊渡假赌尊渡假赌尊渡假赌

热工具

记事本++7.3.1

记事本++7.3.1

好用且免费的代码编辑器

SublimeText3汉化版

SublimeText3汉化版

中文版,非常好用

禅工作室 13.0.1

禅工作室 13.0.1

功能强大的PHP集成开发环境

Dreamweaver CS6

Dreamweaver CS6

视觉化网页开发工具

SublimeText3 Mac版

SublimeText3 Mac版

神级代码编辑软件(SublimeText3)

swoole协程如何在laravel中使用 swoole协程如何在laravel中使用 Apr 09, 2024 pm 06:48 PM

Laravel 中使用 Swoole 协程可以并发处理大量请求,优势包括:并发处理:允许同时处理多个请求。高性能:基于 Linux epoll 事件机制,高效处理请求。低资源消耗:所需服务器资源更少。易于集成:与 Laravel 框架无缝集成,使用简单。

如何使用Swoole实现高性能的HTTP反向代理服务器 如何使用Swoole实现高性能的HTTP反向代理服务器 Nov 07, 2023 am 08:18 AM

如何使用Swoole实现高性能的HTTP反向代理服务器Swoole是一款基于PHP语言的高性能、异步、并发的网络通信框架。它提供了一系列的网络功能,可以用于实现HTTP服务器、WebSocket服务器等。在本文中,我们将介绍如何使用Swoole来实现一个高性能的HTTP反向代理服务器,并提供具体的代码示例。环境配置首先,我们需要在服务器上安装Swoole扩展

swoole和workerman哪个好 swoole和workerman哪个好 Apr 09, 2024 pm 07:00 PM

Swoole 和 Workerman 都是高性能 PHP 服务器框架。Swoole 以其异步处理、出色的性能和可扩展性而闻名,适用于需要处理大量并发请求和高吞吐量的项目。Workerman 提供了异步和同步模式的灵活性,具有直观的 API,更适合易用性和处理较低并发量的项目。

swoole和java哪个性能好 swoole和java哪个性能好 Apr 09, 2024 pm 07:03 PM

性能比较:吞吐量:Swoole 凭借协程机制,吞吐量更高。延迟:Swoole 的协程上下文切换开销更低,延迟更小。内存消耗:Swoole 的协程占用内存更少。易用性:Swoole 提供更易于使用的并发编程 API。

swoole_process 怎么让用户切换 swoole_process 怎么让用户切换 Apr 09, 2024 pm 06:21 PM

Swoole Process 中可以让用户切换,具体操作步骤为:创建进程;设置进程用户;启动进程。

swoole框架怎么重启服务 swoole框架怎么重启服务 Apr 09, 2024 pm 06:15 PM

要重启 Swoole 服务,请按照以下步骤操作:检查服务状态并获取 PID。使用 "kill -15 PID" 停止服务。使用启动服务的相同命令重新启动服务。

Swoole实战:如何使用协程进行并发任务处理 Swoole实战:如何使用协程进行并发任务处理 Nov 07, 2023 pm 02:55 PM

Swoole实战:如何使用协程进行并发任务处理引言在日常的开发中,我们常常会遇到需要同时处理多个任务的情况。传统的处理方式是使用多线程或多进程来实现并发处理,但这种方式在性能和资源消耗上存在一定的问题。而PHP作为一门脚本语言,通常无法直接使用多线程或多进程的方式来处理任务。然而,借助于Swoole协程库,我们可以使用协程来实现高性能的并发任务处理。本文将介

Swoole进阶:如何优化服务器的CPU利用率 Swoole进阶:如何优化服务器的CPU利用率 Nov 07, 2023 pm 12:27 PM

Swoole是一款高性能的PHP网络开发框架,借助其强大的异步机制和事件驱动特点,可以实现快速构建高并发、高吞吐的服务器应用。然而,随着业务的不断扩展和并发量的增加,服务器的CPU利用率可能会成为一个瓶颈,影响服务器的性能和稳定性。因此,在本文中,我们将介绍如何优化服务器的CPU利用率,同时提高Swoole服务器的性能和稳定性,并提供具体的优化代码示例。一、

See all articles