RabbitMQ と PHP (2) - 関連サービスのインストールと、PHP をデーモン モードとして使用してメッセージを処理する方法
前のセクションでは、Exchange/routingkey/queue などの RabbitMQ の概念を詳しく紹介しました。 PHP を使用してメッセージを送信および処理する方法のコードの例も示します。このセクションでは、プロジェクト内でリアルタイム メッセージ処理に PHP マルチスレッドを使用する方法を紹介し、インストール関連の RabbitMQ についても簡単に紹介します。詳しい場合は、インストールのこの部分をスキップできます。
1. RabbitMQ のインストール:
最初に erlang をインストールする必要があります
1 | #安装jdk环境sudo apt-get install openjdk-7-jdk#安装相关类库和工具包sudo apt-get install libncurses5-dev m4 fop freeglut3-dev libwxgtk2.8-dev g++ libssl-dev xsltproc build-essential tk8.5 unixodbc unixodbc-dev libxml2-utils#下载erlang安装包并安装wget http:
|
ログイン後にコピー
注: ?ここでerlang をインストールします。デフォルトのパスを使用する代わりに、指定されたディレクトリ ?/usr/local/erlang にアクセスします。これは良い習慣であり、バージョン管理などに役立ちます。ただし、これにより、後で RabbitMQ がエラーを報告することになります。erl 実行可能ファイルが見つからないため、さらに処理が必要です。
環境変数を更新します:
1 | export PATH=/usr/local/erlang/bin: $PATH
|
ログイン後にコピー
を保存して終了した後、最後の行に
を追加し、次に進みます。コマンド ライン erl と入力して、インストールが成功したかどうかを確認します。
RabbitMQ をインストールします
ここにエラー メッセージが表示される場合: ?./rabbitmq-server:? line?86:? erl:?command?not?found??これは、erlang で指定されたインストール パスがシステムの PATH に見つからないためです。 import?PATH=$PATH:/usr/local/erlang/bin? だけです。
rc.local の起動を容易にしたい場合は、?export?PATH=$PATH:/usr/local/erlang/bin? という行を ?rabbitmq-server? ファイルに書き込むことができます。 🎜>

実行後、ps?-aux? が /usr/local/erlang/lib/erlang/erts-5.10.1/bin/epmd?-daemon? であることを確認します。プロセス内では、?/usr/local/erlang/lib/erlang/erts-5.10.1/bin/beam.smp? は問題ありません。
sbin ディレクトリにもスクリプトがあります: ?rabbitmqctl? も、?rabbitmq-server? と同様に、正しく動作させるためには erlang のパスを指定する必要があります。
一般的に使用されるメソッド:
1 | rabbitmqctl?start_app?启动后,执行一下这个比较保险rabbitmqctl?list_exchanges?显示当前所有的交换机rabbitmqctl?list_queue?查看当前有效队列情况
|
ログイン後にコピー
二、 PHP?extension的安装:
PHP操作rabbitmq需要AMQP扩展的支持。下载扩展:?http://pecl.php.net/package/amqp?,安装过程与一般扩展一样,
1 | /usr/local/php/bin/phpize./configure?--with-php-config=/usr/local/php/bin/php-configsudo make?&&?make?install
|
ログイン後にコピー
在编译安装过程中如果报错信息,可参考这篇文章:Ubuntu 12.04安装RabbitMQ的PHP扩展出现的问题及解决办法。
然后编辑php.ini?插入:
1 | [amqp]extension?=?amqp.so
|
ログイン後にコピー
重启apache或nginx,查看phpinfo其中有关于anqp的段落,就OK了。

