RabbitMQ与PHP(二)—— 相关服务安装及怎么用PHP作为守护模式处理消息

WBOY
Release: 2016-06-13 12:15:54
Original
1000 people have browsed it

RabbitMQ与PHP(二)—— 相关服务安装及如何用PHP作为守护模式处理消息

在上一节中,详细介绍了RabbitMQ的exchange/routingkey/queue等概念,以及示例了如何使用PHP发送和处理消息的代码。这一节,将介绍在项目中如何使用PHP多线程的进行消息实时处理,以及简要介绍一些RabbitMQ的安装相关。熟悉的可以将安装这部分跳过。

一、RabbitMQ的安装:

需要首先安装erlang

#安装jdk环境sudo apt-get install openjdk-7-jdk#安装相关类库和工具包sudo apt-get install libncurses5-dev m4 fop freeglut3-dev libwxgtk2.8-dev g++ libssl-dev xsltproc build-essential tk8.5  unixodbc unixodbc-dev libxml2-utils#下载erlang安装包并安装wget http://www.erlang.org/download/otp_src_R16B03-1.tar.gztar -xzvf otp_src_R16B03-1.tar.gzcd otp_src_R16B03-1./configure --prefix=/usr/local/erlangmakesudo make install
Copy after login

注意:?这里将erlang安装到了指定的目录:?/usr/local/erlang,而不是使用默认的路径。这是一个好的习惯,对于版本控制等都会有好处。但是这会导致后面?rabbitMQ报错:找不到erl?执行文件,需要多做一些处理才行。

更新环境变量:

sudo vi /etc/profile
Copy after login

在最后一行加上

export PATH=/usr/local/erlang/bin:$PATH
Copy after login

保存退出后

source /etc/profile
Copy after login

然后到命令行中输入erl看是否安装成功

安装RabbitMQ

wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.2.3/rabbitmq-server-generic-unix-3.2.3.tar.gztar -xzvf rabbitmq-server-generic-unix-3.2.3.tar.gzmv rabbitmq_server-3.2.3/ /usr/local/cd /usr/local/rabbitmq_server-3.2.3/sbin./rabbitmq-server
Copy after login

到这里如果出现一个报错信息:?./rabbitmq-server:?line?86:?erl:?command?not?found??这是因为erlang指定了安装路径,在系统的PATH中找不到。只要export?PATH=$PATH:/usr/local/erlang/bin?就可以了。

如果为了rc.local启动方便,可以将?export?PATH=$PATH:/usr/local/erlang/bin?这一行写入到?rabbitmq-server?文件中:

1f178a82b9014a90f9dd87cda8773912b21beedb

执行后,ps?-aux?一下,看到进程中有/usr/local/erlang/lib/erlang/erts-5.10.1/bin/epmd?-daemon?和?/usr/local/erlang/lib/erlang/erts-5.10.1/bin/beam.smp?就OK了。

sbin目录下还有一个脚本:?rabbitmqctl?也很常用,与?rabbitmq-server?一样需要指明erlang的路径才能正确工作。

常用的方法:

rabbitmqctl?start_app?启动后,执行一下这个比较保险rabbitmqctl?list_exchanges?显示当前所有的交换机rabbitmqctl?list_queue?查看当前有效队列情况
Copy after login

二、 PHP?extension的安装:

PHP操作rabbitmq需要AMQP扩展的支持。下载扩展:?http://pecl.php.net/package/amqp?,安装过程与一般扩展一样,

/usr/local/php/bin/phpize./configure?--with-php-config=/usr/local/php/bin/php-configsudo make?&&?make?install
Copy after login

在编译安装过程中如果报错信息,可参考这篇文章:Ubuntu 12.04安装RabbitMQ的PHP扩展出现的问题及解决办法。

然后编辑php.ini?插入:

[amqp]extension?=?amqp.so
Copy after login

重启apache或nginx,查看phpinfo其中有关于anqp的段落,就OK了。

0df431adcbef760936f25aef2fdda3cc7dd99ec1

三、如何使用PHP进行实时后端消息处理

首先,要保证PHP文件可以正确的以堵塞方式处理消息。代码可参见上一节RabbitMQ与PHP(一)。

然后,我们来借助Python实现一个多线程的守护进程,由这个守护进程来调用PHP,把PHP作为工作线程。

启动脚本:start_jobs.py

