Python中多進程與多執行緒實例(一)
一、背景
最近在Azkaban的測試工作中,需要在測試環境下模擬線上的調度場景進行穩定性測試。故而重操python舊業,透過python編寫腳本來建構類似線上的調度場景。在腳本編寫過程中,碰到這樣一個需求:要在測試環境中建立10000個作業流。
最開始的想法是在一個azkaban project下循環呼叫10000次create job介面(每個Flow只包含一個job)。由於azkaban它本身沒有增加/刪除作業流的接口,所有的作業流修改、增加、刪除其實都是透過重新上傳項目zip包實現的,相應地每次調猛獁前端的create job接口,實際上是在猛獁端對zip包的內容進行了重新的整合後再重新上傳zip包到azkaban,整個過程可以拆解成如下過程:解壓zip包獲得zip包內容,變更zip包內的文件內容,重新打包zip包,上傳到azkaban。因此,隨著循環次數越往後,zip套件包含的內容會越多,介面執行一次的時間就越長。實作發現,第一次調該介面的時間大致不到1秒,到循環1000次的時候介面呼叫一次的時間就達到了將近3秒。因此,如果指望一個循環10000次來構造該場景,顯然要耗費巨大的時間。
在此背景下, 自然而然地就想到用多進程/多線程的方式來處理該問題。
二、「多任務」的作業系統基礎
大家都知道,作業系統可以同時執行多個任務。例如你一邊聽音樂,一邊聊IM,一邊寫部落格等。現在的cpu大都是多核心的,但即使是過去的單核心cpu也是支援多任務並行執行。
單核心cpu執行多任務的原理:作業系統交替輪流地執行各個任務。先讓任務1執行0.01秒,然後切換到任務2執行0.01秒,再切換到任務3執行0.01秒...這樣往復地執行下去。由於cpu的執行速度非常快,所以使用者的主觀感受就是這些任務在並行地執行。
多核心cpu執行多任務的原理:由於實際應用中,任務的數量往往遠超過cpu的核數,所以作業系統實際上是把這些多任務輪流地調度到每個核心上執行。
對作業系統來說,一個應用程式就是一個行程。例如打開一個瀏覽器,它是一個進程;打開一個記事本,它就是一個進程。每個進程都有它特定的進程號。他們共享系統的記憶體資源。 進程是作業系統分配資源的最小單位。
而對於每一個進程而言,比如一個視頻播放器,它必須同時播放視頻和音頻,就至少需要同時運行兩個“子任務”,進程內的這些子任務就是通過線程來完成。 執行緒是最小的執行單元。一個進程它可以包含多個線程,這些線程相互獨立,同時又共享進程所擁有的資源。
三、Python多進程程式設計
1. multiprocessing
multiprocessing是Python提供的一個跨平台的多進程模組,透過它可以很方便地編寫多進程程序,在不同的平台(Unix/Linux, Windows)都可以執行。
以下就是使用multiprocessing編寫多進程程式的程式碼:
#!/usr/bin/python# -*- coding: utf-8 -*author = 'zni.feng'import sys reload (sys) sys.setdefaultencoding('utf-8')from multiprocessing import Processimport osimport time#子进程fundef child_projcess_fun(name): print 'Child process %s with processId %s starts.' % (name, os.getpid()) time.sleep(3) print 'Child process %s with processId %s ends.' % (name, os.getpid())if name == "main": print 'Parent processId is: %s.' % os.getpid() p = Process(target = child_projcess_fun, args=('zni',)) print 'Process starts' p.start() #开始进程 p.join() #等待子进程结束后再继续往下执行 print 'Process ends.'
程式的輸出:
Parent processId is: 11076. Process starts Child process zni with processId 11077 starts. Child process zni with processId 11077 ends. Process ends. [Finished in 3.1s]
2. Pool
某些情況下,我們希望批量創建多個子進程,或給定子進程數的上限,避免無限地消耗系統的資源。透過Pool(進程池)的方式,就可以完成這項工作,以下是使用Pool的程式碼:
1 #!/usr/bin/python 2 # -*- coding: utf-8 -* 3 author = 'zni.feng' 4 import sys 5 reload (sys) 6 sys.setdefaultencoding('utf-8') 7 8 from multiprocessing import Pool 9 import os, time10 11 def child_process_test(name, sleep_time):12 print 'Child process %s with processId %s starts.' % (name, os.getpid())13 time.sleep(sleep_time)14 print 'Child process %s with processId %s ends.' % (name, os.getpid())15 16 if name == "main":17 print 'Parent processId is: %s.' % os.getpid()18 p = Pool() #进程池默认大小是cpu的核数19 #p = Pool(10) #生成一个容量为10的进程池,即最大同时执行10个子进程20 for i in range(5):21 p.apply_async(child_process_test, args=('zni_'+str(i), i+1,)) #p.apply_async向进程池提交目标请求22 23 print 'Child processes are running.'24 p.close()25 p.join() #用来等待进程池中的所有子进程结束再向下执行代码,必须在p.close()或者p.terminate()之后执行26 print 'All Processes end.'
程式的輸出:
Parent processId is: 5050. Child processes are running. Child process zni_0 with processId 5052 starts. Child process zni_1 with processId 5053 starts. Child process zni_2 with processId 5054 starts. Child process zni_3 with processId 5055 starts. Child process zni_0 with processId 5052 ends. Child process zni_4 with processId 5052 starts. Child process zni_1 with processId 5053 ends. Child process zni_2 with processId 5054 ends. Child process zni_3 with processId 5055 ends. Child process zni_4 with processId 5052 ends. All Processes end. [Finished in 6.2s]
close()方法和terminate()方法的區別:
close:關閉進程池,使之不能再新增新的進程。已經執行的進程會等待繼續執行直到結束。
terminate:強制終止執行緒池,正在執行的程序也會被強制終止。
3. 進程間通訊
Python的multiprocessing模組提供了多種進程間通訊的方式,如Queue、Pipe等。
3.1 Queue、Lock
Queue是multiprocessing提供的模組,它的資料結構就是"FIFO-first in first out"的佇列,常用的方法有:put(object)入隊;get()出隊;empty()判斷隊列是否為空。
Lock:當多個子程序對同一個queue執行寫入操作時,為了避免並發操作產生衝突,可以透過加鎖的方式使得某個子程序對queue擁有唯一的寫入權限,其他子程序必須等待該鎖定釋放後才能再開始執行寫入操作。
以下就是使用Queue進行進程間通訊的程式碼:在父進程裡建立兩個子進程,分別實作queue的讀取和寫入操作
1 #!/usr/bin/python 2 # -*- coding: utf-8 -* 3 author = 'zni.feng' 4 import sys 5 reload (sys) 6 sys.setdefaultencoding('utf-8') 7 from multiprocessing import Process, Queue, Lock 8 import os, time, random 9 #写数据进程10 def write(q, lock, name):11 print 'Child Process %s starts' % name12 #获得锁13 lock.acquire()14 for value in ['A' , 'B', 'C']:15 print 'Put %s to queue...' % value16 q.put(value)17 time.sleep(random.random())18 #释放锁19 lock.release()20 print 'Child Process %s ends' % name21 22 #读数据进程23 def read(q, lock, name):24 print 'Child Process %s starts' % name25 while True: #持续地读取q中的数据26 value =q.get()27 print 'Get %s from queue.' % value28 print 'Child Process %s ends' % name29 30 if name == "main":31 #父进程创建queue,并共享给各个子进程32 q= Queue()33 #创建锁34 lock = Lock()35 #创建第一个“写”子进程36 pw = Process(target = write , args=(q, lock, 'WRITE', ))37 #创建“读”进程38 pr = Process(target = read, args=(q,lock, 'READ',))39 #启动子进程pw,写入:40 pw.start()41 #启动子进程pr,读取:42 pr.start()43 #等待pw结束:44 pw.join()45 #pr是个死循环,通过terminate杀死:46 pr.terminate()47 print 'Test finish.'
程式的輸出結果為:
Child Process WRITE starts Put A to queue... Child Process READ starts Get A from queue. Put B to queue... Get B from queue. Put C to queue... Get C from queue. Child Process WRITE ends Test finish. [Finished in 2.0s]
3.2 Pipe
# Pipe是另一种进程间通信的方式,俗称“管道”。它由两端组成,一端往管道里写入数据,另一端从管道里读取数据。
下面就是使用Pipe通信的代码:
1 #!/usr/bin/python 2 # -*- coding: utf-8 -* 3 author = 'zni.feng' 4 import sys 5 reload (sys) 6 sys.setdefaultencoding('utf-8') 7 from multiprocessing import Process, Pipe 8 import os, time, random 9 10 #发送数据进程11 def send(child_pipe, name):12 print 'Child Process %s starts' % name13 child_pipe.send('This is Mr.Ni')14 child_pipe.close()15 time.sleep(random.random())16 print 'Child Process %s ends' % name17 18 #接收数据进程19 def recv(parent_pipe, name):20 print 'Child Process %s starts' % name21 print parent_pipe.recv()22 time.sleep(random.random())23 print 'Child Process %s ends' % name24 25 if name == "main":26 #创建管道27 parent,child = Pipe()28 #创建send进程29 ps = Process(target=send, args=(child, 'SEND'))30 #创建recv进程31 pr = Process(target=recv, args=(parent, 'RECEIVE'))32 #启动send进程33 ps.start()34 #等待send进程结束35 ps.join()36 #启动recv进程37 pr.start()38 #等待recv进程结束39 pr.join()40 print 'Test finish.'
程序的输出结果如下:
Child Process SEND starts Child Process SEND ends Child Process RECEIVE starts This is Mr.Ni Child Process RECEIVE ends Test finish. [Finished in 1.8s]
【相关推荐】
2. Python中推荐使用多进程而不是多线程?分享推荐使用多进程的原因
以上是Python中多進程與多執行緒實例(一)的詳細內容。更多資訊請關注PHP中文網其他相關文章!

