Python での同時プログラミングの例

WBOY
リリース: 2016-06-16 08:43:21
オリジナル
990 人が閲覧しました

1. はじめに

実行中のプログラムをプロセスと呼びます。各プロセスには独自のシステム状態があり、これにはメモリの状態、開いているファイルのリスト、命令の実行を追跡するプログラム ポインタ、ローカル変数を保持するコール スタックが含まれます。通常、プロセスは、プロセスのメイン スレッドと呼ばれる単一の制御フロー シーケンスに従って順次実行されます。いかなる瞬間においても、プログラムは 1 つのことだけを実行します。

プログラムは、Python ライブラリ関数の os または subprocess モジュール (os.fork() や subprocess.Popen() など) を通じて新しいプロセスを作成できます。ただし、子プロセスと呼ばれるこれらのプロセスは独立して実行され、独自の独立したシステム状態とメイン スレッドを持ちます。プロセスは互いに独立しているため、元のプロセスと同時に実行されます。これは、元のプロセスが子プロセスの作成後に他の作業を実行できることを意味します。

プロセスは互いに独立していますが、プロセス間通信 (IPC) と呼ばれるメカニズムを通じて相互に通信できます。典型的なパターンは、単純に純粋なバイト バッファとして理解できるメッセージ パッシングに基づいており、send() または recv() オペレーション プリミティブは、パイプやネットワーク ソケットなどを介して渡すことができます。 O メッセージを送信または受信するチャネル。メモリ マッピング メカニズム (mmap モジュールなど) を通じて完了できる IPC モードもいくつかあり、メモリ マッピングを通じてプロセスはメモリ内に共有領域を作成でき、これらの領域への変更はすべてのプロセスに表示されます。

マルチプロセスは、タスクの異なる部分を異なるプロセスで担当し、複数のタスクを同時に実行する必要があるシナリオで使用できます。ただし、作業をタスクに分割する別の方法は、スレッドを使用することです。プロセスと同様に、スレッドにも独自の制御フローと実行スタックがありますが、スレッドはそれを作成したプロセス内で実行され、親プロセスのすべてのデータとシステム リソースを共有します。スレッドは、アプリケーションが同時タスクを完了する必要がある場合に便利ですが、大量のシステム状態をタスク間で共有する必要があるという潜在的な問題があります。

複数のプロセスまたはスレッドを使用する場合、オペレーティング システムがスケジューリングを担当します。これは、各プロセス (またはスレッド) に小さなタイム スライスを与え、すべてのアクティブなタスク間を迅速に循環させることによって実現されます。このプロセスは、CPU 時間を小さなフラグメントに分割し、各タスクに分散します。たとえば、システム上で 10 個のアクティブなプロセスが実行されている場合、オペレーティング システムは CPU 時間の 10 分の 1 を各プロセスに適切に割り当て、10 個のプロセス間を循環します。システムに複数の CPU コアがある場合、オペレーティング システムは、システムの負荷を均等に保ち、並列実行を実現するために、プロセスを異なる CPU コアにスケジュールできます。

同時実行メカニズムを使用して作成されたプログラムでは、いくつかの複雑な問題を考慮する必要があります。複雑さの主な原因は、データの同期と共有に関するものです。通常、複数のタスクが同じデータ構造を同時に更新しようとすると、ダーティ データや一貫性のないプログラム ステータスの問題 (正式にはリソース競合問題と呼ばれます) が発生します。この問題を解決するには、ミューテックスまたは他の同様の同期プリミティブを使用して、プログラムの重要な部分を識別して保護する必要があります。たとえば、複数の異なるスレッドが同時に同じファイルに書き込もうとしている場合、これらの書き込みを順番に実行するには、1 つのスレッドが書き込みを行っている間、他のスレッドは現在のスレッドが解放されるまで待機する必要があります。リソース。

Python での同時プログラミング

Python は、スレッド、サブプロセス、ジェネレーター関数を利用するその他の同時実行実装など、さまざまな同時プログラミング方法を長い間サポートしてきました。

Python は、ほとんどのシステムでメッセージ パッシングとスレッドベースの同時プログラミング メカニズムの両方をサポートしています。ほとんどのプログラマはスレッド インターフェイスに慣れていますが、Python のスレッド メカニズムには多くの制限があります。 Python は内部グローバル インタープリター ロック (GIL) を使用して、スレッドの安全性を確保します。GIL は一度に 1 つのスレッドのみを実行できます。これにより、マルチコア システムであっても、Python プログラムは単一のプロセッサ上でのみ実行できます。 Python コミュニティでは GIL について多くの議論が行われていますが、近い将来に GIL が削除される可能性はありません。

