Python中的并发编程实例
一、简介
我们将一个正在运行的程序称为进程。每个进程都有它自己的系统状态,包含内存状态、打开文件列表、追踪指令执行情况的程序指针以及一个保存局部变量的调用栈。通常情况下,一个进程依照一个单序列控制流顺序执行,这个控制流被称为该进程的主线程。在任何给定的时刻,一个程序只做一件事情。
一个程序可以通过Python库函数中的os或subprocess模块创建新进程(例如os.fork()或是subprocess.Popen())。然而,这些被称为子进程的进程却是独立运行的,它们有各自独立的系统状态以及主线程。因为进程之间是相互独立的,因此它们同原有的进程并发执行。这是指原进程可以在创建子进程后去执行其它工作。
虽然进程之间是相互独立的,但是它们能够通过名为进程间通信(IPC)的机制进行相互通信。一个典型的模式是基于消息传递,可以将其简单地理解为一个纯字节的缓冲区,而send()或recv()操作原语可以通过诸如管道(pipe)或是网络套接字(network socket)等I/O通道传输或接收消息。还有一些IPC模式可以通过内存映射(memory-mapped)机制完成(例如mmap模块),通过内存映射,进程可以在内存中创建共享区域,而对这些区域的修改对所有的进程可见。
多进程能够被用于需要同时执行多个任务的场景,由不同的进程负责任务的不同部分。然而,另一种将工作细分到任务的方法是使用线程。同进程类似,线程也有其自己的控制流以及执行栈,但线程在创建它的进程之内运行,分享其父进程的所有数据和系统资源。当应用需要完成并发任务的时候线程是很有用的,但是潜在的问题是任务间必须分享大量的系统状态。
当使用多进程或多线程时,操作系统负责调度。这是通过给每个进程(或线程)一个很小的时间片并且在所有活动任务之间快速循环切换来实现的,这个过程将CPU时间分割为小片段分给各个任务。例如,如果你的系统中有10个活跃的进程正在执行,操作系统将会适当的将十分之一的CPU时间分配给每个进程并且循环地在十个进程之间切换。当系统不止有一个CPU核时,操作系统能够将进程调度到不同的CPU核上,保持系统负载平均以实现并行执行。
利用并发执行机制写的程序需要考虑一些复杂的问题。复杂性的主要来源是关于同步和共享数据的问题。通常情况下,多个任务同时试图更新同一个数据结构会造成脏数据和程序状态不一致的问题(正式的说法是资源竞争的问题)。为了解决这个问题,需要使用互斥锁或是其他相似的同步原语来标识并保护程序中的关键部分。举个例子,如果多个不同的线程正在试图同时向同一个文件写入数据,那么你需要一个互斥锁使这些写操作依次执行,当一个线程在写入时,其他线程必须等待直到当前线程释放这个资源。
Python中的并发编程
Python长久以来一直支持不同方式的并发编程,包括线程、子进程以及其他利用生成器(generator function)的并发实现。
Python在大部分系统上同时支持消息传递和基于线程的并发编程机制。虽然大部分程序员对线程接口更为熟悉,但是Python的线程机制却有着诸多的限制。Python使用了内部全局解释器锁(GIL)来保证线程安全,GIL同时只允许一个线程执行。这使得Python程序就算在多核系统上也只能在单个处理器上运行。Python界关于GIL的争论尽管很多,但在可预见的未来却没有将其移除的可能。
Python提供了一些很精巧的工具用于管理基于线程和进程的并发操作。即使是简单地程序也能够使用这些工具使得任务并发进行从而加快运行速度。subprocess模块为子进程的创建和通信提供了API。这特别适合运行与文本相关的程序,因为这些API支持通过新进程的标准输入输出通道传送数据。signal模块将UNIX系统的信号量机制暴露给用户,用以在进程之间传递事件信息。信号是异步处理的,通常有信号到来时会中断程序当前的工作。信号机制能够实现粗粒度的消息传递系统,但是有其他更可靠的进程内通讯技术能够传递更复杂的消息。threading模块为并发操作提供了一系列高级的,面向对象的API。Thread对象们在一个进程内并发地运行,分享内存资源。使用线程能够更好地扩展I/O密集型的任务。multiprocessing模块同threading模块类似,不过它提供了对于进程的操作。每个进程类是真实的操作系统进程,并且没有共享内存资源,但multiprocessing模块提供了进程间共享数据以及传递消息的机制。通常情况下,将基于线程的程序改为基于进程的很简单,只需要修改一些import声明即可。
Threading模块示例
以threading模块为例,思考这样一个简单的问题:如何使用分段并行的方式完成一个大数的累加。
import threading class SummingThread(threading.Thread): def __init__(self, low, high): super(SummingThread, self).__init__() self.low = low self.high = high self.total = 0 def run(self): for i in range(self.low, self.high): self.total += i thread1 = SummingThread(0, 500000) thread2 = SummingThread(500000, 1000000) thread1.start() # This actually causes the thread to run thread2.start() thread1.join() # This waits until the thread has completed thread2.join() # At this point, both threads have completed result = thread1.total + thread2.total print(result)
自定义Threading类库
我写了一个易于使用threads的小型Python类库,包含了一些有用的类和函数。
关键参数:
* do_threaded_work – 该函数将一系列给定的任务分配给对应的处理函数(分配顺序不确定)
* ThreadedWorker – 该类创建一个线程,它将从一个同步的工作队列中拉取工作任务并将处理结果写入同步结果队列
* start_logging_with_thread_info – 将线程id写入所有日志消息。(依赖日志环境)
* stop_logging_with_thread_info – 用于将线程id从所有的日志消息中移除。(依赖日志环境)
import threading import logging def do_threaded_work(work_items, work_func, num_threads=None, per_sync_timeout=1, preserve_result_ordering=True): """ Executes work_func on each work_item. Note: Execution order is not preserved, but output ordering is (optionally). Parameters: - num_threads Default: len(work_items) --- Number of threads to use process items in work_items. - per_sync_timeout Default: 1 --- Each synchronized operation can optionally timeout. - preserve_result_ordering Default: True --- Reorders result_item to match original work_items ordering. Return: --- list of results from applying work_func to each work_item. Order is optionally preserved. Example: def process_url(url): # TODO: Do some work with the url return url urls_to_process = ["http://url1.com", "http://url2.com", "http://site1.com", "http://site2.com"] # process urls in parallel result_items = do_threaded_work(urls_to_process, process_url) # print(results) print(repr(result_items)) """ global wrapped_work_func if not num_threads: num_threads = len(work_items) work_queue = Queue.Queue() result_queue = Queue.Queue() index = 0 for work_item in work_items: if preserve_result_ordering: work_queue.put((index, work_item)) else: work_queue.put(work_item) index += 1 if preserve_result_ordering: wrapped_work_func = lambda work_item: (work_item[0], work_func(work_item[1])) start_logging_with_thread_info() #spawn a pool of threads, and pass them queue instance for _ in range(num_threads): if preserve_result_ordering: t = ThreadedWorker(work_queue, result_queue, work_func=wrapped_work_func, queue_timeout=per_sync_timeout) else: t = ThreadedWorker(work_queue, result_queue, work_func=work_func, queue_timeout=per_sync_timeout) t.setDaemon(True) t.start() work_queue.join() stop_logging_with_thread_info() logging.info('work_queue joined') result_items = [] while not result_queue.empty(): result = result_queue.get(timeout=per_sync_timeout) logging.info('found result[:500]: ' + repr(result)[:500]) if result: result_items.append(result) if preserve_result_ordering: result_items = [work_item for index, work_item in result_items] return result_items class ThreadedWorker(threading.Thread): """ Generic Threaded Worker Input to work_func: item from work_queue Example usage: import Queue urls_to_process = ["http://url1.com", "http://url2.com", "http://site1.com", "http://site2.com"] work_queue = Queue.Queue() result_queue = Queue.Queue() def process_url(url): # TODO: Do some work with the url return url def main(): # spawn a pool of threads, and pass them queue instance for i in range(3): t = ThreadedWorker(work_queue, result_queue, work_func=process_url) t.setDaemon(True) t.start() # populate queue with data for url in urls_to_process: work_queue.put(url) # wait on the queue until everything has been processed work_queue.join() # print results print repr(result_queue) main() """ def __init__(self, work_queue, result_queue, work_func, stop_when_work_queue_empty=True, queue_timeout=1): threading.Thread.__init__(self) self.work_queue = work_queue self.result_queue = result_queue self.work_func = work_func self.stop_when_work_queue_empty = stop_when_work_queue_empty self.queue_timeout = queue_timeout def should_continue_running(self): if self.stop_when_work_queue_empty: return not self.work_queue.empty() else: return True def run(self): while self.should_continue_running(): try: # grabs item from work_queue work_item = self.work_queue.get(timeout=self.queue_timeout) # works on item work_result = self.work_func(work_item) #place work_result into result_queue self.result_queue.put(work_result, timeout=self.queue_timeout) except Queue.Empty: logging.warning('ThreadedWorker Queue was empty or Queue.get() timed out') except Queue.Full: logging.warning('ThreadedWorker Queue was full or Queue.put() timed out') except: logging.exception('Error in ThreadedWorker') finally: #signals to work_queue that item is done self.work_queue.task_done() def start_logging_with_thread_info(): try: formatter = logging.Formatter('[thread %(thread)-3s] %(message)s') logging.getLogger().handlers[0].setFormatter(formatter) except: logging.exception('Failed to start logging with thread info') def stop_logging_with_thread_info(): try: formatter = logging.Formatter('%(message)s') logging.getLogger().handlers[0].setFormatter(formatter) except: logging.exception('Failed to stop logging with thread info')
使用示例
from test import ThreadedWorker from queue import Queue urls_to_process = ["http://facebook.com", "http://pypix.com"] work_queue = Queue() result_queue = Queue() def process_url(url): # TODO: Do some work with the url return url def main(): # spawn a pool of threads, and pass them queue instance for i in range(5): t = ThreadedWorker(work_queue, result_queue, work_func=process_url) t.setDaemon(True) t.start() # populate queue with data for url in urls_to_process: work_queue.put(url) # wait on the queue until everything has been processed work_queue.join() # print results print(repr(result_queue)) main()

