> 백엔드 개발 > 파이썬 튜토리얼 > 다중 처리를 사용하여 Python에서 프로세스 간 통신을 구현하는 방법은 무엇입니까?

다중 처리를 사용하여 Python에서 프로세스 간 통신을 구현하는 방법은 무엇입니까?

王林
풀어 주다: 2023-05-08 21:31:06
앞으로
2192명이 탐색했습니다.

    1. 프로세스 간 통신을 마스터해야 하는 이유

    Python의 멀티 스레드 코드 효율성은 GIL에 의해 제한되며 멀티 코어 CPU로 가속화할 수 없습니다. 그러나 멀티 프로세스 방법은 GIL을 우회하고 이점을 활용할 수 있습니다. 다중 CPU 가속으로 인해 프로그램 성능이 크게 향상되지만

    프로세스 간 통신은 고려해야 할 문제입니다. 프로세스는 스레드와 다릅니다. 프로세스는 자체 독립 메모리 공간을 가지며 전역 변수를 사용하여 프로세스 간에 데이터를 전송할 수 없습니다.

    다중 처리를 사용하여 Python에서 프로세스 간 통신을 구현하는 방법은 무엇입니까?

    실제 프로젝트 요구 사항에는 집약적인 컴퓨팅이나 실시간 작업이 있는 경우가 많습니다. 때로는 사진, 대형 개체 등과 같은 대량의 데이터를 프로세스 간에 전송해야 하는 경우도 있습니다. 파일 직렬화 또는 네트워크 인터페이스를 통해 실시간 요구 사항을 충족하기 어렵습니다. redis 또는 kaffka, RabbitMQ의 타사 메시지 대기열 패키지는 시스템을 다시 복잡하게 만듭니다.

    Python 다중 처리 모듈 자체는 메시지 메커니즘, 동기화 메커니즘, 공유 메모리

    등 매우 효율적인 프로세스 간 통신 방법을 다양하게 제공합니다. Python 프로세스 간 통신의 다양한 방법과 보안 메커니즘의 사용을 이해하고 익히면 프로그램 실행 성능을 크게 향상시키는 데 도움이 될 수 있습니다.

    2. 프로세스 간 다양한 통신 방법 소개

    프로세스 간 통신의 주요 방법을 요약하면

    다중 처리를 사용하여 Python에서 프로세스 간 통신을 구현하는 방법은 무엇입니까?

    프로세스 간 통신의 메모리 안전성에 대해

    메모리 안전성이란 여러 프로세스가 서로 경쟁할 수 있음을 의미합니다. 같은 이유로 공유 변수 예외는 우발적인 파괴 및 기타 이유로 발생합니다. Multiprocessing 모듈에서 제공하는 큐, 파이프, 잠금 및 이벤트 개체는 모두 프로세스 간 통신 보안 메커니즘을 구현했습니다.
    공유 메모리를 사용하여 통신하는 경우 이러한 공유 메모리 변수를 코드에서 직접 추적하고 파기해야 합니다. 그렇지 않으면 정상적으로 파기되거나 스크램블되지 않을 수 있습니다. 시스템 이상을 유발합니다. 개발자가 공유 메모리의 사용 특성을 명확히 알지 않는 한, 이 공유 메모리를 직접 사용하는 것이 아니라 Manager 관리자를 통해 공유 메모리를 사용하는 것을 권장합니다.

    Memory Manager Manager

    Multiprocessing은 프로세스 통신의 메모리 보안 문제를 균일하게 해결할 수 있는 메모리 관리자 Manager 클래스를 제공합니다. 목록, dict, Queue, Lock, Event, Shared를 포함한 다양한 공유 데이터를 관리자에 추가할 수 있습니다. 기억 등은 일률적으로 추적되어 파괴됩니다.
    3. 메시지 메커니즘 통신

    1) 파이프 통신 방법

    은 1의 단순 소켓 채널과 유사하며 양쪽 끝에서 메시지를 보내고 받을 수 있습니다.

    파이프 객체 생성 방법:

    parent_conn, child_conn = Pipe(duplex=True/False)
    로그인 후 복사

    매개변수 설명

      duplex=True, 파이프라인은 양방향 통신입니다
    • duplex=False, 파이프라인은 단방향 통신입니다. child_conn만 메시지를 보낼 수 있고 parent_conn은 메시지를 보낼 수 있습니다. 받기만 합니다.
    • 샘플 코드:
    from multiprocessing import Process, Pipe
       def myfunction(conn):
          conn.send(['hi!! I am Python'])
          conn.close()
    
    if __name__ == '__main__':
          parent_conn, child_conn = Pipe()
          p = Process(target=myfunction, args=(child_conn,))
          p.start()
      	print (parent_conn.recv() )
    	p.join()
    로그인 후 복사

    2) 메시지 큐 큐 통신 방법

    Multiprocessing의 Queue 클래스는 Python queue 3.0 버전에서 수정되어 생산자와 메시지 전달자 간의 데이터 전송을 쉽게 구현할 수 있으며 Multiprocessing Queue 모듈은 잠금 보안 메커니즘.

    다중 처리를 사용하여 Python에서 프로세스 간 통신을 구현하는 방법은 무엇입니까?

    Queue 모듈은 총 3가지 유형의 대기열을 제공합니다.

    (1) FIFO 큐, 선입선출,

    class queue.Queue(maxsize=0)
    로그인 후 복사

    (2) LIFO 큐, 후입선출, 실제로는 스택

    class queue.LifoQueue(maxsize=0)
    로그인 후 복사

    (3) 우선순위 큐를 사용하면 우선순위가 가장 낮은 항목 값이 큐에 들어갑니다. 먼저

    class queue.PriorityQueue(maxsize=0)
    로그인 후 복사

    Multiprocessing.Queue 클래스의 주요 메서드:

    methodqueue.qsize()queue.full()queue.empty()queue.put(item)queue .get()queue.put_nowait(item), queue.get_nowait()

    说明:

    • put(), get() 是阻塞方法, 而put_notwait(), get_nowait()是非阻塞方法。

    • Multiprocessing 的Queue类没有提供Task_done, join方法

    Queue模块的其它队列类:
    (1) SimpleQueue
    简洁版的FIFO队列, 适事简单场景使用

    (2) JoinableQueue子类
    Python 3.5 后新增的 Queue的子类,拥有 task_done(), join() 方法

    • task_done()表示,最近读出的1个任务已经完成。

    • join()阻塞队列,直到queue中的所有任务都已完成。

    producer – consumer 场景,使用Queue的示例

    import multiprocessing
    
    def producer(numbers, q):
        for x in numbers:
            if x % 2 == 0:
                if q.full():
                    print("queue is full")
                    break
                q.put(x)
                print(f"put {x} in queue by producer")
        return None
    
    def consumer(q):
        while not q.empty():
            print(f"take data {q.get()} from queue by consumer")
        return None
    
    if __name__ == "__main__":
        # 设置1个queue对象,最大长度为5
        qu = multiprocessing.Queue(maxsize=5,) 
    
        # 创建producer子进程,把queue做为其中1个参数传给它,该进程负责写
        p5 = multiprocessing.Process(
            name="producer-1",
            target=producer,
            args=([random.randint(1, 100) for i in range(0, 10)], qu)
        )
        p5.start()
        p5.join()
        #创建consumer子进程,把queue做为1个参数传给它,该进程中队列中读
        p6 = multiprocessing.Process(
            name="consumer-1",
            target=consumer,
            args=(qu,)
        )
        p6.start()
        p6.join()
    
        print(qu.qsize())
    로그인 후 복사

    4、同步机制通信

    (1) 进程间同步锁 – Lock

    Multiprocessing也提供了与threading 类似的同步锁机制,确保某个时刻只有1个子进程可以访问某个资源或执行某项任务, 以避免同抢。

    例如:多个子进程同时访问数据库表时,如果没有同步锁,用户A修改1条数据后,还未提交,此时,用户B也进行了修改,可以预见,用户A提交的将是B个修改的数据。

    添加了同步锁,可以确保同时只有1个子进程能够进行写入数据库与提交操作。

    如下面的示例,同时只有1个进程可以执行打印操作。

    from multiprocessing import Process, Lock
    
    def f(l, i):
        l.acquire()
        try:
            print('hello world', i)
        finally:
            l.release()
    
    if __name__ == '__main__':
        lock = Lock()
    
        for num in range(10):
            Process(target=f, args=(lock, num)).start()
    로그인 후 복사

    (2) 子进程间协调机制 – Event

    Event 机制的工作原理:

    1个event 对象实例管理着1个 flag标记, 可以用set()方法将其置为true, 用clear()方法将其置为false, 使用wait()将阻塞当前子进程,直至flag被置为true.
    这样由1个进程通过event flag 就可以控制、协调各子进程运行。

    Event object的使用方法:
    1)主函数: 创建1个event 对象, flag = multiprocessing.Event() , 做为参数传给各子进程
    2) 子进程A: 不受event影响,通过event 控制其它进程的运行
    o 先clear(),将event 置为False, 占用运行权.
    o 完成工作后,用set()把flag置为True。
    3) 子进程B, C: 受event 影响
    o 设置 wait() 状态,暂停运行
    o 直到flag重新变为True,恢复运行

    主要方法:

    • set(), clear()设置 True/False,

    • wait() 使进程等待,直到flag被改为true.

    • is_set(), Return True if and only if the internal flag is true.

    验证进程间通信 – Event

    import multiprocessing
    import time
    import random
    
    def joo_a(q, ev):
        print("subprocess joo_a start")
        if not ev.is_set():
            ev.wait()
        q.put(random.randint(1, 100))
        print("subprocess joo_a ended")
    
    def joo_b(q, ev):
        print("subprocess joo_b start")
        ev.clear()
        time.sleep(2)
        q.put(random.randint(200, 300))
        ev.set()
        print("subprocess joo_b ended")
    
    def main_event():
        qu = multiprocessing.Queue()
        ev = multiprocessing.Event()
        sub_a = multiprocessing.Process(target=joo_a, args=(qu, ev))
        sub_b = multiprocessing.Process(target=joo_b, args=(qu, ev,))
        sub_a.start()
        sub_b.start()
        # ev.set()
        sub_a.join()
        sub_b.join()
        while not qu.empty():
            print(qu.get())
    
    if __name__ == "__main__":
        main_event()
    로그인 후 복사

    5、共享内存方式通信

    (1) 共享变量

    子进程之间共存内存变量,要用 multiprocessing.Value(), Array() 来定义变量。 实际上是ctypes 类型,由multiprocessing.sharedctypes模块提供相关功能

    注意 使用 share memory 要考虑同抢等问题,释放等问题,需要手工实现。因此在使用共享变量时,建议使用Manager管程来管理这些共享变量。

    def  func(num):
        num.value=10.78   #子进程改变数值的值,主进程跟着改变
     
    if  __name__=="__main__":
    num = multiprocessing.Value("d", 10.0) 
    # d表示数值,主进程与子进程可共享这个变量。
    
        p=multiprocessing.Process(target=func,args=(num,))
        p.start()
        p.join()
     
        print(num.value)
    로그인 후 복사

    进程之间共享数据(数组型):

    import multiprocessing
     
    def  func(num):
        num[2]=9999   #子进程改变数组,主进程跟着改变
     
    if  __name__=="__main__":
        num=multiprocessing.Array("i",[1,2,3,4,5])   
    
        p=multiprocessing.Process(target=func,args=(num,))
        p.start() 
        p.join()
     
        print(num[:])
    로그인 후 복사

    (2) 共享内存 Shared_memory

    如果进程间需要共享对象数据,或共享内容,数据较大,multiprocessing 提供了SharedMemory类来实现进程间实时通信,不需要通过发消息,读写磁盘文件来实现,速度更快。
    注意:直接使用SharedMemory 存在着同抢、泄露隐患,应通过SharedMemory Manager 管程类来使用, 以确保内存安全。

    创建共享内存区:

    multiprocessing.shared_memory.SharedMemory(name=none, create=False, size=0)
    로그인 후 복사

    方法:
    父进程创建shared_memory 后,子进程可以使用它,当不再需要后,使用close(), 删除使用unlink()方法
    相关属性:
    获取内存区内容: shm.buf
    获取内存区名称: shm.name
    获取内存区字节数: shm.size

    示例:

    >>> from multiprocessing import shared_memory
    >>> shm_a = shared_memory.SharedMemory(create=True, size=10)
    >>> type(shm_a.buf)
    <class &#39;memoryview&#39;>
    >>> buffer = shm_a.buf
    >>> len(buffer)
    10
    >>> buffer[:4] = bytearray([22, 33, 44, 55])  # Modify multiple at once
    >>> buffer[4] = 100                           # Modify single byte at a time
    >>> # Attach to an existing shared memory block
    >>> shm_b = shared_memory.SharedMemory(shm_a.name)
    >>> import array
    >>> array.array(&#39;b&#39;, shm_b.buf[:5])  # Copy the data into a new array.array
    array(&#39;b&#39;, [22, 33, 44, 55, 100])
    >>> shm_b.buf[:5] = b&#39;howdy&#39;  # Modify via shm_b using bytes
    >>> bytes(shm_a.buf[:5])      # Access via shm_a
    b&#39;howdy&#39;
    >>> shm_b.close()   # Close each SharedMemory instance
    >>> shm_a.close()
    >>> shm_a.unlink()  # Call unlink only once to release the shared memory
    로그인 후 복사

    3) ShareableList 共享列表

    sharedMemory类还提供了1个共享列表类型,这样就更方便了,进程间可以直接共享python强大的列表
    构建方法:
    multiprocessing.shared_memory.ShareableList(sequence=None, *, name=None)

    from multiprocessing import shared_memory
    >>> a = shared_memory.ShareableList([&#39;howdy&#39;, b&#39;HoWdY&#39;, -273.154, 100, None, True, 42])
    >>> [ type(entry) for entry in a ]
    [<class &#39;str&#39;>, <class &#39;bytes&#39;>, <class &#39;float&#39;>, <class &#39;int&#39;>, <class &#39;NoneType&#39;>, <class &#39;bool&#39;>, <class &#39;int&#39;>]
    >>> a[2]
    -273.154
    >>> a[2] = -78.5
    >>> a[2]
    -78.5
    >>> a[2] = &#39;dry ice&#39;  # Changing data types is supported as well
    >>> a[2]
    &#39;dry ice&#39;
    >>> a[2] = &#39;larger than previously allocated storage space&#39;
    Traceback (most recent call last):
      ...
    ValueError: exceeds available storage for existing str
    >>> a[2]
    &#39;dry ice&#39;
    >>> len(a)
    7
    >>> a.index(42)
    6
    >>> a.count(b&#39;howdy&#39;)
    0
    >>> a.count(b&#39;HoWdY&#39;)
    1
    >>> a.shm.close()
    >>> a.shm.unlink()
    >>> del a  # Use of a ShareableList after call to unlink() is unsupported
    
    
    b = shared_memory.ShareableList(range(5))         # In a first process
    >>> c = shared_memory.ShareableList(name=b.shm.name)  # In a second process
    >>> c
    ShareableList([0, 1, 2, 3, 4], name=&#39;...&#39;)
    >>> c[-1] = -999
    >>> b[-1]
    -999
    >>> b.shm.close()
    >>> c.shm.close()
    >>> c.shm.unlink()
    로그인 후 복사

    6、共享内存管理器Manager

    Multiprocessing 提供了 Manager 内存管理器类,当调用1个Manager实例对象的start()方法时,会创建1个manager进程,其唯一目的就是管理共享内存, 避免出现进程间共享数据不同步,内存泄露等现象。

    其原理如下:

    다중 처리를 사용하여 Python에서 프로세스 간 통신을 구현하는 방법은 무엇입니까?

    Manager管理器相当于提供了1个共享内存的服务,不仅可以被主进程创建的多个子进程使用,还可以被其它进程访问,甚至跨网络访问。本文仅聚焦于由单一主进程创建的各进程之间的通信。

    1) Manager的主要数据结构

    相关类:multiprocessing.Manager
    子类有:

    • multiprocessing.managers.SharedMemoryManager

    • multiprocessing.managers.BaseManager

    支持共享变量类型:

    • python基本类型 int, str, list, tuple, list

    • 进程通信对象: Queue, Lock, Event,

    • Condition, Semaphore, Barrier ctypes类型: Value, Array

    2) 使用步骤

    1)创建管理器对象

    snm = Manager()
    snm = SharedMemoryManager()
    로그인 후 복사

    2)创建共享内存变量
    新建list, dict

    sl = snm.list(), snm.dict()
    로그인 후 복사

    新建1块bytes共享内存变量,需要指定大小

    sx = snm.SharedMemory(size)
    로그인 후 복사

    新建1个共享列表变量,可用列表来初始化

    sl = snm.ShareableList(sequence) 如
    sl = smm.ShareableList([‘howdy&#39;, b&#39;HoWdY&#39;, -273.154, 100, True])
    로그인 후 복사

    新建1个queue, 使用multiprocessing 的Queue类型

    snm = Manager()
    q = snm.Queue()
    로그인 후 복사

    示例 :

    from multiprocessing import Process, Manager
    
    def f(d, l):
        d[1] = &#39;1&#39;
        d[&#39;2&#39;] = 2
        d[0.25] = None
        l.reverse()
    
    if __name__ == &#39;__main__&#39;:
        with Manager() as manager:
            d = manager.dict()
            l = manager.list(range(10))
    
            p = Process(target=f, args=(d, l))
            p.start()
            p.join()
    
            print(d)
            print(l)
    로그인 후 복사

    将打印

    {0.25: None, 1: '1', '2': 2}
    [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

    3) 销毁共享内存变量

    方法一:
    调用snm.shutdown()方法,会自动调用每个内存块的unlink()方法释放内存。或者 snm.close()
    方法二
    使用with语句,结束后会自动释放所有manager变量

    >>> with SharedMemoryManager() as smm:
    ...     sl = smm.ShareableList(range(2000))
    ...     # Divide the work among two processes, storing partial results in sl
    ...     p1 = Process(target=do_work, args=(sl, 0, 1000))
    ...     p2 = Process(target=do_work, args=(sl, 1000, 2000))
    ...     p1.start()
    ...     p2.start()  # A multiprocessing.Pool might be more efficient
    ...     p1.join()
    ...     p2.join()   # Wait for all work to complete in both processes
    ...     total_result = sum(sl)  # Consolidate the partial results now in sl
    로그인 후 복사

    4) 向管理器注册自定义类型

    managers的子类BaseManager提供register()方法,支持注册自定义数据类型。如下例,注册1个自定义MathsClass类,并生成实例。

    from multiprocessing.managers import BaseManager
    
    class MathsClass:
        def add(self, x, y):
            return x + y
        def mul(self, x, y):
            return x * y
    
    class MyManager(BaseManager):
        pass
    
    MyManager.register(&#39;Maths&#39;, MathsClass)
    
    if __name__ == &#39;__main__&#39;:
        with MyManager() as manager:
            maths = manager.Maths()
            print(maths.add(4, 3))         # prints 7
            print(maths.mul(7, 8))
    로그인 후 복사

    위 내용은 다중 처리를 사용하여 Python에서 프로세스 간 통신을 구현하는 방법은 무엇입니까?의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

    관련 라벨:
    원천:yisu.com
    본 웹사이트의 성명
    본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
    인기 튜토리얼
    더>
    최신 다운로드
    더>
    웹 효과
    웹사이트 소스 코드
    웹사이트 자료
    프론트엔드 템플릿
    Description
    큐 길이를 반환합니다
    queue 가득 차면 True를 반환하고, 그렇지 않으면 False를 반환합니다.
    대기열이 비어 있으면 True를 반환하고, 그렇지 않으면 False를 반환
    에 데이터를 씁니다. queue
    큐에서 데이터를 던집니다.
    쓰거나 던질 때까지 기다리는 시간이 없습니다