Python Asyncio의 스케줄링 원리는 무엇입니까?
1. 기본 소개
Python.Asyncio
는 세 가지 대기 가능 객체 외에도 코어 스케줄링과 관련된 다른 기능을 포함하는 크고 포괄적인 라이브러리입니다. runners.py
, base_event.py
및 event.py
세 파일에 있습니다. Python.Asyncio
是一个大而全的库,它包括很多功能,而跟核心调度相关的逻辑除了三种可等待对象外,还有其它一些功能,它们分别位于runners.py
,base_event.py
,event.py
三个文件中。
runners.py
文件有一个主要的类--Runner
,它的主要职责是做好进入协程模式的事件循环等到初始化工作,以及在退出协程模式时清理还在内存的协程,生成器等对象。
协程模式只是为了能方便理解,对于计算机而言,并没有这样区分
event.py
文件除了存放着EventLoop
对象的接口以及获取和设置EventLoop
的函数外,还有两个EventLoop
可调度的对象,分别为Handler
和TimerHandler
,它们可以认为是EvnetLoop
调用其它对象的容器,用于连接待调度对象和事件循环的关系,不过它们的实现非常简单,对于Handler
,它的源码如下:
# 已经移除了一些不想关的代码 class Handle: def __init__(self, callback, args, loop, context=None): # 初始化上下文,确保执行的时候能找到Handle所在的上下文 if context is None: context = contextvars.copy_context() self._context = context self._loop = loop self._callback = callback self._args = args self._cancelled = False def cancel(self): # 设置当前Handle为取消状态 if not self._cancelled: self._cancelled = True self._callback = None self._args = None def cancelled(self): return self._cancelled def _run(self): # 用于执行真正的函数,且通过context.run方法来确保在自己的上下文内执行。 try: # 保持在自己持有的上下文中执行对应的回调 self._context.run(self._callback, *self._args) except (SystemExit, KeyboardInterrupt): raise except BaseException as exc: cb = format_helpers._format_callback_source( self._callback, self._args) msg = f'Exception in callback {cb}' context = { 'message': msg, 'exception': exc, 'handle': self, } self._loop.call_exception_handler(context)
通过源码可以发现,Handle
功能十分简单,提供了可以被取消以及可以在自己所处的上下文执行的功能,而TimerHandle
继承于Handle
比Handle
多了一些和时间以及排序相关的参数,源码如下:
class TimerHandle(Handle): def __init__(self, when, callback, args, loop, context=None): super().__init__(callback, args, loop, context) self._when = when self._scheduled = False def __hash__(self): return hash(self._when) def __lt__(self, other): if isinstance(other, TimerHandle): return self._when < other._when return NotImplemented def __le__(self, other): if isinstance(other, TimerHandle): return self._when < other._when or self.__eq__(other) return NotImplemented def __gt__(self, other): if isinstance(other, TimerHandle): return self._when > other._when return NotImplemented def __ge__(self, other): if isinstance(other, TimerHandle): return self._when > other._when or self.__eq__(other) return NotImplemented def __eq__(self, other): if isinstance(other, TimerHandle): return (self._when == other._when and self._callback == other._callback and self._args == other._args and self._cancelled == other._cancelled) return NotImplemented def cancel(self): if not self._cancelled: # 用于通知事件循环当前Handle已经退出了 self._loop._timer_handle_cancelled(self) super().cancel() def when(self): return self._when
通过代码可以发现,这两个对象十分简单,而我们在使用Python.Asyncio
时并不会直接使用到这两个对象,而是通过loop.call_xxx
系列方法来把调用封装成Handle
对象,然后等待EventLoop
执行。 所以loop.call_xxx
系列方法可以认为是EventLoop
的注册操作,基本上所有非IO的异步操作都需要通过loop.call_xxx
方法来把自己的调用注册到EventLoop
中,比如Task
对象就在初始化后通过调用loop.call_soon
方法来注册到EventLoop
中,loop.call_sonn
的实现很简单,
它的源码如下:
class BaseEventLoop: ... def call_soon(self, callback, *args, context=None): # 检查是否事件循环是否关闭,如果是则直接抛出异常 self._check_closed() handle = self._call_soon(callback, args, context) return handle def _call_soon(self, callback, args, context): # 把调用封装成一个handle,这样方便被事件循环调用 handle = events.Handle(callback, args, self, context) # 添加一个handle到_ready,等待被调用 self._ready.append(handle) return handle
可以看到call_soon
真正相关的代码只有10几行,它负责把一个调用封装成一个Handle
,并添加到self._reday
中,从而实现把调用注册到事件循环之中。
loop.call_xxx
系列函数除了loop.call_soon
系列函数外,还有另外两个方法--loop.call_at
和loop.call_later
,它们类似于loop.call_soon
,不过多了一个时间参数,来告诉EventLoop
在什么时间后才可以调用,同时通过loop.call_at
和loop.call_later
注册的调用会通过Python
的堆排序模块headpq
注册到self._scheduled
变量中,
具体代码如下:
class BaseEventLoop: ... def call_later(self, delay, callback, *args, context=None): if delay is None: raise TypeError('delay must not be None') timer = self.call_at(self.time() + delay, callback, *args, context=context) return timer def call_at(self, when, callback, *args, context=None): if when is None: raise TypeError("when cannot be None") self._check_closed() # 创建一个timer handle,然后添加到事件循环的_scheduled中,等待被调用 timer = events.TimerHandle(when, callback, args, self, context) heapq.heappush(self._scheduled, timer) timer._scheduled = True return timer
2.EventLoop的调度实现
在文章《Python Asyncio中Coroutines,Tasks,Future可等待对象的关系及作用》中已经分析到了runner
会通过loop.run_until_complete
来调用main
Task从而开启EventLoop
的调度,所以在分析EventLoop
的调度时,应该先从loop.run_until_complete
入手,
对应的源码如下:
class BaseEventLoop: def run_until_complete(self, future): ... new_task = not futures.isfuture(future) # 把coroutine转换成task,这样事件循环就可以调度了,事件循环的最小调度单位为task # 需要注意的是此时事件循环并没注册到全局变量中,所以需要显示的传进去, # 同时Task对象注册的时候,已经通过loop.call_soon把自己注册到事件循环中,等待调度 future = tasks.ensure_future(future, loop=self) if new_task: # An exception is raised if the future didn't complete, so there # is no need to log the "destroy pending task" message future._log_destroy_pending = False # 当该task完成时,意味着当前事件循环失去了调度对象,无法继续调度,所以需要关闭当前事件循环,程序会由协程模式返回到线程模式 future.add_done_callback(_run_until_complete_cb) try: # 事件循环开始运行 self.run_forever() except: if new_task and future.done() and not future.cancelled(): # The coroutine raised a BaseException. Consume the exception # to not log a warning, the caller doesn't have access to the # local task. future.exception() raise finally: future.remove_done_callback(_run_until_complete_cb) if not future.done(): raise RuntimeError('Event loop stopped before Future completed.') return future.result() def run_forever(self): # 进行一些初始化工作 self._check_closed() self._check_running() self._set_coroutine_origin_tracking(self._debug) self._thread_id = threading.get_ident() old_agen_hooks = sys.get_asyncgen_hooks() # 通过asyncgen钩子来自动关闭asyncgen函数,这样可以提醒用户生成器还未关闭 sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook, finalizer=self._asyncgen_finalizer_hook) try: # 设置当前在运行的事件循环到全局变量中,这样就可以在任一阶段获取到当前的事件循环了 events._set_running_loop(self) while True: # 正真执行任务的逻辑 self._run_once() if self._stopping: break finally: # 关闭循环, 并且清理一些资源 self._stopping = False self._thread_id = None events._set_running_loop(None) self._set_coroutine_origin_tracking(False) sys.set_asyncgen_hooks(*old_agen_hooks)
这段源码并不复杂,它的主要逻辑是通过把Corotinue
转为一个Task
对象,然后通过Task
对象初始化时调用loop.call_sonn
方法把自己注册到EventLoop
中,最后再通过loop.run_forever
中的循环代码一直运行着,直到_stopping
被标记为True
:
while True: # 正真执行任务的逻辑 self._run_once() if self._stopping: break
可以看出,这段代码是确保事件循环能一直执行着,自动循环结束,而真正调度的核心是_run_once
runners.py
파일에는 Runner
라는 하나의 기본 클래스가 있습니다. 주요 역할은 코루틴 모드 이벤트 루프에 들어가서 초기화 작업을 기다린 후 코루틴에서 종료하는 것입니다. 모드에서는 아직 메모리에 있는 코루틴, 생성기 및 기타 개체를 정리합니다. 코루틴 모드는 단지 이해의 편의를 위한 것일 뿐입니다. 컴퓨터의 경우 EventLoop
를 저장하는 것 외에는
event.py
파일에 차이가 없습니다. code >객체의 인터페이스와 EventLoop
를 가져오고 설정하는 함수 외에도 Handler
라는 두 개의 EventLoop
예약 가능 객체가 있습니다. 및 TimerHandler
는 예약할 개체와 이벤트 루프 사이의 관계를 연결하는 데 사용되는 다른 개체를 호출하기 위한 EvnetLoop
의 컨테이너로 간주될 수 있지만 구현은 다음과 같습니다. 매우 간단합니다. Handler
의 경우 🎜 소스 코드는 다음과 같습니다. 🎜🎜class BaseEventLoop: ... def _run_once(self): # self._scheduled是一个列表,它只存放TimerHandle sched_count = len(self._scheduled) ############################### # 第一阶段,整理self._scheduled # ############################### if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and self._timer_cancelled_count / sched_count > _MIN_CANCELLED_TIMER_HANDLES_FRACTION): # 当待调度的任务数量超过100且待取消的任务占总任务的50%时,才进入这个逻辑 # 把需要取消的任务移除 new_scheduled = [] for handle in self._scheduled: if handle._cancelled: # 设置handle的_cancelled为True,并且把handle从_scheduled中移除 handle._scheduled = False else: new_scheduled.append(handle) # 重新排列堆 heapq.heapify(new_scheduled) self._scheduled = new_scheduled self._timer_cancelled_count = 0 else: # 需要取消的handle不多,则只会走这个逻辑,这里会把堆顶的handle弹出,并标记为不可调度,但不会访问整个堆 while self._scheduled and self._scheduled[0]._cancelled: self._timer_cancelled_count -= 1 handle = heapq.heappop(self._scheduled) handle._scheduled = False ################################# # 第二阶段,计算超时值以及等待事件IO # ################################# timeout = None # 当有准备调度的handle或者是正在关闭时,不等待,方便尽快的调度 if self._ready or self._stopping: timeout = 0 elif self._scheduled: # Compute the desired timeout. # 如果堆有数据时,通过堆顶的handle计算最短的超时时间,但是最多不能超过MAXIMUM_SELECT_TIMEOUT,以免超过系统限制 when = self._scheduled[0]._when timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT) # 事件循环等待事件,直到有事件或者超时 event_list = self._selector.select(timeout) ################################################## # 第三阶段,把满足条件的TimeHandle放入到self._ready中 # ################################################## # 获取得到的事件的回调,然后装填到_ready self._process_events(event_list) # 把一些在self._scheduled且满足调度条件的handle放到_ready中,比如TimerHandle。 # end_time为当前时间+一个时间单位,猜测是能多处理一些这段时间内产生的事件 end_time = self.time() + self._clock_resolution while self._scheduled: handle = self._scheduled[0] if handle._when >= end_time: break handle = heapq.heappop(self._scheduled) handle._scheduled = False self._ready.append(handle) ################################################################################ # 第四阶段,遍历所有准备调度的handle,并且通过handle的context来执行handle对应的callback # ################################################################################ ntodo = len(self._ready) for i in range(ntodo): handle = self._ready.popleft() # 如果handle已经被取消,则不调用 if handle._cancelled: continue if self._debug: try: self._current_handle = handle t0 = self.time() handle._run() dt = self.time() - t0 if dt >= self.slow_callback_duration: # 执行太久的回调,记录下来,这些需要开发者自己优化 logger.warning('Executing %s took %.3f seconds', _format_handle(handle), dt) finally: self._current_handle = None else: handle._run() handle = None # Needed to break cycles when an exception occurs.
Handle
함수는 매우 간단하며 다음과 같은 기능을 제공합니다. 현재 있는 컨텍스트에서 취소 및 실행될 수 있으며 TimerHandle
은 Handle
에서 상속되며 Handle
보다 시간 및 정렬과 관련된 매개변수가 더 많습니다. 🎜import asyncio import threading def task(): print("task") def run_loop_inside_thread(loop): loop.run_forever() loop = asyncio.get_event_loop() threading.Thread(target=run_loop_inside_thread, args=(loop,)).start() loop.call_soon(task)
Python.Asyncio
를 사용할 때는 이 두 개체를 직접 사용하지 않습니다. 대신 loop.call_xxx
일련의 메소드를 통해 호출을 Handle
객체로 캡슐화한 다음 EventLoop
가 실행될 때까지 기다립니다. 실행. 따라서 loop.call_xxx
메서드 시리즈는 EventLoop
의 등록 작업으로 간주될 수 있습니다. 기본적으로 모든 비 IO 비동기 작업은 loop.call_xxx를 전달해야 합니다.
메소드를 사용하여 EventLoop
에 대한 호출을 등록하려면 loop.call_soon<을 호출하여 <code>Task
객체를 에 등록합니다. EventLoop
에서 loop.call_sonn
구현은 매우 간단합니다. 🎜🎜🎜 소스 코드는 다음과 같습니다. 🎜🎜event_list = self._selector.select(timeout)
call_soon
에 실제로 관련된 코드는 몇 줄로 호출을 Handle
에 캡슐화하고 이를 self._reday
에 추가하는 역할을 합니다. , 호출을 이벤트 루프에 등록합니다. 🎜🎜loop.call_soon
함수 시리즈 외에도 loop.call_at
및 loop.call_later
라는 두 가지 다른 메서드가 있습니다. loop.call_soon
과 유사하지만 EventLoop
에 호출 가능한 시간을 알려주고 동시에 에 의해 등록된 호출을 전달하는 추가 시간 매개변수가 있습니다. loop.call_at
및 loop.call_later
는 Python
의 힙 정렬 모듈 headpq
>self를 통해 에 등록됩니다. _scheduled
변수, 🎜🎜🎜구체적인 코드는 다음과 같습니다: 🎜🎜class BaseEventLoop: ... def call_soon_threadsafe(self, callback, *args, context=None): """Like call_soon(), but thread-safe.""" self._check_closed() handle = self._call_soon(callback, args, context) self._write_to_self() return handle
runner
는 loop.run_until_complete
를 통해 main
Task를 호출하여 EventLoop
의 스케줄링을 시작하는 것으로 분석되었습니다. 분석 EventLoop
를 예약할 때 loop.run_until_complete
로 시작해야 합니다. 🎜🎜🎜의 해당 소스 코드는 다음과 같습니다. 🎜🎜class BaseSelectorEventLoop(base_events.BaseEventLoop): ####### # 创建 # ####### def __init__(self, selector=None): super().__init__() if selector is None: # 获取最优的selector selector = selectors.DefaultSelector() self._selector = selector # 创建pipe self._make_self_pipe() self._transports = weakref.WeakValueDictionary() def _make_self_pipe(self): # 创建Pipe对应的sock self._ssock, self._csock = socket.socketpair() # 设置sock为非阻塞 self._ssock.setblocking(False) self._csock.setblocking(False) self._internal_fds += 1 # 阻塞服务端sock读事件对应的回调 self._add_reader(self._ssock.fileno(), self._read_from_self) def _add_reader(self, fd, callback, *args): # 检查事件循环是否关闭 self._check_closed() # 封装回调为handle对象 handle = events.Handle(callback, args, self, None) try: key = self._selector.get_key(fd) except KeyError: # 如果没有注册到系统的事件循环,则注册 self._selector.register(fd, selectors.EVENT_READ, (handle, None)) else: # 如果已经注册过,则更新 mask, (reader, writer) = key.events, key.data self._selector.modify(fd, mask | selectors.EVENT_READ, (handle, writer)) if reader is not None: reader.cancel() return handle def _read_from_self(self): # 负责消费sock数据 while True: try: data = self._ssock.recv(4096) if not data: break self._process_self_data(data) except InterruptedError: continue except BlockingIOError: break ####### # 删除 # ####### def _close_self_pipe(self): # 注销Pipe对应的描述符 self._remove_reader(self._ssock.fileno()) # 关闭sock self._ssock.close() self._ssock = None self._csock.close() self._csock = None self._internal_fds -= 1 def _remove_reader(self, fd): # 如果事件循环已经关闭了,就不用操作了 if self.is_closed(): return False try: # 查询文件描述符是否在selector中 key = self._selector.get_key(fd) except KeyError: # 不存在则返回 return False else: # 存在则进入移除的工作 mask, (reader, writer) = key.events, key.data # 通过事件掩码判断是否有其它事件 mask &= ~selectors.EVENT_READ if not mask: # 移除已经注册到selector的文件描述符 self._selector.unregister(fd) else: # 移除已经注册到selector的文件描述符,并注册新的事件 self._selector.modify(fd, mask, (None, writer)) # 如果reader不为空,则取消reader if reader is not None: reader.cancel() return True else: return False
Corotinue
를 Task
개체로 변환한 다음 Task가 실행될 때 <code>loop.call_sonn
메서드를 호출하여 수행됩니다. 객체가 초기화됩니다. EventLoop
에 자신을 등록하고 마지막으로 _stopping
이 다음과 같이 표시될 때까지 loop.run_forever
의 루프 코드를 실행합니다. True
:🎜event_list = self._selector.select(timeout)
_run_once
입니다. 함수, 🎜🎜🎜소스 코드는 다음과 같습니다.🎜🎜class BaseEventLoop: ... def _run_once(self): # self._scheduled是一个列表,它只存放TimerHandle sched_count = len(self._scheduled) ############################### # 第一阶段,整理self._scheduled # ############################### if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and self._timer_cancelled_count / sched_count > _MIN_CANCELLED_TIMER_HANDLES_FRACTION): # 当待调度的任务数量超过100且待取消的任务占总任务的50%时,才进入这个逻辑 # 把需要取消的任务移除 new_scheduled = [] for handle in self._scheduled: if handle._cancelled: # 设置handle的_cancelled为True,并且把handle从_scheduled中移除 handle._scheduled = False else: new_scheduled.append(handle) # 重新排列堆 heapq.heapify(new_scheduled) self._scheduled = new_scheduled self._timer_cancelled_count = 0 else: # 需要取消的handle不多,则只会走这个逻辑,这里会把堆顶的handle弹出,并标记为不可调度,但不会访问整个堆 while self._scheduled and self._scheduled[0]._cancelled: self._timer_cancelled_count -= 1 handle = heapq.heappop(self._scheduled) handle._scheduled = False ################################# # 第二阶段,计算超时值以及等待事件IO # ################################# timeout = None # 当有准备调度的handle或者是正在关闭时,不等待,方便尽快的调度 if self._ready or self._stopping: timeout = 0 elif self._scheduled: # Compute the desired timeout. # 如果堆有数据时,通过堆顶的handle计算最短的超时时间,但是最多不能超过MAXIMUM_SELECT_TIMEOUT,以免超过系统限制 when = self._scheduled[0]._when timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT) # 事件循环等待事件,直到有事件或者超时 event_list = self._selector.select(timeout) ################################################## # 第三阶段,把满足条件的TimeHandle放入到self._ready中 # ################################################## # 获取得到的事件的回调,然后装填到_ready self._process_events(event_list) # 把一些在self._scheduled且满足调度条件的handle放到_ready中,比如TimerHandle。 # end_time为当前时间+一个时间单位,猜测是能多处理一些这段时间内产生的事件 end_time = self.time() + self._clock_resolution while self._scheduled: handle = self._scheduled[0] if handle._when >= end_time: break handle = heapq.heappop(self._scheduled) handle._scheduled = False self._ready.append(handle) ################################################################################ # 第四阶段,遍历所有准备调度的handle,并且通过handle的context来执行handle对应的callback # ################################################################################ ntodo = len(self._ready) for i in range(ntodo): handle = self._ready.popleft() # 如果handle已经被取消,则不调用 if handle._cancelled: continue if self._debug: try: self._current_handle = handle t0 = self.time() handle._run() dt = self.time() - t0 if dt >= self.slow_callback_duration: # 执行太久的回调,记录下来,这些需要开发者自己优化 logger.warning('Executing %s took %.3f seconds', _format_handle(handle), dt) finally: self._current_handle = None else: handle._run() handle = None # Needed to break cycles when an exception occurs.
通过源码分析,可以很明确的知道调度逻辑中第一步是先规整self._scheduled
,在规整的过程是使用堆排序来进行的,因为堆排序在调度的场景下效率是非常高的,不过这段规整代码分成两种,我猜测是当需要取消的数量过多时直接遍历的效率会更高。 在规整self._scheduled
后,就进入第二步,该步骤开始等待系统事件循环返回对应的事件,如果self._ready
中有数据,就不做等待了,需要马上到下一步骤,以便能赶紧安排调度。 在得到系统事件循环得到的事件后,就进入到了第三步,该步骤会通过self._process_events
方法处理对应的事件,并把事件对应的回调存放到了self._ready
中,最后再遍历self._ready
中的所有Handle
并逐一执行(执行时可以认为EventLoop
把控制权返回给对应的调用逻辑),至此一个完整的调度逻辑就结束了,并进入下一个调度逻辑。
3.网络IO事件的处理
注:由于系统事件循环的限制,所以文件IO一般还是使用多线程来执行,具体见:github.com/python/asyn…
在分析EventLoop
调度实现的时候忽略了self._process_events
的具体实现逻辑,因为_process_events
方法所在asyncio.base_event.py
文件中的BaseEventLoop
类并未有具体实现的,因为网络IO相关的需要系统的事件循环来帮忙处理,所以与系统事件循环相关的逻辑都在asyncio.selector_events.py
中的BaseSelectorEventLoop
类中。BaseSelectorEventLoop
类封装了selector
模块与系统事件循环交互,使调用者不需要去考虑sock的创建以及sock产生的文件描述符的监听与注销等操作,下面以BaseSelectorEventLoop
中自带的pipe为例子,分析BaseSelectorEventLoop
是如何进行网络IO事件处理的。
在分析之前,先看一个例子,代码如下:
import asyncio import threading def task(): print("task") def run_loop_inside_thread(loop): loop.run_forever() loop = asyncio.get_event_loop() threading.Thread(target=run_loop_inside_thread, args=(loop,)).start() loop.call_soon(task)
如果直接运行这个例子,它并不会输出task
(不过在IDE使用DEBUG模式下线程启动会慢一点,所以会输出的),因为在调用loop.run_forever
后EventLoop
会一直卡在这段逻辑中:
event_list = self._selector.select(timeout)
所以调用loop.call_soon
并不会使EventLoop
马上安排调度,而如果把call_soon
换成call_soon_threadsafe
则可以正常输出,这是因为call_soon_threadsafe
中多了一个self._write_to_self
的调用,它的源码如下:
class BaseEventLoop: ... def call_soon_threadsafe(self, callback, *args, context=None): """Like call_soon(), but thread-safe.""" self._check_closed() handle = self._call_soon(callback, args, context) self._write_to_self() return handle
由于这个调用是涉及到IO相关的,所以需要到BaseSelectorEventLoop
类查看,接下来以pipe相关的网络IO操作来分析EventLoop
是如何处理IO事件的(只演示reader对象,writer对象操作与reader类似),
对应的源码如下:
class BaseSelectorEventLoop(base_events.BaseEventLoop): ####### # 创建 # ####### def __init__(self, selector=None): super().__init__() if selector is None: # 获取最优的selector selector = selectors.DefaultSelector() self._selector = selector # 创建pipe self._make_self_pipe() self._transports = weakref.WeakValueDictionary() def _make_self_pipe(self): # 创建Pipe对应的sock self._ssock, self._csock = socket.socketpair() # 设置sock为非阻塞 self._ssock.setblocking(False) self._csock.setblocking(False) self._internal_fds += 1 # 阻塞服务端sock读事件对应的回调 self._add_reader(self._ssock.fileno(), self._read_from_self) def _add_reader(self, fd, callback, *args): # 检查事件循环是否关闭 self._check_closed() # 封装回调为handle对象 handle = events.Handle(callback, args, self, None) try: key = self._selector.get_key(fd) except KeyError: # 如果没有注册到系统的事件循环,则注册 self._selector.register(fd, selectors.EVENT_READ, (handle, None)) else: # 如果已经注册过,则更新 mask, (reader, writer) = key.events, key.data self._selector.modify(fd, mask | selectors.EVENT_READ, (handle, writer)) if reader is not None: reader.cancel() return handle def _read_from_self(self): # 负责消费sock数据 while True: try: data = self._ssock.recv(4096) if not data: break self._process_self_data(data) except InterruptedError: continue except BlockingIOError: break ####### # 删除 # ####### def _close_self_pipe(self): # 注销Pipe对应的描述符 self._remove_reader(self._ssock.fileno()) # 关闭sock self._ssock.close() self._ssock = None self._csock.close() self._csock = None self._internal_fds -= 1 def _remove_reader(self, fd): # 如果事件循环已经关闭了,就不用操作了 if self.is_closed(): return False try: # 查询文件描述符是否在selector中 key = self._selector.get_key(fd) except KeyError: # 不存在则返回 return False else: # 存在则进入移除的工作 mask, (reader, writer) = key.events, key.data # 通过事件掩码判断是否有其它事件 mask &= ~selectors.EVENT_READ if not mask: # 移除已经注册到selector的文件描述符 self._selector.unregister(fd) else: # 移除已经注册到selector的文件描述符,并注册新的事件 self._selector.modify(fd, mask, (None, writer)) # 如果reader不为空,则取消reader if reader is not None: reader.cancel() return True else: return False
通过源码中的创建部分可以看到,EventLoop
在启动的时候会创建一对建立通信的sock,并设置为非阻塞,然后把对应的回调封装成一个Handle
对象并注册到系统事件循环中(删除则进行对应的反向操作),之后系统事件循环就会一直监听对应的事件,也就是EventLoop
的执行逻辑会阻塞在下面的调用中,等待事件响应:
event_list = self._selector.select(timeout)
这时如果执行loop.call_soon_threadsafe
,那么会通过write_to_self
写入一点信息:
def _write_to_self(self): csock = self._csock if csock is None: return try: csock.send(b'\0') except OSError: if self._debug: logger.debug("Fail to write a null byte into the self-pipe socket", exc_info=True)
由于csock
被写入了数据,那么它对应的ssock
就会收到一个读事件,系统事件循环在收到这个事件通知后就会把数据返回,然后EventLoop
就会获得到对应的数据,并交给process_events
方法进行处理,
它的相关代码如下:
class BaseSelectorEventLoop: def _process_events(self, event_list): for key, mask in event_list: # 从回调事件中获取到对应的数据,key.data在注册时是一个元祖,所以这里要对元祖进行解包 fileobj, (reader, writer) = key.fileobj, key.data if mask & selectors.EVENT_READ and reader is not None: # 得到reader handle,如果是被标记为取消,就移除对应的文件描述符 if reader._cancelled: self._remove_reader(fileobj) else: # 如果没被标记为取消,则安排到self._ready中 self._add_callback(reader) if mask & selectors.EVENT_WRITE and writer is not None: # 对于写对象,也是同样的道理。 if writer._cancelled: self._remove_writer(fileobj) else: self._add_callback(writer) def _add_callback(self, handle): # 把回调的handle添加到_ready中 assert isinstance(handle, events.Handle), 'A Handle is required here' if handle._cancelled: return assert not isinstance(handle, events.TimerHandle) self._ready.append(handle) def _remove_reader(self, fd): # 如果事件循环已经关闭了,就不用操作了 if self.is_closed(): return False try: # 查询文件描述符是否在selector中 key = self._selector.get_key(fd) except KeyError: # 不存在则返回 return False else: # 存在则进入移除的工作 mask, (reader, writer) = key.events, key.data mask &= ~selectors.EVENT_READ if not mask: # 移除已经注册到selector的文件描述符 self._selector.unregister(fd) else: self._selector.modify(fd, mask, (None, writer)) if reader is not None: reader.cancel() return True else: return False
从代码中可以看出_process_events
会对事件对应的文件描述符进行处理,并从事件回调中获取到对应的Handle
对象添加到self._ready
中,由EventLoop
在接下来遍历self._ready
并执行。
可以看到网络IO事件的处理并不复杂,因为系统事件循环已经为我们做了很多工作了,但是用户所有与网络IO相关的操作都需要有一个类似的操作,这样是非常的繁琐的,幸好asyncio
库已经为我们做了封装,我们只要调用就可以了,方便了很多。
위 내용은 Python Asyncio의 스케줄링 원리는 무엇입니까?의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