Python は、スレッドとプロセスに基づいて同時操作を管理するための非常に洗練されたツールをいくつか提供します。単純なプログラムでも、これらのツールを使用してタスクを同時に実行し、より高速に実行できます。サブプロセス モジュールは、サブプロセスの作成と通信のための API を提供します。これらの API は新しいプロセスの標準入出力チャネルを介したデータの受け渡しをサポートしているため、これはテキスト関連プログラムの実行に特に適しています。シグナル モジュールは、プロセス間でイベント情報を受け渡すために、UNIX システムのセマフォ メカニズムをユーザーに公開します。信号は非同期で処理され、通常、信号が到着するとプログラムの現在の作業が中断されます。シグナリング メカニズムにより、粗粒度のメッセージング システムが可能になりますが、より複雑なメッセージを配信できる、より信頼性の高いプロセス内通信テクノロジは他にもあります。スレッド モジュールは、同時操作のための一連の高レベルのオブジェクト指向 API を提供します。スレッド オブジェクトはプロセス内で同時に実行され、メモリ リソースを共有します。スレッドを使用すると、I/O 集中型のタスクをより適切にスケーリングできます。マルチプロセッシング モジュールはスレッディング モジュールに似ていますが、プロセスに対する操作を提供します。各プロセス クラスは実際のオペレーティング システム プロセスであり、共有メモリ リソースはありませんが、マルチプロセッシング モジュールはプロセス間でデータを共有し、メッセージを渡すためのメカニズムを提供します。通常、スレッドベースのプログラムをプロセスベースのプログラムに変更するのは非常に簡単で、一部の import ステートメントを変更するだけです。

スレッドモジュールの例

スレッディング モジュールを例として、セグメント化された並列処理を使用して大量の累算を完了する方法について考えてみましょう。

import threading
 
class SummingThread(threading.Thread):
  def __init__(self, low, high):
    super(SummingThread, self).__init__()
    self.low = low
    self.high = high
    self.total = 0
 
  def run(self):
    for i in range(self.low, self.high):
      self.total += i
 
thread1 = SummingThread(0, 500000)
thread2 = SummingThread(500000, 1000000)
thread1.start() # This actually causes the thread to run
thread2.start()
thread1.join() # This waits until the thread has completed
thread2.join()
# At this point, both threads have completed
result = thread1.total + thread2.total
print(result)

ログイン後にコピー

カスタマイズされた Threading クラス ライブラリ

私はスレッドで使いやすく、いくつかの便利なクラスと関数を含む小さな Python ライブラリを作成しました。

主要パラメータ:

* do_threaded_work – この関数は、指定された一連のタスクを対応する処理関数に割り当てます (割り当て順序は決定されません)

* ThreadedWorker – このクラスは、同期された作業キューから作業タスクを取得し、処理結果を同期された結果キューに書き込むスレッドを作成します

* start_logging_with_thread_info – すべてのログ メッセージにスレッド ID を書き込みます。 (ログ環境により異なります)

* stop_logging_with_thread_info – すべてのログ メッセージからスレッド ID を削除するために使用されます。 (ログ環境により異なります)

import threading
import logging
 
def do_threaded_work(work_items, work_func, num_threads=None, per_sync_timeout=1, preserve_result_ordering=True):
  """ Executes work_func on each work_item. Note: Execution order is not preserved, but output ordering is (optionally).
 
    Parameters:
    - num_threads        Default: len(work_items) --- Number of threads to use process items in work_items.
    - per_sync_timeout     Default: 1        --- Each synchronized operation can optionally timeout.
    - preserve_result_ordering Default: True       --- Reorders result_item to match original work_items ordering.
 
    Return: 
    --- list of results from applying work_func to each work_item. Order is optionally preserved.
 
    Example:
 
    def process_url(url):
      # TODO: Do some work with the url
      return url
 
    urls_to_process = ["http://url1.com", "http://url2.com", "http://site1.com", "http://site2.com"]
 
    # process urls in parallel
    result_items = do_threaded_work(urls_to_process, process_url)
 
    # print(results)
    print(repr(result_items))
  """
  global wrapped_work_func
  if not num_threads:
    num_threads = len(work_items)
 
  work_queue = Queue.Queue()
  result_queue = Queue.Queue()
 
  index = 0
  for work_item in work_items:
    if preserve_result_ordering:
      work_queue.put((index, work_item))
    else:
      work_queue.put(work_item)
    index += 1
 
  if preserve_result_ordering:
    wrapped_work_func = lambda work_item: (work_item[0], work_func(work_item[1]))
 
  start_logging_with_thread_info()
 
  #spawn a pool of threads, and pass them queue instance 
  for _ in range(num_threads):
    if preserve_result_ordering:
      t = ThreadedWorker(work_queue, result_queue, work_func=wrapped_work_func, queue_timeout=per_sync_timeout)
    else:
      t = ThreadedWorker(work_queue, result_queue, work_func=work_func, queue_timeout=per_sync_timeout)
    t.setDaemon(True)
    t.start()
 
  work_queue.join()
  stop_logging_with_thread_info()
 
  logging.info('work_queue joined')
 
  result_items = []
  while not result_queue.empty():
    result = result_queue.get(timeout=per_sync_timeout)
    logging.info('found result[:500]: ' + repr(result)[:500])
    if result:
      result_items.append(result)
 
  if preserve_result_ordering:
    result_items = [work_item for index, work_item in result_items]
 
  return result_items
 