熱AI工具

Undresser.AI Undress
人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover
用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool
免費脫衣圖片

Clothoff.io
AI脫衣器

Video Face Swap
使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱門文章

熱工具

記事本++7.3.1
好用且免費的程式碼編輯器

SublimeText3漢化版
中文版,非常好用

禪工作室 13.0.1
強大的PHP整合開發環境

Dreamweaver CS6
視覺化網頁開發工具

SublimeText3 Mac版
神級程式碼編輯軟體(SublimeText3)

PHP主要是過程式編程,但也支持面向對象編程(OOP);Python支持多種範式,包括OOP、函數式和過程式編程。 PHP適合web開發,Python適用於多種應用,如數據分析和機器學習。

PHP適合網頁開發和快速原型開發,Python適用於數據科學和機器學習。 1.PHP用於動態網頁開發,語法簡單,適合快速開發。 2.Python語法簡潔,適用於多領域,庫生態系統強大。

Python更適合初學者,學習曲線平緩,語法簡潔;JavaScript適合前端開發,學習曲線較陡,語法靈活。 1.Python語法直觀,適用於數據科學和後端開發。 2.JavaScript靈活,廣泛用於前端和服務器端編程。

PHP起源於1994年,由RasmusLerdorf開發,最初用於跟踪網站訪問者,逐漸演變為服務器端腳本語言,廣泛應用於網頁開發。 Python由GuidovanRossum於1980年代末開發,1991年首次發布,強調代碼可讀性和簡潔性,適用於科學計算、數據分析等領域。

