<!--?php
class
memcacheQueue{
public
static
$client
;
public
$access
;
private
$currentSide
;
private
$lastSide
;
private
$sideAHead
;
private
$sideATail
;
private
$sideBHead
;
private
$sideBTail
;
private
$currentHead
;
private
$currentTail
;
private
$lastHead
;
private
$lastTail
;
private
$expire
;
private
$sleepTime
;
private
$queueName
;
private
$retryNum
;
const
MAXNUM = 2000;
const
HEAD_KEY = '_lkkQueueHead_';
const
TAIL_KEY = '_lkkQueueTail_';
const
VALU_KEY = '_lkkQueueValu_';
const
LOCK_KEY = '_lkkQueueLock_';
const
SIDE_KEY = '_lkkQueueSide_';
public
function
__construct(
$queueName
='',
$expire
='',
$config
=''){
if
(
empty
(
$config
)){
self::
$client
= memcache_pconnect('localhost',11211);
}
elseif
(
is_array
(
$config
)){
self::
$client
= memcache_pconnect(
$config
['host'],
$config
['port']);
}
elseif
(
is_string
(
$config
)){
$tmp
=
explode
(':',
$config
);
$conf
['host'] = isset(
$tmp
[0]) ?
$tmp
[0] : '127.0.0.1';
$conf
['port'] = isset(
$tmp
[1]) ?
$tmp
[1] : '11211';
self::
$client
= memcache_pconnect(
$conf
['host'],
$conf
['port']);
}
if
(!self::
$client
)
return
false;
ignore_user_abort(TRUE);
set_time_limit(0);
$this
->access = false;
$this
->sleepTime = 1000;
$expire
= (
empty
(
$expire
) &&
$expire
!=0) ? 3600 : (int)
$expire
;
$this
->expire =
$expire
;
$this
->queueName =
$queueName
;
$this
->retryNum = 10000;
$side
= memcache_add(self::
$client
,
$queueName
. self::SIDE_KEY, 'A',false,
$expire
);
$this
->getHeadNTail(
$queueName
);
if
(!isset(
$this
->sideAHead) ||
empty
(
$this
->sideAHead))
$this
->sideAHead = 0;
if
(!isset(
$this
->sideATail) ||
empty
(
$this
->sideATail))
$this
->sideATail = 0;
if
(!isset(
$this
->sideBHead) ||
empty
(
$this
->sideBHead))
$this
->sideBHead = 0;
if
(!isset(
$this
->sideBHead) ||
empty
(
$this
->sideBHead))
$this
->sideBHead = 0;
}
private
function
getHeadNTail(
$queueName
){
$this
->sideAHead = (int)memcache_get(self::
$client
,
$queueName
.'A'. self::HEAD_KEY);
$this
->sideATail = (int)memcache_get(self::
$client
,
$queueName
.'A'. self::TAIL_KEY);
$this
->sideBHead = (int)memcache_get(self::
$client
,
$queueName
.'B'. self::HEAD_KEY);
$this
->sideBTail = (int)memcache_get(self::
$client
,
$queueName
.'B'. self::TAIL_KEY);
}
public
function
getCurrentSide(){
$currentSide
= memcache_get(self::
$client
,
$this
->queueName . self::SIDE_KEY);
if
(
$currentSide
== 'A'){
$this
->currentSide = 'A';
$this
->lastSide = 'B';
$this
->currentHead =
$this
->sideAHead;
$this
->currentTail =
$this
->sideATail;
$this
->lastHead =
$this
->sideBHead;
$this
->lastTail =
$this
->sideBTail;
}
else
{
$this
->currentSide = 'B';
$this
->lastSide = 'A';
$this
->currentHead =
$this
->sideBHead;
$this
->currentTail =
$this
->sideBTail;
$this
->lastHead =
$this
->sideAHead;
$this
->lastTail =
$this
->sideATail;
}
return
$this
->currentSide;
}
private
function
getLock(){
if
(
$this
->access === false){
while
(!memcache_add(self::
$client
,
$this
->queueName .self::LOCK_KEY, 1, false,
$this
->expire) ){
usleep(
$this
->sleepTime);
@
$i
++;
if
(
$i
>
$this
->retryNum){
return
false;
break
;
}
}
return
$this
->access = true;
}
return
false;
}
private
function
unLock(){
memcache_delete(self::
$client
,
$this
->queueName .self::LOCK_KEY);
$this
->access = false;
}
public
function
add(
$data
){
$result
= false;
if
(!
$this
->getLock()){
return
$result
;
}
$this
->getHeadNTail(
$this
->queueName);
$this
->getCurrentSide();
if
(
$this
->isFull()){
$this
->unLock();
return
false;
}
if
(
$this
->currentTail < self::MAXNUM){
$value_key
=
$this
->queueName .
$this
->currentSide . self::VALU_KEY .
$this
->currentTail;
if
(memcache_add(self::
$client
,
$value_key
,
$data
, false,
$this
->expire)){
$this
->changeTail();
$result
= true;
}
}
else
{
$this
->unLock();
$this
->changeCurrentSide();
return
$this
->add(
$data
);
}
$this
->unLock();
return
$result
;
}
public
function
get(
$length
=0){
if
(!
is_numeric
(
$length
))
return
false;
if
(
empty
(
$length
))
$length
= self::MAXNUM * 2;
if
(!
$this
->getLock())
return
false;
if
(
$this
->isEmpty()){
$this
->unLock();
return
false;
}
$keyArray
=
$this
->getKeyArray(
$length
);
$lastKey
=
$keyArray
['lastKey'];
$currentKey
=
$keyArray
['currentKey'];
$keys
=
$keyArray
['keys'];
$this
->changeHead(
$this
->lastSide,
$lastKey
);
$this
->changeHead(
$this
->currentSide,
$currentKey
);
$data
= @memcache_get(self::
$client
,
$keys
);
foreach
(
$keys
as
$v
){
@memcache_delete(self::
$client
,
$v
, 0);
}
$this
->unLock();
return
$data
;
}
public
function
read(
$length
=0){
if
(!
is_numeric
(
$length
))
return
false;
if
(
empty
(
$length
))
$length
= self::MAXNUM * 2;
$keyArray
=
$this
->getKeyArray(
$length
);
$data
= @memcache_get(self::
$client
,
$keyArray
['keys']);
return
$data
;
}
private
function
getKeyArray(
$length
){
$result
=
array
('keys'=>
array
(),'lastKey'=>
array
(),'currentKey'=>
array
());
$this
->getHeadNTail(
$this
->queueName);
$this
->getCurrentSide();
if
(
empty
(
$length
))
return
$result
;
$i
=
$result
['lastKey'] = 0;
for
(
$i
=0;
$i
<
$length
;
$i
++){
$result
['lastKey'] =
$this
->lastHead +
$i
;
if
(
$result
['lastKey'] >=
$this
->lastTail)
break
;
$result
['keys'][] =
$this
->queueName .
$this
->lastSide . self::VALU_KEY .
$result
['lastKey'];
}
$j
=
$length
-
$i
;
$k
=
$result
['currentKey'] = 0;
for
(
$k
=0;
$k
<
$j
;
$k
++){
$result
['currentKey'] =
$this
->currentHead +
$k
;
if
(
$result
['currentKey'] >=
$this
->currentTail)
break
;
$result
['keys'][] =
$this
->queueName .
$this
->currentSide . self::VALU_KEY .
$result
['currentKey'];
}
return
$result
;
}
private
function
changeTail(){
$tail_key
=
$this
->queueName .
$this
->currentSide . self::TAIL_KEY;
memcache_add(self::
$client
,
$tail_key
, 0,false,
$this
->expire);
$v
= memcache_get(self::
$client
,
$tail_key
) +1;
memcache_set(self::
$client
,
$tail_key
,
$v
,false,
$this
->expire);
}
private
function
changeHead(
$side
,
$headValue
){
if
(
$headValue
< 1)
return
false;
$head_key
=
$this
->queueName .
$side
. self::HEAD_KEY;
$tail_key
=
$this
->queueName .
$side
. self::TAIL_KEY;
$sideTail
= memcache_get(self::
$client
,
$tail_key
);
if
(
$headValue
<
$sideTail
){
memcache_set(self::
$client
,
$head_key
,
$headValue
+1,false,
$this
->expire);
}
elseif
(
$headValue
>=
$sideTail
){
$this
->resetSide(
$side
);
}
}
private
function
resetSide(
$side
){
$head_key
=
$this
->queueName .
$side
. self::HEAD_KEY;
$tail_key
=
$this
->queueName .
$side
. self::TAIL_KEY;
memcache_set(self::
$client
,
$head_key
,0,false,
$this
->expire);
memcache_set(self::
$client
,
$tail_key
,0,false,
$this
->expire);
}
private
function
changeCurrentSide(){
$currentSide
= memcache_get(self::
$client
,
$this
->queueName . self::SIDE_KEY);
if
(
$currentSide
== 'A'){
memcache_set(self::
$client
,
$this
->queueName . self::SIDE_KEY,'B',false,
$this
->expire);
$this
->currentSide = 'B';
}
else
{
memcache_set(self::
$client
,
$this
->queueName . self::SIDE_KEY,'A',false,
$this
->expire);
$this
->currentSide = 'A';
}
return
$this
->currentSide;
}
public
function
isFull(){
$result
= false;
if
(
$this
->sideATail == self::MAXNUM &&
$this
->sideBTail == self::MAXNUM){
$result
= true;
}
return
$result
;
}
public
function
isEmpty(){
$result
= true;
if
(
$this
->sideATail > 0 ||
$this
->sideBTail > 0){
$result
= false;
}
return
$result
;
}
public
function
getQueueLength(){
$this
->getHeadNTail(
$this
->queueName);
$this
->getCurrentSide();
$sideALength
=
$this
->sideATail -
$this
->sideAHead;
$sideBLength
=
$this
->sideBTail -
$this
->sideBHead;
$result
=
$sideALength
+
$sideBLength
;
return
$result
;
}
public
function
clear(){
if
(!
$this
->getLock())
return
false;
for
(
$i
=0;
$i
<self::maxnum;
$i
++){ this-=
""
>queueName.'A'. self::VALU_KEY .
$i
, 0);
@memcache_delete(self::
$client
,
$this
->queueName.'B'. self::VALU_KEY .
$i
, 0);
}
$this
->unLock();
$this
->resetSide('A');
$this
->resetSide('B');
return
true;
}
public
function
memFlush(){
memcache_flush(self::
$client
);
}
}
</self::maxnum;
$i
++){>