ホームページ バックエンド開発 PHPチュートリアル RabbitMQ と PHP (2) - 関連サービスのインストールと、PHP をデーモン モードとして使用してメッセージを処理する方法

RabbitMQ と PHP (2) - 関連サービスのインストールと、PHP をデーモン モードとして使用してメッセージを処理する方法

Jun 13, 2016 pm 12:15 PM
erlang php rabbitmq self thread

RabbitMQ と PHP (2) - 関連サービスのインストールと、PHP をデーモン モードとして使用してメッセージを処理する方法

前のセクションでは、Exchange/routingkey/queue などの RabbitMQ の概念を詳しく紹介しました。 PHP を使用してメッセージを送信および処理する方法のコードの例も示します。このセクションでは、プロジェクト内でリアルタイム メッセージ処理に PHP マルチスレッドを使用する方法を紹介し、インストール関連の RabbitMQ についても簡単に紹介します。詳しい場合は、インストールのこの部分をスキップできます。

1. RabbitMQ のインストール:

最初に erlang をインストールする必要があります

1

#安装jdk环境sudo apt-get install openjdk-7-jdk#安装相关类库和工具包sudo apt-get install libncurses5-dev m4 fop freeglut3-dev libwxgtk2.8-dev g++ libssl-dev xsltproc build-essential tk8.5  unixodbc unixodbc-dev libxml2-utils#下载erlang安装包并安装wget http://www.erlang.org/download/otp_src_R16B03-1.tar.gztar -xzvf otp_src_R16B03-1.tar.gzcd otp_src_R16B03-1./configure --prefix=/usr/local/erlangmakesudo make install

ログイン後にコピー

注: ?ここでerlang をインストールします。デフォルトのパスを使用する代わりに、指定されたディレクトリ ?/usr/local/erlang にアクセスします。これは良い習慣であり、バージョン管理などに役立ちます。ただし、これにより、後で RabbitMQ がエラーを報告することになります。erl 実行可能ファイルが見つからないため、さらに処理が必要です。

環境変数を更新します:

1

sudo vi /etc/profile

ログイン後にコピー

1

export PATH=/usr/local/erlang/bin:$PATH

ログイン後にコピー

を保存して終了した後、最後の行に

1

source /etc/profile

ログイン後にコピー

を追加し、次に進みます。コマンド ライン erl と入力して、インストールが成功したかどうかを確認します。

RabbitMQ をインストールします

1

wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.2.3/rabbitmq-server-generic-unix-3.2.3.tar.gztar -xzvf rabbitmq-server-generic-unix-3.2.3.tar.gzmv rabbitmq_server-3.2.3/ /usr/local/cd /usr/local/rabbitmq_server-3.2.3/sbin./rabbitmq-server

ログイン後にコピー

ここにエラー メッセージが表示される場合: ?./rabbitmq-server:? line?86:? erl:?command?not?found??これは、erlang で指定されたインストール パスがシステムの PATH に見つからないためです。 import?PATH=$PATH:/usr/local/erlang/bin? だけです。

rc.local の起動を容易にしたい場合は、?export?PATH=$PATH:/usr/local/erlang/bin? という行を ?rabbitmq-server? ファイルに書き込むことができます。 🎜>

RabbitMQ と PHP (2) - 関連サービスのインストールと、PHP をデーモン モードとして使用してメッセージを処理する方法

実行後、ps?-aux? が /usr/local/erlang/lib/erlang/erts-5.10.1/bin/epmd?-daemon? であることを確認します。プロセス内では、?/usr/local/erlang/lib/erlang/erts-5.10.1/bin/beam.smp? は問題ありません。

sbin ディレクトリにもスクリプトがあります: ?rabbitmqctl? も、?rabbitmq-server? と同様に、正しく動作させるためには erlang のパスを指定する必要があります。

一般的に使用されるメソッド:

1

rabbitmqctl?start_app?启动后,执行一下这个比较保险rabbitmqctl?list_exchanges?显示当前所有的交换机rabbitmqctl?list_queue?查看当前有效队列情况

