Python3.7 以降、contextvars
モジュールが公式ライブラリに登場しました。スレッド化 そして、asyncio エコシステムはコンテキスト関数を追加します。プログラムが複数のコルーチンを同時に実行している場合でも、プログラムのコンテキスト変数を呼び出すことができるため、ロジックを切り離すことができます。
コンテキストは私たちを代弁していると理解できます。チャットのプロセスでは、単語が特定のコンテキストから切り離されると意味が変化しますが、プログラムの実行でも同様です。スレッド内にもそのコンテキストはありますが、Python などではスタックと呼ばれます。は thread.local 変数に保存され、コルーチンにも独自のコンテキストがありますが、公開されていません。ただし、contextvars
モジュールを使用すると、contextvars
モジュールを使用して次のことができます。保存して読み取ります。
contextvars
を使用する利点は、「変数が世界中に拡散する」ことを防ぐだけでなく、TypeHint とうまく組み合わせることができることです。
ただし、contextvars
を使用した後は、さらにいくつかの隠れた呼び出しが発生するため、これらの隠れたコストが必要になります。
Web フレームワークを切り替えるsanic
starlette
#starlette,
fastapi
Flask フレームワークを使用したことがある場合は、
Flask について理解できるでしょう。 owns 独自の context 関数があり、contextvars はそれに非常に似ており、asyncio context のサポートも追加しています。
Flask のコンテキストは
threading.local に基づいて実装されています。
threading.local の分離効果は非常に優れていますが、それはスレッドのみに適用されます。スレッド間のデータ ステータスを分離し、
gevent での実行をサポートするために
werkzeug に、一般的に使用される
Flask# である Local
変数を実装しました。 # #コンテキスト変数の例 request
は次のとおりです: <div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:py;">from flask import Flask, request
app = Flask(__name__)
@app.route(&#39;/&#39;)
def root():
so1n_name = request.get(&#39;so1n_name&#39;)
return f&#39;Name is {so1n_name}&#39;</pre><div class="contentsignin">ログイン後にコピー</div></div>
別の古典的な Web フレームワーク
Djano
と比較すると、Context request
オブジェクトは表示のみを渡すことができます。 例は次のとおりです。
from django.http import HttpResponse
def root(request):
so1n_name = request.get('so1n_name')
return HttpResponse(f'Name is {so1n_name}')
Flask は request というグローバル変数をインポートし、それをビュー内で直接使用して分離の目的を達成します。
変数を渡すことの違いだと言う人もいるかもしれませんが、この変数を渡す手間を省くためには、コンテキスト変数を維持するのに多大な労力を費やす価値はありません。たとえば、レベルが多い場合、「1 日に 1 つのパラメータが渡されます」と表示されます (ただし、階層化が適切に行われているか、要件が欺瞞的でない場合、次のような状況は通常発生しません。優れたプログラマはコードを階層化できます)まあ、しかし、悪い要件もたくさんあるかもしれません。time)
# 伪代码,举个例子一个request传了3个函数 from django.http import HttpResponse def is_allow(request, uid): if request.ip == '127.0.0.1' and check_permissions(uid): return True else: return False def check_permissions(request, uid): pass def root(request): user_id = request.GET.get('uid') if is_allow(request, id): return HttpResponse('ok') else return HttpResponse('error')
1 日パラメータを渡す問題
を防ぐことに加えて、いくつかの分離を実行することもできます。たとえば、最も古典的な技術的なビジネス要件の 1 つは、ログ request_id を出力し、リンクのトラブルシューティングを容易にすることです。このとき、コンテキスト モジュールがある場合は、request_id の読み取りと書き込みを分離できます。次の例は、Flask フレームワークに基づいた request_id の読み取りと書き込みの例です:
import logging from typing import Any from flask import g # type: ignore from flask.logging import default_handler # 这是一个Python logging.Filter的对象, 日志在生成之前会经过Filter步骤, 这时候我们可以为他绑定request_id变量 class RequestIDLogFilter(logging.Filter): """ Log filter to inject the current request id of the request under `log_record.request_id` """ def filter(self, record: Any) -> Any: record.request_id = g.request_id or None return record # 配置日志的format格式, 这里多配了一个request_id变量 format_string: str = ( "[%(asctime)s][%(levelname)s][%(filename)s:%(lineno)d:%(funcName)s:%(request_id)s]" " %(message)s" ) # 为flask的默认logger设置format和增加一个logging.Filter对象 default_handler.setFormatter(logging.Formatter(format_string)) default_handler.addFilter(RequestIDLogFilter()) # 该方法用于设置request_id def set_request_id() -> None: g.request_id = request.headers.get("X-Request-Id", str(uuid4())) # 初始化FLask对象, 并设置before_request app: Flask = Flask("demo") app.before_request(set_request_id)
2. contextvars モジュールの使用方法
が使用されていない場合に、asyncio Web フレームワークがどのように変数を渡すかを見てみましょう。まず、
contextvars
starlette のドキュメントによると、
contextvars が使用されていない場合、# には ##Redis
が渡されます。クライアント インスタンスを使用する方法は、Redis# を保存することです。 ## request.stat 変数を使用してクライアント インスタンスを作成します。コードを次のように書き換えます:
# demo/web_tools.py # 通过中间件把变量给存进去 class RequestContextMiddleware(BaseHTTPMiddleware): async def dispatch( self, request: Request, call_next: RequestResponseEndpoint ) -> Response: request.stat.redis = REDIS_POOL response = await call_next(request) return response # demo/server.py # 调用变量 @APP.route('/') async def homepage(request): # 伪代码,这里是执行redis命令 await request.stat.redis.execute() return JSONResponse({'hello': 'world'})
コードは非常に単純で、通常どおり実行できます。ただし、次回リファクタリングするとき、たとえば、変数名 redis を new_redis に変更すると、IDE が認識しないため、1 つずつ変更する必要があります。同時に、コードを作成するとき、IDE はこのメソッドによって呼び出される変数の型を知ることはなく、IDE がそれをインテリジェントにチェックすることはできません (たとえば、request.stat.redis. を入力するとき、IDE は変数の型をチェックしません)。実行しないとエラーが表示されます。IDE はプロンプトを表示しません)。これはプロジェクトのエンジニアリングにとって非常に不利であり、
contextvars と
TypeHints を使用すると、この問題を解決できます。 の意見が多かったので、
Redis クライアントを例として、asyncio エコシステムで
contextvars
TypeHints (を参照)詳細な説明はコードを参照してください)。
# demo/context.py # 该文件存放contextvars相关 import contextvars if TYPE_CHECKING: from demo.redis_dal import RDS # 这里是一个redis的封装实例 # 初始化一个redis相关的全局context redis_pool_context = contextvars.ContextVar('redis_pool') # 通过函数调用可以获取到当前协程运行时的context上下文 def get_redis() -> 'RDS': return redis_pool_context.get() # demo/web_tool.py # 该文件存放starlette相关模块 from starlette.middleware.base import BaseHTTPMiddleware from starlette.requests import Request from starlette.middleware.base import RequestResponseEndpoint from starlette.responses import Response from demo.redis_dal import RDS # 初始化一个redis客户端变量,当前为空 REDIS_POOL = None # type: Optional[RDS] class RequestContextMiddleware(BaseHTTPMiddleware): async def dispatch( self, request: Request, call_next: RequestResponseEndpoint ) -> Response: # 通过中间件,在进入路由之前,把redis客户端放入当前协程的上下文之中 token = redis_pool_context.set(REDIS_POOL) try: response = await call_next(request) return response finally: # 调用完成,回收当前请求设置的redis客户端的上下文 redis_pool_context.reset(token) async def startup_event() -> None: global REDIS_POOL REDIS_POOL = RDS() # 初始化客户端,里面通过asyncio.ensure_future逻辑延后连接 async def shutdown_event() -> None: if REDIS_POOL: await REDIS_POOL.close() # 关闭redis客户端 # demo/server.py # 该文件存放starlette main逻辑 from starlette.applications import Starlette from starlette.responses import JSONResponse from demo.web_tool import RequestContextMiddleware from demo.context import get_redis APP = Starlette() APP.add_middleware(RequestContextMiddleware) @APP.route('/') async def homepage(request): # 伪代码,这里是执行redis命令 # 只要验证 id(get_redis())等于demo.web_tool里REDID_POOL的id一致,那证明contextvars可以为asyncio维护一套上下文状态 await get_redis().execute() return JSONResponse({'hello': 'world'})
从上面的示例代码来看, 使用contextvar
和TypeHint
确实能让让IDE可以识别到这个变量是什么了, 但增加的代码太多了,更恐怖的是, 每多一个变量,就需要自己去写一个context,一个变量的初始化,一个变量的get函数,同时在引用时使用函数会比较别扭.
自己在使用了contextvars
一段时间后,觉得这样太麻烦了,每次都要做一堆重复的操作,且平时使用最多的就是把一个实例或者提炼出Headers的参数放入contextvars中,所以写了一个封装fast_tools.context(同时兼容fastapi
和starlette
), 它能屏蔽所有与contextvars的相关逻辑,其中由ContextModel负责contextvars的set和get操作,ContextMiddleware管理contextvars的周期,HeaderHeader负责托管Headers相关的参数, 调用者只需要在ContextModel中写入自己需要的变量,引用时调用ContextModel的属性即可.
以下是调用者的代码示例, 这里的实例化变量由一个http client代替, 且都会每次请求分配一个客户端实例, 但在实际使用中并不会为每一个请求都分配一个客户端实例, 很影响性能:
import asyncio import uuid from contextvars import Context, copy_context from functools import partial from typing import Optional, Set import httpx from fastapi import FastAPI, Request, Response from fast_tools.context import ContextBaseModel, ContextMiddleware, HeaderHelper app: FastAPI = FastAPI() check_set: Set[int] = set() class ContextModel(ContextBaseModel): """ 通过该实例可以屏蔽大部分与contextvars相关的操作,如果要添加一个变量,则在该实例添加一个属性即可. 属性必须要使用Type Hints的写法,不然不会识别(强制使用Type Hints) """ # 用于把自己的实例(如上文所说的redis客户端)存放于contextvars中 http_client: httpx.AsyncClient # HeaderHepler用于把header的变量存放于contextvars中 request_id: str = HeaderHelper.i("X-Request-Id", default_func=lambda request: str(uuid.uuid4())) ip: str = HeaderHelper.i("X-Real-IP", default_func=lambda request: request.client.host) user_agent: str = HeaderHelper.i("User-Agent") async def before_request(self, request: Request) -> None: # 请求之前的钩子, 通过该钩子可以设置自己的变量 self.http_client = httpx.AsyncClient() check_set.add(id(self.http_client)) async def before_reset_context(self, request: Request, response: Optional[Response]) -> None: # 准备退出中间件的钩子, 这步奏后会清掉上下文 await self.http_client.aclose() context_model: ContextModel = ContextModel() app.add_middleware(ContextMiddleware, context_model=context_model) async def test_ensure_future() -> None: assert id(context_model.http_client) in check_set def test_run_in_executor() -> None: assert id(context_model.http_client) in check_set def test_call_soon() -> None: assert id(context_model.http_client) in check_set @app.get("/") async def root() -> dict: # 在使用asyncio.ensure_future开启另外一个子协程跑任务时, 也可以复用上下文 asyncio.ensure_future(test_ensure_future()) loop: "asyncio.AbstractEventLoop" = asyncio.get_event_loop() # 使用call_soon也能复用上下文 loop.call_soon(test_call_soon) # 使用run_in_executor也能复用上下文, 但必须使用上下文的run方法, copy_context表示复制当前的上下文 ctx: Context = copy_context() await loop.run_in_executor(None, partial(ctx.run, test_run_in_executor)) # type: ignore return { "message": context_model.to_dict(is_safe_return=True), # not return CustomQuery "client_id": id(context_model.http_client), } if __name__ == "__main__": import uvicorn # type: ignore uvicorn.run(app)
可以从例子中看到, 通过封装的上下文调用会变得非常愉快, 只要通过一两步方法就能设置好自己的上下文属性, 同时不用考虑如何编写上下文的生命周期. 另外也能通过这个例子看出, 在asyncio生态中, contextvars能运用到包括子协程, 多线程等所有的场景中.
在第一次使用时,我就很好奇contextvars是如何去维护程序的上下文的,好在contextvars的作者出了一个向下兼容的contextvars库,虽然他不支持asyncio,但我们还是可以通过代码了解到他的基本原理.
代码仓中有ContextMeta
,ContextVarMeta
和TokenMeta
这几个对象, 它们的功能都是防止用户来继承Context
,ContextVar
和Token
,原理都是通过元类来判断类名是否是自己编写类的名称,如果不是则抛错.
class ContextMeta(type(collections.abc.Mapping)): # contextvars.Context is not subclassable. def __new__(mcls, names, bases, dct): cls = super().__new__(mcls, names, bases, dct) if cls.__module__ != 'contextvars' or cls.__name__ != 'Context': raise TypeError("type 'Context' is not an acceptable base type") return cls
上下文的本质是一个堆栈, 每次set一次对象就向堆栈增加一层数据, 每次reset就是pop掉最上层的数据, 而在Contextvars
中, 通过Token
对象来维护堆栈之间的交互.
class Token(metaclass=TokenMeta): MISSING = object() def __init__(self, context, var, old_value): # 分别存放上下文变量, 当前set的数据以及上次set的数据 self._context = context self._var = var self._old_value = old_value self._used = False @property def var(self): return self._var @property def old_value(self): return self._old_value def __repr__(self): r = '<Token ' if self._used: r += ' used' r += ' var={!r} at {:0x}>'.format(self._var, id(self)) return r
可以看到Token
的代码很少, 它只保存当前的context
变量, 本次调用set的数据和上一次被set的旧数据. 用户只有在调用contextvar.context
后才能得到Token
, 返回的Token
可以被用户在调用context后, 通过调用context.reset(token)来清空保存的上下文,方便本次context的变量能及时的被回收, 回到上上次的数据.
前面说过, Python中由threading.local()
负责每个线程的context, 协程属于线程的’子集’,所以contextvar直接基于threading.local()
生成自己的全局context. 从他的源代码可以看到, _state
就是threading.local()
的引用, 并通过设置和读取_state
的context
属性来写入和读取当前的上下文, copy_context
调用也很简单, 同样也是调用到threading.local()
API.
def copy_context(): return _get_context().copy() def _get_context(): ctx = getattr(_state, 'context', None) if ctx is None: ctx = Context() _state.context = ctx return ctx def _set_context(ctx): _state.context = ctx _state = threading.local()
关于threading.local()
,虽然不是本文重点,但由于contextvars
是基于threading.local()
进行封装的,所以还是要明白threading.local()
的原理,这里并不直接通过源码分析, 而是做一个简单的示例解释.
在一个线程里面使用线程的局部变量会比直接使用全局变量的性能好,因为局部变量只有线程自己能看见,不会影响其他线程,而全局变量的修改必须加锁, 性能会变得很差, 比如下面全局变量的例子:
pet_dict = {} def get_pet(pet_name): return pet_dict[pet_name] def set_pet(pet_name): return pet_dict[pet_name]
这份代码就是模仿一个简单的全局变量调用, 如果是多线程调用的话, 那就需要加锁啦, 每次在读写之前都要等到持有锁的线程放弃了锁后再去竞争, 而且还可能污染到了别的线程存放的数据.
而线程的局部变量则是让每个线程有一个自己的pet_dict
, 假设每个线程调用get_pet
,set_pet
时,都会把自己的pid传入进来, 那么就可以避免多个线程去同时竞争资源, 同时也不会污染到别的线程的数据, 那么代码可以改为这样子:
pet_dict = {} def get_pet(pet_name, pid): return pet_dict[pid][pet_name] def set_pet(pet_name, pid): return pet_dict[pid][pet_name]
不过这样子使用起来非常方便, 同时示例例子没有对异常检查和初始化等处理, 如果值比较复杂, 我们还要维护异常状况, 这样太麻烦了.
这时候threading.local()
就应运而生了,他负责帮我们处理这些维护的工作,我们只要对他进行一些调用即可,调用起来跟单线程调用一样简单方便, 应用threading.local()
后的代码如下:
import threading thread_local=threading.local() def get_pet(pet_name): return thread_local[pet_name] def set_pet(pet_name): return thread_local[pet_name]
可以看到代码就像调用全局变量一样, 但是又不会产生竞争状态。
contextvars
自己封装的Context比较简单, 这里只展示他的两个核心方法(其他的魔术方法就像dict
的魔术方法一样):
class Context(collections.abc.Mapping, metaclass=ContextMeta): def __init__(self): self._data = immutables.Map() self._prev_context = None def run(self, callable, *args, **kwargs): if self._prev_context is not None: raise RuntimeError( 'cannot enter context: {} is already entered'.format(self)) self._prev_context = _get_context() try: _set_context(self) return callable(*args, **kwargs) finally: _set_context(self._prev_context) self._prev_context = None def copy(self): new = Context() new._data = self._data return new
首先, 在__init__
方法可以看到self._data,这里使用到了一个叫immutables.Map()的不可变对象,并对immutables.Map()进行一些封装,所以context可以看成一个不可变的dict。这样可以防止调用copy方法后得到的上下文的变动会影响到了原本的上下文变量。
查看immutables.Map()的示例代码可以看到,每次对原对象的修改时,原对象并不会发生改变,并会返回一个已经发生改变的新对象.
map2 = map.set('a', 10) print(map, map2) # will print: # <immutables.Map({'a': 1, 'b': 2})> # <immutables.Map({'a': 10, 'b': 2})> map3 = map2.delete('b') print(map, map2, map3) # will print: # <immutables.Map({'a': 1, 'b': 2})> # <immutables.Map({'a': 10, 'b': 2})> # <immutables.Map({'a': 10})>
此外,context还有一个叫run
的方法, 上面在执行loop.run_in_executor
时就用过run
方法, 目的就是可以产生一个新的上下文变量给另外一个线程使用, 同时这个新的上下文变量跟原来的上下文变量是一致的.
执行run的时候,可以看出会copy一个新的上下文来调用传入的函数, 由于immutables.Map
的存在, 函数中对上下文的修改并不会影响旧的上下文变量, 达到进程复制数据时的写时复制的目的. 在run
方法的最后, 函数执行完了会再次set旧的上下文, 从而完成一次上下文切换.
def run(self, callable, *args, **kwargs): # 已经存在旧的context,抛出异常,防止多线程循环调用 if self._prev_context is not None: raise RuntimeError( 'cannot enter context: {} is already entered'.format(self)) self._prev_context = _get_context() # 保存当前的context try: _set_context(self) # 设置新的context return callable(*args, **kwargs) # 执行函数 finally: _set_context(self._prev_context) # 设置为旧的context self._prev_context = None
我们一般在使用contextvars模块时,经常使用的就是ContextVar
这个类了,这个类很简单,主要提供了set–设置值,get–获取值,reset–重置值三个方法, 从Context
类中写入和获取值, 而set和reset的就是通过上面的token类进行交互的.
set – 为当前上下文设置变量
def set(self, value): ctx = _get_context() # 获取当前上下文对象`Context` data = ctx._data try: old_value = data[self] # 获取Context旧对象 except KeyError: old_value = Token.MISSING # 获取不到则填充一个object(全局唯一) updated_data = data.set(self, value) # 设置新的值 ctx._data = updated_data return Token(ctx, self, old_value) # 返回带有旧值的token
get – 从当前上下文获取变量
def get(self, default=_NO_DEFAULT): ctx = _get_context() # 获取当前上下文对象`Context` try: return ctx[self] # 返回获取的值 except KeyError: pass if default is not _NO_DEFAULT: return default # 返回调用get时设置的值 if self._default is not _NO_DEFAULT: return self._default # 返回初始化context时设置的默认值 raise LookupError # 都没有则会抛错
reset – 清理本次用到的上下文数据
def reset(self, token): if token._used: # 判断token是否已经被使用 raise RuntimeError("Token has already been used once") if token._var is not self: # 判断token是否是当前contextvar返回的 raise ValueError( "Token was created by a different ContextVar") if token._context is not _get_context(): # 判断token的上下文是否跟contextvar上下文一致 raise ValueError( "Token was created in a different Context") ctx = token._context if token._old_value is Token.MISSING: # 如果没有旧值则删除该值 ctx._data = ctx._data.delete(token._var) else: # 有旧值则当前contextvar变为旧值 ctx._data = ctx._data.set(token._var, token._old_value) token._used = True # 设置flag,标记token已经被使用了
则此,contextvar的原理了解完了,接下来再看看他是如何在asyncio运行的.
由于向下兼容的contextvars
并不支持asyncio, 所以这里通过aiotask-context的源码简要的了解如何在asyncio中如何获取和设置context。
相比起contextvars复杂的概念,在asyncio中,我们可以很简单的获取到当前协程的task, 然后通过task就可以很方便的获取到task的context了,由于Pyhon3.7对asyncio的高级API 重新设计,所以可以看到需要对获取当前task进行封装
PY37 = sys.version_info >= (3, 7) if PY37: def asyncio_current_task(loop=None): """Return the current task or None.""" try: return asyncio.current_task(loop) except RuntimeError: # simulate old behaviour return None else: asyncio_current_task = asyncio.Task.current_task
不同的版本有不同的获取task方法, 之后我们就可以通过调用asyncio_current_task().context
即可获取到当前的上下文了…
同样的,在得到上下文后, 我们这里也需要set, get, reset的操作,不过十分简单, 类似dict一样的操作即可, 它没有token的逻辑:
set
def set(key, value): """ Sets the given value inside Task.context[key]. If the key does not exist it creates it. :param key: identifier for accessing the context dict. :param value: value to store inside context[key]. :raises """ current_task = asyncio_current_task() if not current_task: raise ValueError(NO_LOOP_EXCEPTION_MSG.format(key)) current_task.context[key] = value
get
def get(key, default=None): """ Retrieves the value stored in key from the Task.context dict. If key does not exist, or there is no event loop running, default will be returned :param key: identifier for accessing the context dict. :param default: None by default, returned in case key is not found. :return: Value stored inside the dict[key]. """ current_task = asyncio_current_task() if not current_task: raise ValueError(NO_LOOP_EXCEPTION_MSG.format(key)) return current_task.context.get(key, default)
clear – 也就是contextvar.ContextVars
中的reset
def clear(): """ Clear the Task.context. :raises ValueError: if no current task. """ current_task = asyncio_current_task() if not current_task: raise ValueError("No event loop found") current_task.context.clear()
在Python的更高级版本中,已经支持设置context了,所以这两个方法可以不再使用了.他们最后都用到了task_factory
的方法.task_factory
简单说就是创建一个新的task,再通过工厂方法合成context,最后把context设置到task
def task_factory(loop, coro, copy_context=False, context_factory=None): """ By default returns a task factory that uses a simple dict as the task context, but allows context creation and inheritance to be customized via ``context_factory``. """ # 生成context工厂函数 context_factory = context_factory or partial( dict_context_factory, copy_context=copy_context) # 创建task, 跟asyncio.ensure_future一样 task = asyncio.tasks.Task(coro, loop=loop) if task._source_traceback: del [-1] # 获取task的context try: context = asyncio_current_task(loop=loop).context except AttributeError: context = None # 从context工厂中处理context并赋值在task task.context = context_factory(context) return task
aiotask-context
提供了两个对context处理的函数dict_context_factory
和chainmap_context_factory
.在aiotask-context
中,context是一个dict对象,dict_context_factory
可以选择赋值或者设置新的context
def dict_context_factory(parent_context=None, copy_context=False): """A traditional ``dict`` context to keep things simple""" if parent_context is None: # initial context return {} else: # inherit context new_context = parent_context if copy_context: new_context = deepcopy(new_context) return new_context
chainmap_context_factory
与dict_context_factory
的区别就是在合并context而不是直接继承.同时借用ChainMap
保证合并context后,还能同步context的改变
def chainmap_context_factory(parent_context=None): """ A ``ChainMap`` context, to avoid copying any data and yet preserve strict one-way inheritance (just like with dict copying) """ if parent_context is None: # initial context return ChainMap() else: # inherit context if not isinstance(parent_context, ChainMap): # if a dict context was previously used, then convert # (without modifying the original dict) parent_context = ChainMap(parent_context) return parent_context.new_child()
以上がPythonでcontextvarsモジュールを使用する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。