class ThreadedWorker(threading.Thread):
  """ Generic Threaded Worker
    Input to work_func: item from work_queue
 
  Example usage:
 
  import Queue
 
  urls_to_process = ["http://url1.com", "http://url2.com", "http://site1.com", "http://site2.com"]
 
  work_queue = Queue.Queue()
  result_queue = Queue.Queue()
 
  def process_url(url):
    # TODO: Do some work with the url
    return url
 
  def main():
    # spawn a pool of threads, and pass them queue instance 
    for i in range(3):
      t = ThreadedWorker(work_queue, result_queue, work_func=process_url)
      t.setDaemon(True)
      t.start()
 
    # populate queue with data  
    for url in urls_to_process:
      work_queue.put(url)
 
    # wait on the queue until everything has been processed   
    work_queue.join()
 
    # print results
    print repr(result_queue)
 
  main()
  """
 
  def __init__(self, work_queue, result_queue, work_func, stop_when_work_queue_empty=True, queue_timeout=1):
    threading.Thread.__init__(self)
    self.work_queue = work_queue
    self.result_queue = result_queue
    self.work_func = work_func
    self.stop_when_work_queue_empty = stop_when_work_queue_empty
    self.queue_timeout = queue_timeout
 
  def should_continue_running(self):
    if self.stop_when_work_queue_empty:
      return not self.work_queue.empty()
    else:
      return True
 
  def run(self):
    while self.should_continue_running():
      try:
        # grabs item from work_queue
        work_item = self.work_queue.get(timeout=self.queue_timeout)
 
        # works on item
        work_result = self.work_func(work_item)
 
        #place work_result into result_queue
        self.result_queue.put(work_result, timeout=self.queue_timeout)
 
      except Queue.Empty:
        logging.warning('ThreadedWorker Queue was empty or Queue.get() timed out')
 
      except Queue.Full:
        logging.warning('ThreadedWorker Queue was full or Queue.put() timed out')
 
      except:
        logging.exception('Error in ThreadedWorker')
 
      finally:
        #signals to work_queue that item is done
        self.work_queue.task_done()
 
def start_logging_with_thread_info():
  try:
    formatter = logging.Formatter('[thread %(thread)-3s] %(message)s')
    logging.getLogger().handlers[0].setFormatter(formatter)
  except:
    logging.exception('Failed to start logging with thread info')
 
def stop_logging_with_thread_info():
  try:
    formatter = logging.Formatter('%(message)s')
    logging.getLogger().handlers[0].setFormatter(formatter)
  except:
    logging.exception('Failed to stop logging with thread info')

ログイン後にコピー

使用例

from test import ThreadedWorker
from queue import Queue
 
urls_to_process = ["http://facebook.com", "http://pypix.com"]
 
work_queue = Queue()
result_queue = Queue()
 
def process_url(url):
  # TODO: Do some work with the url
  return url
 
def main():
  # spawn a pool of threads, and pass them queue instance 
  for i in range(5):
    t = ThreadedWorker(work_queue, result_queue, work_func=process_url)
    t.setDaemon(True)
    t.start()
 
  # populate queue with data  
  for url in urls_to_process:
    work_queue.put(url)
 
  # wait on the queue until everything has been processed   
  work_queue.join()
 
  # print results
  print(repr(result_queue))
 
main()
ログイン後にコピー

ソース:php.cn
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
最新の問題
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート
私たちについて 免責事項 Sitemap
PHP中国語ウェブサイト:福祉オンライン PHP トレーニング,PHP 学習者の迅速な成長を支援します!