目录
php自己实现memcached的队列类
首页 后端开发 php教程 php自己实现memcached的队列类_PHP教程

php自己实现memcached的队列类_PHP教程

Jul 13, 2016 am 09:54 AM
队列

php自己实现memcached的队列类

 

 

<!--?php
/*
 * memcache队列类
 * 支持多进程并发写入、读取
 * 边写边读,AB面轮值替换
 * @author guoyu
 * @create on 9:25 2014-9-28
 * @qq技术行业交流群:136112330
 *
 * @example:
 *      $obj = new memcacheQueue(&#39;duilie&#39;);
 *      $obj--->add(&#39;1asdf&#39;);
 *      $obj->getQueueLength();
 *      $obj->read(11);
 *      $obj->get(8);
 */

class memcacheQueue{
    public static   $client;            //memcache客户端连接
    public          $access;            //队列是否可更新   
    private         $currentSide;       //当前轮值的队列面:A/B
    private         $lastSide;          //上一轮值的队列面:A/B
    private         $sideAHead;         //A面队首值
    private         $sideATail;         //A面队尾值
    private         $sideBHead;         //B面队首值
    private         $sideBTail;         //B面队尾值
    private         $currentHead;       //当前队首值
    private         $currentTail;       //当前队尾值
    private         $lastHead;          //上轮队首值
    private         $lastTail;          //上轮队尾值 
    private         $expire;            //过期时间,秒,1~2592000,即30天内;0为永不过期
    private         $sleepTime;         //等待解锁时间,微秒
    private         $queueName;         //队列名称,唯一值
    private         $retryNum;          //重试次数,= 10 * 理论并发数

    const   MAXNUM      = 2000;                 //(单面)最大队列数,建议上限10K
    const   HEAD_KEY    = &#39;_lkkQueueHead_&#39;;     //队列首kye
    const   TAIL_KEY    = &#39;_lkkQueueTail_&#39;;     //队列尾key
    const   VALU_KEY    = &#39;_lkkQueueValu_&#39;;     //队列值key
    const   LOCK_KEY    = &#39;_lkkQueueLock_&#39;;     //队列锁key
    const   SIDE_KEY    = &#39;_lkkQueueSide_&#39;;     //轮值面key

