Heim > Backend-Entwicklung > Python-Tutorial > Warteschlange und Multiprozess in Python

Warteschlange und Multiprozess in Python

高洛峰
Freigeben: 2017-02-25 10:10:02
Original
1447 Leute haben es durchsucht

Ich bin kürzlich auf ein Projekt gestoßen, das die Ausführung von Aufgaben in mehreren virtuellen Maschinen erforderte. Ich habe auf den Code früherer Projekte anderer Leute verwiesen und Multiprozess verwendet, um es zu verarbeiten, also habe ich online nach Multiprozessen in Python gesucht

1. Lassen Sie uns zuerst über Queue (Warteschlangenobjekt) sprechen

Queue ist die Standardbibliothek in Python, die direkt importiert und zitiert werden kann Ich habe gehört, dass die berühmten Wörter „Zuerst essen, zuerst ziehen“ und „Zuerst essen, zuerst erbrechen“ tatsächlich die hier erwähnte Warteschlange sind. Beim Erstellen der Warteschlange können Sie deren Kapazität definieren. Essen Sie nicht zu viel. Wenn Sie zu viel essen , wird beim Erstellen kein Fehler gemeldet oder weniger als 1 geschrieben. Die Zahl bedeutet unendlich

Import Queue

q = Queue.Queue(10)

in die Warteschlange stellen Value(put)

q.put('yang')

q.put(4)

q.put(['yan','xing'])

Den Wert in der Warteschlange abrufen ()

Die Standardwarteschlange lautet „Wer zuerst reinkommt, mahlt zuerst“

>>> q.get()
'yang'
>>> ; q.get()
4
>>> q .get()
['yan', 'xing']

Wenn eine Warteschlange leer ist , wenn Sie get zum Abrufen verwenden, wird es blockiert, daher wird es im Allgemeinen beim Abrufen der Warteschlange

get_nowait()-Methode verwendet. Diese Methode löst eine leere Ausnahme aus, wenn ein Wert aus einer leeren Warteschlange abgerufen wird

Die gebräuchlichere Methode besteht also darin, zunächst festzustellen, ob eine Warteschlange leer ist. Wenn nicht, nehmen Sie den Wert, wenn sie leer ist.

Häufig verwendete Methode in Warteschlangen

Queue.qsize() gibt die Größe der Warteschlange zurück
Queue.empty() Wenn die Warteschlange leer ist, wird True zurückgegeben, andernfalls False
Queue.full() Wenn die Warteschlange voll ist, wird True zurückgegeben , andernfalls False
Queue.get([block[, timeout]]) Holen Sie sich die Warteschlange, Timeout-Wartezeit
Queue.get_nowait () Entspricht Queue.get(False)
Nicht blockierende Queue.put (item) In die Warteschlange schreiben, Timeout-Wartezeit
Queue.put_nowait(item) Äquivalent zu Queue.put(item, False)

2. Verwendung des Unterprozesskonzepts in der Mehrfachverarbeitung

aus Multiprocessing-Importprozess

Sie können einen Unterprozess durch Prozess erstellen

p = Prozess(target =fun,args=(args))

Dann verwenden Sie p.start(), um den untergeordneten Prozess zu starten

Dann verwenden Sie die p.join()-Methode, um die Ausführung des untergeordneten Prozesses zu beenden, bevor der übergeordnete Prozess ausgeführt wird

from multiprocessing import Process
import os
 
# 子进程要执行的代码
def run_proc(name):
 print 'Run child process %s (%s)...' % (name, os.getpid())
 
if __name__=='__main__':
 print 'Parent process %s.' % os.getpid()
 p = Process(target=run_proc, args=('test',))
 print 'Process will start.'
 p.start()
 p.join()
 print 'Process end.'
Nach dem Login kopieren

Warteschlange und Multiprozess in Python

3. Verwendung von Pool

in der Mehrfachverarbeitung Wenn Sie mehrere untergeordnete Prozesse benötigen, können Sie darüber nachdenken Verwenden eines Prozesspools (Pool) zur Verwaltung

aus dem Multiprocessing-Importpool

from multiprocessing import Pool
import os, time
 
def long_time_task(name):
 print 'Run task %s (%s)...' % (name, os.getpid())
 start = time.time()
 time.sleep(3)
 end = time.time()
 print 'Task %s runs %0.2f seconds.' % (name, (end - start))
 
if __name__=='__main__':
 print 'Parent process %s.' % os.getpid()
 p = Pool()
 for i in range(5):
  p.apply_async(long_time_task, args=(i,))
 print 'Waiting for all subprocesses done...'
 p.close()
 p.join()
 print 'All subprocesses done.'
Nach dem Login kopieren

Die Methode des Pools Das Erstellen von Unterprozessen unterscheidet sich vom Prozess. Es wird durch

p.apply_async(func,args=(args)) implementiert Wenn mein Computer beispielsweise über 4 CPUs verfügt, können die Unterprozesse task0, task1, task2 und task3 gleichzeitig gestartet werden, und task4 wird gestartet, nachdem der vorherige Prozess beendet wurde

Warteschlange und Multiprozess in Python

Das Ergebnis nach dem Ausführen des obigen Programms wird tatsächlich separat gemäß 1, 2 und 3 im obigen Bild ausgeführt. Drucken Sie zuerst 1, dann 2 nach 3 Sekunden und dann 3 nach 3 Sekunden

