ホームページ バックエンド開発 Python チュートリアル Python での同時プログラミングの例

Python での同時プログラミングの例

Jun 16, 2016 am 08:43 AM
Python同時プログラミング

1. はじめに

実行中のプログラムをプロセスと呼びます。各プロセスには独自のシステム状態があり、これにはメモリの状態、開いているファイルのリスト、命令の実行を追跡するプログラム ポインタ、ローカル変数を保持するコール スタックが含まれます。通常、プロセスは、プロセスのメイン スレッドと呼ばれる単一の制御フロー シーケンスに従って順次実行されます。いかなる瞬間においても、プログラムは 1 つのことだけを実行します。

プログラムは、Python ライブラリ関数の os または subprocess モジュール (os.fork() や subprocess.Popen() など) を通じて新しいプロセスを作成できます。ただし、子プロセスと呼ばれるこれらのプロセスは独立して実行され、独自の独立したシステム状態とメイン スレッドを持ちます。プロセスは互いに独立しているため、元のプロセスと同時に実行されます。これは、元のプロセスが子プロセスの作成後に他の作業を実行できることを意味します。

プロセスは互いに独立していますが、プロセス間通信 (IPC) と呼ばれるメカニズムを通じて相互に通信できます。典型的なパターンは、単純に純粋なバイト バッファとして理解できるメッセージ パッシングに基づいており、send() または recv() オペレーション プリミティブは、パイプやネットワーク ソケットなどを介して渡すことができます。 O メッセージを送信または受信するチャネル。メモリ マッピング メカニズム (mmap モジュールなど) を通じて完了できる IPC モードもいくつかあり、メモリ マッピングを通じてプロセスはメモリ内に共有領域を作成でき、これらの領域への変更はすべてのプロセスに表示されます。

マルチプロセスは、タスクの異なる部分を異なるプロセスで担当し、複数のタスクを同時に実行する必要があるシナリオで使用できます。ただし、作業をタスクに分割する別の方法は、スレッドを使用することです。プロセスと同様に、スレッドにも独自の制御フローと実行スタックがありますが、スレッドはそれを作成したプロセス内で実行され、親プロセスのすべてのデータとシステム リソースを共有します。スレッドは、アプリケーションが同時タスクを完了する必要がある場合に便利ですが、大量のシステム状態をタスク間で共有する必要があるという潜在的な問題があります。

複数のプロセスまたはスレッドを使用する場合、オペレーティング システムがスケジューリングを担当します。これは、各プロセス (またはスレッド) に小さなタイム スライスを与え、すべてのアクティブなタスク間を迅速に循環させることによって実現されます。このプロセスは、CPU 時間を小さなフラグメントに分割し、各タスクに分散します。たとえば、システム上で 10 個のアクティブなプロセスが実行されている場合、オペレーティング システムは CPU 時間の 10 分の 1 を各プロセスに適切に割り当て、10 個のプロセス間を循環します。システムに複数の CPU コアがある場合、オペレーティング システムは、システムの負荷を均等に保ち、並列実行を実現するために、プロセスを異なる CPU コアにスケジュールできます。

同時実行メカニズムを使用して作成されたプログラムでは、いくつかの複雑な問題を考慮する必要があります。複雑さの主な原因は、データの同期と共有に関するものです。通常、複数のタスクが同じデータ構造を同時に更新しようとすると、ダーティ データや一貫性のないプログラム ステータスの問題 (正式にはリソース競合問題と呼ばれます) が発生します。この問題を解決するには、ミューテックスまたは他の同様の同期プリミティブを使用して、プログラムの重要な部分を識別して保護する必要があります。たとえば、複数の異なるスレッドが同時に同じファイルに書き込もうとしている場合、これらの書き込みを順番に実行するには、1 つのスレッドが書き込みを行っている間、他のスレッドは現在のスレッドが解放されるまで待機する必要があります。リソース。

Python での同時プログラミング

Python は、スレッド、サブプロセス、ジェネレーター関数を利用するその他の同時実行実装など、さまざまな同時プログラミング方法を長い間サポートしてきました。

Python は、ほとんどのシステムでメッセージ パッシングとスレッドベースの同時プログラミング メカニズムの両方をサポートしています。ほとんどのプログラマはスレッド インターフェイスに慣れていますが、Python のスレッド メカニズムには多くの制限があります。 Python は内部グローバル インタープリター ロック (GIL) を使用して、スレッドの安全性を確保します。GIL は一度に 1 つのスレッドのみを実行できます。これにより、マルチコア システムであっても、Python プログラムは単一のプロセッサ上でのみ実行できます。 Python コミュニティでは GIL について多くの議論が行われていますが、近い将来に GIL が削除される可能性はありません。

