まずコードを見てみましょう:
# ~*~ Twisted - A Python tale ~*~ from time import sleep # Hello, I'm a developer and I mainly setup Wordpress. def install_wordpress(customer): # Our hosting company Threads Ltd. is bad. I start installation and... print "Start installation for", customer # ...then wait till the installation finishes successfully. It is # boring and I'm spending most of my time waiting while consuming # resources (memory and some CPU cycles). It's because the process # is *blocking*. sleep(3) print "All done for", customer # I do this all day long for our customers def developer_day(customers): for customer in customers: install_wordpress(customer) developer_day(["Bill", "Elon", "Steve", "Mark"])
実行すると、結果は次のようになります:
$ ./deferreds.py 1
------ Running example 1 ------ Start installation for Bill All done for Bill Start installation ... * Elapsed time: 12.03 seconds
これは、順次実行されるコードです。消費者が 4 人の場合、インストールには 1 人で 3 秒かかるため、4 人の場合は 12 秒かかります。これはあまり満足のいくものではないので、スレッドを使用した 2 番目の例を見てください:
import threading # The company grew. We now have many customers and I can't handle the # workload. We are now 5 developers doing exactly the same thing. def developers_day(customers): # But we now have to synchronize... a.k.a. bureaucracy lock = threading.Lock() # def dev_day(id): print "Goodmorning from developer", id # Yuck - I hate locks... lock.acquire() while customers: customer = customers.pop(0) lock.release() # My Python is less readable install_wordpress(customer) lock.acquire() lock.release() print "Bye from developer", id # We go to work in the morning devs = [threading.Thread(target=dev_day, args=(i,)) for i in range(5)] [dev.start() for dev in devs] # We leave for the evening [dev.join() for dev in devs] # We now get more done in the same time but our dev process got more # complex. As we grew we spend more time managing queues than doing dev # work. We even had occasional deadlocks when processes got extremely # complex. The fact is that we are still mostly pressing buttons and # waiting but now we also spend some time in meetings. developers_day(["Customer %d" % i for i in xrange(15)])
実行してください:
$ ./deferreds.py 2
------ Running example 2 ------ Goodmorning from developer 0Goodmorning from developer 1Start installation forGoodmorning from developer 2 Goodmorning from developer 3Customer 0 ... from developerCustomer 13 3Bye from developer 2 * Elapsed time: 9.02 seconds
今回は、5 つのワーカー スレッドを使用して並列実行されるコードです。 15 個のコンシューマーがそれぞれ 3 秒かかるとすると、合計 45 秒かかりますが、5 つのスレッドを使用して並列実行すると、合計で 9 秒しかかかりません。このコードは少し複雑で、コードの大部分は、アルゴリズムやビジネス ロジックに焦点を当てるのではなく、同時実行性の管理に使用されます。さらに、プログラムの出力は非常に複雑で読みにくいように見えます。単純なマルチスレッド コードでもうまく書くのは難しいため、Twisted を使用することにしました:
# For years we thought this was all there was... We kept hiring more # developers, more managers and buying servers. We were trying harder # optimising processes and fire-fighting while getting mediocre # performance in return. Till luckily one day our hosting # company decided to increase their fees and we decided to # switch to Twisted Ltd.! from twisted.internet import reactor from twisted.internet import defer from twisted.internet import task # Twisted has a slightly different approach def schedule_install(customer): # They are calling us back when a Wordpress installation completes. # They connected the caller recognition system with our CRM and # we know exactly what a call is about and what has to be done next. # # We now design processes of what has to happen on certain events. def schedule_install_wordpress(): def on_done(): print "Callback: Finished installation for", customer print "Scheduling: Installation for", customer return task.deferLater(reactor, 3, on_done) # def all_done(_): print "All done for", customer # # For each customer, we schedule these processes on the CRM # and that # is all our chief-Twisted developer has to do d = schedule_install_wordpress() d.addCallback(all_done) # return d # Yes, we don't need many developers anymore or any synchronization. # ~~ Super-powered Twisted developer ~~ def twisted_developer_day(customers): print "Goodmorning from Twisted developer" # # Here's what has to be done today work = [schedule_install(customer) for customer in customers] # Turn off the lights when done join = defer.DeferredList(work) join.addCallback(lambda _: reactor.stop()) # print "Bye from Twisted developer!" # Even his day is particularly short! twisted_developer_day(["Customer %d" % i for i in xrange(15)]) # Reactor, our secretary uses the CRM and follows-up on events! reactor.run()
実行結果:
------ Running example 3 ------ Goodmorning from Twisted developer Scheduling: Installation for Customer 0 .... Scheduling: Installation for Customer 14 Bye from Twisted developer! Callback: Finished installation for Customer 0 All done for Customer 0 Callback: Finished installation for Customer 1 All done for Customer 1 ... All done for Customer 14 * Elapsed time: 3.18 seconds
今回は、スレッドを使用せずに完全に実行されたコードと読み取り可能な出力が得られました。 15 個のコンシューマーを並行して処理しました。つまり、当初 45 秒かかった実行時間は 3 秒以内に完了しました。秘訣は、sleep() へのブロック呼び出しをすべて、Twisted の同等の task.deferLater() およびコールバック関数に置き換えることです。処理は別の場所で行われるため、15 人の消費者に同時に問題なくサービスを提供できます。
前述の処理操作は別の場所で行われます。ここで説明すると、算術演算は依然として CPU 内で発生しますが、CPU の処理速度はディスクやネットワークの演算に比べて非常に高速になっています。そのため、CPU へのデータの供給、または CPU からメモリまたは別の CPU へのデータの送信にほとんどの時間がかかります。この領域での時間を節約するために、非ブロッキング操作を使用します。たとえば、task.deferLater() は、データが転送されたときにアクティブ化されるコールバック関数を使用します。
もう 1 つの非常に重要な点は、出力内の Twisted 開発者からのおはようと、Twisted 開発者からのバイバイ情報です。これら 2 つの情報は、コードの実行開始時にすでに出力されています。コードがこの時点まで非常に早く実行される場合、アプリケーションはいつ実際に実行を開始するのでしょうか?答えは、Twisted アプリケーション (Scrapy を含む) の場合は、reactor.run() で実行されるということです。このメソッドを呼び出す前に、アプリケーションで使用できるすべての Deferred チェーンの準備ができている必要があります。その後、reactor.run() メソッドがコールバック関数を監視してアクティブ化します。
actor の主なルールの 1 つは、十分に高速でブロックされない限り、どのような操作も実行できることに注意してください。
さて、コードにはマルチスレッドの管理に使用される部分はありませんが、これらのコールバック関数はまだ少し乱雑に見えます。次のように変更できます:
# Twisted gave us utilities that make our code way more readable! @defer.inlineCallbacks def inline_install(customer): print "Scheduling: Installation for", customer yield task.deferLater(reactor, 3, lambda: None) print "Callback: Finished installation for", customer print "All done for", customer def twisted_developer_day(customers): ... same as previously but using inline_install() instead of schedule_install() twisted_developer_day(["Customer %d" % i for i in xrange(15)]) reactor.run()
実行結果は前の例と同じです。このコードの機能は前の例と同じですが、より簡潔で明確になっています。 inlineCallbacks ジェネレーターは、いくつかの Python メカニズムを使用して、inline_install() 関数の実行を一時停止または再開できます。 inline_install() 関数は Deferred オブジェクトになり、各コンシューマーに対して並列実行されます。イールドが発生するたびに、オペレーションは、イールドの Deferred オブジェクトが完了するまで現在の inline_install() インスタンスで一時停止され、その後再開されます。
今の唯一の疑問は、消費者が 15 人ではなく、たとえば 10,000 人だったらどうなるかということです。このコードは、10,000 の同時実行シーケンス (HTTP リクエスト、データベース書き込みなど) を開始します。これを行うことに問題はないかもしれませんが、さまざまな失敗を引き起こす可能性もあります。 Scrapy など、大量の同時リクエストを伴うアプリケーションでは、多くの場合、同時実行数を許容レベルに制限する必要があります。次の例では、task.Cooperator() を使用してこのような関数を完了します。 Scrapy は、同じメカニズムを使用して、アイテム パイプラインの同時実行数 (つまり、CONCURRENT_ITEMS 設定) を制限します。プログラムの実行中にコンシューマーを処理するためのスロットが 5 つあるようです。スロットが解放されない限り、次のコンシューマ要求の処理は開始されません。この例では、処理時間はすべて 3 秒なので、5 回に分けて処理されているように見えます。最終的なパフォーマンスはスレッドを使用した場合と同じですが、今回はスレッドが 1 つだけなので、コードがよりシンプルになり、正しいコードを書きやすくなります。
追記: deferToThread は同期関数をノンブロッキングにします
wisted defer.Deferred (twisted.internet import defer から) は遅延オブジェクトを返すことができます
注: deferToThread はスレッドを使用して実装されており、過度の使用は推奨されません
twisted の deferToThread (twisted.internet.threads import deferToThread から) も遅延オブジェクトを返しますが、コールバック関数は別のスレッドで処理され、主にデータベース/ファイルの読み取りに使用されます。フェッチ操作
@defer.inlineCallbacks def inline_install(customer): ... same as above # The new "problem" is that we have to manage all this concurrency to # avoid causing problems to others, but this is a nice problem to have. def twisted_developer_day(customers): print "Goodmorning from Twisted developer" work = (inline_install(customer) for customer in customers) # # We use the Cooperator mechanism to make the secretary not # service more than 5 customers simultaneously. coop = task.Cooperator() join = defer.DeferredList([coop.coiterate(work) for i in xrange(5)]) # join.addCallback(lambda _: reactor.stop()) print "Bye from Twisted developer!" twisted_developer_day(["Customer %d" % i for i in xrange(15)]) reactor.run() # We are now more lean than ever, our customers happy, our hosting # bills ridiculously low and our performance stellar. # ~*~ THE END ~*~
ここにある完全な例は参考用です
# -*- coding: utf-8 -*- from twisted.internet import defer, reactor from twisted.internet.threads import deferToThread import functools import time # 耗时操作 这是一个同步阻塞函数 def mySleep(timeout): time.sleep(timeout) # 返回值相当于加进了callback里 return 3 def say(result): print "耗时操作结束了, 并把它返回的结果给我了", result # 用functools.partial包装一下, 传递参数进去 cb = functools.partial(mySleep, 3) d = deferToThread(cb) d.addCallback(say) print "你还没有结束我就执行了, 哈哈" reactor.run()
更多使用Python的Twisted框架编写非阻塞程序的代码示例相关文章请关注PHP中文网!