ログイン後にコピー

二、 PHP?extension的安装:

PHP操作rabbitmq需要AMQP扩展的支持。下载扩展:?http://pecl.php.net/package/amqp?,安装过程与一般扩展一样,

1

/usr/local/php/bin/phpize./configure?--with-php-config=/usr/local/php/bin/php-configsudo make?&&?make?install

ログイン後にコピー

在编译安装过程中如果报错信息,可参考这篇文章:Ubuntu 12.04安装RabbitMQ的PHP扩展出现的问题及解决办法。

然后编辑php.ini?插入:

1

[amqp]extension?=?amqp.so

ログイン後にコピー

重启apache或nginx,查看phpinfo其中有关于anqp的段落,就OK了。

RabbitMQ と PHP (2) - 関連サービスのインストールと、PHP をデーモン モードとして使用してメッセージを処理する方法

三、如何使用PHP进行实时后端消息处理

首先,要保证PHP文件可以正确的以堵塞方式处理消息。代码可参见上一节RabbitMQ与PHP(一)。

然后,我们来借助Python实现一个多线程的守护进程,由这个守护进程来调用PHP,把PHP作为工作线程。

启动脚本:start_jobs.py

1

# _*_ coding:utf-8 _*_'''yoka at 实现多线程处理任务的守护进程Created on 2012-4-7@author: xwarrior@update: [email protected]'''#在此引入项目需要的数据包from MyJobs import MyJobsfrom MyThread import MyThreadimport loggingimport timedef main():    logger = logging.getLogger('main()')    logger.info('server start!')    worker_threads = 2 #定义需要启动的线程数量    timeline = 2 #线程检查时间间隔,秒    thread_pool = {}    for i in range(0, worker_threads ):        param = 'some param'        job = MyJobs( param )        thread = MyThread( job, i )        thread.setDaemon = True        thread.start()        logger.info('start thread %s' %( thread.getName() ))        thread_pool[i] = thread    #干完就结束模式    #for eachKey in thread_pool.keys():    # thread_pool[eachKey].join()    #保持线程数量模式    while 1:        time.sleep(timeline)        # 检查每一个线程        for eachKey in thread_pool.keys():            if thread_pool[eachKey].isAlive():                print 'thread alive:' + str(i)            else:                print 'thread down:' + str(i)                thread_pool[eachKey].run()    logger.info('main exist!')    returnif __name__ == '__main__':    #init config format    FORMAT = '%(asctime)-15s %(name)s %(levelname)s file %(filename)s:lineno %(lineno)s - %(message)s'    logging.basicConfig(format=FORMAT,level=logging.INFO)    main()    pass

ログイン後にコピー

线程脚本:?MyThread.py

1

# _*_ coding:utf-8 _*_'''Created on 2013-03-25@author: [email protected]'''from threading import Threadclass MyThread(Thread):    '''    创建线程    '''    def __init__(self,job,thread_id):        '''        Constructor        '''        self.job = job        Thread.__init__(self, name = 'my_thread_%s' %(thread_id))    def run(self):        self.job.run()    def stop(self):        self.job.exit()

ログイン後にコピー

任务脚本:?MyJobs.py

1

# _*_ coding:utf-8 _*_'''Created on 2013-03-25@author: [email protected]'''import osimport urllib2class MyJobs(object):    def __init__(self, param ):        #do something        self.param = param    def __del__(self):        ''' destruct '''        self.exit()    def exit(self):        ''' 退出'''        self.quit = True    def run(self):        ''' 开始处理 '''        #使用shell模式        #cmd = '/usr/bin/curl "http://at.yoka.com/try/amqp_consume.php?key=' + str(self.param) + '"'        cmd = '/usr/local/php/bin/php -c /usr/local/php/lib/nginx.ini /home/jimmy/at/DocumentRoot/try/amqp_consume.php ' + str(self.param)        re = os.system(cmd)        #使用web模式        #req = urllib2.Request('http://at.yoka.com/try/amqp_consume.php?key=' + str(self.param))        #response = urllib2.urlopen(req)        #re = response.read()        #print re