Python は、スレッドとプロセスに基づいて同時操作を管理するための非常に洗練されたツールをいくつか提供します。単純なプログラムでも、これらのツールを使用してタスクを同時に実行し、より高速に実行できます。サブプロセス モジュールは、サブプロセスの作成と通信のための API を提供します。これらの API は新しいプロセスの標準入出力チャネルを介したデータの受け渡しをサポートしているため、これはテキスト関連プログラムの実行に特に適しています。シグナル モジュールは、プロセス間でイベント情報を受け渡すために、UNIX システムのセマフォ メカニズムをユーザーに公開します。信号は非同期で処理され、通常、信号が到着するとプログラムの現在の作業が中断されます。シグナリング メカニズムにより、粗粒度のメッセージング システムが可能になりますが、より複雑なメッセージを配信できる、より信頼性の高いプロセス内通信テクノロジは他にもあります。スレッド モジュールは、同時操作のための一連の高レベルのオブジェクト指向 API を提供します。スレッド オブジェクトはプロセス内で同時に実行され、メモリ リソースを共有します。スレッドを使用すると、I/O 集中型のタスクをより適切にスケーリングできます。マルチプロセッシング モジュールはスレッディング モジュールに似ていますが、プロセスに対する操作を提供します。各プロセス クラスは実際のオペレーティング システム プロセスであり、共有メモリ リソースはありませんが、マルチプロセッシング モジュールはプロセス間でデータを共有し、メッセージを渡すためのメカニズムを提供します。通常、スレッドベースのプログラムをプロセスベースのプログラムに変更するのは非常に簡単で、一部の import ステートメントを変更するだけです。

スレッドモジュールの例

スレッディング モジュールを例として、セグメント化された並列処理を使用して大量の累算を完了する方法について考えてみましょう。

import threading
 
class SummingThread(threading.Thread):
  def __init__(self, low, high):
    super(SummingThread, self).__init__()
    self.low = low
    self.high = high
    self.total = 0
 
  def run(self):
    for i in range(self.low, self.high):
      self.total += i
 
thread1 = SummingThread(0, 500000)
thread2 = SummingThread(500000, 1000000)
thread1.start() # This actually causes the thread to run
thread2.start()
thread1.join() # This waits until the thread has completed
thread2.join()
# At this point, both threads have completed
result = thread1.total + thread2.total
print(result)

ログイン後にコピー

カスタマイズされた Threading クラス ライブラリ

私はスレッドで使いやすく、いくつかの便利なクラスと関数を含む小さな Python ライブラリを作成しました。

主要パラメータ:

* do_threaded_work – この関数は、指定された一連のタスクを対応する処理関数に割り当てます (割り当て順序は決定されません)

* ThreadedWorker – このクラスは、同期された作業キューから作業タスクを取得し、処理結果を同期された結果キューに書き込むスレッドを作成します

* start_logging_with_thread_info – すべてのログ メッセージにスレッド ID を書き込みます。 (ログ環境により異なります)

* stop_logging_with_thread_info – すべてのログ メッセージからスレッド ID を削除するために使用されます。 (ログ環境により異なります)

import threading
import logging
 
def do_threaded_work(work_items, work_func, num_threads=None, per_sync_timeout=1, preserve_result_ordering=True):
  """ Executes work_func on each work_item. Note: Execution order is not preserved, but output ordering is (optionally).
 
    Parameters:
    - num_threads        Default: len(work_items) --- Number of threads to use process items in work_items.
    - per_sync_timeout     Default: 1        --- Each synchronized operation can optionally timeout.
    - preserve_result_ordering Default: True       --- Reorders result_item to match original work_items ordering.
 
    Return: 
    --- list of results from applying work_func to each work_item. Order is optionally preserved.
 
    Example:
 
    def process_url(url):
      # TODO: Do some work with the url
      return url
 
    urls_to_process = ["http://url1.com", "http://url2.com", "http://site1.com", "http://site2.com"]
 
    # process urls in parallel
    result_items = do_threaded_work(urls_to_process, process_url)
 
    # print(results)
    print(repr(result_items))
  """
  global wrapped_work_func
  if not num_threads:
    num_threads = len(work_items)
 
  work_queue = Queue.Queue()
  result_queue = Queue.Queue()
 
  index = 0
  for work_item in work_items:
    if preserve_result_ordering:
      work_queue.put((index, work_item))
    else:
      work_queue.put(work_item)
    index += 1
 
  if preserve_result_ordering:
    wrapped_work_func = lambda work_item: (work_item[0], work_func(work_item[1]))
 
  start_logging_with_thread_info()
 
  #spawn a pool of threads, and pass them queue instance 
  for _ in range(num_threads):
    if preserve_result_ordering:
      t = ThreadedWorker(work_queue, result_queue, work_func=wrapped_work_func, queue_timeout=per_sync_timeout)
    else:
      t = ThreadedWorker(work_queue, result_queue, work_func=work_func, queue_timeout=per_sync_timeout)
    t.setDaemon(True)
    t.start()
 
  work_queue.join()
  stop_logging_with_thread_info()
 
  logging.info('work_queue joined')
 
  result_items = []
  while not result_queue.empty():
    result = result_queue.get(timeout=per_sync_timeout)
    logging.info('found result[:500]: ' + repr(result)[:500])
    if result:
      result_items.append(result)
 
  if preserve_result_ordering:
    result_items = [work_item for index, work_item in result_items]
 
  return result_items
 
