目錄
php自己实现memcached的队列类
首頁 後端開發 php教程 php自己实现memcached的队列类_PHP教程

php自己实现memcached的队列类_PHP教程

Jul 13, 2016 am 09:54 AM
佇列

php自己实现memcached的队列类

 

 

<!--?php
/*
 * memcache队列类
 * 支持多进程并发写入、读取
 * 边写边读,AB面轮值替换
 * @author guoyu
 * @create on 9:25 2014-9-28
 * @qq技术行业交流群:136112330
 *
 * @example:
 *      $obj = new memcacheQueue(&#39;duilie&#39;);
 *      $obj--->add(&#39;1asdf&#39;);
 *      $obj->getQueueLength();
 *      $obj->read(11);
 *      $obj->get(8);
 */

class memcacheQueue{
    public static   $client;            //memcache客户端连接
    public          $access;            //队列是否可更新   
    private         $currentSide;       //当前轮值的队列面:A/B
    private         $lastSide;          //上一轮值的队列面:A/B
    private         $sideAHead;         //A面队首值
    private         $sideATail;         //A面队尾值
    private         $sideBHead;         //B面队首值
    private         $sideBTail;         //B面队尾值
    private         $currentHead;       //当前队首值
    private         $currentTail;       //当前队尾值
    private         $lastHead;          //上轮队首值
    private         $lastTail;          //上轮队尾值 
    private         $expire;            //过期时间,秒,1~2592000,即30天内;0为永不过期
    private         $sleepTime;         //等待解锁时间,微秒
    private         $queueName;         //队列名称,唯一值
    private         $retryNum;          //重试次数,= 10 * 理论并发数

    const   MAXNUM      = 2000;                 //(单面)最大队列数,建议上限10K
    const   HEAD_KEY    = &#39;_lkkQueueHead_&#39;;     //队列首kye
    const   TAIL_KEY    = &#39;_lkkQueueTail_&#39;;     //队列尾key
    const   VALU_KEY    = &#39;_lkkQueueValu_&#39;;     //队列值key
    const   LOCK_KEY    = &#39;_lkkQueueLock_&#39;;     //队列锁key
    const   SIDE_KEY    = &#39;_lkkQueueSide_&#39;;     //轮值面key

