Messaging is widely used in various websites, and this function is also essential for a website. This article mainly introduces the application of Redis in PHP-message passing.
Reading Contents
1. Summary
2. Implementation Method
3. One-to-one messaging
4. Many-to-many messaging Delivery
1. Summary
The application of messaging is widely used in various websites, and this function is also essential for a website. Common messaging applications include @me on Sina Weibo, prompts after commenting, like prompts, private messages, and even new things shared on Weibo; private messages and live on Zhihu Messages sent, Zhihu team messages, etc.
2. Implementation method
Message passing means that two or more clients send and receive messages to each other.
There are usually two ways to implement it:
The first is message push. Redis has this built-in mechanism. Publish pushes messages to channels and subscribes subscribe to channels. One disadvantage of this method is that the receiver must be online at all times (that is, the program cannot be stopped at this time and remains in the monitoring state. If the connection is disconnected, the client will lose information)
The second type is message pulling. The so-called message pull means that the client independently obtains the data stored in the server. Redis does not implement a message pulling mechanism internally. Therefore, we need to manually write the code ourselves to implement this function.
Here we further subdivide messaging into one-to-one messaging and many-to-many messaging (group messaging).
[Note: The codes of the two classes are relatively large, so they are folded]
3. One-to-one messaging
Example 1: One-to-one message sending and acquisition
Module requirements:
1. Prompt how many contacts have sent new messages
2. The information includes the sender, time, and content of the message.
3. The old message can be obtained.
4. The message can be kept for 7 days, and the expiration will be passively triggered. Delete
Redis implementation ideas:
1. New messages and old messages are stored in two linked lists respectively
2 , The structure of the original message is stored in the form of an array, and contains the sender, timestamp, and information content
3. Before pushing into the redis linked list, the data needs to be converted to json type and then stored
4. When retrieving new information, rpoplpush should be used to push the read new message into the old message linked list.
5. When retrieving old messages, the time and present of the old message should be used. Compare the time. If it times out, all subsequent data will be deleted directly (because the data is pushed into the linked list one by one according to time, so the time is arranged in order)
Data storage structure diagram:
PHP implementation code:
SinglePullMessage.class.php
<?php #单接接收者接收消息 class SinglePullMessage { private $redis=''; #存储redis对象 /** * @desc 构造函数 * * @param $host string | redis主机 * @param $port int | 端口 */ public function __construct($host,$port=6379) { $this->redis=new Redis(); $this->redis->connect($host,$port); } /** * @desc 发送消息(一个人) * * @param $toUser string | 接收人 * @param $messageArr array | 发送的消息数组,包含sender、message、time * * @return bool */ public function sendSingle($toUser,$messageArr) { $json_message=json_encode($messageArr); #编码成json数据 return $this->redis->lpush($toUser,$json_message); #将数据推入链表 } /** * @desc 用户获取新消息 * * @param $user string | 用户名 * * @return array 返回数组,包含多少个用户发来新消息,以及具体消息 */ public function getNewMessage($user) { #接收新信息数据,并且将数据推入旧信息数据链表中,并且在原链表中删除 $messageArr=array(); while($json_message=$this->redis->rpoplpush($user, 'preMessage_'.$user)) { $temp=json_decode($json_message); #将json数据变成对象 $messageArr[$temp->sender][]=$temp; #转换成数组信息 } if($messageArr) { $arr['count']=count($messageArr); #统计有多少个用户发来信息 $arr['messageArr']=$messageArr; return $arr; } return false; } public function getPreMessage($user) { ##取出旧消息 $messageArr=array(); $json_pre=$this->redis->lrange('preMessage_'.$user, 0, -1); #一次性将全部旧消息取出来 foreach ($json_pre as $k => $v) { $temp=json_decode($v); #json反编码 $timeout=$temp->time+60*60*24*7; #数据过期时间 七天过期 if($timeout<time()) #判断数据是否过期 { if($k==0) #若是最迟插入的数据都过期了,则将所有数据删除 { $this->redis->del('preMessage_'.$user); break; } $this->redis->ltrim('preMessage_'.$user, 0, $k); #若检测出有过期的,则将比它之前插入的所有数据删除 break; } $messageArr[$temp->sender][]=$temp; } return $messageArr; } /** * @desc 消息处理,没什么特别的作用。在这里这是用来处理数组信息,然后将其输出。 * * @param $arr array | 需要处理的信息数组 * * @return 返回打印输出 */ public function dealArr($arr) { foreach ($arr as $k => $v) { foreach ($v as $k1 => $v2) { echo '发送人:'.$v2->sender.' 发送时间:'.date('Y-m-d h:i:s',$v2->time).'<br/>'; echo '消息内容:'.$v2->message.'<br/>'; } echo "<hr/>"; } } }
Test:
1. Send message
#Create test1.php
include './SinglePullMessage.class.php'; $object=new SinglePullMessage('192.168.95.11'); #发送消息 $sender='boss'; #发送者 $to='jane'; #接收者 $message='How are you'; #信息 $time=time(); $arr=array('sender'=>$sender,'message'=>$message,'time'=>$time); echo $object->sendSingle($to,$arr);
2. Get new messages
#Create test2.php
include './SinglePullMessage.class.php'; $object=new SinglePullMessage('192.168.95.11'); #获取新消息 $arr=$object->getNewMessage('jane'); if($arr) { echo $arr['count']."个联系人发来新消息<br/><hr/>"; $object->dealArr($arr['messageArr']); } else echo "无新消息";
Access results:
##3. Get old messages
#Create test3.phpinclude './SinglePullMessage.class.php'; $object=new SinglePullMessage('192.168.95.11'); #获取旧消息 $arr=$object->getPreMessage('jane'); if($arr) { $object->dealArr($arr); } else echo "无旧数据";
##4. Many-to-many messaging
Example 2: Many-to-many message sending and acquisition (i.e. group) Module requirements:
1. Users can Create a group by yourself and become the group leader
2. The group leader can bring people in as group members and kick people
3. Users can directly exit the group
4. Messages can be sent, and each member can pull messages.
5. The maximum message capacity of the group is 5,000.
6. Members can pull new messages. And prompts how many new messages there are
7. Members can get old messages that have been read in pages
. . . . . Let’s just write these functions. Students who have needs or want to practice can add other functions, such as muting, anonymous message sending, file sending, etc.
Redis implementation ideas: 1、群组的消息以及群组的成员组成采用有序集合进行存储。群组消息有序集合的member存储用户发送的json数据消息,score存储唯一值,将采用原子操作incr获取string中的自增长值进行存储;群组成员有序集合的member存储user,score存储非零数字(在这里这个score意义不大,我的例子代码中使用数字1为群主的score,其他的存储为2。当然这使用这个数据还可以扩展别的功能,例如群组中成员等级)可参考下面数据存储结构简图。 2、用户所加入的群组也是采用有序集合进行存储。其中,member存储群组ID,score存储用户已经获取该群组的最大消息分值(对应群组消息的score值) 3、用户创建群组的时候,通过原子操作incr从而获取一个唯一ID 4、用户在群中发送消息时,也是通过原子操作incr获取一个唯一自增长有序ID 5、在执行incr时,为防止并发导致竞争关系,因此需要进行加锁操作【redis详细锁的讲解可以参考:Redis构建分布式锁http://www.jb51.net/article/109704.htm】 6、创建群组方法简要思路,任何一个用户都可以创建群组聊天,在创建的同时,可以选择时是否添加群组成员(参数通过数组的形式)。创建过程将会为这个群组建立一个群组成员有序集合(群组信息有序集合暂时不创建),接着将群主添加进去,再将群ID添加用户所参加的群组有序集合中。 数据存储结构图: PHP的代码实现: #ManyPullMessage.class.php 测试: 1、建立createGroupChat.php(测试创建群组功能) 执行代码并创建568、569群组(群主为jack) 2、建立addMembers.php(测试添加成员功能) 执行代码并添加新成员 3、建立delete.php(测试群主删除成员功能) 4、建立sendMessage.php(测试发送消息功能) 多执行几遍,568、569都发几条 5、建立getNewMessage.php(测试用户获取新消息功能) 6、建立getPartMessage.php(测试用户获取某个群组部分消息) (多发送几条消息,用于测试。568中共18条数据) page=1,size=10 page=2,size=10 测试完毕,还需要别的功能可以自己进行修改添加测试。 以上就是本文的全部内容,希望对大家的学习有所帮助。 相关推荐: PHP method to automatically confirm the receipt of e-commerce orders redis queue The above is the detailed content of Detailed explanation of application messaging of Redis in PHP. For more information, please follow other related articles on the PHP Chinese website!<?php
class ManyPullMessage
{
private $redis=''; #存储redis对象
/**
* @desc 构造函数
*
* @param $host string | redis主机
* @param $port int | 端口
*/
public function __construct($host,$port=6379)
{
$this->redis=new Redis();
$this->redis->connect($host,$port);
}
/**
* @desc 用于创建群组的方法,在创建的同时还可以拉人进群组
*
* @param $user string | 用户名,创建群组的主人
* @param $addUser array | 其他用户构成的数组
*
* @param $lockName string | 锁的名字,用于获取群组ID的时候用
* @return int 返回群组ID
*/
public function createGroupChat($user, $addUser=array(), $lockName='chatIdLock')
{
$identifier=$this->getLock($lockName); #获取锁
if($identifier)
{
$id=$this->redis->incr('groupChatID'); #获取群组ID
$this->releaseLock($lockName,$identifier); #释放锁
}
else
return false;
$messageCount=$this->redis->set('countMessage_'.$id, 0); #初始化这个群组消息计数器
#开启非事务型流水线,一次性将所有redis命令传给redis,减少与redis的连接
$pipe=$this->redis->pipeline();
$this->redis->zadd('groupChat_'.$id, 1, $user); #创建群组成员有序集合,并添加群主
#将这个群组添加到user所参加的群组有序集合中
$this->redis->zadd('hasGroupChat_'.$user, 0, $id);
foreach ($addUser as $v) #创建群组的同时需要添加的用户成员
{
$this->redis->zadd('groupChat_'.$id, 2, $v);
$this->redis->zadd('hasGroupChat_'.$v, 0, $id);
}
$pipe->exec();
return $id; #返回群组ID
}
/**
* @desc 群主主动拉人进群
*
* @param $user string | 群主名
* @param $groupChatID int | 群组ID
* @param $addMembers array | 需要拉进群的用户
*
* @return bool
*/
public function addMembers($user, $groupChatID, $addMembers=array())
{
$groupMasterScore=$this->redis->zscore('groupChat_'.$groupChatID, $user); #将groupChatName的群主取出来
if($groupMasterScore==1) #判断user是否是群主
{
$pipe=$this->redis->pipeline(); #开启非事务流水线
foreach ($addMembers as $v)
{
$this->redis->zadd('groupChat_'.$groupChatID, 2, $v); #添加进群
$this->redis->zadd('hasGroupChat_'.$v, 0, $groupChatID); #添加群名到用户的有序集合中
}
$pipe->exec();
return true;
}
return false;
}
/**
* @desc 群主删除成员
*
* @param $user string | 群主名
* @param $groupChatID int | 群组ID
* @param $delMembers array | 需要删除的成员名字
*
* @return bool
*/
public function delMembers($user, $groupChatID, $delMembers=array())
{
$groupMasterScore=$this->redis->zscore('groupChat_'.$groupChatID, $user);
if($groupMasterScore==1) #判断user是否是群主
{
$pipe=$this->redis->pipeline(); #开启非事务流水线
foreach ($delMembers as $v)
{
$this->redis->zrem('groupChat_'.$groupChatID, $v);
$this->redis->zrem('hasGroupChat_'.$v, $groupChatID);
}
$pipe->exec();
return true;
}
return false;
}
/**
* @desc 退出群组
*
* @param $user string | 用户名
* @param $groupChatID int | 群组名
*/
public function quitGroupChat($user, $groupChatID)
{
$this->redis->zrem('groupChat_'.$groupChatID, $user);
$this->redis->zrem('hasGroupChat_'.$user, $groupChatID);
return true;
}
/**
* @desc 发送消息
*
* @param $user string | 用户名
* @param $groupChatID int | 群组ID
* @param $messageArr array | 包含发送消息的数组
* @param $preLockName string | 群消息锁前缀,群消息锁全名为countLock_群ID
*
* @return bool
*/
public function sendMessage($user, $groupChatID, $messageArr, $preLockName='countLock_')
{
$memberScore=$this->redis->zscore('groupChat_'.$groupChatID, $user); #成员score
if($memberScore)
{
$identifier=$this->getLock($preLockName.$groupChatID); #获取锁
if($identifier) #判断获取锁是否成功
{
$messageCount=$this->redis->incr('countMessage_'.$groupChatID);
$this->releaseLock($preLockName.$groupChatID,$identifier); #释放锁
}
else
return false;
$json_message=json_encode($messageArr);
$this->redis->zadd('groupChatMessage_'.$groupChatID, $messageCount, $json_message);
$count=$this->redis->zcard('groupChatMessage_'.$groupChatID); #查看信息量大小
if($count>5000) #判断数据量有没有达到5000条
{ #数据量超5000,则需要清除旧数据
$start=5000-$count;
$this->redis->zremrangebyrank('groupChatMessage_'.$groupChatID, $start, $count);
}
return true;
}
return false;
}
/**
* @desc 获取新信息
*
* @param $user string | 用户名
*
* @return 成功则放回json数据数组,无新信息返回false
*/
public function getNewMessage($user)
{
$arrID=$this->redis->zrange('hasGroupChat_'.$user, 0, -1, 'withscores'); #获取用户拥有的群组ID
$json_message=array(); #初始化
foreach ($arrID as $k => $v) #遍历循环所有群组,查看是否有新消息
{
$messageCount=$this->redis->get('countMessage_'.$k); #群组最大信息分值数
if($messageCount>$v) #判断用户是否存在未读新消息
{
$json_message[$k]['message']=$this->redis->zrangebyscore('groupChatMessage_'.$k, $v+1, $messageCount);
$json_message[$k]['count']=count($json_message[$k]['message']); #统计新消息数量
$this->redis->zadd('hasGroupChat_'.$user, $messageCount, $k); #更新已获取消息
}
}
if($json_message)
return $json_message;
return false;
}
/**
* @desc 分页获取群组信息
*
* @param $user string | 用户名
* @param $groupChatID int | 群组ID
* @param $page int | 第几页
* @param $size int | 每页多少条数据
*
* @return 成功返回json数据,失败返回false
*/
public function getPartMessage($user, $groupChatID, $page=1, $size=10)
{
$start=$page*$size-$size; #开始截取数据位置
$stop=$page*$size-1; #结束截取数据位置
$json_message=$this->redis->zrevrange('groupChatMessage_'.$groupChatID, $start, $stop);
if($json_message)
return $json_message;
return false;
}
/**
* @desc 加锁方法
*
* @param $lockName string | 锁的名字
* @param $timeout int | 锁的过期时间
*
* @return 成功返回identifier/失败返回false
*/
public function getLock($lockName, $timeout=2)
{
$identifier=uniqid(); #获取唯一标识符
$timeout=ceil($timeout); #确保是整数
$end=time()+$timeout;
while(time()<$end) #循环获取锁
{
/*
#这里的set操作可以等同于下面那个if操作,并且可以减少一次与redis通讯
if($this->redis->set($lockName, $identifier array('nx', 'ex'=>$timeout)))
return $identifier;
*/
if($this->redis->setnx($lockName, $identifier)) #查看$lockName是否被上锁
{
$this->redis->expire($lockName, $timeout); #为$lockName设置过期时间
return $identifier; #返回一维标识符
}
elseif ($this->redis->ttl($lockName)===-1)
{
$this->redis->expire($lockName, $timeout); #检测是否有设置过期时间,没有则加上
}
usleep(0.001); #停止0.001ms
}
return false;
}
/**
* @desc 释放锁
*
* @param $lockName string | 锁名
* @param $identifier string | 锁的唯一值
*
* @param bool
*/
public function releaseLock($lockName,$identifier)
{
if($this->redis->get($lockName)==$identifier) #判断是锁有没有被其他客户端修改
{
$this->redis->multi();
$this->redis->del($lockName); #释放锁
$this->redis->exec();
return true;
}
else
{
return false; #其他客户端修改了锁,不能删除别人的锁
}
}
}
?>
include './ManyPullMessage.class.php';
$object=new ManyPullMessage('192.168.95.11');
#创建群组
$user='jack';
$arr=array('jane1','jane2');
$a=$object->createGroupChat($user,$arr);
echo "<pre class="brush:php;toolbar:false">";
print_r($a);
echo "
";die;
include './ManyPullMessage.class.php';
$object=new ManyPullMessage('192.168.95.11');
$b=$object->addMembers('jack','568',array('jane1','jane2','jane3','jane4'));
echo "<pre class="brush:php;toolbar:false">";
print_r($b);
echo "
";die;include './ManyPullMessage.class.php';
$object=new ManyPullMessage('192.168.95.11');
#群主删除成员
$c=$object->delMembers('jack', '568', array('jane1','jane4'));
echo "<pre class="brush:php;toolbar:false">";
print_r($c);
echo "
";die;include './ManyPullMessage.class.php';
$object=new ManyPullMessage('192.168.95.11');
#发送消息
$user='jane2';
$message='go go go';
$groupChatID=568;
$arr=array('sender'=>$user, 'message'=>$message, 'time'=>time());
$d=$object->sendMessage($user,$groupChatID,$arr);
echo "<pre class="brush:php;toolbar:false">";
print_r($d);
echo "
";die;include './ManyPullMessage.class.php';
$object=new ManyPullMessage('192.168.95.11');
#用户获取新消息
$e=$object->getNewMessage('jane2');
echo "<pre class="brush:php;toolbar:false">";
print_r($e);
echo "
";die;include './ManyPullMessage.class.php';
$object=new ManyPullMessage('192.168.95.11');
#用户获取某个群组部分消息
$f=$object->getPartMessage('jane2', 568, 1, 10);
echo "<pre class="brush:php;toolbar:false">";
print_r($f);
echo "
";die;