<?php
namespace
Common\Business;
use
RuntimeException;
class
BeanStalk {
const
MIN_PRIORITY = 0;
const
MAX_PRIORITY = 4294967295;
public
$connected
= false;
protected
$_config
= [];
protected
$_connection
;
public
function
__construct(
array
$config
= []) {
$defaults
= [
'persistent' => true,
'host' => '127.0.0.1',
'port' => 11300,
'timeout' => 1,
'logger' => null
];
$this
->_config =
$config
+
$defaults
;
}
public
function
__destruct() {
$this
->disconnect();
}
public
function
connect() {
if
(isset(
$this
->_connection)) {
$this
->disconnect();
}
$errNum
= '';
$errStr
= '';
$function
=
$this
->_config['persistent'] ? 'pfsockopen' : '
fsockopen
';
$params
= [
$this
->_config['host'],
$this
->_config['port'], &
$errNum
, &
$errStr
];
if
(
$this
->_config['timeout']) {
$params
[] =
$this
->_config['timeout'];
}
$this
->_connection = @call_user_func_array(
$function
,
$params
);
if
(!
empty
(
$errNum
) || !
empty
(
$errStr
)) {
$this
->_error("{
$errNum
}: {
$errStr
}");
}
$this
->connected =
is_resource
(
$this
->_connection);
if
(
$this
->connected) {
stream_set_timeout(
$this
->_connection, -1);
}
return
$this
->connected;
}
public
function
disconnect() {
if
(!
is_resource
(
$this
->_connection)) {
$this
->connected = false;
}
else
{
$this
->_write('quit');
$this
->connected = !fclose(
$this
->_connection);
if
(!
$this
->connected) {
$this
->_connection = null;
}
}
return
!
$this
->connected;
}
protected
function
_error(
$message
) {
if
(
$this
->_config['logger']) {
$this
->_config['logger']->error(
$message
);
}
}
public
function
errors()
{
return
$this
->_config['logger'];
}
protected
function
_write(
$data
) {
if
(!
$this
->connected) {
$message
= 'No connecting found
while
writing data to socket.';
throw
new
RuntimeException(
$message
);
}
$data
.= "\r\n";
return
fwrite(
$this
->_connection,
$data
,
strlen
(
$data
));
}
protected
function
_read(
$length
= null) {
if
(!
$this
->connected) {
$message
= 'No connection found
while
reading data from socket.';
throw
new
RuntimeException(
$message
);
}
if
(
$length
) {
if
(
feof
(
$this
->_connection)) {
return
false;
}
$data
= stream_get_contents(
$this
->_connection,
$length
+ 2);
$meta
= stream_get_meta_data(
$this
->_connection);
if
(
$meta
['timed_out']) {
$message
= 'Connection timed out
while
reading data from socket.';
throw
new
RuntimeException(
$message
);
}
$packet
= rtrim(
$data
, "\r\n");
}
else
{
$packet
= stream_get_line(
$this
->_connection, 16384, "\r\n");
}
return
$packet
;
}
public
function
put(
$pri
,
$delay
,
$ttr
,
$data
) {
$this
->_write(sprintf("put %d %d %d %d\r\n%s",
$pri
,
$delay
,
$ttr
,
strlen
(
$data
),
$data
));
$status
=
strtok
(
$this
->_read(), ' ');
switch
(
$status
) {
case
'INSERTED':
case
'BURIED':
return
(integer)
strtok
(' ');
case
'EXPECTED_CRLF':
case
'JOB_TOO_BIG':
default
:
$this
->_error(
$status
);
return
false;
}
}
public
function
useTube(
$tube
) {
$this
->_write(sprintf('
use
%s',
$tube
));
$status
=
strtok
(
$this
->_read(), ' ');
switch
(
$status
) {
case
'USING':
return
strtok
(' ');
default
:
$this
->_error(
$status
);
return
false;
}
}
public
function
pauseTube(
$tube
,
$delay
) {
$this
->_write(sprintf('pause-tube %s %d',
$tube
,
$delay
));
$status
=
strtok
(
$this
->_read(), ' ');
switch
(
$status
) {
case
'PAUSED':
return
true;
case
'NOT_FOUND':
default
:
$this
->_error(
$status
);
return
false;
}
}
public
function
reserve(
$timeout
= null) {
if
(isset(
$timeout
)) {
$this
->_write(sprintf('reserve-with-timeout %d',
$timeout
));
}
else
{
$this
->_write('reserve');
}
$status
=
strtok
(
$this
->_read(), ' ');
switch
(
$status
) {
case
'RESERVED':
return
[
'id' => (integer)
strtok
(' '),
'body' =>
$this
->_read((integer)
strtok
(' '))
];
case
'DEADLINE_SOON':
case
'TIMED_OUT':
default
:
$this
->_error(
$status
);
return
false;
}
}
public
function
delete
(
$id
) {
$this
->_write(sprintf('
delete
%d',
$id
));
$status
=
$this
->_read();
switch
(
$status
) {
case
'DELETED':
return
true;
case
'NOT_FOUND':
default
:
$this
->_error(
$status
);
return
false;
}
}
public
function
release(
$id
,
$pri
,
$delay
) {
$this
->_write(sprintf('release %d %d %d',
$id
,
$pri
,
$delay
));
$status
=
$this
->_read();
switch
(
$status
) {
case
'RELEASED':
case
'BURIED':
return
true;
case
'NOT_FOUND':
default
:
$this
->_error(
$status
);
return
false;
}
}
public
function
bury(
$id
,
$pri
) {
$this
->_write(sprintf('bury %d %d',
$id
,
$pri
));
$status
=
$this
->_read();
switch
(
$status
) {
case
'BURIED':
return
true;
case
'NOT_FOUND':
default
:
$this
->_error(
$status
);
return
false;
}
}
public
function
touch(
$id
) {
$this
->_write(sprintf('touch %d',
$id
));
$status
=
$this
->_read();
switch
(
$status
) {
case
'TOUCHED':
return
true;
case
'NOT_TOUCHED':
default
:
$this
->_error(
$status
);
return
false;
}
}
public
function
watch(
$tube
) {
$this
->_write(sprintf('watch %s',
$tube
));
$status
=
strtok
(
$this
->_read(), ' ');
switch
(
$status
) {
case
'WATCHING':
return
(integer)
strtok
(' ');
default
:
$this
->_error(
$status
);
return
false;
}
}
public
function
ignore(
$tube
) {
$this
->_write(sprintf('ignore %s',
$tube
));
$status
=
strtok
(
$this
->_read(), ' ');
switch
(
$status
) {
case
'WATCHING':
return
(integer)
strtok
(' ');
case
'NOT_IGNORED':
default
:
$this
->_error(
$status
);
return
false;
}
}
public
function
peek(
$id
) {
$this
->_write(sprintf('peek %d',
$id
));
return
$this
->_peekRead();
}
public
function
peekReady() {
$this
->_write('peek-ready');
return
$this
->_peekRead();
}
public
function
peekDelayed() {
$this
->_write('peek-delayed');
return
$this
->_peekRead();
}
public
function
peekBuried() {
$this
->_write('peek-buried');
return
$this
->_peekRead();
}
protected
function
_peekRead() {
$status
=
strtok
(
$this
->_read(), ' ');
switch
(
$status
) {
case
'FOUND':
return
[
'id' => (integer)
strtok
(' '),
'body' =>
$this
->_read((integer)
strtok
(' '))
];
case
'NOT_FOUND':
default
:
$this
->_error(
$status
);
return
false;
}
}
public
function
kick(
$bound
) {
$this
->_write(sprintf('kick %d',
$bound
));
$status
=
strtok
(
$this
->_read(), ' ');
switch
(
$status
) {
case
'KICKED':
return
(integer)
strtok
(' ');
default
:
$this
->_error(
$status
);
return
false;
}
}
public
function
kickJob(
$id
) {
$this
->_write(sprintf('kick-job %d',
$id
));
$status
=
strtok
(
$this
->_read(), ' ');
switch
(
$status
) {
case
'KICKED':
return
true;
case
'NOT_FOUND':
default
:
$this
->_error(
$status
);
return
false;
}
}
public
function
statsJob(
$id
) {
$this
->_write(sprintf('stats-job %d',
$id
));
return
$this
->_statsRead();
}
public
function
statsTube(
$tube
) {
$this
->_write(sprintf('stats-tube %s',
$tube
));
return
$this
->_statsRead();
}
public
function
stats() {
$this
->_write('stats');
return
$this
->_statsRead();
}
public
function
listTubes() {
$this
->_write('list-tubes');
return
$this
->_statsRead();
}
public
function
listTubeUsed() {
$this
->_write('list-tube-used');
$status
=
strtok
(
$this
->_read(), ' ');
switch
(
$status
) {
case
'USING':
return
strtok
(' ');
default
:
$this
->_error(
$status
);
return
false;
}
}
public
function
listTubesWatched() {
$this
->_write('list-tubes-watched');
return
$this
->_statsRead();
}
protected
function
_statsRead(
$decode
= true) {
$status
=
strtok
(
$this
->_read(), ' ');
switch
(
$status
) {
case
'OK':
$data
=
$this
->_read((integer)
strtok
(' '));
return
$decode
?
$this
->_decode(
$data
) :
$data
;
default
:
$this
->_error(
$status
);
return
false;
}
}
protected
function
_decode(
$data
) {
$data
=
array_slice
(
explode
("\n",
$data
), 1);
$result
= [];
foreach
(
$data
as
$key
=>
$value
) {
if
(
$value
[0] === '-') {
$value
= ltrim(
$value
, '- ');
}
elseif
(
strpos
(
$value
, ':') !== false) {
list(
$key
,
$value
) =
explode
(':',
$value
);
$value
= ltrim(
$value
, ' ');
}
if
(
is_numeric
(
$value
)) {
$value
= (integer)
$value
==
$value
? (integer)
$value
: (float)
$value
;
}
$result
[
$key
] =
$value
;
}
return
$result
;
}
}
?>