핫 AI 도구

Undresser.AI Undress
사실적인 누드 사진을 만들기 위한 AI 기반 앱

AI Clothes Remover
사진에서 옷을 제거하는 온라인 AI 도구입니다.

Undress AI Tool
무료로 이미지를 벗다

Clothoff.io
AI 옷 제거제

AI Hentai Generator
AI Hentai를 무료로 생성하십시오.

인기 기사

뜨거운 도구

메모장++7.3.1
사용하기 쉬운 무료 코드 편집기

SublimeText3 중국어 버전
중국어 버전, 사용하기 매우 쉽습니다.

스튜디오 13.0.1 보내기
강력한 PHP 통합 개발 환경

드림위버 CS6
시각적 웹 개발 도구

SublimeText3 Mac 버전
신 수준의 코드 편집 소프트웨어(SublimeText3)

뜨거운 주제











이 튜토리얼은 Python을 사용하여 Zipf의 법칙의 통계 개념을 처리하는 방법을 보여주고 법을 처리 할 때 Python의 읽기 및 대형 텍스트 파일을 정렬하는 효율성을 보여줍니다. ZIPF 분포라는 용어가 무엇을 의미하는지 궁금 할 것입니다. 이 용어를 이해하려면 먼저 Zipf의 법칙을 정의해야합니다. 걱정하지 마세요. 지침을 단순화하려고 노력할 것입니다. Zipf의 법칙 Zipf의 법칙은 단순히 : 큰 자연어 코퍼스에서 가장 자주 발생하는 단어는 두 번째 빈번한 단어, 세 번째 빈번한 단어보다 세 번, 네 번째 빈번한 단어 등 4 배나 자주 발생합니다. 예를 살펴 보겠습니다. 미국 영어로 브라운 코퍼스를 보면 가장 빈번한 단어는 "TH입니다.

