共有リソースへの同期アクセス
スレッドを使用する場合、非常に重要な問題は、同じ変数または他のリソース上の複数のスレッド間のアクセス競合を回避することです。注意しないと、複数のスレッドでのアクセスや変更 (共有リソース) の重複などの操作により、さまざまな問題がさらに深刻になります。これらの問題は、通常、より極端な状況 (同時実行性が高い、実稼働サーバー、さらにはより優れたサーバーなど) でのみ発生します。ハードウェアを実行します)。
たとえば、イベントが処理された回数を追跡する必要がある状況があります
counter = 0 def process_item(item): global counter ... do something with item ... counter += 1
この関数を複数のスレッドで同時に呼び出すと、counter の値がそれほど正確ではないことがわかります。ほとんどの場合、それは正しいですが、場合によっては、実際よりも少し低い場合もあります。
この問題が発生する理由は、カウント増加操作が実際には 3 つのステップで実行されるためです。
次の状況を考えてみましょう。現在のスレッドがカウンタ値を取得した後、別のスレッドが CPU を捕捉し、さらにカウンタ値を取得し、さらにカウンタ値を再計算してライトバックを完了し、その後、タイム スライスが再度ローテーションされます。 (ここでは識別のみを目的としており、実際の電流ではありません) この時点で、現在のスレッドによって取得されたカウンタ値は、後続の 2 つのステップが完了した後でも、実際には 1 だけ増加します。
もう 1 つの一般的な状況は、不完全または一貫性のない状態にアクセスすることです。このタイプの状況は主に、あるスレッドがデータの初期化または更新を行っている間に、別のプロセスが変化するデータを読み取ろうとしているときに発生します。
アトミック操作
共有変数または他のリソースへの同期アクセスを実現する最も簡単な方法は、インタープリターのアトミック操作に依存することです。アトミック操作は 1 つのステップで実行される操作であり、その間、他のスレッドは共有リソースを取得できません。
通常、この同期方法は、文字列変数、数値、リスト、辞書などの単一のコア データ タイプのみで構成される共有リソースに対してのみ有効です。以下にスレッドセーフな操作をいくつか示します:
上で述べたように、変数またはプロパティを読み取って変更し、最後に書き戻すことはスレッドセーフではないことに注意してください。このスレッドが読み取りを完了する前に、別のスレッドがこの共有変数/属性を変更するためですが、変更または書き戻しは行われません。
ロック
ロックは、Python のスレッド モジュールによって提供される最も基本的な同期メカニズムです。いつでも、ロック オブジェクトは 1 つのスレッドによって取得されることも、どのスレッドによっても取得されないこともあります。あるスレッドが別のスレッドによって取得されたロック オブジェクトを取得しようとした場合、ロック オブジェクトを取得したいスレッドは、ロック オブジェクトが別のスレッドによって解放されるまで一時的に実行を終了することしかできません。
ロックは、共有リソースへの同期アクセスを実現するためによく使用されます。共有リソースごとに Lock オブジェクトを作成します。リソースにアクセスする必要がある場合は、acquire メソッドを呼び出してロック オブジェクトを取得します (他のスレッドがすでにロックを取得している場合、現在のスレッドはロックが解放されるまで待つ必要があります)。リソースにアクセスし、次に release メソッドを呼び出してロックを解放します。
lock = Lock() lock.acquire() #: will block if lock is already held ... access shared resource lock.release()
共有リソースへのアクセス中にエラーが発生した場合でも、ロックは解放される必要があることに注意してください。この目的を達成するには、try-finally を使用できます。
lock.acquire() try: ... access shared resource finally: lock.release() #: release lock, no matter what
Python 2.5 以降のバージョンでは、with ステートメントを使用できます。ロックを使用する場合、with ステートメントはステートメント ブロックに入る前にロック オブジェクトを自動的に取得し、ステートメント ブロックの実行後にロックを自動的に解放します。
from __future__ import with_statement #: 2.5 only with lock: ... access shared resource
if not lock.acquire(False): ... 锁资源失败 else: try: ... access shared resource finally: lock.release()
if not lock.locked(): #: 其它线程可能在下一条语句执行之前占有了该锁 lock.acquire() #: 可能会阻塞
単純ロックの欠点
標準のロック オブジェクトは、現在どのスレッドがロックを保持しているかを気にしません。ロックがすでに保持されている場合は、ロックを保持しているスレッドであっても、ロックを取得しようとする他のスレッドはブロックされます。次の例を考えてみましょう:
lock = threading.Lock() def get_first_part(): lock.acquire() try: ... 从共享对象中获取第一部分数据 finally: lock.release() return data def get_second_part(): lock.acquire() try: ... 从共享对象中获取第二部分数据 finally: lock.release() return data
示例中,我们有一个共享资源,有两个分别取这个共享资源第一部分和第二部分的函数。两个访问函数都使用了锁来确保在获取数据时没有其它线程修改对应的共享数据。
现在,如果我们想添加第三个函数来获取两个部分的数据,我们将会陷入泥潭。一个简单的方法是依次调用这两个函数,然后返回结合的结果:
def get_both_parts(): first = get_first_part() seconde = get_second_part() return first, second
这里的问题是,如有某个线程在两个函数调用之间修改了共享资源,那么我们最终会得到不一致的数据。最明显的解决方法是在这个函数中也使用lock:
def get_both_parts(): lock.acquire() try: first = get_first_part() seconde = get_second_part() finally: lock.release() return first, second
然而,这是不可行的。里面的两个访问函数将会阻塞,因为外层语句已经占有了该锁。为了解决这个问题,你可以通过使用标记在访问函数中让外层语句释放锁,但这样容易失去控制并导致出错。幸运的是,threading模块包含了一个更加实用的锁实现:re-entrant锁。
Re-Entrant Locks (RLock)
RLock类是简单锁的另一个版本,它的特点在于,同一个锁对象只有在被其它的线程占有时尝试获取才会发生阻塞;而简单锁在同一个线程中同时只能被占有一次。如果当前线程已经占有了某个RLock锁对象,那么当前线程仍能再次获取到该RLock锁对象。
lock = threading.Lock() lock.acquire() lock.acquire() #: 这里将会阻塞 lock = threading.RLock() lock.acquire() lock.acquire() #: 这里不会发生阻塞
RLock的主要作用是解决嵌套访问共享资源的问题,就像前面描述的示例。要想解决前面示例中的问题,我们只需要将Lock换为RLock对象,这样嵌套调用也会OK.
lock = threading.RLock() def get_first_part(): ... see above def get_second_part(): ... see above def get_both_parts(): ... see above
这样既可以单独访问两部分数据也可以一次访问两部分数据而不会被锁阻塞或者获得不一致的数据。
注意RLock会追踪递归层级,因此记得在acquire后进行release操作。
Semaphores
信号量是一个更高级的锁机制。信号量内部有一个计数器而不像锁对象内部有锁标识,而且只有当占用信号量的线程数超过信号量时线程才阻塞。这允许了多个线程可以同时访问相同的代码区。
semaphore = threading.BoundedSemaphore() semaphore.acquire() #: counter减小
... 访问共享资源
semaphore.release() #: counter增大
当信号量被获取的时候,计数器减小;当信号量被释放的时候,计数器增大。当获取信号量的时候,如果计数器值为0,则该进程将阻塞。当某一信号量被释放,counter值增加为1时,被阻塞的线程(如果有的话)中会有一个得以继续运行。
信号量通常被用来限制对容量有限的资源的访问,比如一个网络连接或者数据库服务器。在这类场景中,只需要将计数器初始化为最大值,信号量的实现将为你完成剩下的事情。
max_connections = 10 semaphore = threading.BoundedSemaphore(max_connections)
如果你不传任何初始化参数,计数器的值会被初始化为1.
Python的threading模块提供了两种信号量实现。Semaphore类提供了一个无限大小的信号量,你可以调用release任意次来增大计数器的值。为了避免错误出现,最好使用BoundedSemaphore类,这样当你调用release的次数大于acquire次数时程序会出错提醒。
线程同步
锁可以用在线程间的同步上。threading模块包含了一些用于线程间同步的类。
Events
一个事件是一个简单的同步对象,事件表示为一个内部标识(internal flag),线程等待这个标识被其它线程设定,或者自己设定、清除这个标识。
event = threading.Event() #: 一个客户端线程等待flag被设定 event.wait() #: 服务端线程设置或者清除flag event.set() event.clear()
一旦标识被设定,wait方法就不做任何处理(不会阻塞),当标识被清除时,wait将被阻塞直至其被重新设定。任意数量的线程可能会等待同一个事件。
Conditions
条件是事件对象的高级版本。条件表现为程序中的某种状态改变,线程可以等待给定条件或者条件发生的信号。
下面是一个简单的生产者/消费者实例。首先你需要创建一个条件对象:
#: 表示一个资源的附属项 condition = threading.Condition() 生产者线程在通知消费者线程有新生成资源之前需要获得条件: #: 生产者线程 ... 生产资源项 condition.acquire() ... 将资源项添加到资源中 condition.notify() #: 发出有可用资源的信号 condition.release() 消费者必须获取条件(以及相关联的锁),然后尝试从资源中获取资源项: #: 消费者线程 condition.acquire() while True: ...从资源中获取资源项 if item: break condition.wait() #: 休眠,直至有新的资源 condition.release() ... 处理资源
wait方法释放了锁,然后将当前线程阻塞,直到有其它线程调用了同一条件对象的notify或者notifyAll方法,然后又重新拿到锁。如果同时有多个线程在等待,那么notify方法只会唤醒其中的一个线程,而notifyAll则会唤醒全部线程。
为了避免在wait方法处阻塞,你可以传入一个超时参数,一个以秒为单位的浮点数。如果设置了超时参数,wait将会在指定时间返回,即使notify没被调用。一旦使用了超时,你必须检查资源来确定发生了什么。
注意,条件对象关联着一个锁,你必须在访问条件之前获取这个锁;同样的,你必须在完成对条件的访问时释放这个锁。在生产代码中,你应该使用try-finally或者with.
可以通过将锁对象作为条件构造函数的参数来让条件关联一个已经存在的锁,这可以实现多个条件公用一个资源:
lock = threading.RLock() condition_1 = threading.Condition(lock) condition_2 = threading.Condition(lock)
互斥锁同步
我们先来看一个例子:
#!/usr/bin/env python # -*- coding: utf-8 -*- import time, threading # 假定这是你的银行存款: balance = 0 muxlock = threading.Lock() def change_it(n): # 先存后取,结果应该为0: global balance balance = balance + n balance = balance - n def run_thread(n): # 循环次数一旦多起来,最后的数字就变成非0 for i in range(100000): change_it(n) t1 = threading.Thread(target=run_thread, args=(5,)) t2 = threading.Thread(target=run_thread, args=(8,)) t3 = threading.Thread(target=run_thread, args=(9,)) t1.start() t2.start() t3.start() t1.join() t2.join() t3.join() print balance
结果 :
[/data/web/test_python]$ python multhread_threading.py 0 [/data/web/test_python]$ python multhread_threading.py 61 [/data/web/test_python]$ python multhread_threading.py 0 [/data/web/test_python]$ python multhread_threading.py 24
上面的例子引出了多线程编程的最常见问题:数据共享。当多个线程都修改某一个共享数据的时候,需要进行同步控制。
线程同步能够保证多个线程安全访问竞争资源,最简单的同步机制是引入互斥锁。互斥锁为资源引入一个状态:锁定/非锁定。某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其他线程不能更改;直到该线程释放资源,将资源的状态变成“非锁定”,其他的线程才能再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。
threading模块中定义了Lock类,可以方便的处理锁定:
#创建锁mutex = threading.Lock() #锁定mutex.acquire([timeout]) #释放mutex.release()
其中,锁定方法acquire可以有一个超时时间的可选参数timeout。如果设定了timeout,则在超时后通过返回值可以判断是否得到了锁,从而可以进行一些其他的处理。
使用互斥锁实现上面的例子的代码如下:
balance = 0 muxlock = threading.Lock() def change_it(n): # 获取锁,确保只有一个线程操作这个数 muxlock.acquire() global balance balance = balance + n balance = balance - n # 释放锁,给其他被阻塞的线程继续操作 muxlock.release() def run_thread(n): for i in range(10000): change_it(n)
加锁后的结果,就能确保数据正确:
[/data/web/test_python]$ python multhread_threading.py 0 [/data/web/test_python]$ python multhread_threading.py 0 [/data/web/test_python]$ python multhread_threading.py 0 [/data/web/test_python]$ python multhread_threading.py 0