# _*_ coding:utf-8 _*_'''yoka at 实现多线程处理任务的守护进程Created on 2012-4-7@author: xwarrior@update: [email protected]'''#在此引入项目需要的数据包from MyJobs import MyJobsfrom MyThread import MyThreadimport loggingimport timedef main():    logger = logging.getLogger('main()')    logger.info('server start!')    worker_threads = 2 #定义需要启动的线程数量    timeline = 2 #线程检查时间间隔,秒    thread_pool = {}    for i in range(0, worker_threads ):        param = 'some param'        job = MyJobs( param )        thread = MyThread( job, i )        thread.setDaemon = True        thread.start()        logger.info('start thread %s' %( thread.getName() ))        thread_pool[i] = thread    #干完就结束模式    #for eachKey in thread_pool.keys():    # thread_pool[eachKey].join()    #保持线程数量模式    while 1:        time.sleep(timeline)        # 检查每一个线程        for eachKey in thread_pool.keys():            if thread_pool[eachKey].isAlive():                print 'thread alive:' + str(i)            else:                print 'thread down:' + str(i)                thread_pool[eachKey].run()    logger.info('main exist!')    returnif __name__ == '__main__':    #init config format    FORMAT = '%(asctime)-15s %(name)s %(levelname)s file %(filename)s:lineno %(lineno)s - %(message)s'    logging.basicConfig(format=FORMAT,level=logging.INFO)    main()    pass
Copy after login

线程脚本:?MyThread.py

# _*_ coding:utf-8 _*_'''Created on 2013-03-25@author: [email protected]'''from threading import Threadclass MyThread(Thread):    '''    创建线程    '''    def __init__(self,job,thread_id):        '''        Constructor        '''        self.job = job        Thread.__init__(self, name = 'my_thread_%s' %(thread_id))    def run(self):        self.job.run()    def stop(self):        self.job.exit()
Copy after login

任务脚本:?MyJobs.py

# _*_ coding:utf-8 _*_'''Created on 2013-03-25@author: [email protected]'''import osimport urllib2class MyJobs(object):    def __init__(self, param ):        #do something        self.param = param    def __del__(self):        ''' destruct '''        self.exit()    def exit(self):        ''' 退出'''        self.quit = True    def run(self):        ''' 开始处理 '''        #使用shell模式        #cmd = '/usr/bin/curl "http://at.yoka.com/try/amqp_consume.php?key=' + str(self.param) + '"'        cmd = '/usr/local/php/bin/php -c /usr/local/php/lib/nginx.ini /home/jimmy/at/DocumentRoot/try/amqp_consume.php ' + str(self.param)        re = os.system(cmd)        #使用web模式        #req = urllib2.Request('http://at.yoka.com/try/amqp_consume.php?key=' + str(self.param))        #response = urllib2.urlopen(req)        #re = response.read()        #print re
Copy after login

在任务调度(start_jobs.py)中,设计了两种工作模式:

一种工作模式是一共启动N个线程去干活,适合于尽快完成一个大任务;

另一种是保持进程数量,当发现某个进程完成后,再重新将进程启动起来。显然,用户守护处理消息适合这种模式。

具体工作在MyJob.py中,提供了系统Shell调用和采用URL调用两种方式。推荐使用shell直接调用php的方式,这样可以灵活控制Php.ini,比如增加auto_prepend_file、增长max_execution_time等。

实际项目中,假定有5种类型的消息,可以启动20个线程,将thread_id当作参数传递给PHP。PHP将thread_id%5当作待处理类型,就可以得到每种类型有4个线程工作的场景了。

考虑到PHP的执行时间限制及内存泄露问题,可以将consume.php脚本进行一下改进,让PHP脚本每次处理指定数量的消息后就退出,由Python多线程框架重新启动线程,以保证运行稳定可靠。另外,将应答改为手工应答,确保消息获得正确有效处理。

/$q->consume('processMessage'); //需手动应答/*** 消费回调函数*/function processMessage($envelope, $queue) {    global $counter;    $msg = $envelope->getBody();    echo $msg."\n"; //处理消息    $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答    if($counter++ > 5)return FALSE; //处理5个消息后退出}
Copy after login

用?两个线程,检查间隔2秒,SHELL模式?测试运行结果:

f703738da9773912b0eab410f9198618377ae2f6

由上图可见,运行开始后,检查2个线程都处于活跃状态,并对消息进行了正确处理,当处理到一定数量后,PHP程序结束,父进程检查到有进程处于完成状态,重新将其启动(第二个绿色框)。完全与预期相符。

?

http://nonfu.me/p/8838.html

Related labels:
source:php.cn
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template