이 기사에서는 HTML을 구문 분석하기 위해 파이썬 라이브러리 인 아름다운 수프를 사용하는 방법을 설명합니다. 데이터 추출, 다양한 HTML 구조 및 오류 처리 및 대안 (SEL과 같은 Find (), find_all (), select () 및 get_text ()와 같은 일반적인 방법을 자세히 설명합니다.

시끄러운 이미지를 다루는 것은 특히 휴대폰 또는 저해상도 카메라 사진에서 일반적인 문제입니다. 이 튜토리얼은 OpenCV를 사용 하여이 문제를 해결하기 위해 Python의 이미지 필터링 기술을 탐구합니다. 이미지 필터링 : 강력한 도구 이미지 필터

데이터 과학 및 처리가 가장 좋아하는 Python은 고성능 컴퓨팅을위한 풍부한 생태계를 제공합니다. 그러나 Python의 병렬 프로그래밍은 독특한 과제를 제시합니다. 이 튜토리얼은 이러한 과제를 탐구하며 전 세계 해석에 중점을 둡니다.

이 기사는 딥 러닝을 위해 텐서 플로와 Pytorch를 비교합니다. 데이터 준비, 모델 구축, 교육, 평가 및 배포와 관련된 단계에 대해 자세히 설명합니다. 프레임 워크, 특히 계산 포도와 관련하여 주요 차이점

이 튜토리얼은 Python 3에서 사용자 정의 파이프 라인 데이터 구조를 작성하여 클래스 및 작업자 과부하를 활용하여 향상된 기능을 보여줍니다. 파이프 라인의 유연성은 일련의 기능을 데이터 세트, GE에 적용하는 능력에 있습니다.

파이썬 객체의 직렬화 및 사막화는 사소한 프로그램의 주요 측면입니다. 무언가를 Python 파일에 저장하면 구성 파일을 읽거나 HTTP 요청에 응답하는 경우 객체 직렬화 및 사태화를 수행합니다. 어떤 의미에서, 직렬화와 사제화는 세계에서 가장 지루한 것들입니다. 이 모든 형식과 프로토콜에 대해 누가 걱정합니까? 일부 파이썬 객체를 지속하거나 스트리밍하여 나중에 완전히 검색하려고합니다. 이것은 세상을 개념적 차원에서 볼 수있는 좋은 방법입니다. 그러나 실제 수준에서 선택한 직렬화 체계, 형식 또는 프로토콜은 속도, 보안, 유지 보수 상태 및 프로그램의 기타 측면을 결정할 수 있습니다.

Python의 통계 모듈은 강력한 데이터 통계 분석 기능을 제공하여 생물 통계 및 비즈니스 분석과 같은 데이터의 전반적인 특성을 빠르게 이해할 수 있도록 도와줍니다. 데이터 포인트를 하나씩 보는 대신 평균 또는 분산과 같은 통계를보고 무시할 수있는 원래 데이터에서 트렌드와 기능을 발견하고 대형 데이터 세트를보다 쉽고 효과적으로 비교하십시오. 이 튜토리얼은 평균을 계산하고 데이터 세트의 분산 정도를 측정하는 방법을 설명합니다. 달리 명시되지 않는 한,이 모듈의 모든 함수는 단순히 평균을 합산하는 대신 평균 () 함수의 계산을 지원합니다. 부동 소수점 번호도 사용할 수 있습니다. 무작위로 가져옵니다 수입 통계 Fracti에서
