<?php
class
MQueue
{
public
static
$client
;
private
$expire
;
private
$sleepTime
;
private
$queueName
;
private
$retryNum
;
private
$MAXNUM
;
private
$canRewrite
;
private
$HEAD
;
private
$TAIL
;
private
$LEN
;
const
LOCK_KEY =
'_Fox_MQ_LOCK_'
;
const
LENGTH_KEY =
'_Fox_MQ_LENGTH_'
;
const
VALU_KEY =
'_Fox_MQ_VAL_'
;
const
HEAD_KEY =
'_Fox_MQ_HEAD_'
;
const
TAIL_KEY =
'_Fox_MQ_TAIL_'
;
public
function
__construct(
$queueName
=
''
,
$maxqueue
= 1,
$canRewrite
= false,
$expire
= 0,
$config
=
''
)
{
if
(
empty
(
$config
)) {
self::
$client
= memcache_pconnect(
'127.0.0.1'
, 11211);
}
elseif
(
is_array
(
$config
)) {
self::
$client
= memcache_pconnect(
$config
[
'host'
],
$config
[
'port'
]);
}
elseif
(
is_string
(
$config
)) {
$tmp
=
explode
(
':'
,
$config
);
$conf
[
'host'
] = isset(
$tmp
[0]) ?
$tmp
[0] :
'127.0.0.1'
;
$conf
[
'port'
] = isset(
$tmp
[1]) ?
$tmp
[1] :
'11211'
;
self::
$client
= memcache_pconnect(
$conf
[
'host'
],
$conf
[
'port'
]);
}
if
(!self::
$client
)
return
false;
ignore_user_abort(true);
set_time_limit(0);
$this
->access = false;
$this
->sleepTime = 1000;
$expire
= (
empty
(
$expire
)) ? 0 : (int)
$expire
+ 1;
$this
->expire =
$expire
;
$this
->queueName =
$queueName
;
$this
->retryNum = 20000;
$this
->MAXNUM =
$maxqueue
!= null ?
$maxqueue
: 1;
$this
->canRewrite =
$canRewrite
;
$this
->getHeadAndTail();
if
(!isset(
$this
->HEAD) ||
empty
(
$this
->HEAD))
$this
->HEAD = 0;
if
(!isset(
$this
->TAIL) ||
empty
(
$this
->TAIL))
$this
->TAIL = 0;
if
(!isset(
$this
->LEN) ||
empty
(
$this
->LEN))
$this
->LEN = 0;
}
private
function
getHeadAndTail()
{
$this
->HEAD = (int) memcache_get(self::
$client
,
$this
->queueName . self::HEAD_KEY);
$this
->TAIL = (int) memcache_get(self::
$client
,
$this
->queueName . self::TAIL_KEY);
$this
->LEN = (int) memcache_get(self::
$client
,
$this
->queueName . self::LENGTH_KEY);
}
private
function
lock()
{
if
(
$this
->access === false) {
$i
= 0;
while
(!memcache_add(self::
$client
,
$this
->queueName . self::LOCK_KEY, 1, false,
$this
->expire)) {
usleep(
$this
->sleepTime);
@
$i
++;
if
(
$i
>
$this
->retryNum) {
return
false;
break
;
}
}
return
$this
->access = true;
}
return
false;
}
private
function
incrHead()
{
$this
->HEAD++;
if
(
$this
->HEAD >=
$this
->MAXNUM) {
$this
->HEAD = 0;
}
;
$this
->LEN--;
if
(
$this
->LEN < 0) {
$this
->LEN = 0;
}
;
memcache_set(self::
$client
,
$this
->queueName . self::HEAD_KEY,
$this
->HEAD, false,
$this
->expire);
memcache_set(self::
$client
,
$this
->queueName . self::LENGTH_KEY,
$this
->LEN, false,
$this
->expire);
}
private
function
incrTail()
{
$this
->TAIL++;
if
(
$this
->TAIL >=
$this
->MAXNUM) {
$this
->TAIL = 0;
}
;
$this
->LEN++;
if
(
$this
->LEN >=
$this
->MAXNUM) {
$this
->LEN =
$this
->MAXNUM;
}
;
memcache_set(self::
$client
,
$this
->queueName . self::TAIL_KEY,
$this
->TAIL, false,
$this
->expire);
memcache_set(self::
$client
,
$this
->queueName . self::LENGTH_KEY,
$this
->LEN, false,
$this
->expire);
}
private
function
unLock()
{
memcache_delete(self::
$client
,
$this
->queueName . self::LOCK_KEY);
$this
->access = false;
}
public
function
isFull()
{
if
(
$this
->canRewrite)
return
false;
return
$this
->LEN ==
$this
->MAXNUM ? true : false;
}
public
function
isEmpty()
{
return
$this
->LEN == 0 ? true : false;
}
public
function
getLen()
{
return
$this
->LEN;
}
public
function
push(
$data
=
''
)
{
$result
= false;
if
(
empty
(
$data
))
return
$result
;
if
(!
$this
->lock()) {
return
$result
;
}
$this
->getHeadAndTail();
if
(
$this
->isFull()) {
$this
->unLock();
return
false;
}
if
(memcache_set(self::
$client
,
$this
->queueName . self::VALU_KEY .
$this
->TAIL,
$data
, MEMCACHE_COMPRESSED,
$this
->expire)) {
if
(
$this
->TAIL ==
$this
->HEAD &&
$this
->LEN >= 1) {
$this
->incrHead();
}
$this
->incrTail();
$result
= true;
}
$this
->unLock();
return
$result
;
}
public
function
pop(
$length
= 0)
{
if
(!
is_numeric
(
$length
))
return
false;
if
(!
$this
->lock())
return
false;
$this
->getHeadAndTail();
if
(
empty
(
$length
))
$length
=
$this
->LEN;
if
(
$this
->isEmpty()) {
$this
->unLock();
return
false;
}
if
(
$length
>
$this
->LEN)
$length
=
$this
->LEN;
$data
=
$this
->popKeyArray(
$length
);
$this
->unLock();
return
$data
;
}
private
function
popKeyArray(
$length
)
{
$result
=
array
();
if
(
empty
(
$length
))
return
$result
;
for
(
$k
= 0;
$k
<
$length
;
$k
++) {
$result
[] = @memcache_get(self::
$client
,
$this
->queueName . self::VALU_KEY .
$this
->HEAD);
@memcache_delete(self::
$client
,
$this
->queueName . self::VALU_KEY .
$this
->HEAD, 0);
if
(
$this
->TAIL ==
$this
->HEAD &&
$this
->LEN <= 1) {
$this
->LEN = 0;
memcache_set(self::
$client
,
$this
->queueName . self::LENGTH_KEY,
$this
->LEN, false,
$this
->expire);
break
;
}
else
{
$this
->incrHead();
}
}
return
$result
;
}
private
function
reset(
$all
= false)
{
if
(
$all
) {
memcache_delete(self::
$client
,
$this
->queueName . self::HEAD_KEY, 0);
memcache_delete(self::
$client
,
$this
->queueName . self::TAIL_KEY, 0);
memcache_delete(self::
$client
,
$this
->queueName . self::LENGTH_KEY, 0);
}
else
{
$this
->HEAD =
$this
->TAIL =
$this
->LEN = 0;
memcache_set(self::
$client
,
$this
->queueName . self::HEAD_KEY, 0, false,
$this
->expire);
memcache_set(self::
$client
,
$this
->queueName . self::TAIL_KEY, 0, false,
$this
->expire);
memcache_set(self::
$client
,
$this
->queueName . self::LENGTH_KEY, 0, false,
$this
->expire);
}
}
public
function
memFlush()
{
memcache_flush(self::
$client
);
}
public
function
clear(
$all
= false)
{
if
(!
$this
->lock())
return
false;
$this
->getHeadAndTail();
$Head
=
$this
->HEAD;
$Length
=
$this
->LEN;
$curr
= 0;
for
(
$i
= 0;
$i
<
$Length
;
$i
++) {
$curr
=
$this
->
$Head
+
$i
;
if
(
$curr
>=
$this
->MAXNUM) {
$this
->HEAD =
$curr
= 0;
}
@memcache_delete(self::
$client
,
$this
->queueName . self::VALU_KEY .
$curr
, 0);
}
$this
->unLock();
$this
->reset(
$all
);
return
true;
}
}