ホームページ > バックエンド開発 > PHPチュートリアル > kestrel php メッセージ キューを再印刷する

kestrel php メッセージ キューを再印刷する

WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB
リリース: 2016-06-13 13:01:50
オリジナル
897 人が閲覧しました

转下kestrel php 消息队列

私たちはしばらく仕事で Twitter の kestrel キュー サーバーを使用していますが、Python で書かれたサービス層からのみ使用しています。 PHP で記述されたアプリケーション層からのキューのニーズがいくつかあるため、今週数日かけて Web アプリケーションにキューのサポートを追加しました。私が学んだことと、それをどのように実践したかを共有したいと思いました。

目標

Kestrel サーバー自体は非常に簡単に起動して実行できました。唯一指摘したいのは、マスターを使用しようとしたときはかなり不安定だったので、リリースブランチに固執することをお勧めするということです。クライアントの実装に関して、開始時に念頭に置いていた目標がいくつかありました。

? Kestrel は memcache プロトコルに基づいて構築されているため、memcache クライアントを最初から構築するのではなく、既存の memcache クライアントを活用してみてください
???以前ここで説明した既存のバッチ ジョブ インフラストラクチャを活用し、マルチテナントのニーズが確実に満たされるようにします
???後でキューサーバーを変更する場合に備えて、キューインターフェースを汎用のままにしておきます
???既存のケストレル管理ツールを利用し、必要な機能のみを構築します

これらの目標を念頭に置いて、Kestrel クライアント、プロデューサー、コンシューマー、そしてコンシューマーを実行するための非常に小さな CLI ハーネスの 4 つのコンポーネントを完成しました。しかし、何もコーディングする前に、同僚の Matt Erkkila が書いた Kestrel の Web UI である Kestrel Web をセットアップしました。 Kestrel Web を使用すると、Kestrel の統計を表示したり、キューを管理したり、手動入力に基づいてキューを並べ替えたりフィルターしたりすることができます。このツールを最初から起動して実行することで、ジョブがテスト キューに追加および消費されるのを簡単に確認でき、必要に応じてキューを簡単にフラッシュすることもできました。

Kestrel クライアント

PHP 用の既存の Kestrel クライアントが見つからなかったので、2 つの memcache 拡張機能を調べ始めました。古い memcache と Andrei Zmievski の memcached で、後者は libmemcached ライブラリに基づいています。 memcache から始めましたが、最初は問題なく動作していましたが、タイムアウトを変更できないことがすぐにわかりました。これは道の邪魔ですか? kestrel は新しいジョブをポーリングすることを推奨しています。ポーリング タイムアウトを 1 秒以上 (memcache のデフォルト) に設定しようとすると、memcache 拡張機能からのタイムアウト エラーが表示されます。 memcached 拡張機能にはこれらの問題がないため、これを使用しました。

最初に遭遇した問題は連載でした。 memcached のシリアライザーを使用して kestrel に書き込むことはできますが、データを読み戻すときに、シリアライザーであることが認識されません。したがって、クライアントでデータを手動でシリアル化するだけで、問題なく動作します。もう 1 つ注意すべき点は、memcached 拡張機能はデフォルトで 100 バイトを超えるものを自動的に圧縮し、kestrel から読み取るときに解凍しないため、圧縮を無効にするか手動で行う必要があることです。

もう 1 つの問題は、カスタム Kestrel コマンドを使用したくても使用できないことです。アプリケーション層には特別なことは何も必要ないため、memcached 拡張機能は問題なく機能します。 Kestrel 2 で今後のモニター (バッチ処理) のサポートが必要になったら、Kestrel クライアントを最初から実装する必要があるかもしれません。 Kestrel Web は、私たちが現在必要としているものをすべて提供します。

memcached を使用することが決定したら、そのためのライト デコレータ EC_KestrelClient を作成しました。これは、memcached クライアントのインスタンス化、シリアル化、および GET コマンドに対するいくつかの kestrel 固有のオプションのヘルパーを処理します。また、memcached 固有のオプションを渡すサポートもあります。クラスは最終的に次のようになりました:

?

<?php 
/** 
* A thin kestrel client that wraps Memcached (libmemcached extension) 
* 
* @author Bill Shupp <hostmaster@shupp.org> 
* @copyright 2010-2011 Empower Campaigns 
*/ 
class EC_KestrelClient 
{ 
    /** 
* The Memcached instance 
* 
* @var Memcached 
*/ 
    protected $_memcached = null; 

    /** 
* The Kestrel server IP 
* 
* @var string 
*/ 
    protected $_host = '127.0.0.1'; 

    /** 
* The Kestrel server port 
* 
* @var string 
*/ 
    protected $_port = 22133; 

    /** 
* Optional options, not currently used 
* 
* @var array 
*/ 
    protected $_options = array(); 

    /** 
* Sets the host, port, and options to be used 
* 
* @param string $host The host to use, defaults to 127.0.0.1 
* @param int $port The port to use, defaults to 22133 
* @param array $options Memcached options, not currently used 
* 
* @return void 
*/ 
    public function __construct( 
        $host = '127.0.0.1', $port = 22133, array $options = array() 
    ) 
    { 
        $this->_host = $host; 
        $this->_port = $port; 
        $this->setOptions($options); 
    } 


    /** 
* Sets job data on the queue, json_encoding the value to avoid problematic 
* serialization. 
* 
* @param string $queue The queue name 
* @param mixed $data The data to store 
* 
* @return bool 
*/ 
    public function set($queue, $data) 
    { 
        // Local json serialization, as kestrel doesn't send serialization flags 
        return $this->getMemcached()->set($queue, json_encode($data)); 
    } 

    /** 
* Reliably read an item off of the queue. Meant to be run in a loop, and 
* call closeReliableRead() when done to make sure the final job is not left 
* on the queue. 
* 
* @param mixed $queue The queue name to read from 
* @param int $timeout The timeout to wait for a job to appear 
* 
* @return array|false 
* @see closeReliableRead() 
*/ 
    public function reliableRead($queue, $timeout = 1000) 
    { 
        $queue = $queue . '/close/open/t=' . $timeout; 
        $result = $this->getMemcached()->get($queue); 
        if ($result === false) { 
            return $result; 
        } 
        // Local json serialization, as kestrel doesn't send serialization flags 
        return json_decode($result, true); 
    } 

    /** 
* Closes any existing open read 
* 
* @param string $queue The queue name 
* 
* @return false 
*/ 
    public function closeReliableRead($queue) 
    { 
        $queue = $queue . '/close'; 
        return $this->getMemcached()->get($queue); 
    } 

    /** 
* Aborts an existing reliable read 
* 
* @param string $queue The queue name 
* 
* @return false 
*/ 
    public function abortReliableRead($queue) 
    { 
        $queue = $queue . '/abort'; 
        return $this->getMemcached()->get($queue); 
    } 

    /** 
* Set an option to be used with the Memcached client. Not used. 
* 
* @param string $name The option name 
* @param value $value The option value 
* 
* @return void 
*/ 
    public function setOption($name, $value) 
    { 
        $this->_options[$name] = $value; 
    } 

    /** 
* Sets multiple options 
* 
* @param array $options Array of key/values to set 
* 
* @return void 
*/ 
    public function setOptions(array $options) 
    { 
        foreach ($options as $name => $value) { 
            $this->setOption($name, $value); 
        } 
    } 

    /** 
* Gets a current option's value 
* 
* @param string $name The option name 
* 
* @return mixed 
*/ 
    public function getOption($name) 
    { 
        if (isset($this->_options[$name])) { 
            return $this->_options[$name]; 
        } 
        return null; 
    } 

    /** 
* Gets all current options 
* 
* @return array 
*/ 
    public function getOptions() 
    { 
        return $this->_options; 
    } 

    /** 
* Gets a singleton instance of the Memcached client 
* 
* @return Memcached 
*/ 
    public function getMemcached() 
    { 
        if ($this->_memcached === null) { 
            $this->_initMemcached(); 
        } 
        return $this->_memcached; 
    } 

    /** 
* Initialized the Memcached client instance 
* 
* @return void 
*/ 
    protected function _initMemcached() 
    { 
        $this->_memcached = $this->_getMemcachedInstance(); 
        foreach ($this->_options as $option => $value) { 
            $this->_memcached->setOption($option, $value); 
        } 
        $this->_memcached->addServer($this->_host, $this->_port); 
        $this->_memcached->setOption(Memcached::OPT_COMPRESSION, false); 
    } 

