kestrel php メッセージ キューを再印刷する
转下kestrel php 消息队列
私たちはしばらく仕事で Twitter の kestrel キュー サーバーを使用していますが、Python で書かれたサービス層からのみ使用しています。 PHP で記述されたアプリケーション層からのキューのニーズがいくつかあるため、今週数日かけて Web アプリケーションにキューのサポートを追加しました。私が学んだことと、それをどのように実践したかを共有したいと思いました。
目標
Kestrel サーバー自体は非常に簡単に起動して実行できました。唯一指摘したいのは、マスターを使用しようとしたときはかなり不安定だったので、リリースブランチに固執することをお勧めするということです。クライアントの実装に関して、開始時に念頭に置いていた目標がいくつかありました。
? Kestrel は memcache プロトコルに基づいて構築されているため、memcache クライアントを最初から構築するのではなく、既存の memcache クライアントを活用してみてください
???以前ここで説明した既存のバッチ ジョブ インフラストラクチャを活用し、マルチテナントのニーズが確実に満たされるようにします
???後でキューサーバーを変更する場合に備えて、キューインターフェースを汎用のままにしておきます
???既存のケストレル管理ツールを利用し、必要な機能のみを構築します
これらの目標を念頭に置いて、Kestrel クライアント、プロデューサー、コンシューマー、そしてコンシューマーを実行するための非常に小さな CLI ハーネスの 4 つのコンポーネントを完成しました。しかし、何もコーディングする前に、同僚の Matt Erkkila が書いた Kestrel の Web UI である Kestrel Web をセットアップしました。 Kestrel Web を使用すると、Kestrel の統計を表示したり、キューを管理したり、手動入力に基づいてキューを並べ替えたりフィルターしたりすることができます。このツールを最初から起動して実行することで、ジョブがテスト キューに追加および消費されるのを簡単に確認でき、必要に応じてキューを簡単にフラッシュすることもできました。
Kestrel クライアント
PHP 用の既存の Kestrel クライアントが見つからなかったので、2 つの memcache 拡張機能を調べ始めました。古い memcache と Andrei Zmievski の memcached で、後者は libmemcached ライブラリに基づいています。 memcache から始めましたが、最初は問題なく動作していましたが、タイムアウトを変更できないことがすぐにわかりました。これは道の邪魔ですか? kestrel は新しいジョブをポーリングすることを推奨しています。ポーリング タイムアウトを 1 秒以上 (memcache のデフォルト) に設定しようとすると、memcache 拡張機能からのタイムアウト エラーが表示されます。 memcached 拡張機能にはこれらの問題がないため、これを使用しました。
最初に遭遇した問題は連載でした。 memcached のシリアライザーを使用して kestrel に書き込むことはできますが、データを読み戻すときに、シリアライザーであることが認識されません。したがって、クライアントでデータを手動でシリアル化するだけで、問題なく動作します。もう 1 つ注意すべき点は、memcached 拡張機能はデフォルトで 100 バイトを超えるものを自動的に圧縮し、kestrel から読み取るときに解凍しないため、圧縮を無効にするか手動で行う必要があることです。
もう 1 つの問題は、カスタム Kestrel コマンドを使用したくても使用できないことです。アプリケーション層には特別なことは何も必要ないため、memcached 拡張機能は問題なく機能します。 Kestrel 2 で今後のモニター (バッチ処理) のサポートが必要になったら、Kestrel クライアントを最初から実装する必要があるかもしれません。 Kestrel Web は、私たちが現在必要としているものをすべて提供します。
memcached を使用することが決定したら、そのためのライト デコレータ EC_KestrelClient を作成しました。これは、memcached クライアントのインスタンス化、シリアル化、および GET コマンドに対するいくつかの kestrel 固有のオプションのヘルパーを処理します。また、memcached 固有のオプションを渡すサポートもあります。クラスは最終的に次のようになりました:
?
<?php /** * A thin kestrel client that wraps Memcached (libmemcached extension) * * @author Bill Shupp <hostmaster@shupp.org> * @copyright 2010-2011 Empower Campaigns */ class EC_KestrelClient { /** * The Memcached instance * * @var Memcached */ protected $_memcached = null; /** * The Kestrel server IP * * @var string */ protected $_host = '127.0.0.1'; /** * The Kestrel server port * * @var string */ protected $_port = 22133; /** * Optional options, not currently used * * @var array */ protected $_options = array(); /** * Sets the host, port, and options to be used * * @param string $host The host to use, defaults to 127.0.0.1 * @param int $port The port to use, defaults to 22133 * @param array $options Memcached options, not currently used * * @return void */ public function __construct( $host = '127.0.0.1', $port = 22133, array $options = array() ) { $this->_host = $host; $this->_port = $port; $this->setOptions($options); } /** * Sets job data on the queue, json_encoding the value to avoid problematic * serialization. * * @param string $queue The queue name * @param mixed $data The data to store * * @return bool */ public function set($queue, $data) { // Local json serialization, as kestrel doesn't send serialization flags return $this->getMemcached()->set($queue, json_encode($data)); } /** * Reliably read an item off of the queue. Meant to be run in a loop, and * call closeReliableRead() when done to make sure the final job is not left * on the queue. * * @param mixed $queue The queue name to read from * @param int $timeout The timeout to wait for a job to appear * * @return array|false * @see closeReliableRead() */ public function reliableRead($queue, $timeout = 1000) { $queue = $queue . '/close/open/t=' . $timeout; $result = $this->getMemcached()->get($queue); if ($result === false) { return $result; } // Local json serialization, as kestrel doesn't send serialization flags return json_decode($result, true); } /** * Closes any existing open read * * @param string $queue The queue name * * @return false */ public function closeReliableRead($queue) { $queue = $queue . '/close'; return $this->getMemcached()->get($queue); } /** * Aborts an existing reliable read * * @param string $queue The queue name * * @return false */ public function abortReliableRead($queue) { $queue = $queue . '/abort'; return $this->getMemcached()->get($queue); } /** * Set an option to be used with the Memcached client. Not used. * * @param string $name The option name * @param value $value The option value * * @return void */ public function setOption($name, $value) { $this->_options[$name] = $value; } /** * Sets multiple options * * @param array $options Array of key/values to set * * @return void */ public function setOptions(array $options) { foreach ($options as $name => $value) { $this->setOption($name, $value); } } /** * Gets a current option's value * * @param string $name The option name * * @return mixed */ public function getOption($name) { if (isset($this->_options[$name])) { return $this->_options[$name]; } return null; } /** * Gets all current options * * @return array */ public function getOptions() { return $this->_options; } /** * Gets a singleton instance of the Memcached client * * @return Memcached */ public function getMemcached() { if ($this->_memcached === null) { $this->_initMemcached(); } return $this->_memcached; } /** * Initialized the Memcached client instance * * @return void */ protected function _initMemcached() { $this->_memcached = $this->_getMemcachedInstance(); foreach ($this->_options as $option => $value) { $this->_memcached->setOption($option, $value); } $this->_memcached->addServer($this->_host, $this->_port); $this->_memcached->setOption(Memcached::OPT_COMPRESSION, false); } // @codeCoverageIgnoreStart /** * Returns a new instance of Memcached. Abstracted for testing. * * @return Memcached */ protected function _getMemcachedInstance() { return new Memcached(); } // @codeCoverageIgnoreEnd }
?
?
生の EC_KestrelClient.php を表示する この要点は GitHub から提供されています。
プロデューサー
プロデューサーはとてもシンプルです。現在のテナント情報を含む標準構造にデータをフォーマットし、他のプロジェクトと衝突しないようにキューに名前空間を設定し、キューに追加するだけです。プロデューサーは次のようになります:
?
<?php /** * Interface for adding jobs to a queue server * * @author Bill Shupp <hostmaster@shupp.org> * @copyright 2010-2011 Empower Campaigns */ class EC_Producer { /** * Adds a job onto a queue * * @param string $queue The queue name to add a job to * @param string $jobName The job name for the consumer to run * @param mixed $data Optional additional data to pass to the job * * @return bool */ public function addJob($queue, $jobName, $data = null) { $item = array( 'instance' => EC::getCurrentInstanceName(), 'jobName' => $jobName ); if ($data !== null) { $item['data'] = $data; } // Namespace queue with project $queue = 'enterprise_' . $queue; $client = $this->_getKestrelClient(); return $client->set($queue, $item); } // @codeCoverageIgnoreStart /** * Gets a single instance of EC_KestrelClient. Abstracted for testing. * * @return void */ protected function _getKestrelClient() { if (APPLICATION_ENV === 'testing') { throw new Exception(__METHOD__ . ' was not mocked when testing'); } static $client = null; if ($client === null) { $host = EC::getConfigOption('kestrel.host'); $port = EC::getConfigOption('kestrel.port'); $client = new EC_KestrelClient($host, $port); } return $client; } // @codeCoverageIgnoreEnd }
?
?
?
生の EC_Producer.php を表示する この要点は GitHub によって提供されています。
消費者
消費者にはもう少し理解する必要がありますが、それでも非常に簡単です。これは、daemontools や Supervisord などの監視ツールから実行することを目的としているため、CLI 引数を EC_Consumer に渡して実行するだけの非常に小さな CLI ハーネスがあります。 CLI 引数を解析した後、EC_Consumer は新しいジョブを求めて kestrel をポーリングし、標準のバッチ ジョブ インフラストラクチャを通じてそれらを実行します。 PHP の長時間実行プロセス能力にさらに自信が持てるまで、オプションの maxium jobs 引数を追加しました。これにより、コンシューマが X 個を超えるジョブを処理するのを停止して終了します。監視サービス (supervisord) は数秒以内に再起動します。また、テスト用にオプションのデバッグ引数も追加したので、発生するすべてのアクションを確認できます。 CLI ハーネスは次のようになります:
?
#!/bin/env php <?php // External application bootstrapping require_once __DIR__ . '/cli_init.php'; // Instantiate and run the consumer $consumer = new EC_Consumer($argv); $consumer->run();
?
view raw consumer_cli.php This Gist brought to you by GitHub.
And the main consumer class, EC_Consumer, looks something like this:
<?php /** * Enterprise queue consumer interface, called by bin/consumer_cli.php * * @author Bill Shupp <hostmaster@shupp.org> * @copyright 2010-2011 Empower Campaigns */ class EC_Consumer { /** * Instance of {@link Zend_Console_Getopt} * * @var Zend_Console_Getopt */ protected $_opt = null; /** * Which APPLICATION_ENV to run under (see -e) * * @var string */ protected $_environment = null; /** * The kestrel server IP * * @var string */ protected $_host = null; /** * The kestrel server port * * @var int */ protected $_port = null; /** * The kestrel queue name to connect to * * @var string */ protected $_queue = null; /** * Whether we should show debug output * * @var bool */ protected $_debug = false; /** * Maximum # of jobs for this process to perform (for memory fail safe) * * @var int */ protected $_maxJobs = null; /** * Current job count * * @var int */ protected $_jobCount = 0; /** * Parses arguments from the command line and does error handling * * @param array $argv The $argv from bin/ecli.php * * @throw Zend_Console_Getopt_Exception on failure * @return void */ public function __construct(array $argv) { try { $opt = new Zend_Console_Getopt( array( 'environment|e=s' => 'environment name (e.g. development)' . ', required', 'server|s=s' => 'kestrel server, format of host:port' . ', required', 'queue|q=s' => 'queue name (e.g. crawler_campaign)' . ', required', 'max-jobs|m=s' => 'max jobs to run before exiting' . ', optional', 'debug|d' => 'show debug output' . ', optional', ) ); $opt->setArguments($argv); $opt->parse(); // Set environment if ($opt->e === null) { throw new Zend_Console_Getopt_Exception( 'Error: missing environment' ); } $this->_environment = $opt->e; // @codeCoverageIgnoreStart if (!defined('APPLICATION_ENV')) { define('APPLICATION_ENV', $this->_environment); } // @codeCoverageIgnoreEnd // Set server if ($opt->s === null) { throw new Zend_Console_Getopt_Exception( 'Error: missing server' ); } $parts = explode(':', $opt->s); if (count($parts) !== 2) { throw new Zend_Console_Getopt_Exception( 'Error: invalid server: ' . $opt->s ); } $this->_host = $parts[0]; $this->_port = $parts[1]; // Set queue if ($opt->q === null) { throw new Zend_Console_Getopt_Exception( 'Error: missing queue' ); } $this->_queue = $opt->q; // Set max-jobs if ($opt->m !== null) { $this->_maxJobs = $opt->m; } // Set debug if ($opt->d !== null) { $this->_debug = true; } } catch (Zend_Console_Getopt_Exception $e) { echo "\n" . $e->getMessage() . "\n\n"; echo $opt->getUsageMessage(); // @codeCoverageIgnoreStart if (!defined('APPLICATION_ENV') || APPLICATION_ENV !== 'testing') { exit(1); } // @codeCoverageIgnoreEnd } $this->_opt = $opt; } /** * Polls the queue server for jobs and runs them as they come in * * @return void */ public function run() { $client = $this->_getKestrelClient(); $queue = 'enterprise_' . $this->_queue; while ($this->_keepRunning()) { // Pull job from queue $job = $client->reliableRead($queue, 500); if ($job === false) { $this->_debug('Nothing on queue ' . $queue); continue; } if (!isset($job['instance'])) { echo 'Instance not set in queue job: ' . print_r($job, true); continue; } $instance = $job['instance']; if (!isset($job['jobName'])) { echo 'Job name not set in queue job: ' . print_r($job, true); continue; } $jobName = $job['jobName']; $data = null; if (isset($job['data'])) { $data = $job['data']; } // Run the job $returnCode = $this->runJob($instance, $jobName, $data); if ($returnCode !== 0) { $client->abortReliableRead($queue); continue; } } $client->closeReliableRead($queue); } /** * Runs the job via bin/ecli.php * * @param string $instance The instance name to run the job under * @param string $jobName The job name * @param string $data Optional extra data * * @return int */ public function runJob($instance, $jobName, $data) { $cmd = BASE_PATH . '/bin/ecli.php ' . '-e ' . $this->_environment . ' -i ' . $instance . ' -j ' . $jobName; if ($data) { $cmd .= " '" . base64_encode(json_encode($data)) . "'"; } $returnCode = $this->_passthru($cmd); $this->_jobCount++; $this->_debug('Job count: ' . $this->_jobCount); return $returnCode; } /** * Check to see if the job limit has been reached * * @return bool */ protected function _keepRunning() { return ($this->_maxJobs === null) ? true : ($this->_jobCount < $this->_maxJobs); } /** * Show debug messages * * @param mixed $message * * @return void */ protected function _debug($message) { if (!$this->_debug) { return; } echo $message . "\n"; } // @codeCoverageIgnoreStart /** * Calls the passthru() function and returns the exit code. Abstracted * for testing. * * @param string $cmd The command to execute * * @return int */ protected function _passthru($cmd) { passthru($cmd, $returnCode); return $returnCode; } /** * Gets a single instance of EC_KestrelClient. Abstracted for testing. * * @return void */ protected function _getKestrelClient() { if (APPLICATION_ENV === 'testing') { throw new Exception(__METHOD__ . ' was not mocked when testing'); } return new EC_KestrelClient($this->_host, $this->_port); } // @codeCoverageIgnoreEnd }
?
?
view raw EC_Consumer.php This Gist brought to you by GitHub.
Putting it together
Now that all the pieces are put together, let's take a look at in action. Adding example job "HelloWorld" to the queue "hello_world" from within our application looks something like this:
<?php $producer = new EC_Producer(); $producer->addJob('hello_world', 'HelloWorld', array('foo' => 'bar')); ?> view raw gistfile1.php This Gist brought to you by GitHub.
?
?
And finally, here's an example of running the consumer from the CLI harness, along with some example debug output of processing the job:
./bin/consumer_cli.php -e development -s 127.0.0.1:22133 -q hello_world -d -m 2
Nothing on queue enterprise_hello_world
Nothing on queue enterprise_hello_world
Nothing on queue enterprise_hello_world
Nothing on queue enterprise_hello_world
Running EC_Job_HelloWorld on instance dev under environment development
Hello, world! Here is my data array:
stdClass Object
(
??? [foo] => bar
)
And here are my args: ./bin/ecli.php eyJmb28iOiJiYXIifQ==
Completed job in 0 seconds.
Job count: 1
Nothing on queue enterprise_hello_world
Nothing on queue enterprise_hello_world
Nothing on queue enterprise_hello_world
Nothing on queue enterprise_hello_world
Running EC_Job_HelloWorld on instance dev under environment development
Hello, world! Here is my data array:
stdClass Object
(
??? [foo] => bar
)
And here are my args: ./bin/ecli.php eyJmb28iOiJiYXIifQ==
Completed job in 0 seconds.
Job count: 2
view raw example.txt This Gist brought to you by GitHub.
That's it! I'd be interested to hear how other folks are interfacing with kestrel from PHP.

ホットAIツール

Undresser.AI Undress
リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover
写真から衣服を削除するオンライン AI ツール。

Undress AI Tool
脱衣画像を無料で

Clothoff.io
AI衣類リムーバー

AI Hentai Generator
AIヘンタイを無料で生成します。

人気の記事

ホットツール

メモ帳++7.3.1
使いやすく無料のコードエディター

SublimeText3 中国語版
中国語版、とても使いやすい

ゼンドスタジオ 13.0.1
強力な PHP 統合開発環境

ドリームウィーバー CS6
ビジュアル Web 開発ツール

SublimeText3 Mac版
神レベルのコード編集ソフト(SublimeText3)

ホットトピック









アプリケーションが複雑になるにつれて、大量のデータとプロセスの処理と管理が課題になります。この状況に対処するために、Laravel はユーザーに Laravel Queue (キュー) という非常に強力なツールを提供します。これにより、開発者は、ユーザー インターフェイスに影響を与えることなく、電子メールの送信、PDF の生成、画像のトリミングの処理などのタスクをバックグラウンドで実行できます。この記事では、Laravel キューの使用方法について詳しく説明します。 LaravelQueueキューとは

C 言語における return の使い方は、 1. 戻り値の型が void の関数については、return 文を使用して関数の実行を早期に終了することができます; 2. 戻り値の型が void ではない関数については、 return ステートメントは、関数の実行を終了するためのものです。結果は呼び出し元に返されます。 3. 関数の実行を早期に終了します。関数内で return ステートメントを使用して、関数の実行を早期に終了することもできます。関数が値を返さない場合。

Machine Power Report 編集者: Wu Xin 国内版の人型ロボット + 大型模型チームは、衣服を折りたたむなどの複雑で柔軟な素材の操作タスクを初めて完了しました。 OpenAIのマルチモーダル大規模モデルを統合したFigure01の公開により、国内同業者の関連動向が注目を集めている。つい昨日、中国の「ヒューマノイドロボットのナンバーワン株」であるUBTECHは、Baidu Wenxinの大型モデルと深く統合されたヒューマノイドロボットWalkerSの最初のデモを公開し、いくつかの興味深い新機能を示した。 Baidu Wenxin の大規模モデル機能の恩恵を受けた WalkerS は次のようになります。 Figure01 と同様に、WalkerS は動き回るのではなく、机の後ろに立って一連のタスクを完了します。人間の命令に従って服をたたむことができる

ソースコード: publicclassReturnFinallyDemo{publicstaticvoidmain(String[]args){System.out.println(case1());}publicstaticintcase1(){intx;try{x=1;returnx;}finally{x=3;}}}#出力 上記のコードの出力は、単純に次のように結論付けることができます:finally の前に return が実行されます。バイトコード レベルで何が起こるかを見てみましょう。以下は、case1 メソッドのバイトコードの一部をインターセプトし、ソース コードを比較して、各命令の意味に注釈を付けます。

マルチスレッド環境における JavaQueue キューのセキュリティ問題と解決策 はじめに: マルチスレッド プログラミングでは、プログラム内の共有リソースが競合状態に直面し、データの不整合やエラーが発生する可能性があります。 Java では、キューは一般的に使用されるデータ構造ですが、複数のスレッドが同時にキューを操作すると、セキュリティ上の問題が発生します。この記事では、コード例の形式での説明を中心に、マルチスレッド環境における JavaQueue キューのセキュリティ問題について説明し、いくつかの解決策を紹介します。 1つ

Java でのキューの使用法 Java では、キュー (キュー) は、先入れ先出し (FIFO) 原則に従う一般的に使用されるデータ構造です。 Queue は、メッセージ キュー、タスク スケジューリング、その他のシナリオの実装に使用でき、データの配置と処理順序を適切に管理できます。この記事では、Queue の使用法を紹介し、具体的なコード例を示します。 Queue の定義と共通メソッドは Java であり、Queue は JavaCollectionsFramework のインターフェイスです。

Vue3.2 セットアップ構文シュガーは、単一ファイル コンポーネント (SFC) で結合された API を使用して、Vue3.0 の面倒なセットアップを解決するコンパイル時構文シュガーです。宣言された変数、関数、インポートによって導入されたコンテンツは、インポートによって公開されます。使用上の問題点 1. 宣言した変数、関数、import で導入した内容を使用中に return する必要はなく、糖衣構文を使用することができます。 // 導入した内容をインポート import{getToday }from'./utils'//variable constmsg='Hello !'//function func

JavaScript での return の使用には特定のコード例が必要です。 JavaScript では、return ステートメントを使用して関数から返される値を指定します。関数の実行を終了するために使用できるだけでなく、関数が呼び出された場所に値を返すこともできます。 return ステートメントには次の一般的な用途があります。 値を返す return ステートメントは、関数が呼び出された場所に値を返すために使用できます。簡単な例を次に示します。 functionadd(a,b){