class ThreadedWorker(threading.Thread):
  """ Generic Threaded Worker
    Input to work_func: item from work_queue
 
  Example usage:
 
  import Queue
 
  urls_to_process = ["http://url1.com", "http://url2.com", "http://site1.com", "http://site2.com"]
 
  work_queue = Queue.Queue()
  result_queue = Queue.Queue()
 
  def process_url(url):
    # TODO: Do some work with the url
    return url
 
  def main():
    # spawn a pool of threads, and pass them queue instance 
    for i in range(3):
      t = ThreadedWorker(work_queue, result_queue, work_func=process_url)
      t.setDaemon(True)
      t.start()
 
    # populate queue with data  
    for url in urls_to_process:
      work_queue.put(url)
 
    # wait on the queue until everything has been processed   
    work_queue.join()
 
    # print results
    print repr(result_queue)
 
  main()
  """
 
  def __init__(self, work_queue, result_queue, work_func, stop_when_work_queue_empty=True, queue_timeout=1):
    threading.Thread.__init__(self)
    self.work_queue = work_queue
    self.result_queue = result_queue
    self.work_func = work_func
    self.stop_when_work_queue_empty = stop_when_work_queue_empty
    self.queue_timeout = queue_timeout
 
  def should_continue_running(self):
    if self.stop_when_work_queue_empty:
      return not self.work_queue.empty()
    else:
      return True
 
  def run(self):
    while self.should_continue_running():
      try:
        # grabs item from work_queue
        work_item = self.work_queue.get(timeout=self.queue_timeout)
 
        # works on item
        work_result = self.work_func(work_item)
 
        #place work_result into result_queue
        self.result_queue.put(work_result, timeout=self.queue_timeout)
 
      except Queue.Empty:
        logging.warning('ThreadedWorker Queue was empty or Queue.get() timed out')
 
      except Queue.Full:
        logging.warning('ThreadedWorker Queue was full or Queue.put() timed out')
 
      except:
        logging.exception('Error in ThreadedWorker')
 
      finally:
        #signals to work_queue that item is done
        self.work_queue.task_done()
 
def start_logging_with_thread_info():
  try:
    formatter = logging.Formatter('[thread %(thread)-3s] %(message)s')
    logging.getLogger().handlers[0].setFormatter(formatter)
  except:
    logging.exception('Failed to start logging with thread info')
 
def stop_logging_with_thread_info():
  try:
    formatter = logging.Formatter('%(message)s')
    logging.getLogger().handlers[0].setFormatter(formatter)
  except:
    logging.exception('Failed to stop logging with thread info')

ログイン後にコピー

使用例

from test import ThreadedWorker
from queue import Queue
 
urls_to_process = ["http://facebook.com", "http://pypix.com"]
 
work_queue = Queue()
result_queue = Queue()
 
def process_url(url):
  # TODO: Do some work with the url
  return url
 
def main():
  # spawn a pool of threads, and pass them queue instance 
  for i in range(5):
    t = ThreadedWorker(work_queue, result_queue, work_func=process_url)
    t.setDaemon(True)
    t.start()
 
  # populate queue with data  
  for url in urls_to_process:
    work_queue.put(url)
 
  # wait on the queue until everything has been processed   
  work_queue.join()
 
  # print results
  print(repr(result_queue))
 
main()
ログイン後にコピー

このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。

ホットAIツール

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

写真から衣服を削除するオンライン AI ツール。

Undress AI Tool

Undress AI Tool

脱衣画像を無料で

Clothoff.io

Clothoff.io

AI衣類リムーバー

AI Hentai Generator

AI Hentai Generator

AIヘンタイを無料で生成します。

ホットツール

メモ帳++7.3.1

メモ帳++7.3.1

使いやすく無料のコードエディター

SublimeText3 中国語版

SublimeText3 中国語版

中国語版、とても使いやすい

ゼンドスタジオ 13.0.1

ゼンドスタジオ 13.0.1

強力な PHP 統合開発環境

ドリームウィーバー CS6

ドリームウィーバー CS6

ビジュアル Web 開発ツール

SublimeText3 Mac版

SublimeText3 Mac版

神レベルのコード編集ソフト(SublimeText3)

