Application scenarios
Why should I use it? What are the benefits? This should be said at the very beginning. Only when you understand what a thing does and what it is suitable for, can you better integrate it with your own projects. Wherever you use it, you will learn it. If you don’t use it after learning it, it means you don’t know it. We usually just You should consider more questions like this: What project functions can you build that can be combined with xx technology? Is this xx technology feasible in this business scenario? Rather than "What can I do if I learn this XX technology? The company doesn't use it now, so it's useless if I learn it." It must be very painful to learn XX technology with such a mood.
Everyone knows that queues do not do some time-consuming operations first, but are buried first, and then processed asynchronously. In this way, users cannot feel some time-consuming operations such as sending emails and text messages. Yes, because the burying point is over, the operation is over, and the consumption queue is all done on the server. It is mainly used in SMS or email notifications, accessing third-party interfaces to subscribe to messages, and some flash sales activities in the mall, all of which can be completed in combination with queues.
Beanstalkd Introduction
Beanstalkd is a high-performance, lightweight distributed memory queue, C code, typical Memcached-like design, protocol and usage. The same style, so users who have used memcached will feel that Beanstalkd is familiar.
The original design intention of beanstalkd is to execute time-consuming requests asynchronously and return results in time to reduce the response delay of requests under highly concurrent network requests.
Ubuntu installation
sudo apt-get install beanstalkd
Configuration file
vim /etc/default/beanstalkd
View status
service beanstalkd status # 命令回显 # root@:/www/server/php/72/etc# service beanstalkd status ● beanstalkd.service - Simple, fast work queue Loaded: loaded (/lib/systemd/system/beanstalkd.service; enabled; vendor preset: enabled) Active: active (running) since Tue 2018-10-16 10:42:28 CST; 6 days ago Docs: man:beanstalkd(1) Main PID: 7033 (beanstalkd) Tasks: 1 (limit: 4634) CGroup: /system.slice/beanstalkd.service └─7033 /usr/bin/beanstalkd -l 0.0.0.0 -p 11300 -b /var/lib/beanstalkd Oct 16 10:42:28 ip-10-93-2-137 systemd[1]: Started Simple, fast work queue.
Configure connectivity persistence
ip Use 0.0.0.0 to allow all connections, configure security groups or firewalls to restrict connections, release the -b parameter (no persistence by default), memory The queue messages can be logged to the hard disk binlog for persistence, and the queue messages can be re-read when the power is turned off.
vim /etc/default/beanstalkd BEANSTALKD_LISTEN_ADDR=0.0.0.0 BEANSTALKD_LISTEN_PORT=11300 BEANSTALKD_EXTRA="-b /var/lib/beanstalkd"
beanstalkd Task Status
Status | Comments |
---|---|
delayed | Delayed status |
ready | Ready status |
reserved | The consumer reads out the task and processes it |
#buried | Reserved status |
delete | Delete status |
管理工具
亲测了很多网上能找到的 beanstalkd 工具,这两款是我最中意的了,一个命令行,一个 web 的。
命令行:https://github.com/src-d/beanstool
web 界面:https://github.com/ptrofimov/beanstalk_console
编程语言客户端
PHP 客户端
https://packagist.org/packages/pda/pheanstalk
composer require pda/pheanstalk
写入 job
<?php //创建队列消息 require_once('./vendor/autoload.php'); use Pheanstalk\Pheanstalk; $pheanstalk = new Pheanstalk('127.0.0.1',11300); $tubeName = 'email_list'; $jobData = [ 'email' => '123456@163.com', 'message' => 'Hello World !!', 'dtime' => date('Y-m-d H:i:s'), ]; $pheanstalk->useTube( $tubeName)->put( json_encode( $jobData ) );
消费 job
<?php ini_set('default_socket_timeout', 86400*7); ini_set( 'memory_limit', '256M' ); // 消费队列消息 require_once('./vendor/autoload.php'); use Pheanstalk\Pheanstalk; $pheanstalk = new Pheanstalk('127.0.0.1',11300); $tubeName = 'email_list'; while ( true ) { // 获取队列信息, reserve 阻塞获取 $job = $pheanstalk->watch( $tubeName )->ignore( 'default' )->reserve(); if ( $job !== false ) { $data = $job->getData(); /* TODO 逻辑操作 */ /* 处理完成,删除 job */ $pheanstalk->delete( $job ); } }
default_socket_timeout 这个参数是一定要加的,php 默认一般是 60s,假如您没有在代码里面设置,采用默认的话(60s),60s 之内如果没有 job 产生,脚本就会报 socket 错误,我写的是 7 天超时,您可以根据业务去调整,记住一定要配置,网上很多搜的 consumer 脚本都没有配置这个,根本不能投入生产环境使用,这是我亲自实践的结果。
关于 while true 是否死循环,很明确告诉你是死循环,但是不会一直耗性能的那样执行下去,它会在 reserve 这里阻塞不动,直到有消息产生才会往下走,所以大可放心使用,我的项目代码里面是使用了方法调用方法自身去实现循环的。
就是这样的代码,供参考:
public function watchJob() { $job = $this->pheanstalk->watch( config( 'tube' ) )->ignore( 'default' )->reserve(); if ( $job !== false ) { $job_data = $job->getData(); $this->subscribe( $job_data ); $this->pheanstalk->delete( $job ); /* 继续 Watch 下一个 job */ $this->watchJob(); } else { $this->log->error( 'reserve false', 'reserve false' ); } }
监控 beanstalkd 状态
<?php //监控服务状态 require_once('./vendor/autoload.php'); use Pheanstalk\Pheanstalk; $pheanstalk = new Pheanstalk('127.0.0.1',11300); $isAlive = $pheanstalk->getConnection()->isServiceListening(); var_dump( $isAlive );
可以配合 email 做一个报警邮件,脚本每分钟去执行,判断状态是 false,就给管理员发送邮件报警。
一些相关命令
查看 beanstalkd 服务内存占用
top -u beanstalkd
后台运行 consumer 脚本
nohup php googlehome_subscribe.php &
查看 consumer 脚本运行时间
ps -A -opid,stime,etime,args | grep consumer.php
手工重启 consumer 脚本
ps auxf|grep 'googlehome_subscribe.php'|grep -v grep|awk '{print $2}'|xargs kill -9 nohup php googlehome_subscribe.php &
一些总结
php 要把错误日志打开,方便收集 consumer 脚本 crash 的 log,脚本跑出一些致命的 error 一定要及时修复,因为一旦有错就会挂掉,这会影响你脚本的可用性,后期稳定之后可以上 supervisor 这种进程管理程序来管控脚本生命周期。
一些网络请求操作,一定要 try catch 到所有错误,一旦没有 catch 到,脚本就崩。我用的是 Guzzle 去做的网络请求,下面是我 catch 的一些错误,代码片段供参考。
try { /* TODO: 逻辑操作 */ } catch ( ClientException $e ) { $results['mid'] = $this->mid; $results['code'] = $e->getResponse()->getStatusCode(); $results['reason'] = $e->getResponse()->getReasonPhrase(); $this->log->error( 'properties-changed ClientException', $results ); } catch ( ServerException $e ) { $results['mid'] = $this->mid; $results['code'] = $e->getResponse()->getStatusCode(); $results['reason'] = $e->getResponse()->getReasonPhrase(); $this->log->error( 'properties-changed ServerException', $results ); } catch ( ConnectException $e ) { $results['mid'] = $this->mid; $this->log->error( 'properties-changed ConnectException', $results ); }
job 消费之后一定要删除掉,如果长时间不删除,php 客户端会有 false 返回,是因为有 DEADLINE_SOON 这个超时错误产生,所以处理完任务,一定要记得删除,这一点跟 kafka 不一样,beanstalkd 需要开发者自己去删除 job。
推荐教程:《PHP教程》
The above is the detailed content of PHP7 production environment queue Beanstalkd correct usage posture. For more information, please follow other related articles on the PHP Chinese website!