    // @codeCoverageIgnoreStart 
    /** 
* Returns a new instance of Memcached. Abstracted for testing. 
* 
* @return Memcached 
*/ 
    protected function _getMemcachedInstance() 
    { 
        return new Memcached(); 
    } 
    // @codeCoverageIgnoreEnd 
} 
ログイン後にコピー

?

?

生の EC_KestrelClient.php を表示する この要点は GitHub から提供されています。

プロデューサー

プロデューサーはとてもシンプルです。現在のテナント情報を含む標準構造にデータをフォーマットし、他のプロジェクトと衝突しないようにキューに名前空間を設定し、キューに追加するだけです。プロデューサーは次のようになります:

?

<?php 
/** 
* Interface for adding jobs to a queue server 
* 
* @author Bill Shupp <hostmaster@shupp.org> 
* @copyright 2010-2011 Empower Campaigns 
*/ 
class EC_Producer 
{ 
    /** 
* Adds a job onto a queue 
* 
* @param string $queue The queue name to add a job to 
* @param string $jobName The job name for the consumer to run 
* @param mixed $data Optional additional data to pass to the job 
* 
* @return bool 
*/ 
    public function addJob($queue, $jobName, $data = null) 
    { 
        $item = array( 
            'instance' => EC::getCurrentInstanceName(), 
            'jobName' => $jobName 
        ); 
        if ($data !== null) { 
            $item['data'] = $data; 
        } 

        // Namespace queue with project 
        $queue = 'enterprise_' . $queue; 

        $client = $this->_getKestrelClient(); 
        return $client->set($queue, $item); 
    } 

    // @codeCoverageIgnoreStart 
    /** 
* Gets a single instance of EC_KestrelClient. Abstracted for testing. 
* 
* @return void 
*/ 
    protected function _getKestrelClient() 
    { 
        if (APPLICATION_ENV === 'testing') { 
            throw new Exception(__METHOD__ . ' was not mocked when testing'); 
        } 

        static $client = null; 
        if ($client === null) { 
            $host = EC::getConfigOption('kestrel.host'); 
            $port = EC::getConfigOption('kestrel.port'); 
            $client = new EC_KestrelClient($host, $port); 
        } 
        return $client; 
    } 
    // @codeCoverageIgnoreEnd 
} 
ログイン後にコピー

?

?

?

生の EC_Producer.php を表示する この要点は GitHub によって提供されています。

消費者

消費者にはもう少し理解する必要がありますが、それでも非常に簡単です。これは、daemontools や Supervisord などの監視ツールから実行することを目的としているため、CLI 引数を EC_Consumer に渡して実行するだけの非常に小さな CLI ハーネスがあります。 CLI 引数を解析した後、EC_Consumer は新しいジョブを求めて kestrel をポーリングし、標準のバッチ ジョブ インフラストラクチャを通じてそれらを実行します。 PHP の長時間実行プロセス能力にさらに自信が持てるまで、オプションの maxium jobs 引数を追加しました。これにより、コンシューマが X 個を超えるジョブを処理するのを停止して終了します。監視サービス (supervisord) は数秒以内に再起動します。また、テスト用にオプションのデバッグ引数も追加したので、発生するすべてのアクションを確認できます。 CLI ハーネスは次のようになります:

?

#!/bin/env php 
<?php 

// External application bootstrapping 
require_once __DIR__ . '/cli_init.php'; 

// Instantiate and run the consumer 
$consumer = new EC_Consumer($argv); 
$consumer->run(); 
ログイン後にコピー

?

view raw consumer_cli.php This Gist brought to you by GitHub.

And the main consumer class, EC_Consumer, looks something like this:

<?php 

/** 
* Enterprise queue consumer interface, called by bin/consumer_cli.php 
* 
* @author Bill Shupp <hostmaster@shupp.org> 
* @copyright 2010-2011 Empower Campaigns 
*/ 
class EC_Consumer 
{ 
    /** 
* Instance of {@link Zend_Console_Getopt} 
* 
* @var Zend_Console_Getopt 
*/ 
    protected $_opt = null; 