VS Code可以在Windows 8上運行,但體驗可能不佳。首先確保系統已更新到最新補丁,然後下載與系統架構匹配的VS Code安裝包,按照提示安裝。安裝後,注意某些擴展程序可能與Windows 8不兼容,需要尋找替代擴展或在虛擬機中使用更新的Windows系統。安裝必要的擴展,檢查是否正常工作。儘管VS Code在Windows 8上可行,但建議升級到更新的Windows系統以獲得更好的開發體驗和安全保障。

VS Code 可用於編寫 Python,並提供許多功能,使其成為開發 Python 應用程序的理想工具。它允許用戶:安裝 Python 擴展,以獲得代碼補全、語法高亮和調試等功能。使用調試器逐步跟踪代碼,查找和修復錯誤。集成 Git,進行版本控制。使用代碼格式化工具,保持代碼一致性。使用 Linting 工具,提前發現潛在問題。

在 Notepad 中運行 Python 代碼需要安裝 Python 可執行文件和 NppExec 插件。安裝 Python 並為其添加 PATH 後,在 NppExec 插件中配置命令為“python”、參數為“{CURRENT_DIRECTORY}{FILE_NAME}”,即可在 Notepad 中通過快捷鍵“F6”運行 Python 代碼。

VS Code 擴展存在惡意風險,例如隱藏惡意代碼、利用漏洞、偽裝成合法擴展。識別惡意擴展的方法包括:檢查發布者、閱讀評論、檢查代碼、謹慎安裝。安全措施還包括:安全意識、良好習慣、定期更新和殺毒軟件。
