1.协程使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import asyncioasync def a (): print ('Coro a is running' ) await asyncio.sleep(2 ) print ('Coro a is done' ) return 'A' async def b (): print ('Coro b is running' ) await asyncio.sleep(1 ) print ('Coro b is done' ) return 'B' if __name__ == '__main__' : asyncio.run(main()) ''' loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close() '''
1 2 3 4 async def main (): return_value_a, return_value_b = await asyncio.gather(a(), b(), return_exceptions=True ) print (return_value_a, return_value_b)
1 2 3 4 5 6 7 8 9 10 11 async def main (): task1 = asyncio.create_task(a()) task2 = asyncio.create_task(b()) done, pending = await asyncio.wait([task1, task2], return_when=asyncio.tasks.ALL_COMPLETED) for task in done: print (f'Task {task.get_name()} result: {task.result()} ' )
1 2 3 4 5 6 async def main (): task1 = asyncio.create_task(a()) task2 = asyncio.create_task(b()) await task1 await task2
1 2 3 4 5 async def main (): task1 = asyncio.ensure_future(a()) task2 = asyncio.ensure_future(b()) await task1 await task2
1 2 3 4 5 6 7 async def main (): loop = asyncio.get_event_loop() task1 = loop.create_task(a()) task2 = loop.create_task(b()) task2.cancel() await task1
1 2 3 4 5 6 7 async def main (): loop = asyncio.get_event_loop() task1 = loop.create_task(a()) task2 = asyncio.shield(b()) task2.cancel() await task1
2.协程原理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 ''' 1.producer通过c.send(None)启动生成器 2.producer通过c.send(n)切换到consumer, 3.consumer通过yield拿到send消息,再通过yield切换到producer并把结果返回 4.producer在c.send(n)返回时拿到consumer结果,继续生产下一条消息 5.producer通过c.close()关闭consumer,结束 ''' import timedef consumer (): r = '' while True : n = yield r if not n: return print ('[CONSUMER] Consuming %s...' % n) r = 'OK' def produce (c ): c.send(None ) n = 0 while n < 5 : n = n + 1 print ('[PRODUCER] Producing %s...' % n) r = c.send(n) print ('[PRODUCER] Consumer return: %s' % r) c.close() c = consumer() produce(c) def grep (pattern ): while True : line = (yield ) if pattern in line: print (line) search = grep('coroutine' ) next (search) search.send("I love you" ) search.send("I love coroutine instead!" ) search.close()
3.asynccontextmanager asynccontextmanager
是 Python 中 contextlib
模块提供的一个装饰器,用于创建异步上下文管理器。异步上下文管理器类似于同步上下文管理器,但可以在异步代码中使用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 from contextlib import asynccontextmanagerimport asyncio@asynccontextmanager async def async_manager (): print ("Entering async context" ) try : await asyncio.sleep(1 ) yield "resource" finally : await asyncio.sleep(1 ) print ("Exiting async context" ) async def main (): async with async_manager() as resource: print ("Inside async context:" , resource) asyncio.run(main()) $ Entering async context Inside async context: resource Exiting async context
4.异步装饰器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import asyncio def async_decorator (func ): async def wrapper (*args, **kwargs ): print ("Starting asynchronous operation..." ) result = await func(*args, **kwargs) print ("Asynchronous operation completed: " , result) return result return wrapper @async_decorator async def my_function (): await asyncio.sleep(1 ) print ("my_function completed." ) return "Hello, world!" asyncio.run(my_function()) $ Starting asynchronous operation... my_function completed. Asynchronous operation completed: Hello, world!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import asyncio def async_decorator_with_args (arg1, arg2 ): def inner_decorator (func ): async def wrapper (*args, **kwargs ): print (f"Starting asynchronous operation with {arg1} and {arg2} ..." ) result = await func(*args, **kwargs) print ("Asynchronous operation completed: " , result) return result return wrapper return inner_decorator @async_decorator_with_args("Parameter 1" , "Parameter 2" ) async def my_function (): await asyncio.sleep(1 ) print ("my_function completed." ) return "Hello, world!" asyncio.run(my_function()) $ Starting asynchronous operation with Parameter 1 and Parameter 2. .. my_function completed. Asynchronous operation completed: Hello, world!
5.执行回调函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import asynciofrom functools import partialdef callback (future, n ): print (f'Result: {future.result()} , {n} ' ) async def a (): await asyncio.sleep(1 ) return 'A' async def main (): task = asyncio.create_task(a()) task.add_done_callback(partial(callback, n=1 )) await task asyncio.run(main()) $ Result: A, 1
6.执行同步代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import asyncioimport timedef a (): time.sleep(1 ) return 'A' async def b (): await asyncio.sleep(1 ) return 'B' def show_perf (func ): print ('*' * 20 ) start = time.perf_counter() asyncio.run(func()) print (f'{func.__name__} Cost: {time.perf_counter() - start} ' ) async def c1 (): loop = asyncio.get_running_loop() await asyncio.gather( loop.run_in_executor(None , a), b() ) show_perf(c1) ******************** c1 Cost: 1.0054080998525023
7.多进程+协程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import timeimport asyncioimport aiohttp from multiprocessing import Poolall_urls = ['https://www.baidu.com' ] * 400 async def get_html (url, sem ): async with (sem): async with aiohttp.ClientSession() as session: async with session.get(url) as resp: html = await resp.text() def main (urls ): loop = asyncio.get_event_loop() sem = asyncio.Semaphore(10 ) tasks = [get_html(url, sem) for url in urls] loop.run_until_complete(asyncio.wait(tasks)) loop.close() if __name__ == '__main__' : start = time.time() p = Pool(4 ) for i in range (4 ): p.apply_async(main, args=(all_urls[i*100 :(i+1 )*100 ],)) p.close() p.join() print (time.time()-start)