    /** 
* Which APPLICATION_ENV to run under (see -e) 
* 
* @var string 
*/ 
    protected $_environment = null; 

    /** 
* The kestrel server IP 
* 
* @var string 
*/ 
    protected $_host = null; 

    /** 
* The kestrel server port 
* 
* @var int 
*/ 
    protected $_port = null; 

    /** 
* The kestrel queue name to connect to 
* 
* @var string 
*/ 
    protected $_queue = null; 

    /** 
* Whether we should show debug output 
* 
* @var bool 
*/ 
    protected $_debug = false; 

    /** 
* Maximum # of jobs for this process to perform (for memory fail safe) 
* 
* @var int 
*/ 
    protected $_maxJobs = null; 

    /** 
* Current job count 
* 
* @var int 
*/ 
    protected $_jobCount = 0; 

    /** 
* Parses arguments from the command line and does error handling 
* 
* @param array $argv The $argv from bin/ecli.php 
* 
* @throw Zend_Console_Getopt_Exception on failure 
* @return void 
*/ 
    public function __construct(array $argv) 
    { 
        try { 
            $opt = new Zend_Console_Getopt( 
                array( 
                    'environment|e=s' => 'environment name (e.g. development)' 
                                         . ', required', 
                    'server|s=s' => 'kestrel server, format of host:port' 
                                         . ', required', 
                    'queue|q=s' => 'queue name (e.g. crawler_campaign)' 
                                         . ', required', 
                    'max-jobs|m=s' => 'max jobs to run before exiting' 
                                         . ', optional', 
                    'debug|d' => 'show debug output' 
                                         . ', optional', 
                ) 
            ); 
            $opt->setArguments($argv); 
            $opt->parse(); 

            // Set environment 
            if ($opt->e === null) { 
                throw new Zend_Console_Getopt_Exception( 
                    'Error: missing environment' 
                ); 
            } 
            $this->_environment = $opt->e; 
            // @codeCoverageIgnoreStart 
            if (!defined('APPLICATION_ENV')) { 
                define('APPLICATION_ENV', $this->_environment); 
            } 
            // @codeCoverageIgnoreEnd 


            // Set server 
            if ($opt->s === null) { 
                throw new Zend_Console_Getopt_Exception( 
                    'Error: missing server' 
                ); 
            } 
            $parts = explode(':', $opt->s); 
            if (count($parts) !== 2) { 
                throw new Zend_Console_Getopt_Exception( 
                    'Error: invalid server: ' . $opt->s 
                ); 
            } 
            $this->_host = $parts[0]; 
            $this->_port = $parts[1]; 

            // Set queue 
            if ($opt->q === null) { 
                throw new Zend_Console_Getopt_Exception( 
                    'Error: missing queue' 
                ); 
            } 
            $this->_queue = $opt->q; 

            // Set max-jobs 
            if ($opt->m !== null) { 
                $this->_maxJobs = $opt->m; 
            } 

            // Set debug 
            if ($opt->d !== null) { 
                $this->_debug = true; 
            } 
        } catch (Zend_Console_Getopt_Exception $e) { 
            echo "\n" . $e->getMessage() . "\n\n"; 
            echo $opt->getUsageMessage(); 
            // @codeCoverageIgnoreStart 
            if (!defined('APPLICATION_ENV') || APPLICATION_ENV !== 'testing') { 
                exit(1); 
            } 
            // @codeCoverageIgnoreEnd 
        } 

        $this->_opt = $opt; 
    } 


    /** 
* Polls the queue server for jobs and runs them as they come in 
* 
* @return void 
*/ 
    public function run() 
    { 
        $client = $this->_getKestrelClient(); 
        $queue = 'enterprise_' . $this->_queue; 

        while ($this->_keepRunning()) { 
            // Pull job from queue 
            $job = $client->reliableRead($queue, 500); 
            if ($job === false) { 
                $this->_debug('Nothing on queue ' . $queue); 
                continue; 
            } 

            if (!isset($job['instance'])) { 
                echo 'Instance not set in queue job: ' . print_r($job, true); 
                continue; 
            } 
            $instance = $job['instance']; 

            if (!isset($job['jobName'])) { 
                echo 'Job name not set in queue job: ' . print_r($job, true); 
                continue; 
            } 
            $jobName = $job['jobName']; 

            $data = null; 
            if (isset($job['data'])) { 
                $data = $job['data']; 
            } 

            // Run the job 
            $returnCode = $this->runJob($instance, $jobName, $data); 
            if ($returnCode !== 0) { 
                $client->abortReliableRead($queue); 
                continue; 
            } 
        } 
        $client->closeReliableRead($queue); 
    } 


