本篇是该系列的终篇,为大家介绍flask对werkzeug包中LocalProxy的使用。
1.contextvars 使用threading.local()
对象可以基于线程存储全局变量,但不能基于协程。
Python 3.7 引入contextvars
标准库模块,用于处理上下文变量(Context Variables),它在协程、线程等上下文之间共享,而不会污染全局命名空间。
contextvars的使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 import asyncioimport contextvarsuser_id = contextvars.ContextVar('user_id' ) user_ids = contextvars.ContextVar('user_ids' ) token = user_id.set (0 ) user_id.reset(token) try : print (f'main user_id user_ids: {user_id.get()} ' ) except LookupError as e: print ('no value' ) user_id.set (0 ) user_ids.set ({}) async def func (user_id_value ): user_id.set (user_id_value) user_id_dict = user_ids.get() user_id_dict[user_id_value] = user_id_value async def new_coro (user_id_value ): print (f'coro {user_id_value} before func user_id user_ids: {user_id.get()} {user_ids.get()} ' ) await func(user_id_value) print (f'coro {user_id_value} after func user_id user_ids: {user_id.get()} {user_ids.get()} ' ) async def main (): tasks = [] for num in range (1 , 4 ): tasks.append(asyncio.create_task(new_coro(num))) await asyncio.gather(*tasks) asyncio.run(main()) print (f'main user_id user_ids: {user_id.get()} {user_ids.get()} ' )no value coro 1 before func user_id user_ids: 0 {} coro 1 after func user_id user_ids: 1 {1 : 1 } coro 2 before func user_id user_ids: 0 {1 : 1 } coro 2 after func user_id user_ids: 2 {1 : 1 , 2 : 2 } coro 3 before func user_id user_ids: 0 {1 : 1 , 2 : 2 } coro 3 after func user_id user_ids: 3 {1 : 1 , 2 : 2 , 3 : 3 } main user_id user_ids: 0 {1 : 1 , 2 : 2 , 3 : 3 }
普通函数使用 contextvars
模块,需要手动创建上下文:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import contextvarsuser_id = contextvars.ContextVar('user_id' ) user_id.set (100 ) def func (user_id_value ): user_id.set (user_id_value) print ("in func user_id:" , user_id.get()) def main (): print ('before func user_id:' , user_id.get()) context = contextvars.copy_context() context.run(func, 200 ) print ('after func user_id:' , user_id.get()) main() before func user_id: 100 in func user_id: 200 after func user_id: 100
2.LocalProxy LocalProxy的主要作用是先根据特定的方法,获取代理对象设置的值本身或其某个属性,然后对其做后续的操作。
LocalProxy能代理四种类型的对象:ContextVar、Local、LocalStack、Callable对象。Local和LocalStack是对ContextVar的封装:
Local将ContextVar的值设为字典,基于字典的key完成属性的设置和读取
LocalStack将ContextVar的值设为列表,提供了push和pop方法
LocalProxy使用示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 from contextvars import ContextVarfrom werkzeug.local import LocalProxyclass G (): def __getattr__ (self, name ): return 'G' class A (): def __init__ (self ): self .name = 'Tom' self .g = G() def __len__ (self ): return 3 _cv_app = ContextVar("flask.app_ctx" ) app_ctx = LocalProxy(_cv_app, unbound_message='_no_app_msg' ) g = LocalProxy(_cv_app, 'g' , unbound_message='_no_app_msg' ) a = A() _cv_app.set (a) print (len (app_ctx)) print (app_ctx.name) print (g.name)
3.源码分析 Flask使用了ContextVar,源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 _cv_app: ContextVar[AppContext] = ContextVar("flask.app_ctx" ) app_ctx: AppContext = LocalProxy(_cv_app, unbound_message=_no_app_msg) current_app: Flask = LocalProxy(_cv_app, "app" , unbound_message=_no_app_msg) g: _AppCtxGlobals = LocalProxy(_cv_app, "g" , unbound_message=_no_app_msg) _cv_request: ContextVar[RequestContext] = ContextVar("flask.request_ctx" ) request_ctx: RequestContext = LocalProxy(_cv_request, unbound_message=_no_req_msg) request: Request = LocalProxy(_cv_request, "request" , unbound_message=_no_req_msg) session: SessionMixin = LocalProxy(_cv_request, "session" , unbound_message=_no_req_msg) class AppContext : def __init__ (self, app: Flask ) -> None : self .app = app self .g: _AppCtxGlobals = app.app_ctx_globals_class() self ._cv_tokens: list [contextvars.Token] = [] class RequestContext : def __init__ (self, app, environ, request = None , session = None ): self .app = app self .request: Request = request self .session: SessionMixin None = session self ._cv_tokens: list [tuple [contextvars.Token, AppContext None ]] = []
werkzeug包LocalProxy关键源码提取如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 class _ProxyLookup : def __init__ (self, f = None ): if hasattr (f, "__get__" ): def bind_f (instance: LocalProxy, obj: t.Any ) -> t.Callable : return f.__get__(obj, type (obj)) self .bind_f = bind_f def __get__ (self, instance: LocalProxy, owner: type None = None ) -> t.Any : try : obj = instance._get_current_object() except RuntimeError: ... if self .bind_f is not None : return self .bind_f(instance, obj) def __call__ (self, instance: LocalProxy, *args, **kwargs ): return self .__get__(instance, type (instance))(*args, **kwargs) def _identity (o: T ) -> T: return o class LocalProxy (): def __init__ (self, local, name = None , *, unbound_message = None ): if name is None : get_name = _identity else : get_name = attrgetter(name) ... elif isinstance (local, ContextVar): def _get_current_object () -> T: try : obj = local.get() except LookupError: raise RuntimeError(unbound_message) from None return get_name(obj) object .__setattr__(self , "_get_current_object" , _get_current_object) __getattr__ = _ProxyLookup(getattr ) __setattr__ = _ProxyLookup(setattr ) __len__ = _ProxyLookup(len ) ...
__getattr__
是一个描述符,当其被调用时,触发_ProxyLookup.__call__
方法,该方法调用自身的__get__
方法
__get__
方法首先调用代理对象的_get_current_object
方法,获取代理对象背后的obj对象,然后将普通的__getattr__
函数包装成obj对象的方法,接着返回这个方法,最后在_ProxyLookup.__call__
中被调用。
operator.attrgetter
是 Python 标准库中 operator
模块提供的一个函数,用于创建一个可以提取对象属性的函数。使用示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from operator import attrgetterclass Person : def __init__ (self, name, age ): self .name = name self .age = age people = [Person("Alice" , 30 ), Person("Bob" , 25 ), Person("Charlie" , 35 )] get_age = attrgetter('age' ) sorted_people = sorted (people, key=get_age) for person in sorted_people: print (person.name, person.age) Bob 25 Alice 30 Charlie 35
普通函数的__get__
方法可以将自己包装成类的方法:
1 2 3 4 5 6 7 8 9 10 def func (self ): print (f'2333 {self.name} ' ) class C : name = 123 c = C() r = func.__get__(c, C) r()
4.flask调用链 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 app.run(host, port, threaded=True ) run_simple(host, port, app, threaded=True ) srv = make_server(host, port, app, threaded=True ) srv.serve_forever() 使用select循环处理请求: srv._handle_request_noblock() srv.process_request(request, client_address) 开启一个线程处理请求: srv.finish_request(request, client_address) srv.RequestHandlerClass(request, client_address, srv) 这里最后执行了BaseRequestHandler的__init__方法,以下将其实例记为wsgi wsgi.handle() wsgi.handle_one_request() wsgi.run_wsgi() execute(wsgi.srv.app) application_iter = app(environ, start_response) app.__call__(environ, start_response) app.wsgi_app(environ, start_response) ctx = self .request_context(environ) ctx.push() ctx._cv_tokens.append((_cv_request.set (ctx), app_ctx)) response = app.full_dispatch_request() app.dispatch_request() app.ensure_sync(app.view_functions[rule.endpoint])(**view_args) app.finalize_request() ctx.pop() token, app_ctx = ctx._cv_tokens.pop() _cv_request.reset(token) return response(environ, start_response)
srv继承关系:
1 2 3 4 5 srv = ThreadedWSGIServer(host, port, app) ThreadedWSGIServer 继承 (ThreadingMixIn, BaseWSGIServer) BaseWSGIServer 继承 HTTPServer HTTPServer 继承 TCPServer TCPServer 继承 BaseServer
WSGIRequestHandler继承关系:
1 2 RequestHandlerClass = WSGIRequestHandler WSGIRequestHandler 继承 BaseHTTPRequestHandler 继承 StreamRequestHandler 继承 BaseRequestHandler