핫 AI 도구

Undresser.AI Undress
사실적인 누드 사진을 만들기 위한 AI 기반 앱

AI Clothes Remover
사진에서 옷을 제거하는 온라인 AI 도구입니다.

Undress AI Tool
무료로 이미지를 벗다

Clothoff.io
AI 옷 제거제

Video Face Swap
완전히 무료인 AI 얼굴 교환 도구를 사용하여 모든 비디오의 얼굴을 쉽게 바꾸세요!

인기 기사

뜨거운 도구

메모장++7.3.1
사용하기 쉬운 무료 코드 편집기

SublimeText3 중국어 버전
중국어 버전, 사용하기 매우 쉽습니다.

스튜디오 13.0.1 보내기
강력한 PHP 통합 개발 환경

드림위버 CS6
시각적 웹 개발 도구

SublimeText3 Mac 버전
신 수준의 코드 편집 소프트웨어(SublimeText3)

PHP는 주로 절차 적 프로그래밍이지만 객체 지향 프로그래밍 (OOP)도 지원합니다. Python은 OOP, 기능 및 절차 프로그래밍을 포함한 다양한 패러다임을 지원합니다. PHP는 웹 개발에 적합하며 Python은 데이터 분석 및 기계 학습과 같은 다양한 응용 프로그램에 적합합니다.

