Dans les récents travaux de test d'Azkaban, il est nécessaire de simuler des scénarios de planification en ligne dans l'environnement de test pour les tests de stabilité. Par conséquent, je suis revenu à mon ancienne activité Python et j'ai écrit des scripts en Python pour construire des scénarios de planification en ligne similaires. Lors du processus d'écriture du script, j'ai rencontré une exigence : créer 10 000 flux de travaux dans l'environnement de test.
L'idée de départ était d'appeler l'interface de création de tâches 10 000 fois en boucle dans le cadre d'un projet azkaban (chaque Flow ne contient qu'une seule tâche). Étant donné qu'azkaban lui-même ne dispose pas d'interface pour ajouter/supprimer des flux de travaux, toutes les modifications, ajouts et suppressions de flux de travaux sont en fait implémentés en téléchargeant à nouveau le package zip du projet, en conséquence, à chaque création de l'interface de travail du frontal Mammoth. est ajusté, c'est en fait Mammoth qui réintègre le contenu du package zip puis télécharge à nouveau le package zip sur azkaban. L'ensemble du processus peut être décomposé en les processus suivants : décompresser le package zip pour obtenir le contenu du zip. package, modifiez le contenu du fichier dans le package zip et reconditionnez le package zip, téléchargé sur azkaban. Par conséquent, à mesure que le nombre de cycles augmente, le package zip contiendra plus de contenu et l'interface prendra plus de temps à s'exécuter une fois. La pratique a montré que le premier appel de l'interface est inférieur à 1 seconde, et lorsque le cycle est 1 000 fois, le temps nécessaire pour appeler l'interface une fois atteint près de 3 secondes. Par conséquent, si vous attendez une boucle 10 000 fois pour construire cette scène, cela prendra évidemment énormément de temps.
Dans ce contexte, il est naturel de penser à utiliser une approche multi-processus/multi-thread pour traiter ce problème.
Comme nous le savons tous, le système d'exploitation peut exécuter plusieurs tâches en même temps. Par exemple, vous écoutez de la musique, discutez sur IM, écrivez un blog, etc. La plupart des processeurs actuels sont multicœurs, mais même les processeurs monocœur prenaient autrefois en charge l'exécution parallèle de plusieurs tâches.
Le principe du multitâche sur un CPU monocœur : Le système d'exploitation exécute chaque tâche en alternance. Laissez d'abord la tâche 1 s'exécuter pendant 0,01 seconde, puis passez à la tâche 2 pour l'exécuter pendant 0,01 seconde, puis passez à la tâche 3 pour l'exécuter pendant 0,01 seconde... et ainsi de suite. La vitesse d'exécution du CPU étant très rapide, le sentiment subjectif de l'utilisateur est que ces tâches sont exécutées en parallèle.
Le principe du processeur multicœur effectuant plusieurs tâches : Puisque dans les applications réelles, le nombre de tâches dépasse souvent de loin le nombre de cœurs de processeur, de sorte que le système d'exploitation planifie réellement ces tâches multiples. -les tâches s'exécutent à tour de rôle sur chaque cœur.
Pour le système d'exploitation, une application est un processus. Par exemple, si vous ouvrez un navigateur, c'est un processus ; si vous ouvrez un Bloc-notes, c'est un processus. Chaque processus possède son propre numéro de processus unique. Ils partagent les ressources mémoire du système. Un processus est la plus petite unité permettant au système d'exploitation d'allouer des ressources .
Pour chaque processus, tel qu'un lecteur vidéo, qui doit lire la vidéo et l'audio en même temps, il doit exécuter au moins deux « sous-tâches » en même temps. Ces sous-tâches au sein du processus sont exécutées via. fils. Terminer. Un thread est la plus petite unité d'exécution. Un processus peut contenir plusieurs threads, indépendants les uns des autres et partageant les ressources détenues par le processus.
Le multitraitement est un module multi-processus multiplateforme fourni par Python, grâce auquel vous pouvez facilement écrire des multi-processus. les programmes de processus, peuvent être exécutés sur différentes plateformes (Unix/Linux, Windows).
Voici le code permettant d'utiliser le multitraitement pour écrire un programme multi-processus :
#!/usr/bin/python# -*- coding: utf-8 -*author = 'zni.feng'import sys reload (sys) sys.setdefaultencoding('utf-8')from multiprocessing import Processimport osimport time#子进程fundef child_projcess_fun(name): print 'Child process %s with processId %s starts.' % (name, os.getpid()) time.sleep(3) print 'Child process %s with processId %s ends.' % (name, os.getpid())if name == "main": print 'Parent processId is: %s.' % os.getpid() p = Process(target = child_projcess_fun, args=('zni',)) print 'Process starts' p.start() #开始进程 p.join() #等待子进程结束后再继续往下执行 print 'Process ends.'
Sortie du programme :
Parent processId is: 11076. Process starts Child process zni with processId 11077 starts. Child process zni with processId 11077 ends. Process ends. [Finished in 3.1s]
Certains Dans certains cas, nous souhaitons créer plusieurs sous-processus par lots, ou définir une limite supérieure sur le nombre de sous-processus pour éviter une consommation illimitée des ressources système. Ce travail peut être effectué via Pool (pool de processus). Voici le code d'utilisation de Pool :
1 #!/usr/bin/python 2 # -*- coding: utf-8 -* 3 author = 'zni.feng' 4 import sys 5 reload (sys) 6 sys.setdefaultencoding('utf-8') 7 8 from multiprocessing import Pool 9 import os, time10 11 def child_process_test(name, sleep_time):12 print 'Child process %s with processId %s starts.' % (name, os.getpid())13 time.sleep(sleep_time)14 print 'Child process %s with processId %s ends.' % (name, os.getpid())15 16 if name == "main":17 print 'Parent processId is: %s.' % os.getpid()18 p = Pool() #进程池默认大小是cpu的核数19 #p = Pool(10) #生成一个容量为10的进程池,即最大同时执行10个子进程20 for i in range(5):21 p.apply_async(child_process_test, args=('zni_'+str(i), i+1,)) #p.apply_async向进程池提交目标请求22 23 print 'Child processes are running.'24 p.close()25 p.join() #用来等待进程池中的所有子进程结束再向下执行代码,必须在p.close()或者p.terminate()之后执行26 print 'All Processes end.'
Sortie du programme :
Parent processId is: 5050. Child processes are running. Child process zni_0 with processId 5052 starts. Child process zni_1 with processId 5053 starts. Child process zni_2 with processId 5054 starts. Child process zni_3 with processId 5055 starts. Child process zni_0 with processId 5052 ends. Child process zni_4 with processId 5052 starts. Child process zni_1 with processId 5053 ends. Child process zni_2 with processId 5054 ends. Child process zni_3 with processId 5055 ends. Child process zni_4 with processId 5052 ends. All Processes end. [Finished in 6.2s]
méthode close() et The. différence entre les méthodes terminate() :
Close : Fermez le pool de processus afin qu'aucun nouveau processus ne puisse être ajouté. Les processus déjà exécutés attendront de poursuivre leur exécution jusqu'à leur fin.
Terminer : mettre fin de force au pool de threads et le processus d'exécution sera également terminé de force.
Le module multitraitement de Python fournit une variété de méthodes de communication inter-processus, telles que Queue, Pipe, etc.
3.1 Queue, Lock
Queue est un module fourni par le multitraitement. Sa structure de données est la file d'attente "FIFO-first in first out". Les méthodes couramment utilisées sont : put(object) Entrez le. queue; get() retire la file d'attente; empty() détermine si la file d'attente est vide.
Verrouillage : lorsque plusieurs sous-processus effectuent des opérations d'écriture sur la même file d'attente, afin d'éviter les conflits dans les opérations simultanées, un verrou peut être utilisé pour qu'un sous-processus ait la seule autorisation d'écriture dans la file d'attente, et les autres sous-processus doivent attendre que le verrou soit libéré avant de pouvoir recommencer les opérations d'écriture.
Voici le code d'utilisation de Queue pour la communication inter-processus : Créez deux processus enfants dans le processus parent pour implémenter respectivement les opérations de lecture et d'écriture sur la file d'attente
1 #!/usr/bin/python 2 # -*- coding: utf-8 -* 3 author = 'zni.feng' 4 import sys 5 reload (sys) 6 sys.setdefaultencoding('utf-8') 7 from multiprocessing import Process, Queue, Lock 8 import os, time, random 9 #写数据进程10 def write(q, lock, name):11 print 'Child Process %s starts' % name12 #获得锁13 lock.acquire()14 for value in ['A' , 'B', 'C']:15 print 'Put %s to queue...' % value16 q.put(value)17 time.sleep(random.random())18 #释放锁19 lock.release()20 print 'Child Process %s ends' % name21 22 #读数据进程23 def read(q, lock, name):24 print 'Child Process %s starts' % name25 while True: #持续地读取q中的数据26 value =q.get()27 print 'Get %s from queue.' % value28 print 'Child Process %s ends' % name29 30 if name == "main":31 #父进程创建queue,并共享给各个子进程32 q= Queue()33 #创建锁34 lock = Lock()35 #创建第一个“写”子进程36 pw = Process(target = write , args=(q, lock, 'WRITE', ))37 #创建“读”进程38 pr = Process(target = read, args=(q,lock, 'READ',))39 #启动子进程pw,写入:40 pw.start()41 #启动子进程pr,读取:42 pr.start()43 #等待pw结束:44 pw.join()45 #pr是个死循环,通过terminate杀死:46 pr.terminate()47 print 'Test finish.'
La sortie de le programme est :
Child Process WRITE starts Put A to queue... Child Process READ starts Get A from queue. Put B to queue... Get B from queue. Put C to queue... Get C from queue. Child Process WRITE ends Test finish. [Finished in 2.0s]
3.2 Pipe
Pipe是另一种进程间通信的方式,俗称“管道”。它由两端组成,一端往管道里写入数据,另一端从管道里读取数据。
下面就是使用Pipe通信的代码:
1 #!/usr/bin/python 2 # -*- coding: utf-8 -* 3 author = 'zni.feng' 4 import sys 5 reload (sys) 6 sys.setdefaultencoding('utf-8') 7 from multiprocessing import Process, Pipe 8 import os, time, random 9 10 #发送数据进程11 def send(child_pipe, name):12 print 'Child Process %s starts' % name13 child_pipe.send('This is Mr.Ni')14 child_pipe.close()15 time.sleep(random.random())16 print 'Child Process %s ends' % name17 18 #接收数据进程19 def recv(parent_pipe, name):20 print 'Child Process %s starts' % name21 print parent_pipe.recv()22 time.sleep(random.random())23 print 'Child Process %s ends' % name24 25 if name == "main":26 #创建管道27 parent,child = Pipe()28 #创建send进程29 ps = Process(target=send, args=(child, 'SEND'))30 #创建recv进程31 pr = Process(target=recv, args=(parent, 'RECEIVE'))32 #启动send进程33 ps.start()34 #等待send进程结束35 ps.join()36 #启动recv进程37 pr.start()38 #等待recv进程结束39 pr.join()40 print 'Test finish.'
程序的输出结果如下:
Child Process SEND starts Child Process SEND ends Child Process RECEIVE starts This is Mr.Ni Child Process RECEIVE ends Test finish. [Finished in 1.8s]
【相关推荐】
2. Python中推荐使用多进程而不是多线程?分享推荐使用多进程的原因
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!