Code p.close() dient dazu, den Prozesspool zu schließen und ihm keine Prozesse mehr hinzuzufügen. Der Aufruf der Methode „join()“ für das Poolobjekt wartet darauf, dass alle untergeordneten Prozesse ausgeführt werden. ) muss vor dem Aufruf von join() aufgerufen werden. Nach dem Aufruf von close() können Sie keine weiteren Prozesse hinzufügen.

Sie können auch die Anzahl der Prozesse für einen Instanzpool zu diesem Zeitpunkt definieren

Wenn im obigen Code p=Pool(5) ist, können alle untergeordneten Prozesse gleichzeitig verarbeitet werden

3. Kommunikation zwischen mehreren Unterprozessen

Die Kommunikation zwischen mehreren Unterprozessen erfordert beispielsweise die Verwendung der im ersten Schritt erwähnten Warteschlange Die folgende Anforderung lautet: Ein Unterprozess schreibt Daten in die Warteschlange und ein anderer Prozess entnimmt Daten aus der Warteschlange:

#coding:gbk

from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
 for value in ['A', 'B', 'C']:
  print 'Put %s to queue...' % value
  q.put(value)
  time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
 while True:
  if not q.empty():
   value = q.get(True)
   print 'Get %s from queue.' % value
   time.sleep(random.random())
  else:
   break

if __name__=='__main__':
 # 父进程创建Queue,并传给各个子进程:
 q = Queue()
 pw = Process(target=write, args=(q,))
 pr = Process(target=read, args=(q,))
 # 启动子进程pw,写入:
 pw.start() 
 # 等待pw结束:
 pw.join()
 # 启动子进程pr,读取:
 pr.start()
 pr.join()
 # pr进程里是死循环,无法等待其结束,只能强行终止:
 print
 print '所有数据都写入并且读完'
Nach dem Login kopieren


4. Mehrere interessante Fragen zum obigen Code

if __name__=='__main__': 
 # 父进程创建Queue,并传给各个子进程:
 q = Queue()
 p = Pool()
 pw = p.apply_async(write,args=(q,)) 
 pr = p.apply_async(read,args=(q,))
 p.close()
 p.join()
 
 print
 print '所有数据都写入并且读完'
Nach dem Login kopieren

Wenn die Hauptfunktion wie im obigen Beispiel geschrieben ist, Was ich ursprünglich wollte, ist, dass ich eine Warteschlange bekomme, sie als Parameter an jeden untergeordneten Prozess im Prozesspool übergebe, aber

RuntimeError: Queue-Objekte sollten nur durch Vererbung zwischen Prozessen geteilt werden

< erhalten habe 🎜> Fehler, überprüft, die allgemeine Idee ist, dass das Warteschlangenobjekt nicht zwischen dem übergeordneten Prozess und dem untergeordneten Prozess kommunizieren kann. Wenn Sie die Warteschlange im Prozesspool verwenden möchten, müssen Sie die Manager-Klasse von Multiprozess verwenden

if __name__==&#39;__main__&#39;:
 manager = multiprocessing.Manager()
 # 父进程创建Queue,并传给各个子进程:
 q = manager.Queue()
 p = Pool()
 pw = p.apply_async(write,args=(q,))
 time.sleep(0.5)
 pr = p.apply_async(read,args=(q,))
 p.close()
 p.join()
 
 print
 print &#39;所有数据都写入并且读完&#39;
Nach dem Login kopieren

Auf diese Weise kann dieses Warteschlangenobjekt zwischen dem übergeordneten Prozess und dem untergeordneten Prozess kommunizieren. Wenn Sie keinen Pool verwenden, benötigen Sie keinen Manager . Sie können die Manager-Klasse in Zukunft im Multiprozess erweitern

关于锁的应用,在不同程序间如果有同时对同一个队列操作的时候,为了避免错误,可以在某个函数操作队列的时候给它加把锁,这样在同一个时间内则只能有一个子进程对队列进行操作,锁也要在manager对象中的锁

#coding:gbk
 
from multiprocessing import Process,Queue,Pool
import multiprocessing
import os, time, random
 
# 写数据进程执行的代码:
def write(q,lock):
 lock.acquire() #加上锁
 for value in [&#39;A&#39;, &#39;B&#39;, &#39;C&#39;]:
  print &#39;Put %s to queue...&#39; % value  
  q.put(value)  
 lock.release() #释放锁 
 
# 读数据进程执行的代码:
def read(q):
 while True:
  if not q.empty():
   value = q.get(False)
   print &#39;Get %s from queue.&#39; % value
   time.sleep(random.random())
  else:
   break
 
if __name__==&#39;__main__&#39;:
 manager = multiprocessing.Manager()
 # 父进程创建Queue,并传给各个子进程:
 q = manager.Queue()
 lock = manager.Lock() #初始化一把锁
 p = Pool()
 pw = p.apply_async(write,args=(q,lock)) 
 pr = p.apply_async(read,args=(q,))
 p.close()
 p.join()
 
 print
 print &#39;所有数据都写入并且读完&#39;
Nach dem Login kopieren

更多Warteschlange und Multiprozess in Python相关文章请关注PHP中文网!

Verwandte Etiketten:
Quelle:php.cn
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage