Pythonの分散タスクhueyが非同期タスクを実装する方法についての説明
この記事では、Pythonの分散タスクhueyが非同期タスクを実装できるようにするためのPythonの軽量タスクキュープログラムを共有しますので、興味のある方は読んでみてください。
軽量のタスクキュー。その関数と関連ブローカーは軽量であることに重点を置いており、コードは比較的読みやすいです。
huey の紹介: (セロリより軽く、mrq や rq より使いやすい!)
軽量の代替品です。
Pythonで書かれています
Redis (または独自のバックエンドをロールする) を除き、stdlib の外部に deps はありません
ジャンゴのサポート
サポート:
マルチスレッドタスクの実行
指定された時間にスケジュールされた実行
crontab のような定期的な実行
失敗したタスクを再試行する
タスク結果ストレージ
インストール:
コードは次のとおりです |
|
インストール中
huey は pip を使用して非常に簡単にインストールできます。
pip インストール ヒューイ
huey には標準ライブラリ以外の依存関係はありませんが、現在同梱されている唯一の完全に実装されたキュー バックエンドには redis が必要です。redis バックエンドを使用するには、Python クライアントをインストールする必要があります。
pip インストール redis
git の使用
最新のものを実行したい場合は、github からリポジトリを自由にプルダウンして手動でインストールしてください。
git clone https://github.com/coleifer/huey.git
CDヒューイ
Python setup.py インストール
テストランナーを使用してテストを実行できます:
Python setup.py テスト |
hueyのAPIについては以下に詳しい紹介とパラメータの紹介があります。
コードは次のとおりです |
|
hueyインポートRedisHuey、crontabから
huey = RedisHuey('my-app', host='redis.myapp.com')
@huey.task()
def add_numbers(a, b):
a + bを返します
@huey.periodic_task(crontab(分='0', 時='3'))
def nightly_backup():
sync_all_data() |
juey がワーカーの場合、いくつかの cli パラメータ。
一般的に使用されるものは次のとおりです:
-l ログファイルの実行について。
-WORKERS数、-wの値が大きいのでタスクを増やす能力でしょう
-p --periodic huey ワーカーを開始すると、tasks.py から crontab を必要とするタスクが検出され、これらのタスクを処理するためにいくつかのスレッドが送信されます。
-n は、crontab での定期的な実行を開始しません。毎週のタスクは、トリガーした場合にのみ実行されます。
--threads それが何を意味するかはわかります。
1
コードは次のとおりです |
|
#原文:
次の表に、消費者が利用できるオプションとそのデフォルト値を示します。
-l、--logfile
ログ記録に使用されるファイルへのパス。デフォルトでは、Huey は最大 3 つのバックアップを含むローテーション ファイル ハンドラー (huey.logger) を使用します。ログレベルは情報です
-v、--verbose
詳細ログ (DEBUG レベルに相当)。ログファイルが指定されておらず、詳細が設定されている場合、コンシューマはテスト/デバッグに非常に役立ちます。
-q、--静か
コンシューマーのデフォルトのログレベルはエラーのみをログに記録します。
-w、--労働者
ワーカー スレッドの数。デフォルトは 1 スレッドですが、多くの I/O バインド タスクがあるアプリケーションの場合、この数を増やすとスループットが向上する可能性があります。
-p、--定期
このコンシューマ プロセスが「定期的な」タスクをキューに入れる専用のスレッドを開始することを示します (crontab のような機能)。これはデフォルトで True なので、実際には指定する必要はありません。
-n、--周期なし
このコンシューマ プロセスが定期的なタスクをキューに入れるべきではないことを示します。
-d、--遅延
「ポーリング」タイプのキュー バックエンドを使用する場合、バックエンドのポーリング間の待機時間はデフォルトで 0.1 秒です。
-m、--最大遅延
加重バックオフを使用する場合、ポーリング間の最大待機時間は 10 秒です。
-b、--バックオフ
結果をポーリングするときにバックオフする量は 1.15 より大きい必要があります。
-u、--utc
コンシューマーがすべてのタスク、crontab、およびスケジュールに UTC 時間を使用する必要があることを示します。デフォルトは True なので、実際にはこのオプションを指定する必要はありません。
--現地時間
コンシューマーがすべてのタスク、crontab、およびスケジュールにローカルタイムを使用する必要があることを示します。デフォルトは False です。
例
8 つのスレッド、エラーのみのログファイル、および非常に短いポーリング間隔でコンシューマーを実行します:
huey_consumer.py my.app.huey -l /var/log/app.huey.log -w 8 -b 1.1 -m 1.0
|
タスクキュー huey は Redis に依存してキュータスクストレージを実装するため、事前に redis-server と redis-py をインストールする必要があります。 インストール方法については説明しませんので、ご自身で検索してください。
まず、huey リンク インスタンスを作成しましょう:
コードは次のとおりです
| #config.py
ヒューイからヒューイをインポート |
huey.backends.redis_backend から RedisBlockingQueue をインポート
queue = RedisBlockingQueue('test-queue', host='localhost', port=6379)
ヒューイ = ヒューイ(キュー)
次に、タスク、つまり、celey、rq、mrq のように、タスク キュー サークルに誰を含めるかについてです。これらはすべて、tasks.py で表されます。
コードは次のとおりです |
|
from config import huey # config.pyでインスタンス化したhueyをインポートします
@huey.task()
def count_beans(num):
Print '-- カウントした %s Bean --' % num |
実際に実行されるもう 1 つの例を次に示します。 main.py はプロデューサーに相当し、tasks.py はコンシューマ関係に相当します。 main.py はデータのフィードを担当します。
コードは次のとおりです |
|
main.py
from config import huey # 「huey」オブジェクトをインポートします
from task import count_beans # タスクをインポートします
if __name__ == '__main__':
豆 = raw_input('豆は何個?')
Count_beans(int(beans))
'%s Bean をカウントするためにキューに入れられたジョブ' % Bean を印刷します
Redis がローカルで実行されていることを確認してください
huey がインストールされていることを確認してください
コンシューマーを起動します: huey_consumer.py main.huey (これは「config.huey」ではなく「main.huey」であることに注意してください)。
メインプログラムを実行します: python main.py
|
celery や rq と同様に、その結果を取得するには、config.py またはメインコードでその保存方法を指定する必要があります。現時点では、huey は redis のみをサポートしていますが、その特性とボリュームを比較すると、これで十分です。
ほんの数文で、RedisDataStore ライブラリをインポートし、ストレージ アドレスを宣言します。
コードは次のとおりです
|
|
ヒューイインポートヒューイより
huey.backends.redis_backend から RedisBlockingQueue をインポート
from huey.backends.redis_backend import RedisDataStore # この行を追加します
queue = RedisBlockingQueue('test-queue', host='localhost', port=6379)
result_store = RedisDataStore('results', host='localhost', port=6379) # 追加しました
huey = Huey(queue, result_store=result_store) # 結果ストアを追加しました
|
このとき、再度 ipython で試してみると、tasks.py で戻り値が取得できることが分かります。
コードは次のとおりです
|
|
>>> メインインポート count_beans から
>>> res = count_beans(100)
>>> レス # 「レス」とは何ですか?
>>> res.get() # このタスクの結果を取得します
「豆を100個数えました」
|
huey は、celey の遅延実行と crontab 機能もサポートしています。これらの機能は非常に重要であり、優先順位をカスタマイズしたり、Linux 独自の crontab に依存する必要はありません。
使い方は非常に簡単で、huey のソースコードを確認した後、デフォルトですぐに実行されます。もちろん、すべてのスレッドが実行保留状態にあるかどうかによって異なります。
コードは次のとおりです |
|
>>> 日時をインポートします
>>> res = count_beans.schedule(args=(100,), 遅延=60)
>>>
>>> res.get() # これは None を返します、データは準備されていません
>>> res.get() # まだデータがありません...
>>> res.get(blocking=True) # 準備ができるまでブロックしましょう
「豆を100個数えました」
|
ここでもう一つリトライについて紹介します。ヒューイにもリトライがあります。これは非常に実用的なものです。 上記の記事でセロリの再試行メカニズムについて説明したのを見た人なら、huey が何であるかについても理解できるはずです。はい、彼は実際にタスク内の特定の関数の前にデコレータも作成しました。デコレータには func try 例外再試行ロジックがあります。 誰もが理解しています。
コードは次のとおりです
|
|
#tasks.py
from datetime import datetime
構成インポートヒューイから
@huey.task(リトライ数=3、リトライ遅延=10)
def try_thrice():
'trying....%s' を印刷します % datetime.now()
例外を発生させる('いいえ')
|
ヒューイはあなたに後悔する機会を与えます~ つまり、ディリーの計画されたタスクを完了した後、キャンセルしたい場合は、単に取り消すことができます。
コードは次のとおりです
|
|
#豆を数える
res = count_beans(10000000)
res.revoke()
将来スケジュールされているタスクにも同じことが当てはまります:
res = count_beans.schedule(args=(100000,), eta=将来)
res.revoke()
@huey.task(crontab(分='*'))
def print_time():
datetime.now() を印刷します
|
task() - 関数をより美しくする透明なデコレータです。
periodic_task() - これは定期的なタスクです
crontab() - ワーカーの起動時にアタッチされる crontab 定期タスク。
BaseQueue - タスクキュー
BaseDataStore - タスクの実行後、結果をそこに詰め込むことができます。 BaseDataStoreは自分で書き換えることができます。
公式の huey git ライブラリは、関連するテスト コードを提供します:
main.py
コードは次のとおりです
|
|
構成インポートヒューイから
タスクから count_beans をインポート
if __name__ == '__main__':
豆 = raw_input('豆は何個?')
Count_beans(int(beans))
Print('%s Bean をカウントするためにキューに入れられたジョブ' % Bean)
|
タスク.py
代码如下 |
|
ランダムにインポート
輸入時間
ヒューイから crontab をインポート
構成インポートヒューイから
@huey.task()
def count_beans(num):
「開始...」を印刷します
print('-- %s Bean を数えました --' % num)
タイム.スリープ(3)
「終了...」を印刷します
return 'カウントされた %s Bean' % num
@huey.periodic_task(crontab(分='*/5'))
def Every_five_mins():
print('消費者はこれを 5 分ごとに印刷します')
@huey.task(リトライ数=3、リトライ遅延=10)
def try_thrice():
if randint(1, 3) == 1:
印刷('OK')
その他:
print('失敗しそうなので、10秒後に再試行します')
例外を発生させます(「何か問題が発生しました」)
@huey.task()
デフォルト遅い(n):
time.sleep(n)
print('スリープ %s' %n) |
run.sh
代码如下 |
|
#!/bin/bash
「ヒューイ消費者」をエコー
エコー「---------------」
echo "別のターミナルで、「python main.py」を実行します"
echo "Ctrl+C を使用してコンシューマを停止します"
PythonPATH=.:$PYTHONPATH
python ../../huey/bin/huey_consumer.py main.huey --threads=2
=> |
ここに例が示されており、django がサポートされていることがわかりますが、これは重要ではありません。
代码如下
|
|
[xiaorui@devops /tmp ]$ git clone https://github.com/coleifer/huey.git
「huey」にクローンを作成しています...
リモート: オブジェクトの数: 1423、完了しました。
リモート: オブジェクトの圧縮: 100% (9/9)、完了しました。
オブジェクトの受信: 34% (497/1423)、388.00 KiB | 29.00 KiB/秒 KiB/秒
オブジェクトの受信: 34% (498/1423)、628.00 KiB | 22.00 KiB/秒
リモート: 合計 1423 (デルタ 0)、再利用 0 (デルタ 0)
オブジェクトの受信: 100% (1423/1423)、2.24 MiB | 29.00 KiB/s、完了
デルタの解決: 100% (729/729)、完了しました。
接続を確認しています...完了しました。
[xiaorui@devops /tmp ]$cd huey/examples/simple
[xiaorui@devops シンプル(マスター)]$ ll
合計40個
-rw-r--r-- 1 xiaorui ホイール 79B 9 8 08:49 README
-rw-r--r-- 1 xiaorui ホイール 0B 9 8 08:49 __init__.py
-rw-r--r-- 1 xiaorui ホイール 56B 9 8 08:49 config.py
-rwxr-xr-x 1 xiaorui ホイール 227B 9 8 08:49 cons.sh
-rw-r--r-- 1 xiaorui ホイール 205B 9 8 08:49 main.py
-rw-r--r-- 1 xiaorui ホイール 607B 9 8 08:49 task.py
[xiaorui@devops シンプル (マスター)]$
|
http://www.bkjia.com/PHPjc/968078.htmlwww.bkjia.comtruehttp://www.bkjia.com/PHPjc/968078.html技術記事 Python の分散タスク huey はどのようにして非同期タスクを実装するのでしょうか? この記事では、Python の分散タスク huey が非同期タスクを実装できるようにする Python の軽量タスク キュー プログラムを共有します...
|