최근 Azkaban의 테스트 작업에서는 안정성 테스트를 위해 테스트 환경에서 온라인 일정 시나리오를 시뮬레이션하는 것이 필요합니다. 따라서 저는 예전의 Python 사업으로 돌아가 유사한 온라인 일정 계획 시나리오를 구성하기 위해 Python으로 스크립트를 작성했습니다. 스크립트 작성 과정에서 테스트 환경에서 10,000개의 작업 스트림을 생성해야 한다는 요구 사항이 발생했습니다.
원래 아이디어는 azkaban 프로젝트의 루프에서 작업 생성 인터페이스를 10,000번 호출하는 것이었습니다(각 흐름에는 하나의 작업만 포함됨). azkaban 자체에는 작업 스트림 추가/삭제를 위한 인터페이스가 없기 때문에 모든 작업 스트림 수정, 추가 및 삭제는 실제로 Mammoth 프런트엔드의 작업 인터페이스 생성 시마다 프로젝트 zip 패키지를 다시 업로드하여 구현됩니다. 조정되면 실제로 Mammoth는 zip 패키지의 콘텐츠를 다시 통합한 다음 zip 패키지를 azkaban에 다시 업로드합니다. 전체 프로세스는 다음 프로세스로 나눌 수 있습니다. zip 패키지의 압축을 풀어 zip의 콘텐츠를 얻습니다. 패키지, zip 패키지의 파일 내용을 변경하고 zip 패키지를 다시 패키지하여 azkaban에 업로드합니다. 따라서 주기 수가 늦어질수록 zip 패키지에 더 많은 콘텐츠가 포함되고 인터페이스를 한 번 실행하는 데 더 오랜 시간이 걸립니다. 실습에 따르면 인터페이스를 처음 호출하는 데 걸리는 시간은 1초 미만이며, 주기가 1,000회일 경우 인터페이스를 한 번 호출하는 데 걸리는 시간은 거의 3초에 이릅니다. 그러므로 이 장면을 구성하기 위해 10,000번의 루프를 예상한다면 분명히 엄청난 시간이 걸릴 것입니다.
이러한 맥락에서 이 문제를 해결하기 위해 다중 프로세스/다중 스레드 접근 방식을 사용하는 것을 생각하는 것은 당연합니다.
우리 모두 알고 있듯이 운영 체제는 동시에 여러 작업을 실행할 수 있습니다. 예를 들어 음악을 듣고, IM으로 채팅하고, 블로그에 글을 쓰는 등의 작업을 수행합니다. 오늘날 CPU의 대부분은 멀티 코어이지만 과거에는 단일 코어 CPU도 여러 작업의 병렬 실행을 지원했습니다.
멀티 태스킹을 수행하는 단일 코어 CPU의 원리: 운영 체제는 각 작업을 교대로 실행합니다. 먼저 작업 1이 0.01초 동안 실행되도록 하고, 그런 다음 작업 2로 0.01초 동안 전환하고, 작업 3으로 0.01초 동안 전환하고... 이런 식으로 계속됩니다. CPU의 실행 속도가 매우 빠르기 때문에 사용자의 주관적인 느낌은 이러한 작업이 병렬로 실행된다는 것입니다.
멀티 태스킹을 수행하는 멀티 코어 CPU의 원리: 실제 응용 프로그램에서는 작업 수가 CPU 코어 수를 훨씬 초과하는 경우가 많기 때문에 운영 체제는 실제로 이러한 멀티 작업이 차례로 실행되도록 예약합니다. 각 코어.
운영 체제에서 애플리케이션은 하나의 프로세스입니다. 예를 들어, 브라우저를 열면 프로세스이고, 메모장을 열면 프로세스입니다. 각 프로세스에는 고유한 프로세스 번호가 있습니다. 이들은 시스템의 메모리 리소스를 공유합니다. 프로세스는 운영 체제가 자원을 할당하는 가장 작은 단위 입니다.
동시에 비디오와 오디오를 재생해야 하는 비디오 플레이어와 같은 각 프로세스에 대해 프로세스 내의 이러한 하위 작업은 동시에 두 개 이상의 "하위 작업"을 실행해야 합니다. 스레드는 가장 작은 실행 단위입니다. 프로세스에는 서로 독립적이고 프로세스가 소유한 리소스를 공유하는 여러 스레드가 포함될 수 있습니다.
다중 처리는 Python에서 제공하는 크로스 플랫폼 다중 프로세스 모듈로, 이를 통해 다양한 플랫폼(Unix/Linux, Windows)를 실행할 수 있습니다.
다음은 다중 프로세스 프로그램을 작성하기 위해 다중 처리를 사용하는 코드입니다:
#!/usr/bin/python# -*- coding: utf-8 -*author = 'zni.feng'import sys reload (sys) sys.setdefaultencoding('utf-8')from multiprocessing import Processimport osimport time#子进程fundef child_projcess_fun(name): print 'Child process %s with processId %s starts.' % (name, os.getpid()) time.sleep(3) print 'Child process %s with processId %s ends.' % (name, os.getpid())if name == "main": print 'Parent processId is: %s.' % os.getpid() p = Process(target = child_projcess_fun, args=('zni',)) print 'Process starts' p.start() #开始进程 p.join() #等待子进程结束后再继续往下执行 print 'Process ends.'
프로그램의 출력:
Parent processId is: 11076. Process starts Child process zni with processId 11077 starts. Child process zni with processId 11077 ends. Process ends. [Finished in 3.1s]
어떤 경우에는 여러 하위 프로세스를 일괄적으로 생성하고 싶을 때가 있습니다. 또는 시스템 리소스를 무한히 소모하는 것을 방지하기 위해 하위 프로세스 수에 상한을 제공합니다. 이 작업은 Pool(프로세스 풀)을 통해 수행할 수 있습니다. 다음은 Pool을 사용하는 코드입니다.
1 #!/usr/bin/python 2 # -*- coding: utf-8 -* 3 author = 'zni.feng' 4 import sys 5 reload (sys) 6 sys.setdefaultencoding('utf-8') 7 8 from multiprocessing import Pool 9 import os, time10 11 def child_process_test(name, sleep_time):12 print 'Child process %s with processId %s starts.' % (name, os.getpid())13 time.sleep(sleep_time)14 print 'Child process %s with processId %s ends.' % (name, os.getpid())15 16 if name == "main":17 print 'Parent processId is: %s.' % os.getpid()18 p = Pool() #进程池默认大小是cpu的核数19 #p = Pool(10) #生成一个容量为10的进程池,即最大同时执行10个子进程20 for i in range(5):21 p.apply_async(child_process_test, args=('zni_'+str(i), i+1,)) #p.apply_async向进程池提交目标请求22 23 print 'Child processes are running.'24 p.close()25 p.join() #用来等待进程池中的所有子进程结束再向下执行代码,必须在p.close()或者p.terminate()之后执行26 print 'All Processes end.'
프로그램 출력:
Parent processId is: 5050. Child processes are running. Child process zni_0 with processId 5052 starts. Child process zni_1 with processId 5053 starts. Child process zni_2 with processId 5054 starts. Child process zni_3 with processId 5055 starts. Child process zni_0 with processId 5052 ends. Child process zni_4 with processId 5052 starts. Child process zni_1 with processId 5053 ends. Child process zni_2 with processId 5054 ends. Child process zni_3 with processId 5055 ends. Child process zni_4 with processId 5052 ends. All Processes end. [Finished in 6.2s]
close() 메서드와 Terminate() 메서드의 차이점:
close: 새 프로세스를 추가할 수 없도록 프로세스 풀을 닫습니다. 이미 실행된 프로세스는 종료될 때까지 계속 실행을 기다립니다.
terminate: 스레드 풀을 강제 종료하며, 실행 중인 프로세스도 강제 종료됩니다.
Python의 다중 처리 모듈은 Queue, Pipe 등 다양한 프로세스 간 통신 방법을 제공합니다.
3.1 대기열, 잠금
큐는 다중 처리에 의해 제공되는 모듈입니다. 해당 데이터 구조는 "FIFO-선입선출" 대기열입니다. 일반적으로 사용되는 방법은 다음과 같습니다. 대기열에 들어가려면 get(); ;empty()는 큐가 비어 있는지 여부를 결정합니다.
잠금: 여러 하위 프로세스가 동일한 대기열에서 쓰기 작업을 수행하는 경우 동시 작업의 충돌을 피하기 위해 하위 프로세스가 대기열에 대한 유일한 쓰기 권한을 갖도록 하위 프로세스를 잠글 수 있습니다. 하위 프로세스는 잠금이 해제될 때까지 기다려야 합니다. 그러면 쓰기 작업이 다시 시작될 수 있습니다.
다음은 프로세스 간 통신을 위해 Queue를 사용하는 코드입니다. 상위 프로세스에 두 개의 하위 프로세스를 생성하여 각각 큐에 대한 읽기 및 쓰기 작업을 구현합니다.
1 #!/usr/bin/python 2 # -*- coding: utf-8 -* 3 author = 'zni.feng' 4 import sys 5 reload (sys) 6 sys.setdefaultencoding('utf-8') 7 from multiprocessing import Process, Queue, Lock 8 import os, time, random 9 #写数据进程10 def write(q, lock, name):11 print 'Child Process %s starts' % name12 #获得锁13 lock.acquire()14 for value in ['A' , 'B', 'C']:15 print 'Put %s to queue...' % value16 q.put(value)17 time.sleep(random.random())18 #释放锁19 lock.release()20 print 'Child Process %s ends' % name21 22 #读数据进程23 def read(q, lock, name):24 print 'Child Process %s starts' % name25 while True: #持续地读取q中的数据26 value =q.get()27 print 'Get %s from queue.' % value28 print 'Child Process %s ends' % name29 30 if name == "main":31 #父进程创建queue,并共享给各个子进程32 q= Queue()33 #创建锁34 lock = Lock()35 #创建第一个“写”子进程36 pw = Process(target = write , args=(q, lock, 'WRITE', ))37 #创建“读”进程38 pr = Process(target = read, args=(q,lock, 'READ',))39 #启动子进程pw,写入:40 pw.start()41 #启动子进程pr,读取:42 pr.start()43 #等待pw结束:44 pw.join()45 #pr是个死循环,通过terminate杀死:46 pr.terminate()47 print 'Test finish.'
프로그램의 출력 결과는 다음과 같습니다.
Child Process WRITE starts Put A to queue... Child Process READ starts Get A from queue. Put B to queue... Get B from queue. Put C to queue... Get C from queue. Child Process WRITE ends Test finish. [Finished in 2.0s]
3.2 파이프
Pipe是另一种进程间通信的方式,俗称“管道”。它由两端组成,一端往管道里写入数据,另一端从管道里读取数据。
下面就是使用Pipe通信的代码:
1 #!/usr/bin/python 2 # -*- coding: utf-8 -* 3 author = 'zni.feng' 4 import sys 5 reload (sys) 6 sys.setdefaultencoding('utf-8') 7 from multiprocessing import Process, Pipe 8 import os, time, random 9 10 #发送数据进程11 def send(child_pipe, name):12 print 'Child Process %s starts' % name13 child_pipe.send('This is Mr.Ni')14 child_pipe.close()15 time.sleep(random.random())16 print 'Child Process %s ends' % name17 18 #接收数据进程19 def recv(parent_pipe, name):20 print 'Child Process %s starts' % name21 print parent_pipe.recv()22 time.sleep(random.random())23 print 'Child Process %s ends' % name24 25 if name == "main":26 #创建管道27 parent,child = Pipe()28 #创建send进程29 ps = Process(target=send, args=(child, 'SEND'))30 #创建recv进程31 pr = Process(target=recv, args=(parent, 'RECEIVE'))32 #启动send进程33 ps.start()34 #等待send进程结束35 ps.join()36 #启动recv进程37 pr.start()38 #等待recv进程结束39 pr.join()40 print 'Test finish.'
程序的输出结果如下:
Child Process SEND starts Child Process SEND ends Child Process RECEIVE starts This is Mr.Ni Child Process RECEIVE ends Test finish. [Finished in 1.8s]
【相关推荐】
2. Python中推荐使用多进程而不是多线程?分享推荐使用多进程的原因
위 내용은 Python의 다중 프로세스 및 다중 스레딩 예(1)의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!