PHP는 웹 개발 및 빠른 프로토 타이핑에 적합하며 Python은 데이터 과학 및 기계 학습에 적합합니다. 1.PHP는 간단한 구문과 함께 동적 웹 개발에 사용되며 빠른 개발에 적합합니다. 2. Python은 간결한 구문을 가지고 있으며 여러 분야에 적합하며 강력한 라이브러리 생태계가 있습니다.

PHP는 1994 년에 시작되었으며 Rasmuslerdorf에 의해 개발되었습니다. 원래 웹 사이트 방문자를 추적하는 데 사용되었으며 점차 서버 측 스크립팅 언어로 진화했으며 웹 개발에 널리 사용되었습니다. Python은 1980 년대 후반 Guidovan Rossum에 의해 개발되었으며 1991 년에 처음 출시되었습니다. 코드 가독성과 단순성을 강조하며 과학 컴퓨팅, 데이터 분석 및 기타 분야에 적합합니다.

Python은 부드러운 학습 곡선과 간결한 구문으로 초보자에게 더 적합합니다. JavaScript는 가파른 학습 곡선과 유연한 구문으로 프론트 엔드 개발에 적합합니다. 1. Python Syntax는 직관적이며 데이터 과학 및 백엔드 개발에 적합합니다. 2. JavaScript는 유연하며 프론트 엔드 및 서버 측 프로그래밍에서 널리 사용됩니다.