ログイン後にコピー

在任务调度(start_jobs.py)中,设计了两种工作模式:

一种工作模式是一共启动N个线程去干活,适合于尽快完成一个大任务;

另一种是保持进程数量,当发现某个进程完成后,再重新将进程启动起来。显然,用户守护处理消息适合这种模式。

具体工作在MyJob.py中,提供了系统Shell调用和采用URL调用两种方式。推荐使用shell直接调用php的方式,这样可以灵活控制Php.ini,比如增加auto_prepend_file、增长max_execution_time等。

实际项目中,假定有5种类型的消息,可以启动20个线程,将thread_id当作参数传递给PHP。PHP将thread_id%5当作待处理类型,就可以得到每种类型有4个线程工作的场景了。

考虑到PHP的执行时间限制及内存泄露问题,可以将consume.php脚本进行一下改进,让PHP脚本每次处理指定数量的消息后就退出,由Python多线程框架重新启动线程,以保证运行稳定可靠。另外,将应答改为手工应答,确保消息获得正确有效处理。

1

/$q->consume('processMessage'); //需手动应答/*** 消费回调函数*/function processMessage($envelope, $queue) {    global $counter;    $msg = $envelope->getBody();    echo $msg."\n"; //处理消息    $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答    if($counter++ > 5)return FALSE; //处理5个消息后退出}

ログイン後にコピー

用?两个线程,检查间隔2秒,SHELL模式?测试运行结果:

RabbitMQ と PHP (2) - 関連サービスのインストールと、PHP をデーモン モードとして使用してメッセージを処理する方法

由上图可见,运行开始后,检查2个线程都处于活跃状态,并对消息进行了正确处理,当处理到一定数量后,PHP程序结束,父进程检查到有进程处于完成状态,重新将其启动(第二个绿色框)。完全与预期相符。

?

http://nonfu.me/p/8838.html

このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。

ホットな記事タグ

メモ帳++7.3.1

メモ帳++7.3.1

使いやすく無料のコードエディター

SublimeText3 中国語版

SublimeText3 中国語版

中国語版、とても使いやすい

ゼンドスタジオ 13.0.1

ゼンドスタジオ 13.0.1

強力な PHP 統合開発環境

ドリームウィーバー CS6

ドリームウィーバー CS6

ビジュアル Web 開発ツール

SublimeText3 Mac版

SublimeText3 Mac版

神レベルのコード編集ソフト(SublimeText3)

Ubuntu および Debian 用の PHP 8.4 インストールおよびアップグレード ガイド Ubuntu および Debian 用の PHP 8.4 インストールおよびアップグレード ガイド Dec 24, 2024 pm 04:42 PM

Ubuntu および Debian 用の PHP 8.4 インストールおよびアップグレード ガイド

CakePHP の日付と時刻 CakePHP の日付と時刻 Sep 10, 2024 pm 05:27 PM

CakePHP の日付と時刻

CakePHP ファイルのアップロード CakePHP ファイルのアップロード Sep 10, 2024 pm 05:27 PM

CakePHP ファイルのアップロード

CakePHP ルーティング CakePHP ルーティング Sep 10, 2024 pm 05:25 PM

CakePHP ルーティング

CakePHP プロジェクトの構成 CakePHP プロジェクトの構成 Sep 10, 2024 pm 05:25 PM

CakePHP プロジェクトの構成

CakePHP について話し合う CakePHP について話し合う Sep 10, 2024 pm 05:28 PM

CakePHP について話し合う

CakePHP クイックガイド CakePHP クイックガイド Sep 10, 2024 pm 05:27 PM

CakePHP クイックガイド

PHP 開発用に Visual Studio Code (VS Code) をセットアップする方法 PHP 開発用に Visual Studio Code (VS Code) をセットアップする方法 Dec 20, 2024 am 11:31 AM

PHP 開発用に Visual Studio Code (VS Code) をセットアップする方法

See all articles