    /** 
* Runs the job via bin/ecli.php 
* 
* @param string $instance The instance name to run the job under 
* @param string $jobName The job name 
* @param string $data Optional extra data 
* 
* @return int 
*/ 
    public function runJob($instance, $jobName, $data) 
    { 
        $cmd = BASE_PATH . '/bin/ecli.php ' 
            . '-e ' . $this->_environment 
            . ' -i ' . $instance 
            . ' -j ' . $jobName; 
        if ($data) { 
            $cmd .= " '" . base64_encode(json_encode($data)) . "'"; 
        } 
        $returnCode = $this->_passthru($cmd); 
        $this->_jobCount++; 
        $this->_debug('Job count: ' . $this->_jobCount); 

        return $returnCode; 
    } 

    /** 
* Check to see if the job limit has been reached 
* 
* @return bool 
*/ 
    protected function _keepRunning() 
    { 
        return ($this->_maxJobs === null) ? true 
               : ($this->_jobCount < $this->_maxJobs); 
    } 


    /** 
* Show debug messages 
* 
* @param mixed $message 
* 
* @return void 
*/ 
    protected function _debug($message) 
    { 
        if (!$this->_debug) { 
            return; 
        } 
        echo $message . "\n"; 
    } 

    // @codeCoverageIgnoreStart 
    /** 
* Calls the passthru() function and returns the exit code. Abstracted 
* for testing. 
* 
* @param string $cmd The command to execute 
* 
* @return int 
*/ 
    protected function _passthru($cmd) 
    { 
        passthru($cmd, $returnCode); 
        return $returnCode; 
    } 

    /** 
* Gets a single instance of EC_KestrelClient. Abstracted for testing. 
* 
* @return void 
*/ 
    protected function _getKestrelClient() 
    { 
        if (APPLICATION_ENV === 'testing') { 
            throw new Exception(__METHOD__ . ' was not mocked when testing'); 
        } 
        return new EC_KestrelClient($this->_host, $this->_port); 
    } 
    // @codeCoverageIgnoreEnd 
} 
ログイン後にコピー

?

?

view raw EC_Consumer.php This Gist brought to you by GitHub.

Putting it together

Now that all the pieces are put together, let's take a look at in action. Adding example job "HelloWorld" to the queue "hello_world" from within our application looks something like this:

<?php 

$producer = new EC_Producer(); 
$producer->addJob('hello_world', 'HelloWorld', array('foo' => 'bar')); 

?> 

view raw gistfile1.php This Gist brought to you by GitHub. 
ログイン後にコピー

?

?

And finally, here's an example of running the consumer from the CLI harness, along with some example debug output of processing the job:

./bin/consumer_cli.php -e development -s 127.0.0.1:22133 -q hello_world -d -m 2
Nothing on queue enterprise_hello_world
Nothing on queue enterprise_hello_world
Nothing on queue enterprise_hello_world
Nothing on queue enterprise_hello_world
Running EC_Job_HelloWorld on instance dev under environment development
Hello, world! Here is my data array:
stdClass Object
(
??? [foo] => bar
)
And here are my args: ./bin/ecli.php eyJmb28iOiJiYXIifQ==
Completed job in 0 seconds.
Job count: 1
Nothing on queue enterprise_hello_world
Nothing on queue enterprise_hello_world
Nothing on queue enterprise_hello_world
Nothing on queue enterprise_hello_world
Running EC_Job_HelloWorld on instance dev under environment development
Hello, world! Here is my data array:
stdClass Object
(
??? [foo] => bar
)
And here are my args: ./bin/ecli.php eyJmb28iOiJiYXIifQ==
Completed job in 0 seconds.
Job count: 2

view raw example.txt This Gist brought to you by GitHub.

That's it! I'd be interested to hear how other folks are interfacing with kestrel from PHP.

関連ラベル:
ソース:php.cn
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート