기본적으로 Python은 선형 언어이지만 더 많은 처리 능력이 필요할 때 스레딩 모듈이 유용합니다. Python의 스레드는 병렬 CPU 컴퓨팅에 사용할 수 없지만 프로세서가 유휴 상태에서 데이터를 기다리고 있기 때문에 웹 스크래핑과 같은 I/O 작업에 매우 적합합니다.
많은 네트워크/데이터 I/O 관련 스크립트가 원격 소스의 데이터를 기다리는 데 대부분의 시간을 소비하므로 스레드가 게임의 판도를 바꾸고 있습니다. 다운로드가 연결 해제될 수 있으므로(예: 별도의 웹사이트 크롤링) 프로세서는 여러 데이터 소스에서 병렬로 다운로드하고 마지막에 결과를 병합할 수 있습니다. CPU 집약적인 프로세스의 경우 스레드 모듈을 사용해도 이점이 거의 없습니다.
다행히 스레딩은 표준 라이브러리에 포함되어 있습니다.
import threading from queue import Queue import time
을 호출 가능한 개체로 사용하고 args
를 사용하여 함수에 인수를 전달할 수 있습니다. start
는 스레드를 시작합니다. target
def testThread(num): print num if __name__ == '__main__': for i in range(5): t = threading.Thread(target=testThread, arg=(i,)) t.start()
如果你以前从未见过if __name__ == '__main__':
同一操作系统进程的线程将计算工作负载分布到多个内核中,如C++和Java等编程语言所示。通常,python只使用一个进程,从该进程生成一个主线程来执行运行时。由于一种称为全局解释器锁(global interpreter lock)的锁定机制,它保持在单个核上,而不管计算机有多少核,也不管产生了多少新线程,这种机制是为了防止所谓的竞争条件。
提到竞争,我想到了想到 NASCAR 和一级方程式赛车。让我们用这个类比,想象所有一级方程式赛车手都试图同时在一辆赛车上比赛。听起来很荒谬,对吧?,这只有在每个司机都可以使用自己的车的情况下才有可能,或者最好还是一次跑一圈,每次把车交给下一个司机。
>>> a = 8
通过让内存中的某个任意位置暂时保持值 8 来消耗很少的内存 (RAM)。
import time import threading from threading import Thread a = 8 def threaded_add(x, y): # simulation of a more complex task by asking # python to sleep, since adding happens so quick! for i in range(2): global a print("computing task in a different thread!") time.sleep(1) #this is not okay! but python will force sync, more on that later! a = 10 print(a) # the current thread will be a subset fork! if __name__ != "__main__": current_thread = threading.current_thread() # here we tell python from the main # thread of execution make others if __name__ == "__main__": thread = Thread(target = threaded_add, args = (1, 2)) thread.start() thread.join() print(a) print("main thread finished...exiting")
>>> computing task in a different thread! >>> 10 >>> computing task in a different thread! >>> 10 >>> 10 >>> main thread finished...exiting
告诉你一件事,thread two
a = 8 # spawns two different threads 1 and 2 # thread_one updates the value of a to 10 if (a == 10): # a check #thread_two updates the value of a to 15 a = 15 b = a * 2 # if thread_one finished first the result will be 20 # if thread_two finished first the result will be 30 # who is right?
if __name__ == '__main__':
이전에 본 적이 없다면 이는 기본적으로 Method 내에 중첩되어 있는지 확인하는 코드입니다. 스크립트를 직접 실행할 때만 실행됩니다(가져오지 않음).
동일한 운영 체제 프로세스의 스레드는 프로그래밍 언어 등 여러 코어에 컴퓨팅 작업 부하를 분산시킵니다. C++, Java 등이 표시됩니다. 일반적으로 Python은 런타임을 실행하기 위해 메인 스레드가 생성되는 하나의 프로세스만 사용합니다. 전역 인터프리터 잠금이라는 잠금 메커니즘으로 인해 컴퓨터의 코어 수나 생성된 새 스레드 수에 관계없이 단일 코어에 유지됩니다. 이는 소위 경쟁 조건을 방지하기 위한 것입니다.
경쟁이라고 하면 NASCAR와 Formula One이 떠오릅니다. 이 비유를 사용하여 동시에 한 대의 자동차를 타고 경주하려고 하는 모든 Formula 1 드라이버를 상상해 봅시다. 말도 안되는 소리 같죠? 이는 각 운전자가 자신의 차에 접근할 수 있는 경우에만 가능합니다. 또는 더 나은 방법은 한 번에 한 바퀴씩 달리고 매번 다음 운전자에게 차를 넘겨주는 것입니다.이는 스레드에서 발생하는 것과 매우 유사합니다. 스레드는 "기본" 스레드에서 "분기"되고, 각 후속 스레드는 이전 스레드의 복사본입니다. 이러한 스레드는 모두 동일한 프로세스 "컨텍스트"(이벤트 또는 경합)에 존재하므로 프로세스에 할당된 모든 리소스(예: 메모리)가 공유됩니다. 예를 들어 일반적인 Python 인터프리터 세션에서는
import sys import gc hello = "world" #reference to 'world' is 2 print (sys.getrefcount(hello)) bye = "world" other_bye = bye print(sys.getrefcount(bye)) print(gc.get_referrers(other_bye))
여기에서 a
는 메모리의 임의 위치에 값 8을 임시로 유지하여 이를 수행합니다. 메모리(RAM)가 거의 없습니다. 🎜
지금까지는 훌륭했습니다. 일부 스레드를 시작하고 두 숫자 x
y code>를 추가할 때의 동작을 관찰해 보겠습니다. 🎜<div class="code" style="position:relative; padding:0px; margin:0px;"><div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:php;toolbar:false">>>> 4
>>> 6
>>> [['sys', 'gc', 'hello', 'world', 'print', 'sys', 'getrefcount', 'hello', 'bye', 'world', 'other_bye', 'bye', 'print', 'sys', 'getrefcount', 'bye', 'print', 'gc', 'get_referrers', 'other_bye'], (0, None, 'world'), {'__name__': '__main__', '__doc__': None, '__package__': None, '__loader__': <_frozen_importlib_external.sourcefileloader>, '__spec__': None, '__annotations__': {}, '__builtins__': <module>, '__file__': 'test.py', '__cached__': None, 'sys': <module>, 'gc': <module>, 'hello': 'world', 'bye': 'world', 'other_bye': 'world'}]</module></module></module></_frozen_importlib_external.sourcefileloader></pre><div class="contentsignin">로그인 후 복사</div></div><div class="contentsignin">로그인 후 복사</div></div>rrree현재 두 개의 스레드가 실행 중입니다. <code>thread_one
과 thread_two
라고 부르겠습니다. thread_one
이 a
를 값 10으로 수정하려고 하고 thread_two
가 동시에 동일한 변수를 업데이트하려고 하면 문제가 있는 것입니다! 데이터 경합이라는 조건이 발생하고 결과적으로 a
값이 일치하지 않게 됩니다. 🎜
보지도 않았지만 친구 두 명에게서 상반된 결과를 들었던 레이싱 이벤트! thread_one
은 당신에게 무언가를 말하고, thread two
는 그것을 반박합니다! 다음은 설명하기 위한 의사 코드 조각입니다. 🎜
import time, os from threading import Thread, current_thread from multiprocessing import current_process COUNT = 200000000 SLEEP = 10 def io_bound(sec): pid = os.getpid() threadName = current_thread().name processName = current_process().name print(f"{pid} * {processName} * {threadName} \ ---> Start sleeping...") time.sleep(sec) print(f"{pid} * {processName} * {threadName} \ ---> Finished sleeping...") def cpu_bound(n): pid = os.getpid() threadName = current_thread().name processName = current_process().name print(f"{pid} * {processName} * {threadName} \ ---> Start counting...") while n>0: n -= 1 print(f"{pid} * {processName} * {threadName} \ ---> Finished counting...") def timeit(function,args,threaded=False): start = time.time() if threaded: t1 = Thread(target = function, args =(args, )) t2 = Thread(target = function, args =(args, )) t1.start() t2.start() t1.join() t2.join() else: function(args) end = time.time() print('Time taken in seconds for running {} on Argument {} is {}s -{}'.format(function,args,end - start,"Threaded" if threaded else "None Threaded")) if __name__=="__main__": #Running io_bound task print("IO BOUND TASK NON THREADED") timeit(io_bound,SLEEP) print("IO BOUND TASK THREADED") #Running io_bound task in Thread timeit(io_bound,SLEEP,threaded=True) print("CPU BOUND TASK NON THREADED") #Running cpu_bound task timeit(cpu_bound,COUNT) print("CPU BOUND TASK THREADED") #Running cpu_bound task in Thread timeit(cpu_bound,COUNT,threaded=True)
import sys import gc hello = "world" #reference to 'world' is 2 print (sys.getrefcount(hello)) bye = "world" other_bye = bye print(sys.getrefcount(bye)) print(gc.get_referrers(other_bye))
>>> 4 >>> 6 >>> [['sys', 'gc', 'hello', 'world', 'print', 'sys', 'getrefcount', 'hello', 'bye', 'world', 'other_bye', 'bye', 'print', 'sys', 'getrefcount', 'bye', 'print', 'gc', 'get_referrers', 'other_bye'], (0, None, 'world'), {'__name__': '__main__', '__doc__': None, '__package__': None, '__loader__': <_frozen_importlib_external.sourcefileloader>, '__spec__': None, '__annotations__': {}, '__builtins__': <module>, '__file__': 'test.py', '__cached__': None, 'sys': <module>, 'gc': <module>, 'hello': 'world', 'bye': 'world', 'other_bye': 'world'}]</module></module></module></_frozen_importlib_external.sourcefileloader>
CPython 的 GIL 通过一次允许一个线程控制解释器来控制 Python 解释器。它为单线程程序提供了性能提升,因为只需要管理一个锁,但代价是它阻止了多线程 CPython 程序在某些情况下充分利用多处理器系统。
import time, os from threading import Thread, current_thread from multiprocessing import current_process COUNT = 200000000 SLEEP = 10 def io_bound(sec): pid = os.getpid() threadName = current_thread().name processName = current_process().name print(f"{pid} * {processName} * {threadName} \ ---> Start sleeping...") time.sleep(sec) print(f"{pid} * {processName} * {threadName} \ ---> Finished sleeping...") def cpu_bound(n): pid = os.getpid() threadName = current_thread().name processName = current_process().name print(f"{pid} * {processName} * {threadName} \ ---> Start counting...") while n>0: n -= 1 print(f"{pid} * {processName} * {threadName} \ ---> Finished counting...") def timeit(function,args,threaded=False): start = time.time() if threaded: t1 = Thread(target = function, args =(args, )) t2 = Thread(target = function, args =(args, )) t1.start() t2.start() t1.join() t2.join() else: function(args) end = time.time() print('Time taken in seconds for running {} on Argument {} is {}s -{}'.format(function,args,end - start,"Threaded" if threaded else "None Threaded")) if __name__=="__main__": #Running io_bound task print("IO BOUND TASK NON THREADED") timeit(io_bound,SLEEP) print("IO BOUND TASK THREADED") #Running io_bound task in Thread timeit(io_bound,SLEEP,threaded=True) print("CPU BOUND TASK NON THREADED") #Running cpu_bound task timeit(cpu_bound,COUNT) print("CPU BOUND TASK THREADED") #Running cpu_bound task in Thread timeit(cpu_bound,COUNT,threaded=True)
>>> IO BOUND TASK NON THREADED >>> 17244 * MainProcess * MainThread ---> Start sleeping... >>> 17244 * MainProcess * MainThread ---> Finished sleeping... >>> 17244 * MainProcess * MainThread ---> Start sleeping... >>> 17244 * MainProcess * MainThread ---> Finished sleeping... >>> Time taken in seconds for running <function> on Argument 10 is 20.036664724349976s -None Threaded >>> IO BOUND TASK THREADED >>> 10180 * MainProcess * Thread-1 ---> Start sleeping... >>> 10180 * MainProcess * Thread-2 ---> Start sleeping... >>> 10180 * MainProcess * Thread-1 ---> Finished sleeping... >>> 10180 * MainProcess * Thread-2 ---> Finished sleeping... >>> Time taken in seconds for running <function> on Argument 10 is 10.01464056968689s -Threaded >>> CPU BOUND TASK NON THREADED >>> 14172 * MainProcess * MainThread ---> Start counting... >>> 14172 * MainProcess * MainThread ---> Finished counting... >>> 14172 * MainProcess * MainThread ---> Start counting... >>> 14172 * MainProcess * MainThread ---> Finished counting... >>> Time taken in seconds for running <function> on Argument 200000000 is 44.90199875831604s -None Threaded >>> CPU BOUND TASK THEADED >>> 15616 * MainProcess * Thread-1 ---> Start counting... >>> 15616 * MainProcess * Thread-2 ---> Start counting... >>> 15616 * MainProcess * Thread-1 ---> Finished counting... >>> 15616 * MainProcess * Thread-2 ---> Finished counting... >>> Time taken in seconds for running <function> on Argument 200000000 is 106.09711360931396s -Threaded</function></function></function></function>
import os import time from multiprocessing import Process, current_process SLEEP = 10 COUNT = 200000000 def count_down(cnt): pid = os.getpid() processName = current_process().name print(f"{pid} * {processName} \ ---> Start counting...") while cnt > 0: cnt -= 1 def io_bound(sec): pid = os.getpid() threadName = current_thread().name processName = current_process().name print(f"{pid} * {processName} * {threadName} \ ---> Start sleeping...") time.sleep(sec) print(f"{pid} * {processName} * {threadName} \ ---> Finished sleeping...") if __name__ == '__main__': # creating processes start = time.time() #CPU BOUND p1 = Process(target=count_down, args=(COUNT, )) p2 = Process(target=count_down, args=(COUNT, )) #IO BOUND #p1 = Process(target=, args=(SLEEP, )) #p2 = Process(target=count_down, args=(SLEEP, )) # starting process_thread p1.start() p2.start() # wait until finished p1.join() p2.join() stop = time.time() elapsed = stop - start print ("The time taken in seconds is :", elapsed)
>>> 1660 * Process-2 ---> Start counting... >>> 10184 * Process-1 ---> Start counting... >>> The time taken in seconds is : 12.815475225448608
import time import asyncio COUNT = 200000000 # asynchronous function defination async def func_name(cnt): while cnt > 0: cnt -= 1 #asynchronous main function defination async def main (): # Creating 2 tasks.....You could create as many tasks (n tasks) task1 = loop.create_task(func_name(COUNT)) task2 = loop.create_task(func_name(COUNT)) # await each task to execute before handing control back to the program await asyncio.wait([task1, task2]) if __name__ =='__main__': # get the event loop start_time = time.time() loop = asyncio.get_event_loop() # run all tasks in the event loop until completion loop.run_until_complete(main()) loop.close() print("--- %s seconds ---" % (time.time() - start_time))
>>> --- 41.74118399620056 seconds ---
数据在进程之间混洗会产生 I/O 开销