    /*
     * 构造函数
     * @param   [config]    array   memcache服务器参数
     * @param   [queueName] string  队列名称
     * @param   [expire]    string  过期时间
     * @return  NULL
     */
    public function __construct($queueName =&#39;&#39;,$expire=&#39;&#39;,$config =&#39;&#39;){
        if(empty($config)){
            self::$client = memcache_pconnect(&#39;localhost&#39;,11211);
        }elseif(is_array($config)){//array(&#39;host&#39;=>&#39;127.0.0.1&#39;,&#39;port&#39;=>&#39;11211&#39;)
            self::$client = memcache_pconnect($config[&#39;host&#39;],$config[&#39;port&#39;]);
        }elseif(is_string($config)){//"127.0.0.1:11211"
            $tmp = explode(&#39;:&#39;,$config);
            $conf[&#39;host&#39;] = isset($tmp[0]) ? $tmp[0] : &#39;127.0.0.1&#39;;
            $conf[&#39;port&#39;] = isset($tmp[1]) ? $tmp[1] : &#39;11211&#39;;
            self::$client = memcache_pconnect($conf[&#39;host&#39;],$conf[&#39;port&#39;]);     
        }
        if(!self::$client) return false;

        ignore_user_abort(TRUE);//当客户断开连接,允许继续执行
        set_time_limit(0);//取消脚本执行延时上限

        $this->access = false;
        $this->sleepTime = 1000;
        $expire = (empty($expire) && $expire!=0) ? 3600 : (int)$expire;
        $this->expire = $expire;
        $this->queueName = $queueName;
        $this->retryNum = 10000;

        $side = memcache_add(self::$client, $queueName . self::SIDE_KEY, &#39;A&#39;,false, $expire);
        $this->getHeadNTail($queueName);
        if(!isset($this->sideAHead) || empty($this->sideAHead)) $this->sideAHead = 0;
        if(!isset($this->sideATail) || empty($this->sideATail)) $this->sideATail = 0;
        if(!isset($this->sideBHead) || empty($this->sideBHead)) $this->sideBHead = 0;
        if(!isset($this->sideBHead) || empty($this->sideBHead)) $this->sideBHead = 0;
    }

    /*
     * 获取队列首尾值
     * @param   [queueName] string  队列名称
     * @return  NULL
     */
    private function getHeadNTail($queueName){
        $this->sideAHead = (int)memcache_get(self::$client, $queueName.&#39;A&#39;. self::HEAD_KEY);
        $this->sideATail = (int)memcache_get(self::$client, $queueName.&#39;A&#39;. self::TAIL_KEY);
        $this->sideBHead = (int)memcache_get(self::$client, $queueName.&#39;B&#39;. self::HEAD_KEY);
        $this->sideBTail = (int)memcache_get(self::$client, $queueName.&#39;B&#39;. self::TAIL_KEY);
    }

    /*
     * 获取当前轮值的队列面
     * @return  string  队列面名称
     */
    public function getCurrentSide(){
        $currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY);
        if($currentSide == &#39;A&#39;){
            $this->currentSide = &#39;A&#39;;
            $this->lastSide = &#39;B&#39;;  

            $this->currentHead  = $this->sideAHead;
            $this->currentTail  = $this->sideATail;
            $this->lastHead     = $this->sideBHead;
            $this->lastTail     = $this->sideBTail;         
        }else{
            $this->currentSide = &#39;B&#39;;
            $this->lastSide = &#39;A&#39;;

            $this->currentHead  = $this->sideBHead;
            $this->currentTail  = $this->sideBTail;
            $this->lastHead     = $this->sideAHead;
            $this->lastTail     = $this->sideATail;                     
        }

        return $this->currentSide;
    }

    /*
     * 队列加锁
     * @return boolean
     */
    private function getLock(){
        if($this->access === false){
            while(!memcache_add(self::$client, $this->queueName .self::LOCK_KEY, 1, false, $this->expire) ){
                usleep($this->sleepTime);
                @$i++;
                if($i > $this->retryNum){//尝试等待N次
                    return false;
                    break;
                }
            }
            return $this->access = true;
        }
        return false;
    }

    /*
     * 队列解锁
     * @return NULL
     */
    private function unLock(){
        memcache_delete(self::$client, $this->queueName .self::LOCK_KEY);
        $this->access = false;
    }

    /*
     * 添加数据
     * @param   [data]  要存储的值
     * @return  boolean
     */
    public function add($data){
        $result = false;
        if(!$this->getLock()){
            return $result;
        } 
        $this->getHeadNTail($this->queueName);
        $this->getCurrentSide();

        if($this->isFull()){
            $this->unLock();
            return false;
        }

        if($this->currentTail < self::MAXNUM){
            $value_key = $this->queueName .$this->currentSide . self::VALU_KEY . $this->currentTail;
            if(memcache_add(self::$client, $value_key, $data, false, $this->expire)){
                $this->changeTail();
                $result = true;
            }
        }else{//当前队列已满,更换轮值面
            $this->unLock();
            $this->changeCurrentSide();
            return $this->add($data);
        }

        $this->unLock();
        return $result;
    }

    /*
     * 取出数据
     * @param   [length]    int 数据的长度
     * @return  array
     */
    public function get($length=0){
        if(!is_numeric($length)) return false;
        if(empty($length)) $length = self::MAXNUM * 2;//默认读取所有
        if(!$this->getLock()) return false;

        if($this->isEmpty()){
            $this->unLock();
            return false;
        }

        $keyArray   = $this->getKeyArray($length);
        $lastKey    = $keyArray[&#39;lastKey&#39;];
        $currentKey = $keyArray[&#39;currentKey&#39;];
        $keys       = $keyArray[&#39;keys&#39;];
        $this->changeHead($this->lastSide,$lastKey);
        $this->changeHead($this->currentSide,$currentKey);

        $data   = @memcache_get(self::$client, $keys);
        foreach($keys as $v){//取出之后删除
            @memcache_delete(self::$client, $v, 0);
        }
        $this->unLock();

        return $data;
    }

    /*
     * 读取数据
     * @param   [length]    int 数据的长度
     * @return  array
     */
    public function read($length=0){
        if(!is_numeric($length)) return false;
        if(empty($length)) $length = self::MAXNUM * 2;//默认读取所有
        $keyArray   = $this->getKeyArray($length);
        $data   = @memcache_get(self::$client, $keyArray[&#39;keys&#39;]);
        return $data;
    }

    /*
     * 获取队列某段长度的key数组
     * @param   [length]    int 队列长度
     * @return  array
     */
    private function getKeyArray($length){
        $result = array(&#39;keys&#39;=>array(),&#39;lastKey&#39;=>array(),&#39;currentKey&#39;=>array());
        $this->getHeadNTail($this->queueName);
        $this->getCurrentSide();
        if(empty($length)) return $result;

        //先取上一面的key
        $i = $result[&#39;lastKey&#39;] = 0;
        for($i=0;$i<$length;$i++){
            $result[&#39;lastKey&#39;] = $this->lastHead + $i;
            if($result[&#39;lastKey&#39;] >= $this->lastTail) break;
            $result[&#39;keys&#39;][] = $this->queueName .$this->lastSide . self::VALU_KEY . $result[&#39;lastKey&#39;];
        }

        //再取当前面的key
        $j = $length - $i;
        $k = $result[&#39;currentKey&#39;] = 0;
        for($k=0;$k<$j;$k++){
            $result[&#39;currentKey&#39;] = $this->currentHead + $k;
            if($result[&#39;currentKey&#39;] >= $this->currentTail) break;
            $result[&#39;keys&#39;][] = $this->queueName .$this->currentSide . self::VALU_KEY . $result[&#39;currentKey&#39;];
        }

        return $result;
    }

    /*
     * 更新当前轮值面队列尾的值
     * @return  NULL
     */
    private function changeTail(){
        $tail_key = $this->queueName .$this->currentSide . self::TAIL_KEY;
        memcache_add(self::$client, $tail_key, 0,false, $this->expire);//如果没有,则插入;有则false;
        //memcache_increment(self::$client, $tail_key, 1);//队列尾+1
        $v = memcache_get(self::$client, $tail_key) +1;
        memcache_set(self::$client, $tail_key,$v,false,$this->expire);
    }

    /*
     * 更新队列首的值
     * @param   [side]      string  要更新的面
     * @param   [headValue] int     队列首的值
     * @return  NULL
     */
    private function changeHead($side,$headValue){
        if($headValue < 1) return false;
        $head_key = $this->queueName .$side . self::HEAD_KEY;
        $tail_key = $this->queueName .$side . self::TAIL_KEY;
        $sideTail = memcache_get(self::$client, $tail_key);
        if($headValue < $sideTail){
            memcache_set(self::$client, $head_key,$headValue+1,false,$this->expire);
        }elseif($headValue >= $sideTail){
            $this->resetSide($side);
        }
    }

    /*
     * 重置队列面,即将该队列面的队首、队尾值置为0
     * @param   [side]  string  要重置的面
     * @return  NULL
     */
    private function resetSide($side){
        $head_key = $this->queueName .$side . self::HEAD_KEY;
        $tail_key = $this->queueName .$side . self::TAIL_KEY;
        memcache_set(self::$client, $head_key,0,false,$this->expire);
        memcache_set(self::$client, $tail_key,0,false,$this->expire);
    }

    /*
     * 改变当前轮值队列面
     * @return  string
     */
    private function changeCurrentSide(){
        $currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY);
        if($currentSide == &#39;A&#39;){
            memcache_set(self::$client, $this->queueName . self::SIDE_KEY,&#39;B&#39;,false,$this->expire);
            $this->currentSide = &#39;B&#39;;
        }else{
            memcache_set(self::$client, $this->queueName . self::SIDE_KEY,&#39;A&#39;,false,$this->expire);
            $this->currentSide = &#39;A&#39;;
        }
        return $this->currentSide;
    }

    /*
     * 检查当前队列是否已满
     * @return  boolean
     */
    public function isFull(){
        $result = false;
        if($this->sideATail == self::MAXNUM && $this->sideBTail == self::MAXNUM){
            $result = true;
        }
        return $result;
    }

    /*
     * 检查当前队列是否为空
     * @return  boolean
     */
    public function isEmpty(){
        $result = true;
        if($this->sideATail > 0 || $this->sideBTail > 0){
            $result = false;
        }
        return $result;
    }

    /*
     * 获取当前队列的长度
     * 该长度为理论长度,某些元素由于过期失效而丢失,真实长度小于或等于该长度
     * @return  int
     */
    public function getQueueLength(){
        $this->getHeadNTail($this->queueName);
        $this->getCurrentSide();

        $sideALength = $this->sideATail - $this->sideAHead;
        $sideBLength = $this->sideBTail - $this->sideBHead;
        $result = $sideALength + $sideBLength;

        return $result;
    }

    /*
     * 清空当前队列数据,仅保留HEAD_KEY、TAIL_KEY、SIDE_KEY三个key
     * @return  boolean
     */
    public function clear(){
        if(!$this->getLock()) return false;
        for($i=0;$i<self::maxnum;$i++){ this-="">queueName.&#39;A&#39;. self::VALU_KEY .$i, 0);
            @memcache_delete(self::$client, $this->queueName.&#39;B&#39;. self::VALU_KEY .$i, 0);
        }
        $this->unLock();
        $this->resetSide(&#39;A&#39;);
        $this->resetSide(&#39;B&#39;);
        return true;
    }

    /*
     * 清除所有memcache缓存数据
     * @return  NULL
     */
    public function memFlush(){
        memcache_flush(self::$client);
    }

}
</self::maxnum;$i++){>
登入後複製


 

www.bkjia.comtruehttp://www.bkjia.com/PHPjc/994954.htmlTechArticlephp自己实现memcached的队列类 add(1asdf); * $obj-getQueueLength(); * $obj-read(11); * $obj-get(8); */class memcacheQueue{ public static $client; //memcache客户端连接 pu...
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

AI Hentai Generator

AI Hentai Generator

免費產生 AI 無盡。

熱門文章

R.E.P.O.能量晶體解釋及其做什麼(黃色晶體)
1 個月前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.最佳圖形設置
1 個月前 By 尊渡假赌尊渡假赌尊渡假赌
威爾R.E.P.O.有交叉遊戲嗎?
1 個月前 By 尊渡假赌尊渡假赌尊渡假赌

熱工具

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

強大的PHP整合開發環境

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

Python中的Deque: 實作高效的佇列與堆疊 Python中的Deque: 實作高效的佇列與堆疊 Apr 12, 2023 pm 09:46 PM

Python 中的 deque 是一個低階的、高度最佳化的雙端佇列,對於實現優雅、高效的Pythonic 佇列和堆疊很有用,它們是計算中最常見的列表式資料類型。本文中,雲朵君將和大家一起學習如下:開始使用deque有效地彈出和追加元素訪問deque中的任意元素用deque構建高效隊列開始使用Deque向Python 列表的右端追加元素和彈出元素的操作,一般非常高效。如果用大 O 表示時間複雜性,那麼可以說它們是 O(1)。而當 Python 需要重新分配記憶體來增加底層列表以接受新的元素時,這些

怎樣使用Supervisor管理ThinkPHP6隊列? 怎樣使用Supervisor管理ThinkPHP6隊列? Jun 12, 2023 am 08:51 AM

隨著Web應用的不斷發展,我們需要處理大量的任務來維持應用程式的穩定性和可用性。使用隊列系統就是一種解決方案。 ThinkPHP6提供了內建的佇列系統來管理任務。然而,處理大量的任務需要更好的隊列管理,這時候可以使用Supervisor來實現。本文將介紹如何使用Supervisor管理ThinkPHP6隊列。在此之前,我們需要了解一些基礎的概念:隊列系統隊列系統是

隊列技術在PHP與MySQL中的消息延遲和訊息重試的應用 隊列技術在PHP與MySQL中的消息延遲和訊息重試的應用 Oct 15, 2023 pm 02:26 PM

佇列技術在PHP與MySQL中的消息延遲和訊息重試的應用摘要:隨著Web應用程式的不斷發展,對於高並發處理和系統可靠性方面的需求越來越高。佇列技術作為一種解決方案,被廣泛應用於PHP與MySQL中,以實現訊息延遲和訊息重試的功能。本文將介紹隊列技術在PHP與MySQL中的應用,包括佇列的基本原理、使用佇列實現訊息延遲的方法和使用佇列實作訊息重試的方法,並給出

對Java Queue佇列效能的分析與最佳化策略 對Java Queue佇列效能的分析與最佳化策略 Jan 09, 2024 pm 05:02 PM

JavaQueue佇列的效能分析與最佳化策略摘要:佇列(Queue)是Java中常用的資料結構之一,廣泛應用於各種場景。本文將從效能分析和最佳化策略兩個面向來探討JavaQueue佇列的效能問題,並給出具體的程式碼範例。引言佇列是一種先進先出(FIFO)的資料結構,可用來實作生產者-消費者模式、執行緒池任務佇列等場景。 Java提供了多種佇列的實現,例如Arr

佇列在PHP與MySQL中的任務監控與任務排程的實作方案 佇列在PHP與MySQL中的任務監控與任務排程的實作方案 Oct 15, 2023 am 09:15 AM

隊列在PHP與MySQL中的任務監控和任務調度的實現方案引言在現代的Web應用程式開發中,任務隊列是非常重要的一項技術。透過佇列,我們可以將一些需要在背景執行的任務排隊,並透過任務排程來控制任務的執行時間和順序。本文將介紹如何在PHP與MySQL中實現任務的監控與調度,並提供具體的程式碼範例。一、佇列的工作原理佇列是一種先進先出(FIFO)的資料結構,可以用來

在Java中,add()方法和offer()方法在佇列中有什麼區別? 在Java中,add()方法和offer()方法在佇列中有什麼區別? Aug 27, 2023 pm 02:25 PM

Java中的佇列是一種線性資料結構,具有多種功能。佇列有兩個端點,它遵循先進先出(FIFO)原則插入和刪除其元素。在本教程中,我們將了解Java中佇列的兩個重要函數,它們是add()和Offer()。什麼是隊列? java中的佇列是一個擴充了util和collection包的介面。元素在後端插入並從前端移除。 java中的佇列可以使用鍊錶、DeQueue、優先權佇列等類別來實作。優先權佇列是普通佇列的擴充形式,每個元素都有一個優先權。佇列的add()方法此方法用於向佇列中插入元素。它將定義的元素(作為

PHP郵件佇列系統的原理和實作方式是什麼? PHP郵件佇列系統的原理和實作方式是什麼? Sep 13, 2023 am 11:39 AM

PHP郵件佇列系統的原理和實作方式是什麼?隨著網路的發展,電子郵件已經成為人們日常生活和工作中必不可少的溝通方式之一。然而,隨著業務的成長和用戶數量的增加,直接發送電子郵件可能會導致伺服器效能下降、郵件發送失敗等問題。為了解決這個問題,可以使用郵件佇列系統來透過串列佇列的方式傳送和管理電子郵件。郵件佇列系統的實作原理如下:郵件入佇列當需要傳送郵件時,不再直

Yii框架中的佇列:有效率地處理非同步操作 Yii框架中的佇列:有效率地處理非同步操作 Jun 21, 2023 am 10:13 AM

隨著互聯網的快速發展,應用程式對於處理大量並發請求和任務變得越來越重要。在這樣的情況下,處理非同步任務是必不可​​少的,因為這可以使應用程式更加高效,並更好地回應使用者請求。 Yii框架提供了一個方便的佇列元件,使得處理非同步操作更加容易和有效率。在本篇文章中,我們將探討Yii框架中隊列的使用與優勢。什麼是佇列佇列是一種資料結構,用於處理資料的先進先出(FIFO)順序。隊

See all articles