Cara menggunakan modul contextvars dalam python

王林
Lepaskan: 2023-05-13 12:04:05
ke hadapan
1384 orang telah melayarinya

    Prakata

    Selepas Python3.7, modul contextvars muncul dalam pustaka rasminya ialah untuk menambah ekologi berbilang benang dan asyncio . Fungsi konteks boleh memanggil pembolehubah konteks program walaupun berbilang coroutine berjalan serentak, dengan itu menyahgandingkan logik kami

    Konteks boleh difahami sebagai konteks di mana kita bercakap, beberapa perkataan mempunyai makna yang berbeza apabila ia dipisahkan daripada konteks tertentu, dan perkara yang sama berlaku untuk menjalankan program Terdapat juga konteksnya dalam benang, tetapi ia hanya dipanggil tindanan Sebagai contoh, dalam Python, ia adalah disimpan dalam pembolehubah thread.local, dan coroutine juga mempunyai konteksnya sendiri, tetapi ia tidak didedahkan, dengan modul contextvars, kita boleh menyimpan dan membaca melalui contextvars penggunaan ialah ia bukan sahaja boleh menghalang "pembolehubah daripada merebak ke seluruh dunia", tetapi ia juga boleh digabungkan dengan baik dengan TypeHint, supaya kod anda boleh disemak oleh mypy dan IDE, menjadikan kod anda lebih sesuai untuk kejuruteraan.

    Namun, selepas menggunakan

    , akan ada beberapa lagi panggilan tersembunyi, dan kos tersembunyi ini perlu diselesaikan contextvars
    Arahan kemas kinicontextvars

    <.>Tukar rangka kerja web

    Tambahkan perihalan konteks yang ditulis sendiri untuk
    • sanicstarlette yang boleh digunakan untuk

      ,
    • starlettefastapi Kemas kini fast_tools Contoh terkini .context dan pengubahsuaian ringkas pada teks.

    • 1 Perbezaan antara melepasi pembolehubah dengan atau tanpa konteks

    • Jika anda telah menggunakan rangka kerja
    , anda akan tahu bahawa

    mempunyai konteksnya sendiri fungsi, dan contextvars adalah Ia sangat serupa, tetapi juga menambah sokongan untuk konteks asyncio. Konteks

    dilaksanakan berdasarkan Flask Kesan pengasingan Flask adalah sangat baik, tetapi ia hanya untuk urutan dan hanya mengasingkan status data antara urutan untuk menyokong
    Berjalan dalam 🎜>, saya sendiri melaksanakan pembolehubah Flask Contoh pembolehubah konteks threading.local yang biasa digunakan adalah seperti berikut: threading.local

    from flask import Flask, request
    app = Flask(__name__)
    @app.route(&#39;/&#39;)
    def root():
        so1n_name = request.get(&#39;so1n_name&#39;)
        return f&#39;Name is {so1n_name}&#39;
    Salin selepas log masuk
    werkzeug Berbanding dengannya, ia adalah satu lagi rangka kerja web klasik <. 🎜> gevent, ia tidak mempunyai sokongan konteks, jadi ia hanya boleh melepasi objek Local secara eksplisit Contoh Flask adalah seperti berikut: request

    from django.http import HttpResponse
    def root(request):
        so1n_name = request.get(&#39;so1n_name&#39;)
        return HttpResponse(f&#39;Name is {so1n_name}&#39;)
    Salin selepas log masuk

    Dengan membandingkan dua di atas, kita dapati bahawa dalam Python , kita perlu lulus pembolehubah yang dipanggil permintaan secara eksplisit, dan Djano mengimport pembolehubah global yang dipanggil permintaan dan menggunakannya secara langsung dalam paparan untuk mencapai tujuan penyahgandingan request Beberapa orang mungkin berkata, iaitu, perbezaan antara melepasi pembolehubah Untuk menjimatkan keperluan untuk melepasi pembolehubah ini, ia tidak berbaloi untuk menghabiskan banyak usaha untuk mengekalkan pembolehubah konteks Kemudian anda boleh melihat contoh berikut . Jika terdapat banyak peringkat, "melepasi satu parameter untuk satu hari" akan berlaku (Namun, jika pelapisan dilakukan dengan baik atau keperluan tidak menipu, keadaan di bawah biasanya tidak akan berlaku. baik, tetapi mungkin juga ada kalanya sekumpulan keperluan buruk muncul)

    # 伪代码,举个例子一个request传了3个函数
    from django.http import HttpResponse
    def is_allow(request, uid):
        if request.ip == &#39;127.0.0.1&#39; and check_permissions(uid):
            return True
        else:
            return False
    def check_permissions(request, uid):
        pass
    
    def root(request):
        user_id = request.GET.get(&#39;uid&#39;)
        if is_allow(request, id):
        	return HttpResponse(&#39;ok&#39;)
        else
            return HttpResponse(&#39;error&#39;)
    Salin selepas log masuk

    Selain itu, selain daripada mencegah

    masalah ini, beberapa penyahgandingan boleh dilakukan melalui konteks Sebagai contoh, salah satu daripada keperluan perniagaan teknikal yang paling klasik ialah mencetak request_id dalam log untuk memudahkan penyelesaian masalah pautan Jika terdapat konteks pada modul masa ini, anda boleh memisahkan bacaan dan penulisan request_id, seperti contoh membaca dan menulis request_id berdasarkan pada. Rangka kerja Django: Flask

    import logging
    from typing import Any
    from flask import g  # type: ignore
    from flask.logging import default_handler
    # 这是一个Python logging.Filter的对象, 日志在生成之前会经过Filter步骤, 这时候我们可以为他绑定request_id变量
    class RequestIDLogFilter(logging.Filter):
        """
        Log filter to inject the current request id of the request under `log_record.request_id`
        """
        def filter(self, record: Any) -> Any:
            record.request_id = g.request_id or None
            return record
    # 配置日志的format格式, 这里多配了一个request_id变量
    format_string: str = (
        "[%(asctime)s][%(levelname)s][%(filename)s:%(lineno)d:%(funcName)s:%(request_id)s]" " %(message)s"
    )
    # 为flask的默认logger设置format和增加一个logging.Filter对象
    default_handler.setFormatter(logging.Formatter(format_string))
    default_handler.addFilter(RequestIDLogFilter())
    # 该方法用于设置request_id
    def set_request_id() -> None:
        g.request_id = request.headers.get("X-Request-Id", str(uuid4()))
    # 初始化FLask对象, 并设置before_request
    app: Flask = Flask("demo")
    app.before_request(set_request_id)
    Salin selepas log masuk

    2 Cara menggunakan modul contextvars

    di sini Contoh diberikan, tetapi contoh ini juga mempunyai penyelesaian lain cara menggunakan modul contextvar 一个参数传一天Flask

    Mula-mula, mari kita lihat cara rangka kerja web asyncio berfungsi apabila

    tidak digunakan untuk meneruskan pembolehubah, mengikut dokumentasi

    , apabila
    tidak digunakan, cara untuk menghantar

    instance klien adalah dengan menyimpan

    instance klien melalui pembolehubah request.stat Tulis semula kod seperti berikut:
    # demo/web_tools.py
    # 通过中间件把变量给存进去
    class RequestContextMiddleware(BaseHTTPMiddleware):
        async def dispatch(
                self, request: Request, call_next: RequestResponseEndpoint
        ) -> Response:
            request.stat.redis = REDIS_POOL
            response = await call_next(request)
            return response
    # demo/server.py
    # 调用变量
    @APP.route(&#39;/&#39;)
    async def homepage(request):
        # 伪代码,这里是执行redis命令
        await request.stat.redis.execute()
        return JSONResponse({&#39;hello&#39;: &#39;world&#39;})
    Salin selepas log masuk

    Kodnya sangat mudah dan boleh berjalan seperti biasa, tetapi apabila anda memfaktorkan semula pada masa akan datang, sebagai contoh, jika anda hanya menukar nama pembolehubah redis kepada new_redis, IDE tidak akan mengenalinya dan anda perlu mengubahnya satu demi satu. Pada masa yang sama, apabila menulis kod, IDE tidak akan tahu jenis pembolehubah yang dipanggil oleh kaedah ini, dan IDE tidak boleh menyemaknya secara bijak untuk anda (contohnya, apabila anda memasukkan request.stat.redis., IDE akan tidak dilaksanakan atau ralat akan muncul , IDE tidak akan meminta ini sangat merugikan kejuruteraan projek, dan melalui contextvars dan starlette, masalah ini dapat diselesaikan berkata begitu banyak, berikut ialah contextvars klien ialah contoh untuk menunjukkan cara menggunakan Redis dalam ekosistem asyncio dan memperkenalkan Redis (lihat kod untuk penjelasan terperinci

    ).
    # demo/context.py
    # 该文件存放contextvars相关
    import contextvars
    if TYPE_CHECKING:
        from demo.redis_dal import RDS  # 这里是一个redis的封装实例
    # 初始化一个redis相关的全局context
    redis_pool_context = contextvars.ContextVar(&#39;redis_pool&#39;)
    # 通过函数调用可以获取到当前协程运行时的context上下文
    def get_redis() -> &#39;RDS&#39;:
        return redis_pool_context.get()
    # demo/web_tool.py
    # 该文件存放starlette相关模块
    from starlette.middleware.base import BaseHTTPMiddleware
    from starlette.requests import Request
    from starlette.middleware.base import RequestResponseEndpoint
    from starlette.responses import Response
    from demo.redis_dal import RDS
    # 初始化一个redis客户端变量,当前为空
    REDIS_POOL = None  # type: Optional[RDS]
    class RequestContextMiddleware(BaseHTTPMiddleware):
        async def dispatch(
                self, request: Request, call_next: RequestResponseEndpoint
        ) -> Response:
            # 通过中间件,在进入路由之前,把redis客户端放入当前协程的上下文之中
            token = redis_pool_context.set(REDIS_POOL)
            try:
            	response = await call_next(request)
                return response
            finally:
            	# 调用完成,回收当前请求设置的redis客户端的上下文
                redis_pool_context.reset(token)
    async def startup_event() -> None:
        global REDIS_POOL
        REDIS_POOL = RDS() # 初始化客户端,里面通过asyncio.ensure_future逻辑延后连接
    async def shutdown_event() -> None:
        if REDIS_POOL:
            await REDIS_POOL.close() # 关闭redis客户端
    # demo/server.py
    # 该文件存放starlette main逻辑
    from starlette.applications import Starlette
    from starlette.responses import JSONResponse
    from demo.web_tool import RequestContextMiddleware
    from demo.context import get_redis
    APP = Starlette()
    APP.add_middleware(RequestContextMiddleware)
    @APP.route(&#39;/&#39;)
    async def homepage(request):
        # 伪代码,这里是执行redis命令
        # 只要验证 id(get_redis())等于demo.web_tool里REDID_POOL的id一致,那证明contextvars可以为asyncio维护一套上下文状态
        await get_redis().execute()
        return JSONResponse({&#39;hello&#39;: &#39;world&#39;})
    Salin selepas log masuk

    3.如何优雅的使用contextvars

    从上面的示例代码来看, 使用contextvarTypeHint确实能让让IDE可以识别到这个变量是什么了, 但增加的代码太多了,更恐怖的是, 每多一个变量,就需要自己去写一个context,一个变量的初始化,一个变量的get函数,同时在引用时使用函数会比较别扭.

    自己在使用了contextvars一段时间后,觉得这样太麻烦了,每次都要做一堆重复的操作,且平时使用最多的就是把一个实例或者提炼出Headers的参数放入contextvars中,所以写了一个封装fast_tools.context(同时兼容fastapistarlette), 它能屏蔽所有与contextvars的相关逻辑,其中由ContextModel负责contextvars的set和get操作,ContextMiddleware管理contextvars的周期,HeaderHeader负责托管Headers相关的参数, 调用者只需要在ContextModel中写入自己需要的变量,引用时调用ContextModel的属性即可.

    以下是调用者的代码示例, 这里的实例化变量由一个http client代替, 且都会每次请求分配一个客户端实例, 但在实际使用中并不会为每一个请求都分配一个客户端实例, 很影响性能:

    import asyncio
    import uuid
    from contextvars import Context, copy_context
    from functools import partial
    from typing import Optional, Set
    import httpx
    from fastapi import FastAPI, Request, Response
    from fast_tools.context import ContextBaseModel, ContextMiddleware, HeaderHelper
    app: FastAPI = FastAPI()
    check_set: Set[int] = set()
    class ContextModel(ContextBaseModel):
        """
    	通过该实例可以屏蔽大部分与contextvars相关的操作,如果要添加一个变量,则在该实例添加一个属性即可.
    	属性必须要使用Type Hints的写法,不然不会识别(强制使用Type Hints)
        """
        # 用于把自己的实例(如上文所说的redis客户端)存放于contextvars中
        http_client: httpx.AsyncClient
        # HeaderHepler用于把header的变量存放于contextvars中
        request_id: str = HeaderHelper.i("X-Request-Id", default_func=lambda request: str(uuid.uuid4()))
        ip: str = HeaderHelper.i("X-Real-IP", default_func=lambda request: request.client.host)
        user_agent: str = HeaderHelper.i("User-Agent")
    
        async def before_request(self, request: Request) -> None:
            # 请求之前的钩子, 通过该钩子可以设置自己的变量
            self.http_client = httpx.AsyncClient()
            check_set.add(id(self.http_client))
    
        async def before_reset_context(self, request: Request, response: Optional[Response]) -> None:
            # 准备退出中间件的钩子, 这步奏后会清掉上下文
            await self.http_client.aclose()
    context_model: ContextModel = ContextModel()
    app.add_middleware(ContextMiddleware, context_model=context_model)
    async def test_ensure_future() -> None:
        assert id(context_model.http_client) in check_set
    def test_run_in_executor() -> None:
        assert id(context_model.http_client) in check_set
    def test_call_soon() -> None:
        assert id(context_model.http_client) in check_set
    @app.get("/")
    async def root() -> dict:
        # 在使用asyncio.ensure_future开启另外一个子协程跑任务时, 也可以复用上下文
        asyncio.ensure_future(test_ensure_future())
        loop: "asyncio.AbstractEventLoop" = asyncio.get_event_loop()
        # 使用call_soon也能复用上下文
        loop.call_soon(test_call_soon)
        # 使用run_in_executor也能复用上下文, 但必须使用上下文的run方法, copy_context表示复制当前的上下文
        ctx: Context = copy_context()
        await loop.run_in_executor(None, partial(ctx.run, test_run_in_executor))  # type: ignore
        return {
            "message": context_model.to_dict(is_safe_return=True),  # not return CustomQuery
            "client_id": id(context_model.http_client),
        }
    if __name__ == "__main__":
        import uvicorn  # type: ignore
        uvicorn.run(app)
    Salin selepas log masuk

    可以从例子中看到, 通过封装的上下文调用会变得非常愉快, 只要通过一两步方法就能设置好自己的上下文属性, 同时不用考虑如何编写上下文的生命周期. 另外也能通过这个例子看出, 在asyncio生态中, contextvars能运用到包括子协程, 多线程等所有的场景中.

    4.contextvars的原理

    在第一次使用时,我就很好奇contextvars是如何去维护程序的上下文的,好在contextvars的作者出了一个向下兼容的contextvars库,虽然他不支持asyncio,但我们还是可以通过代码了解到他的基本原理.

    4.1 ContextMeta,ContextVarMeta和TokenMeta

    代码仓中有ContextMeta,ContextVarMetaTokenMeta这几个对象, 它们的功能都是防止用户来继承Context,ContextVarToken,原理都是通过元类来判断类名是否是自己编写类的名称,如果不是则抛错.

    class ContextMeta(type(collections.abc.Mapping)):
        # contextvars.Context is not subclassable.
        def __new__(mcls, names, bases, dct):
            cls = super().__new__(mcls, names, bases, dct)
            if cls.__module__ != &#39;contextvars&#39; or cls.__name__ != &#39;Context&#39;:
                raise TypeError("type &#39;Context&#39; is not an acceptable base type")
            return cls
    Salin selepas log masuk

    4.2 Token

    上下文的本质是一个堆栈, 每次set一次对象就向堆栈增加一层数据, 每次reset就是pop掉最上层的数据, 而在Contextvars中, 通过Token对象来维护堆栈之间的交互.

    class Token(metaclass=TokenMeta):
        MISSING = object()
        def __init__(self, context, var, old_value):
            # 分别存放上下文变量, 当前set的数据以及上次set的数据
            self._context = context
            self._var = var
            self._old_value = old_value
            self._used = False
        @property
        def var(self):
            return self._var
        @property
        def old_value(self):
            return self._old_value
        def __repr__(self):
            r = &#39;<Token &#39;
            if self._used:
                r += &#39; used&#39;
            r += &#39; var={!r} at {:0x}>&#39;.format(self._var, id(self))
            return r
    Salin selepas log masuk

    可以看到Token的代码很少, 它只保存当前的context变量, 本次调用set的数据和上一次被set的旧数据. 用户只有在调用contextvar.context后才能得到Token, 返回的Token可以被用户在调用context后, 通过调用context.reset(token)来清空保存的上下文,方便本次context的变量能及时的被回收, 回到上上次的数据.

    4.3 全局唯一context

    前面说过, Python中由threading.local()负责每个线程的context, 协程属于线程的’子集’,所以contextvar直接基于threading.local()生成自己的全局context. 从他的源代码可以看到, _state就是threading.local()的引用, 并通过设置和读取_statecontext属性来写入和读取当前的上下文, copy_context调用也很简单, 同样也是调用到threading.local()API.

    def copy_context():
        return _get_context().copy()
    def _get_context():
        ctx = getattr(_state, &#39;context&#39;, None)
        if ctx is None:
            ctx = Context()
            _state.context = ctx
        return ctx
    def _set_context(ctx):
        _state.context = ctx
    _state = threading.local()
    Salin selepas log masuk

    关于threading.local(),虽然不是本文重点,但由于contextvars是基于threading.local()进行封装的,所以还是要明白threading.local()的原理,这里并不直接通过源码分析, 而是做一个简单的示例解释.

    在一个线程里面使用线程的局部变量会比直接使用全局变量的性能好,因为局部变量只有线程自己能看见,不会影响其他线程,而全局变量的修改必须加锁, 性能会变得很差, 比如下面全局变量的例子:

    pet_dict = {}
    def get_pet(pet_name):
        return pet_dict[pet_name]
    def set_pet(pet_name):
        return pet_dict[pet_name]
    Salin selepas log masuk

    这份代码就是模仿一个简单的全局变量调用, 如果是多线程调用的话, 那就需要加锁啦, 每次在读写之前都要等到持有锁的线程放弃了锁后再去竞争, 而且还可能污染到了别的线程存放的数据.

    而线程的局部变量则是让每个线程有一个自己的pet_dict, 假设每个线程调用get_pet,set_pet时,都会把自己的pid传入进来, 那么就可以避免多个线程去同时竞争资源, 同时也不会污染到别的线程的数据, 那么代码可以改为这样子:

    pet_dict = {}
    def get_pet(pet_name, pid):
        return pet_dict[pid][pet_name]
    def set_pet(pet_name, pid):
        return pet_dict[pid][pet_name]
    Salin selepas log masuk

    不过这样子使用起来非常方便, 同时示例例子没有对异常检查和初始化等处理, 如果值比较复杂, 我们还要维护异常状况, 这样太麻烦了.

    这时候threading.local()就应运而生了,他负责帮我们处理这些维护的工作,我们只要对他进行一些调用即可,调用起来跟单线程调用一样简单方便, 应用threading.local()后的代码如下:

    import threading
    thread_local=threading.local()
    def get_pet(pet_name):
        return thread_local[pet_name]
    def set_pet(pet_name):
        return thread_local[pet_name]
    Salin selepas log masuk

    可以看到代码就像调用全局变量一样, 但是又不会产生竞争状态。

    4.4contextvar自己封装的Context

    contextvars自己封装的Context比较简单, 这里只展示他的两个核心方法(其他的魔术方法就像dict的魔术方法一样):

    class Context(collections.abc.Mapping, metaclass=ContextMeta):
        def __init__(self):
            self._data = immutables.Map()
            self._prev_context = None
        def run(self, callable, *args, **kwargs):
            if self._prev_context is not None:
                raise RuntimeError(
                    &#39;cannot enter context: {} is already entered&#39;.format(self))
            self._prev_context = _get_context()
            try:
                _set_context(self)
                return callable(*args, **kwargs)
            finally:
                _set_context(self._prev_context)
                self._prev_context = None
        def copy(self):
            new = Context()
            new._data = self._data
            return new
    Salin selepas log masuk

    首先, 在__init__方法可以看到self._data,这里使用到了一个叫immutables.Map()的不可变对象,并对immutables.Map()进行一些封装,所以context可以看成一个不可变的dict。这样可以防止调用copy方法后得到的上下文的变动会影响到了原本的上下文变量。

    查看immutables.Map()的示例代码可以看到,每次对原对象的修改时,原对象并不会发生改变,并会返回一个已经发生改变的新对象.

    map2 = map.set(&#39;a&#39;, 10)
    print(map, map2)
    # will print:
    #   <immutables.Map({&#39;a&#39;: 1, &#39;b&#39;: 2})>
    #   <immutables.Map({&#39;a&#39;: 10, &#39;b&#39;: 2})>
    map3 = map2.delete(&#39;b&#39;)
    print(map, map2, map3)
    # will print:
    #   <immutables.Map({&#39;a&#39;: 1, &#39;b&#39;: 2})>
    #   <immutables.Map({&#39;a&#39;: 10, &#39;b&#39;: 2})>
    #   <immutables.Map({&#39;a&#39;: 10})>
    Salin selepas log masuk

    此外,context还有一个叫run的方法, 上面在执行loop.run_in_executor时就用过run方法, 目的就是可以产生一个新的上下文变量给另外一个线程使用, 同时这个新的上下文变量跟原来的上下文变量是一致的.
    执行run的时候,可以看出会copy一个新的上下文来调用传入的函数, 由于immutables.Map的存在, 函数中对上下文的修改并不会影响旧的上下文变量, 达到进程复制数据时的写时复制的目的. 在run方法的最后, 函数执行完了会再次set旧的上下文, 从而完成一次上下文切换.

    def run(self, callable, *args, **kwargs):
        # 已经存在旧的context,抛出异常,防止多线程循环调用
        if self._prev_context is not None:
            raise RuntimeError(
                &#39;cannot enter context: {} is already entered&#39;.format(self))
        self._prev_context = _get_context()  # 保存当前的context
        try:
            _set_context(self) # 设置新的context
            return callable(*args, **kwargs)  # 执行函数
        finally:
            _set_context(self._prev_context)  # 设置为旧的context
            self._prev_context = None
    Salin selepas log masuk

    4.5 ContextVar

    我们一般在使用contextvars模块时,经常使用的就是ContextVar这个类了,这个类很简单,主要提供了set–设置值,get–获取值,reset–重置值三个方法, 从Context类中写入和获取值, 而set和reset的就是通过上面的token类进行交互的.

    set – 为当前上下文设置变量

    def set(self, value):
        ctx = _get_context()  # 获取当前上下文对象`Context`
        data = ctx._data
        try:
            old_value = data[self]  # 获取Context旧对象
        except KeyError:
            old_value = Token.MISSING  # 获取不到则填充一个object(全局唯一)
        updated_data = data.set(self, value) # 设置新的值
        ctx._data = updated_data
        return Token(ctx, self, old_value) # 返回带有旧值的token
    Salin selepas log masuk

    get – 从当前上下文获取变量

    def get(self, default=_NO_DEFAULT):
        ctx = _get_context()  # 获取当前上下文对象`Context`
        try:
            return ctx[self]  # 返回获取的值
        except KeyError:
            pass
        if default is not _NO_DEFAULT:
            return default    # 返回调用get时设置的值
        if self._default is not _NO_DEFAULT:
            return self._default  # 返回初始化context时设置的默认值
        raise LookupError  # 都没有则会抛错
    Salin selepas log masuk

    reset – 清理本次用到的上下文数据

    def reset(self, token):
           if token._used:
           	# 判断token是否已经被使用
               raise RuntimeError("Token has already been used once")
           if token._var is not self:
           	# 判断token是否是当前contextvar返回的
               raise ValueError(
                   "Token was created by a different ContextVar")
           if token._context is not _get_context():
           	# 判断token的上下文是否跟contextvar上下文一致
               raise ValueError(
                   "Token was created in a different Context")
           ctx = token._context
           if token._old_value is Token.MISSING:
           	# 如果没有旧值则删除该值
               ctx._data = ctx._data.delete(token._var)
           else:
           	# 有旧值则当前contextvar变为旧值
               ctx._data = ctx._data.set(token._var, token._old_value)
           token._used = True  # 设置flag,标记token已经被使用了
    Salin selepas log masuk

    则此,contextvar的原理了解完了,接下来再看看他是如何在asyncio运行的.

    5.contextvars asyncio

    由于向下兼容的contextvars并不支持asyncio, 所以这里通过aiotask-context的源码简要的了解如何在asyncio中如何获取和设置context。

    5.1在asyncio中获取context

    相比起contextvars复杂的概念,在asyncio中,我们可以很简单的获取到当前协程的task, 然后通过task就可以很方便的获取到task的context了,由于Pyhon3.7对asyncio的高级API 重新设计,所以可以看到需要对获取当前task进行封装

    PY37 = sys.version_info >= (3, 7)
    if PY37:
        def asyncio_current_task(loop=None):
            """Return the current task or None."""
            try:
                return asyncio.current_task(loop)
            except RuntimeError:
                # simulate old behaviour
                return None
    else:
        asyncio_current_task = asyncio.Task.current_task
    Salin selepas log masuk

    不同的版本有不同的获取task方法, 之后我们就可以通过调用asyncio_current_task().context即可获取到当前的上下文了…

    5.2 对上下文的操作

    同样的,在得到上下文后, 我们这里也需要set, get, reset的操作,不过十分简单, 类似dict一样的操作即可, 它没有token的逻辑:

    set

    def set(key, value):
        """
        Sets the given value inside Task.context[key]. If the key does not exist it creates it.
        :param key: identifier for accessing the context dict.
        :param value: value to store inside context[key].
        :raises
        """
        current_task = asyncio_current_task()
        if not current_task:
            raise ValueError(NO_LOOP_EXCEPTION_MSG.format(key))
    
        current_task.context[key] = value
    Salin selepas log masuk

    get

    def get(key, default=None):
        """
        Retrieves the value stored in key from the Task.context dict. If key does not exist,
        or there is no event loop running, default will be returned
        :param key: identifier for accessing the context dict.
        :param default: None by default, returned in case key is not found.
        :return: Value stored inside the dict[key].
        """
        current_task = asyncio_current_task()
        if not current_task:
            raise ValueError(NO_LOOP_EXCEPTION_MSG.format(key))
        return current_task.context.get(key, default)
    Salin selepas log masuk

    clear – 也就是contextvar.ContextVars中的reset

    def clear():
        """
        Clear the Task.context.
        :raises ValueError: if no current task.
        """
        current_task = asyncio_current_task()
        if not current_task:
            raise ValueError("No event loop found")
        current_task.context.clear()
    Salin selepas log masuk

    5.2 copying_task_factory和chainmap_task_factory

    在Python的更高级版本中,已经支持设置context了,所以这两个方法可以不再使用了.他们最后都用到了task_factory的方法.
    task_factory简单说就是创建一个新的task,再通过工厂方法合成context,最后把context设置到task

    def task_factory(loop, coro, copy_context=False, context_factory=None):
        """
        By default returns a task factory that uses a simple dict as the task context,
        but allows context creation and inheritance to be customized via ``context_factory``.
        """
        # 生成context工厂函数
        context_factory = context_factory or partial(
            dict_context_factory, copy_context=copy_context)
        # 创建task, 跟asyncio.ensure_future一样
        task = asyncio.tasks.Task(coro, loop=loop)
        if task._source_traceback:
            del [-1]
    
        # 获取task的context
        try:
            context = asyncio_current_task(loop=loop).context
        except AttributeError:
            context = None
        # 从context工厂中处理context并赋值在task
        task.context = context_factory(context)
        return task
    Salin selepas log masuk

    aiotask-context提供了两个对context处理的函数dict_context_factorychainmap_context_factory.在aiotask-context中,context是一个dict对象,dict_context_factory可以选择赋值或者设置新的context

    def dict_context_factory(parent_context=None, copy_context=False):
        """A traditional ``dict`` context to keep things simple"""
        if parent_context is None:
            # initial context
            return {}
        else:
            # inherit context
            new_context = parent_context
            if copy_context:
                new_context = deepcopy(new_context)
            return new_context
    Salin selepas log masuk

    chainmap_context_factorydict_context_factory的区别就是在合并context而不是直接继承.同时借用ChainMap保证合并context后,还能同步context的改变

    def chainmap_context_factory(parent_context=None):
        """
        A ``ChainMap`` context, to avoid copying any data
        and yet preserve strict one-way inheritance
        (just like with dict copying)
        """
        if parent_context is None:
            # initial context
            return ChainMap()
        else:
            # inherit context
            if not isinstance(parent_context, ChainMap):
                # if a dict context was previously used, then convert
                # (without modifying the original dict)
                parent_context = ChainMap(parent_context)
            return parent_context.new_child()
    Salin selepas log masuk

    Atas ialah kandungan terperinci Cara menggunakan modul contextvars dalam python. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

    Label berkaitan:
    sumber:yisu.com
    Kenyataan Laman Web ini
    Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn
    Tutorial Popular
    Lagi>
    Muat turun terkini
    Lagi>
    kesan web
    Kod sumber laman web
    Bahan laman web
    Templat hujung hadapan