    /*
     * 构造函数
     * @param   [config]    array   memcache服务器参数
     * @param   [queueName] string  队列名称
     * @param   [expire]    string  过期时间
     * @return  NULL
     */
    public function __construct($queueName =&#39;&#39;,$expire=&#39;&#39;,$config =&#39;&#39;){
        if(empty($config)){
            self::$client = memcache_pconnect(&#39;localhost&#39;,11211);
        }elseif(is_array($config)){//array(&#39;host&#39;=>&#39;127.0.0.1&#39;,&#39;port&#39;=>&#39;11211&#39;)
            self::$client = memcache_pconnect($config[&#39;host&#39;],$config[&#39;port&#39;]);
        }elseif(is_string($config)){//"127.0.0.1:11211"
            $tmp = explode(&#39;:&#39;,$config);
            $conf[&#39;host&#39;] = isset($tmp[0]) ? $tmp[0] : &#39;127.0.0.1&#39;;
            $conf[&#39;port&#39;] = isset($tmp[1]) ? $tmp[1] : &#39;11211&#39;;
            self::$client = memcache_pconnect($conf[&#39;host&#39;],$conf[&#39;port&#39;]);     
        }
        if(!self::$client) return false;

        ignore_user_abort(TRUE);//当客户断开连接,允许继续执行
        set_time_limit(0);//取消脚本执行延时上限

        $this->access = false;
        $this->sleepTime = 1000;
        $expire = (empty($expire) && $expire!=0) ? 3600 : (int)$expire;
        $this->expire = $expire;
        $this->queueName = $queueName;
        $this->retryNum = 10000;

        $side = memcache_add(self::$client, $queueName . self::SIDE_KEY, &#39;A&#39;,false, $expire);
        $this->getHeadNTail($queueName);
        if(!isset($this->sideAHead) || empty($this->sideAHead)) $this->sideAHead = 0;
        if(!isset($this->sideATail) || empty($this->sideATail)) $this->sideATail = 0;
        if(!isset($this->sideBHead) || empty($this->sideBHead)) $this->sideBHead = 0;
        if(!isset($this->sideBHead) || empty($this->sideBHead)) $this->sideBHead = 0;
    }

    /*
     * 获取队列首尾值
     * @param   [queueName] string  队列名称
     * @return  NULL
     */
    private function getHeadNTail($queueName){
        $this->sideAHead = (int)memcache_get(self::$client, $queueName.&#39;A&#39;. self::HEAD_KEY);
        $this->sideATail = (int)memcache_get(self::$client, $queueName.&#39;A&#39;. self::TAIL_KEY);
        $this->sideBHead = (int)memcache_get(self::$client, $queueName.&#39;B&#39;. self::HEAD_KEY);
        $this->sideBTail = (int)memcache_get(self::$client, $queueName.&#39;B&#39;. self::TAIL_KEY);
    }

    /*
     * 获取当前轮值的队列面
     * @return  string  队列面名称
     */
    public function getCurrentSide(){
        $currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY);
        if($currentSide == &#39;A&#39;){
            $this->currentSide = &#39;A&#39;;
            $this->lastSide = &#39;B&#39;;  

            $this->currentHead  = $this->sideAHead;
            $this->currentTail  = $this->sideATail;
            $this->lastHead     = $this->sideBHead;
            $this->lastTail     = $this->sideBTail;         
        }else{
            $this->currentSide = &#39;B&#39;;
            $this->lastSide = &#39;A&#39;;

            $this->currentHead  = $this->sideBHead;
            $this->currentTail  = $this->sideBTail;
            $this->lastHead     = $this->sideAHead;
            $this->lastTail     = $this->sideATail;                     
        }

        return $this->currentSide;
    }

    /*
     * 队列加锁
     * @return boolean
     */
    private function getLock(){
        if($this->access === false){
            while(!memcache_add(self::$client, $this->queueName .self::LOCK_KEY, 1, false, $this->expire) ){
                usleep($this->sleepTime);
                @$i++;
                if($i > $this->retryNum){//尝试等待N次
                    return false;
                    break;
                }
            }
            return $this->access = true;
        }
        return false;
    }

    /*
     * 队列解锁
     * @return NULL
     */
    private function unLock(){
        memcache_delete(self::$client, $this->queueName .self::LOCK_KEY);
        $this->access = false;
    }

    /*
     * 添加数据
     * @param   [data]  要存储的值
     * @return  boolean
     */
    public function add($data){
        $result = false;
        if(!$this->getLock()){
            return $result;
        } 
        $this->getHeadNTail($this->queueName);
        $this->getCurrentSide();

        if($this->isFull()){
            $this->unLock();
            return false;
        }

        if($this->currentTail < self::MAXNUM){
            $value_key = $this->queueName .$this->currentSide . self::VALU_KEY . $this->currentTail;
            if(memcache_add(self::$client, $value_key, $data, false, $this->expire)){
                $this->changeTail();
                $result = true;
            }
        }else{//当前队列已满,更换轮值面
            $this->unLock();
            $this->changeCurrentSide();
            return $this->add($data);
        }

        $this->unLock();
        return $result;
    }

    /*
     * 取出数据
     * @param   [length]    int 数据的长度
     * @return  array
     */
    public function get($length=0){
        if(!is_numeric($length)) return false;
        if(empty($length)) $length = self::MAXNUM * 2;//默认读取所有
        if(!$this->getLock()) return false;

        if($this->isEmpty()){
            $this->unLock();
            return false;
        }

        $keyArray   = $this->getKeyArray($length);
        $lastKey    = $keyArray[&#39;lastKey&#39;];
        $currentKey = $keyArray[&#39;currentKey&#39;];
        $keys       = $keyArray[&#39;keys&#39;];
        $this->changeHead($this->lastSide,$lastKey);
        $this->changeHead($this->currentSide,$currentKey);

        $data   = @memcache_get(self::$client, $keys);
        foreach($keys as $v){//取出之后删除
            @memcache_delete(self::$client, $v, 0);
        }
        $this->unLock();

        return $data;
    }

    /*
     * 读取数据
     * @param   [length]    int 数据的长度
     * @return  array
     */
    public function read($length=0){
        if(!is_numeric($length)) return false;
        if(empty($length)) $length = self::MAXNUM * 2;//默认读取所有
        $keyArray   = $this->getKeyArray($length);
        $data   = @memcache_get(self::$client, $keyArray[&#39;keys&#39;]);
        return $data;
    }

    /*
     * 获取队列某段长度的key数组
     * @param   [length]    int 队列长度
     * @return  array
     */
    private function getKeyArray($length){
        $result = array(&#39;keys&#39;=>array(),&#39;lastKey&#39;=>array(),&#39;currentKey&#39;=>array());
        $this->getHeadNTail($this->queueName);
        $this->getCurrentSide();
        if(empty($length)) return $result;

        //先取上一面的key
        $i = $result[&#39;lastKey&#39;] = 0;
        for($i=0;$i<$length;$i++){
            $result[&#39;lastKey&#39;] = $this->lastHead + $i;
            if($result[&#39;lastKey&#39;] >= $this->lastTail) break;
            $result[&#39;keys&#39;][] = $this->queueName .$this->lastSide . self::VALU_KEY . $result[&#39;lastKey&#39;];
        }

        //再取当前面的key
        $j = $length - $i;
        $k = $result[&#39;currentKey&#39;] = 0;
        for($k=0;$k<$j;$k++){
            $result[&#39;currentKey&#39;] = $this->currentHead + $k;
            if($result[&#39;currentKey&#39;] >= $this->currentTail) break;
            $result[&#39;keys&#39;][] = $this->queueName .$this->currentSide . self::VALU_KEY . $result[&#39;currentKey&#39;];
        }

        return $result;
    }

    /*
     * 更新当前轮值面队列尾的值
     * @return  NULL
     */
    private function changeTail(){
        $tail_key = $this->queueName .$this->currentSide . self::TAIL_KEY;
        memcache_add(self::$client, $tail_key, 0,false, $this->expire);//如果没有,则插入;有则false;
        //memcache_increment(self::$client, $tail_key, 1);//队列尾+1
        $v = memcache_get(self::$client, $tail_key) +1;
        memcache_set(self::$client, $tail_key,$v,false,$this->expire);
    }

    /*
     * 更新队列首的值
     * @param   [side]      string  要更新的面
     * @param   [headValue] int     队列首的值
     * @return  NULL
     */
    private function changeHead($side,$headValue){
        if($headValue < 1) return false;
        $head_key = $this->queueName .$side . self::HEAD_KEY;
        $tail_key = $this->queueName .$side . self::TAIL_KEY;
        $sideTail = memcache_get(self::$client, $tail_key);
        if($headValue < $sideTail){
            memcache_set(self::$client, $head_key,$headValue+1,false,$this->expire);
        }elseif($headValue >= $sideTail){
            $this->resetSide($side);
        }
    }

    /*
     * 重置队列面,即将该队列面的队首、队尾值置为0
     * @param   [side]  string  要重置的面
     * @return  NULL
     */
    private function resetSide($side){
        $head_key = $this->queueName .$side . self::HEAD_KEY;
        $tail_key = $this->queueName .$side . self::TAIL_KEY;
        memcache_set(self::$client, $head_key,0,false,$this->expire);
        memcache_set(self::$client, $tail_key,0,false,$this->expire);
    }

    /*
     * 改变当前轮值队列面
     * @return  string
     */
    private function changeCurrentSide(){
        $currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY);
        if($currentSide == &#39;A&#39;){
            memcache_set(self::$client, $this->queueName . self::SIDE_KEY,&#39;B&#39;,false,$this->expire);
            $this->currentSide = &#39;B&#39;;
        }else{
            memcache_set(self::$client, $this->queueName . self::SIDE_KEY,&#39;A&#39;,false,$this->expire);
            $this->currentSide = &#39;A&#39;;
        }
        return $this->currentSide;
    }

    /*
     * 检查当前队列是否已满
     * @return  boolean
     */
    public function isFull(){
        $result = false;
        if($this->sideATail == self::MAXNUM && $this->sideBTail == self::MAXNUM){
            $result = true;
        }
        return $result;
    }

    /*
     * 检查当前队列是否为空
     * @return  boolean
     */
    public function isEmpty(){
        $result = true;
        if($this->sideATail > 0 || $this->sideBTail > 0){
            $result = false;
        }
        return $result;
    }

    /*
     * 获取当前队列的长度
     * 该长度为理论长度,某些元素由于过期失效而丢失,真实长度小于或等于该长度
     * @return  int
     */
    public function getQueueLength(){
        $this->getHeadNTail($this->queueName);
        $this->getCurrentSide();

        $sideALength = $this->sideATail - $this->sideAHead;
        $sideBLength = $this->sideBTail - $this->sideBHead;
        $result = $sideALength + $sideBLength;

        return $result;
    }

    /*
     * 清空当前队列数据,仅保留HEAD_KEY、TAIL_KEY、SIDE_KEY三个key
     * @return  boolean
     */
    public function clear(){
        if(!$this->getLock()) return false;
        for($i=0;$i<self::maxnum;$i++){ this-="">queueName.&#39;A&#39;. self::VALU_KEY .$i, 0);
            @memcache_delete(self::$client, $this->queueName.&#39;B&#39;. self::VALU_KEY .$i, 0);
        }
        $this->unLock();
        $this->resetSide(&#39;A&#39;);
        $this->resetSide(&#39;B&#39;);
        return true;
    }

    /*
     * 清除所有memcache缓存数据
     * @return  NULL
     */
    public function memFlush(){
        memcache_flush(self::$client);
    }

}
</self::maxnum;$i++){>
登录后复制


 

www.bkjia.comtruehttp://www.bkjia.com/PHPjc/994954.htmlTechArticlephp自己实现memcached的队列类 add(1asdf); * $obj-getQueueLength(); * $obj-read(11); * $obj-get(8); */class memcacheQueue{ public static $client; //memcache客户端连接 pu...
本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

AI Hentai Generator

AI Hentai Generator

免费生成ai无尽的。

热门文章

R.E.P.O.能量晶体解释及其做什么(黄色晶体)
3 周前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.最佳图形设置
3 周前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.如果您听不到任何人,如何修复音频
3 周前 By 尊渡假赌尊渡假赌尊渡假赌
WWE 2K25:如何解锁Myrise中的所有内容
4 周前 By 尊渡假赌尊渡假赌尊渡假赌

热工具

记事本++7.3.1

记事本++7.3.1

好用且免费的代码编辑器

SublimeText3汉化版

SublimeText3汉化版

中文版,非常好用

禅工作室 13.0.1

禅工作室 13.0.1

功能强大的PHP集成开发环境

Dreamweaver CS6

Dreamweaver CS6

视觉化网页开发工具

SublimeText3 Mac版

SublimeText3 Mac版

神级代码编辑软件(SublimeText3)

Python中的Deque: 实现高效的队列和堆栈 Python中的Deque: 实现高效的队列和堆栈 Apr 12, 2023 pm 09:46 PM

Python 中的 deque 是一个低级别的、高度优化的双端队列,对于实现优雅、高效的Pythonic 队列和堆栈很有用,它们是计算中最常见的列表式数据类型。本文中,云朵君将和大家一起学习如下:开始使用deque有效地弹出和追加元素访问deque中的任意元素用deque构建高效队列开始使用Deque向 Python 列表的右端追加元素和弹出元素的操作,一般非常高效。如果用大 O 表示时间复杂性,那么可以说它们是 O(1)。而当 Python 需要重新分配内存来增加底层列表以接受新的元素时,这些

怎样使用Supervisor管理ThinkPHP6队列? 怎样使用Supervisor管理ThinkPHP6队列? Jun 12, 2023 am 08:51 AM

随着Web应用的不断发展,我们需要处理大量的任务来保持应用的稳定性和可用性。使用队列系统就是一种解决方案。ThinkPHP6提供了内置的队列系统来管理任务。然而,处理大量的任务需要更好的队列管理,这时候可以使用Supervisor来实现。本文将介绍如何使用Supervisor管理ThinkPHP6队列。在此之前,我们需要了解一些基础的概念:队列系统队列系统是

队列技术在PHP与MySQL中的消息延迟和消息重试的应用 队列技术在PHP与MySQL中的消息延迟和消息重试的应用 Oct 15, 2023 pm 02:26 PM

队列技术在PHP与MySQL中的消息延迟和消息重试的应用摘要:随着Web应用程序的不断发展,对于高并发处理和系统可靠性方面的需求越来越高。队列技术作为一种解决方案,被广泛应用于PHP与MySQL中,以实现消息延迟和消息重试的功能。本文将介绍队列技术在PHP与MySQL中的应用,包括队列的基本原理、使用队列实现消息延迟的方法和使用队列实现消息重试的方法,并给出

对Java Queue队列性能的分析和优化策略 对Java Queue队列性能的分析和优化策略 Jan 09, 2024 pm 05:02 PM

JavaQueue队列的性能分析与优化策略摘要:队列(Queue)是在Java中常用的数据结构之一,广泛应用于各种场景中。本文将从性能分析和优化策略两个方面来探讨JavaQueue队列的性能问题,并给出具体的代码示例。引言队列是一种先进先出(FIFO)的数据结构,可用于实现生产者-消费者模式、线程池任务队列等场景。Java提供了多种队列的实现,例如Arr

在Java中,add()方法和offer()方法在队列中有什么区别? 在Java中,add()方法和offer()方法在队列中有什么区别? Aug 27, 2023 pm 02:25 PM

Java中的队列是一种线性数据结构,具有多种功能。队列有两个端点,它遵循先进先出(FIFO)原则插入和删除其元素。在本教程中,我们将了解Java中队列的两个重要函数,它们是add()和Offer()。什么是队列?java中的队列是一个扩展了util和collection包的接口。元素在后端插入并从前端移除。java中的队列可以使用链表、DeQueue、优先级队列等类来实现。优先级队列是普通队列的扩展形式,每个元素都有一个优先级。队列的add()方法该方法用于向队列中插入元素。它将定义的元素(作为

队列在PHP与MySQL中的任务监控和任务调度的实现方案 队列在PHP与MySQL中的任务监控和任务调度的实现方案 Oct 15, 2023 am 09:15 AM

队列在PHP与MySQL中的任务监控和任务调度的实现方案引言在现代的Web应用程序开发中,任务队列是非常重要的一项技术。通过队列,我们可以将一些需要在后台执行的任务排队,并通过任务调度来控制任务的执行时间和顺序。本文将介绍如何在PHP与MySQL中实现任务的监控和调度,并提供具体的代码示例。一、队列的工作原理队列是一种先进先出(FIFO)的数据结构,可以用来

PHP邮件队列系统的原理和实现方式是什么? PHP邮件队列系统的原理和实现方式是什么? Sep 13, 2023 am 11:39 AM

PHP邮件队列系统的原理和实现方式是什么?随着互联网的发展,电子邮件已经成为人们日常生活和工作中必不可少的通信方式之一。然而,随着业务的增长和用户数量的增加,直接发送电子邮件可能会导致服务器性能下降、邮件发送失败等问题。为了解决这个问题,可以使用邮件队列系统来通过串行队列的方式发送和管理电子邮件。邮件队列系统的实现原理如下:邮件入队列当需要发送邮件时,不再直

Yii框架中的队列:高效地处理异步操作 Yii框架中的队列:高效地处理异步操作 Jun 21, 2023 am 10:13 AM

随着互联网的快速发展,应用程序对于处理大量并发请求和任务变得越来越重要。在这样的情况下,处理异步任务是必不可少的,因为这可以使应用程序更加高效,并更好地响应用户请求。Yii框架提供了一个方便的队列组件,使得处理异步操作更加容易和高效。在本篇文章中,我们将探讨Yii框架中队列的使用和优势。什么是队列队列是一种数据结构,用于处理数据的先进先出(FIFO)顺序。队

See all articles