Sublime 텍스트로 Python 코드를 실행하려면 먼저 Python 플러그인을 설치 한 다음 .py 파일을 작성하고 코드를 작성한 다음 CTRL B를 눌러 코드를 실행하면 콘솔에 출력이 표시됩니다.

Visual Studio Code (VSCODE)에서 코드를 작성하는 것은 간단하고 사용하기 쉽습니다. vscode를 설치하고, 프로젝트를 만들고, 언어를 선택하고, 파일을 만들고, 코드를 작성하고, 저장하고 실행합니다. VSCODE의 장점에는 크로스 플랫폼, 무료 및 오픈 소스, 강력한 기능, 풍부한 확장 및 경량 및 빠른가 포함됩니다.

VS 코드는 파이썬을 작성하는 데 사용될 수 있으며 파이썬 애플리케이션을 개발하기에 이상적인 도구가되는 많은 기능을 제공합니다. 사용자는 다음을 수행 할 수 있습니다. Python 확장 기능을 설치하여 코드 완료, 구문 강조 및 디버깅과 같은 기능을 얻습니다. 디버거를 사용하여 코드를 단계별로 추적하고 오류를 찾아 수정하십시오. 버전 제어를 위해 git을 통합합니다. 코드 서식 도구를 사용하여 코드 일관성을 유지하십시오. 라인 도구를 사용하여 잠재적 인 문제를 미리 발견하십시오.

메모장에서 Python 코드를 실행하려면 Python 실행 파일 및 NPPEXEC 플러그인을 설치해야합니다. Python을 설치하고 경로를 추가 한 후 nppexec 플러그인의 명령 "Python"및 매개 변수 "{current_directory} {file_name}"을 구성하여 Notepad의 단축키 "F6"을 통해 Python 코드를 실행하십시오.