三、如何使用PHP进行实时后端消息处理
首先,要保证PHP文件可以正确的以堵塞方式处理消息。代码可参见上一节RabbitMQ与PHP(一)。
然后,我们来借助Python实现一个多线程的守护进程,由这个守护进程来调用PHP,把PHP作为工作线程。
启动脚本:start_jobs.py
1 | # _*_ coding:utf-8 _*_ '' 'yoka at 实现多线程处理任务的守护进程Created on 2012-4-7@author: xwarrior@update: [email protected]' '' #在此引入项目需要的数据包from MyJobs import MyJobsfrom MyThread import MyThreadimport loggingimport timedef main(): logger = logging.getLogger( 'main()' ) logger.info( 'server start!' ) worker_threads = 2 #定义需要启动的线程数量 timeline = 2 #线程检查时间间隔,秒 thread_pool = {} for i in range(0, worker_threads ): param = 'some param' job = MyJobs( param ) thread = MyThread( job, i ) thread.setDaemon = True thread.start() logger.info( 'start thread %s' %( thread.getName() )) thread_pool[i] = thread #干完就结束模式 # for eachKey in thread_pool.keys(): # thread_pool[eachKey].join() #保持线程数量模式 while 1: time.sleep(timeline) # 检查每一个线程 for eachKey in thread_pool.keys(): if thread_pool[eachKey].isAlive(): print 'thread alive:' + str(i) else : print 'thread down:' + str(i) thread_pool[eachKey].run() logger.info( 'main exist!' ) returnif __name__ == '__main__' : #init config format FORMAT = '%(asctime)-15s %(name)s %(levelname)s file %(filename)s:lineno %(lineno)s - %(message)s' logging.basicConfig(format=FORMAT,level=logging.INFO) main() pass
|
ログイン後にコピー
线程脚本:?MyThread.py
1 | # _*_ coding:utf-8 _*_ '' 'Created on 2013-03-25@author: [email protected]' '' from threading import Threadclass MyThread(Thread): '' ' 创建线程 ' '' def __init__(self,job,thread_id): '' ' Constructor ' '' self.job = job Thread.__init__(self, name = 'my_thread_%s' %(thread_id)) def run(self): self.job.run() def stop(self): self.job. exit ()
|
ログイン後にコピー
任务脚本:?MyJobs.py
1 | # _*_ coding:utf-8 _*_ '' 'Created on 2013-03-25@author: [email protected]' '' import osimport urllib2class MyJobs(object): def __init__(self, param ): # do something self.param = param def __del__(self): '' ' destruct ' '' self. exit () def exit (self): '' ' 退出' '' self.quit = True def run(self): '' ' 开始处理 ' '' #使用shell模式 #cmd = '/usr/bin/curl "http://at.yoka.com/try/amqp_consume.php?key=' + str(self.param) + '"' cmd = '/usr/local/php/bin/php -c /usr/local/php/lib/nginx.ini /home/jimmy/at/DocumentRoot/try/amqp_consume.php ' + str(self.param) re = os.system(cmd) #使用web模式 #req = urllib2.Request( 'http://at.yoka.com/try/amqp_consume.php?key=' + str(self.param)) #response = urllib2.urlopen(req) #re = response.read() # print re
|
ログイン後にコピー
在任务调度(start_jobs.py)中,设计了两种工作模式:
一种工作模式是一共启动N个线程去干活,适合于尽快完成一个大任务;
另一种是保持进程数量,当发现某个进程完成后,再重新将进程启动起来。显然,用户守护处理消息适合这种模式。
具体工作在MyJob.py中,提供了系统Shell调用和采用URL调用两种方式。推荐使用shell直接调用php的方式,这样可以灵活控制Php.ini,比如增加auto_prepend_file、增长max_execution_time等。
实际项目中,假定有5种类型的消息,可以启动20个线程,将thread_id当作参数传递给PHP。PHP将thread_id%5当作待处理类型,就可以得到每种类型有4个线程工作的场景了。
考虑到PHP的执行时间限制及内存泄露问题,可以将consume.php脚本进行一下改进,让PHP脚本每次处理指定数量的消息后就退出,由Python多线程框架重新启动线程,以保证运行稳定可靠。另外,将应答改为手工应答,确保消息获得正确有效处理。
1 | / $q ->consume( 'processMessage' );
|
ログイン後にコピー
用?两个线程,检查间隔2秒,SHELL模式?测试运行结果:

由上图可见,运行开始后,检查2个线程都处于活跃状态,并对消息进行了正确处理,当处理到一定数量后,PHP程序结束,父进程检查到有进程处于完成状态,重新将其启动(第二个绿色框)。完全与预期相符。
?
http://nonfu.me/p/8838.html