여기 커뮤니케이션 메커니즘의 예가 있습니다. 우리는 모두 커뮤니케이션이라는 단어에 익숙합니다. 예를 들어, 사람이 여자친구에게 전화하고 싶어합니다. 호출이 설정되면 암시적 대기열(이 용어에 유의)이 형성됩니다. 이때 이 사람은 계속해서 여자친구에게 대화를 통해 정보를 전할 예정이고, 이 사람의 여자친구도 듣고 있다. 아마 대부분의 경우 그 반대일 것이라고 생각합니다.
둘은 두 가지 프로세스로 비유할 수 있습니다. "이 사람"의 프로세스는 "여자 친구"의 프로세스에 정보를 보내야 하므로 대기열의 도움이 필요합니다. 여자친구는 항상 대기열에 있는 정보를 받아야 하기 때문에 동시에 다른 작업을 수행할 수 있습니다. 이는 두 프로세스 간의 통신이 주로 대기열에 의존한다는 것을 의미합니다.
이 대기열은 메시지 보내기 및 받기를 지원할 수 있습니다. "이 사람"은 메시지 보내기를 담당하고 "여자 친구"는 메시지 받기를 담당합니다.
큐가 중심이므로 큐를 생성하는 방법을 살펴보겠습니다.
는 여전히 다중 처리 모듈을 사용하고 이 모듈의 Queue 함수를 호출하여 대기열을 생성합니다.
함수 이름 | 소개 | Parameters | 반환 값 |
---|---|---|---|
Queue | 큐 생성 | mac_count | 큐 객체 |
Queue 함수 기능 소개: Queue를 호출하면 대기열을 생성할 수 있습니다. ; 대기열에 생성될 수 있는 최대 메시지 수를 나타내는 매개변수 mac_count가 있습니다. 전달되지 않으면 기본 길이는 무제한입니다. 큐 객체를 인스턴스화한 후에는 큐 객체를 조작하여 데이터를 넣고 빼야 합니다.
함수 이름 | 소개 | Parameters | 반환 값 |
---|---|---|---|
put | 메시지를 대기열에 넣기 | message | 없음 |
get | 대기열 메시지 받기 | None | str |
Put 기능 소개: 데이터를 전달합니다. 여기에는 문자열 유형인 하나의 매개변수 메시지가 있습니다.
get 함수 소개: 대기열에 있는 데이터를 수신하는 데 사용됩니다. (실제로 이것은 일반적인 json 시나리오입니다. 많은 데이터 전송이 문자열로 이루어집니다. 대기열의 삽입과 검색은 문자열을 사용하므로 json은 이 시나리오에 매우 적합합니다.)
다음으로 대기열 사용법을 연습해 보겠습니다.
코드 예제는 다음과 같습니다.
# coding:utf-8 import json import multiprocessing class Work(object): # 定义一个 Work 类 def __init__(self, queue): # 构造函数传入一个 '队列对象' --> queue self.queue = queue def send(self, message): # 定义一个 send(发送) 函数,传入 message # [这里有个隐藏的bug,就是只判断了传入的是否字符串类型;如果传入的是函数、类、集合等依然会报错] if not isinstance(message, str): # 判断传入的 message 是否为字符串,若不是,则进行 json 序列化 message = json.dumps(message) self.queue.put(message) # 利用 queue 的队列实例化对象将 message 发送出去 def receive(self): # 定义一个 receive(接收) 函数,不需传入参数,但是因为接收是一个源源不断的过程,所以需要使用 while 循环 while 1: result = self.queue.get() # 获取 '队列对象' --> queue 传入的message # 由于我们接收的 message 可能不是一个字符串,所以要进程异常的捕获 try: # 如果传入的 message 符合 JSON 格式将赋值给 res ;若不符合,则直接使用 result 赋值 res res = json.loads(result) except: res = result print('接收到的信息为:{}'.format(res)) if __name__ == '__main__': queue = multiprocessing.Queue() work = Work(queue) send = multiprocessing.Process(target=work.send, args=({'message': '这是一条测试的消息'},)) receive = multiprocessing.Process(target=work.receive) send.start() receive.start()
프로세스 간 통신을 설정하기 위해 큐를 사용할 때 예외가 발생했습니다.
그러나 여기에는 아래와 같이 오류가 표시됩니다.
예 오류 스크린샷은 다음과 같습니다.
여기서 오류 메시지는 파일을 찾을 수 없다는 의미입니다. 실제로 큐를 사용하여 put() 및 get()을 수행하면 보이지 않는 잠금이 추가되는데, 이는 위 그림의 원 안에 있는 .SemLock입니다. 이 오류의 구체적인 원인에 대해 신경 쓸 필요는 없습니다. 이 문제를 해결하는 것은 실제로 매우 간단합니다.
FileNotFoundError: [Errno 2] 해당 파일 또는 디렉터리 예외 솔루션이 없습니다
프로세스를 차단해야 하는 것은 전송 또는 수신 하위 프로세스 중 하나만 차단하는 것입니다. 이는 이론적인 상황입니다. 그러나 우리의 수신 하위 프로세스는 항상 실행되는 while 루프이므로 전송 하위 프로세스에 조인만 추가하면 됩니다.
해결 다이어그램은 다음과 같습니다.
PS: 오류 문제는 해결되었으나 프로그램이 정상적으로 종료되지 않았습니다.
사실 우리의 수신 프로세스는 while 루프이기 때문에 언제 처리될지 알 수 없고 즉시 종료할 방법도 없습니다. 따라서 수신측을 종료하려면 수신 프로세스에서 quit() 함수를 사용해야 합니다.
실행 결과는 다음과 같습니다.
새 함수를 만들고 for 루프에 작성하여 일괄적으로 보낼 메시지 추가를 시뮬레이션합니다.
그런 다음 스레드를 추가합니다. 일괄적으로 데이터 전송을 시뮬레이션하는 이 함수에 추가됩니다.
샘플 코드는 다음과 같습니다.
# coding:utf-8 import json import time import multiprocessing class Work(object): # 定义一个 Work 类 def __init__(self, queue): # 构造函数传入一个 '队列对象' --> queue self.queue = queue def send(self, message): # 定义一个 send(发送) 函数,传入 message # [这里有个隐藏的bug,就是只判断了传入的是否字符串类型;如果传入的是函数、类、集合等依然会报错] if not isinstance(message, str): # 判断传入的 message 是否为字符串,若不是,则进行 json 序列化 message = json.dumps(message) self.queue.put(message) # 利用 queue 的队列实例化对象将 message 发送出去 def send_all(self): # 定义一个 send_all(发送)函数,然后通过for循环模拟批量发送的 message for i in range(20): self.queue.put('第 {} 次循环,发送的消息为:{}'.format(i, i)) time.sleep(1) def receive(self): # 定义一个 receive(接收) 函数,不需传入参数,但是因为接收是一个源源不断的过程,所以需要使用 while 循环 while 1: result = self.queue.get() # 获取 '队列对象' --> queue 传入的message # 由于我们接收的 message 可能不是一个字符串,所以要进程异常的捕获 try: # 如果传入的 message 符合 JSON 格式将赋值给 res ;若不符合,则直接使用 result 赋值 res res = json.loads(result) except: res = result print('接收到的信息为:{}'.format(res)) if __name__ == '__main__': queue = multiprocessing.Queue() work = Work(queue) send = multiprocessing.Process(target=work.send, args=({'message': '这是一条测试的消息'},)) receive = multiprocessing.Process(target=work.receive) send_all = multiprocessing.Process(target=work.send_all,) send_all.start() # 这里因为 send 只执行了1次,然后就结束了。而 send_all 却要循环20次,它的执行时间是最长的,信息也是发送的最多的 send.start() receive.start() # send.join() # 使用 send 的阻塞会造成 send_all 循环还未结束 ,receive.terminate() 函数接收端就会终结。 send_all.join() # 所以我们只需要阻塞最长使用率的进程就可以了 receive.terminate()
실행 결과는 다음과 같습니다.
위 그림에서 send 및 send_all 프로세스 모두 대기열에 의해 인스턴스화된 Queue 객체를 통해 메시지를 보낼 수 있다는 것을 알 수 있습니다. 동일한 수신 기능 두 프로세스에 의해 전달된 메시지도 인쇄됩니다.
이 장에서는 대기열을 성공적으로 사용하여 프로세스 간 통신을 달성하고 대기열 작업 기술도 마스터했습니다. 대기열에서 한쪽 끝(여기서는 send 끝을 보여줍니다)은 put 메서드를 통해 관련 정보를 추가하고, 다른 쪽 끝은 get 메서드를 사용하여 관련 정보를 얻습니다. 두 프로세스는 서로 협력하여 하나의 프로세스 효과를 얻습니다. 의사소통.
큐 외에도 프로세스는 파이프, 세마포어 및 공유 메모리를 사용하여 통신할 수도 있습니다. 관심이 있는 경우 이러한 방법에 대해 알아볼 수 있습니다. 직접 확장할 수 있습니다.
Python은 신호, 파이프, 메시지 큐, 세마포어, 공유 메모리, 소켓 등을 포함한 다양한 프로세스 통신 방법을 제공합니다.
주된 두 가지 방법은 큐와 파이프입니다. . 큐 여러 프로세스 간의 통신을 구현하는 데 사용되는 파이프는 두 프로세스 간의 통신입니다.
1. 파이프: 익명 파이프와 명명된 파이프로 구분
익명 파이프: 커널에 고정 크기 버퍼를 적용합니다. 일반적으로 fock 함수는 간의 통신을 구현하는 데 사용됩니다. 부모 및 자식 프로세스
명명된 파이프: 메모리에 고정 크기 버퍼를 적용합니다. 이 프로그램은 혈연 관계가 없는 프로세스도 프로세스 간에 통신할 수 있습니다.
특징: 바이트 스트림을 지향합니다. 수명 주기는 커널을 따릅니다. 동기식 상호 배제 메커니즘, 반이중, 단방향 통신, 두 개의 파이프가 양방향 통신을 실현합니다.
재작성하는 한 가지 방법은 운영 체제 커널에 대기열을 설정하는 것입니다. 여러 데이터그램 요소와 여러 프로세스가 특정 핸들을 통해 대기열에 액세스할 수 있습니다. 메시지 큐를 사용하여 한 프로세스에서 다른 프로세스로 데이터를 보낼 수 있습니다. 각 데이터 블록은 유형이 있는 것으로 간주되며, 수신자 프로세스에서 수신한 데이터 블록은 서로 다른 유형을 가질 수 있습니다. 메시지 대기열에도 파이프와 동일한 단점이 있습니다. 즉, 각 메시지의 최대 길이에 상한이 있고, 각 메시지 대기열의 총 바이트 수에 상한이 있으며, 메시지 대기열에도 상한이 있습니다. 시스템의 총 메시지 대기열 수
기능: 메시지 대기열 전역 연결 목록으로 간주할 수 있습니다. 데이터그램의 유형과 내용은 메시지 대기열의 식별자로 표시된 연결 목록 노드에 저장됩니다. ; 메시지 큐는 하나 이상의 프로세스가 메시지를 쓰거나 읽을 수 있도록 합니다. 이 주기는 커널을 따르며 양방향 통신을 달성할 수 있습니다.
3. 세마포어: 커널에서 세마포어 컬렉션(기본적으로 배열)을 만듭니다. 배열의 요소(세마포어)는 모두 1입니다. -1을 수행하려면 P 연산을 사용하고 +1
P에는 V 연산을 사용합니다. (sv) : sv 값이 0보다 크면 1만큼 감소시키고, 값이 0이면 프로그램 실행을 일시 중지합니다.
V(sv): sv를 기다리며 다른 프로세스가 일시 중지되면 다시 시작합니다. sv 대기로 인해 프로세스가 중단되지 않으면 1을 추가하세요.
PV 작업은 동일한 프로세스에 사용되어 상호 배제를 달성합니다. PV 작업은 동기화를 달성하기 위해 다른 프로세스에 사용됩니다.
기능: 중요한 리소스를 보호합니다
4. 공유 메모리: 동일한 물리적 메모리 조각을 서로 다른 프로세스의 가상 주소 공간에 매핑하여 서로 다른 프로세스 간에 동일한 리소스를 공유합니다. 프로세스 간 통신 방식으로 볼 때, 공유 메모리는 IPC 중 가장 유용하고 빠른 형태라고 할 수 있습니다
특징: 사용자 모드에서 커널 모드로 데이터를 자주 전환하고 복사하는 것과 달리, 메모리에서 직접 읽기만 하면 됩니다. ; 메모리 공유는 중요한 리소스이므로 작업은 원자적이어야 합니다. 세마포어나 뮤텍스를 사용할 수 있습니다.
위 내용은 Python 프로세스 간의 통신 방법은 무엇입니까?의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!