Pythonを使用してテキストファイルのZIPF配布を見つける方法 Pythonを使用してテキストファイルのZIPF配布を見つける方法 Mar 05, 2025 am 09:58 AM

このチュートリアルでは、Pythonを使用してZIPFの法則の統計的概念を処理する方法を示し、法律の処理時にPythonの読み取りおよび並べ替えの効率性を示します。 ZIPF分布という用語が何を意味するのか疑問に思うかもしれません。この用語を理解するには、まずZIPFの法律を定義する必要があります。心配しないでください、私は指示を簡素化しようとします。 ZIPFの法則 ZIPFの法則は単に意味します。大きな自然言語のコーパスでは、最も頻繁に発生する単語は、2番目の頻繁な単語のほぼ2倍の頻度で表示されます。 例を見てみましょう。アメリカ英語の茶色のコーパスを見ると、最も頻繁な言葉は「thであることに気付くでしょう。

HTMLを解析するために美しいスープを使用するにはどうすればよいですか? HTMLを解析するために美しいスープを使用するにはどうすればよいですか? Mar 10, 2025 pm 06:54 PM

この記事では、Pythonライブラリである美しいスープを使用してHTMLを解析する方法について説明します。 find()、find_all()、select()、およびget_text()などの一般的な方法は、データ抽出、多様なHTML構造とエラーの処理、および代替案(SEL

Pythonでの画像フィルタリング Pythonでの画像フィルタリング Mar 03, 2025 am 09:44 AM

ノイズの多い画像を扱うことは、特に携帯電話や低解像度のカメラの写真でよくある問題です。 このチュートリアルでは、OpenCVを使用してPythonの画像フィルタリング手法を調査して、この問題に取り組みます。 画像フィルタリング:強力なツール 画像フィルター

Pythonの並列および同時プログラミングの紹介 Pythonの並列および同時プログラミングの紹介 Mar 03, 2025 am 10:32 AM

データサイエンスと処理のお気に入りであるPythonは、高性能コンピューティングのための豊富なエコシステムを提供します。ただし、Pythonの並列プログラミングは、独自の課題を提示します。このチュートリアルでは、これらの課題を調査し、グローバルな承認に焦点を当てています

TensorflowまたはPytorchで深い学習を実行する方法は? TensorflowまたはPytorchで深い学習を実行する方法は? Mar 10, 2025 pm 06:52 PM

この記事では、深い学習のためにTensorflowとPytorchを比較しています。 関連する手順、データの準備、モデルの構築、トレーニング、評価、展開について詳しく説明しています。 特に計算グラップに関して、フレームワーク間の重要な違い

Pythonで独自のデータ構造を実装する方法 Pythonで独自のデータ構造を実装する方法 Mar 03, 2025 am 09:28 AM

このチュートリアルでは、Python 3にカスタムパイプラインデータ構造を作成し、機能を強化するためにクラスとオペレーターのオーバーロードを活用していることを示しています。 パイプラインの柔軟性は、一連の機能をデータセットに適用する能力にあります。

Pythonオブジェクトのシリアル化と脱介入:パート1 Pythonオブジェクトのシリアル化と脱介入:パート1 Mar 08, 2025 am 09:39 AM

Pythonオブジェクトのシリアル化と脱介入は、非自明のプログラムの重要な側面です。 Pythonファイルに何かを保存すると、構成ファイルを読み取る場合、またはHTTPリクエストに応答する場合、オブジェクトシリアル化と脱滑り化を行います。 ある意味では、シリアル化と脱派化は、世界で最も退屈なものです。これらすべての形式とプロトコルを気にするのは誰ですか? Pythonオブジェクトを維持またはストリーミングし、後で完全に取得したいと考えています。 これは、概念レベルで世界を見るのに最適な方法です。ただし、実用的なレベルでは、選択したシリアル化スキーム、形式、またはプロトコルは、プログラムの速度、セキュリティ、メンテナンスの自由、およびその他の側面を決定する場合があります。

Pythonの数学モジュール:統計 Pythonの数学モジュール:統計 Mar 09, 2025 am 11:40 AM

Pythonの統計モジュールは、強力なデータ統計分析機能を提供して、生物統計やビジネス分析などのデータの全体的な特性を迅速に理解できるようにします。データポイントを1つずつ見る代わりに、平均や分散などの統計を見て、無視される可能性のある元のデータの傾向と機能を発見し、大きなデータセットをより簡単かつ効果的に比較してください。 このチュートリアルでは、平均を計算し、データセットの分散の程度を測定する方法を説明します。特に明記しない限り、このモジュールのすべての関数は、単に平均を合計するのではなく、平均()関数の計算をサポートします。 浮動小数点数も使用できます。 ランダムをインポートします インポート統計 fractiから

See all articles