这篇博客介绍了如何在 FastAPI 项目中集成 Redis 缓存,以提高数据查询的效率和性能。通过利用 async_redis
工具类和生命周期事件管理,文章展示了如何在应用启动时初始化 Redis 连接池,并在服务关闭时释放资源。利用 Redis 缓存,API 路由可以在查询数据库时先检查缓存,若缓存存在则直接返回缓存数据,否则从数据库中获取并存入缓存,从而实现缓存优化和性能提升。
1.main.py main.py
文件展示了 FastAPI 应用的初始化和生命周期管理。在这个文件中,通过使用 @asynccontextmanager
装饰器定义了一个生命周期事件(lifespan
),该事件会在应用启动时初始化数据库和 Redis 连接池,并在服务关闭时释放资源。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from contextlib import asynccontextmanagerfrom fastapi import FastAPIfrom api import usersfrom model import db_initfrom utils.async_redis import RedisUtil@asynccontextmanager async def lifespan (app: FastAPI ): print ('服务启动...' ) await db_init() app.state.redis = await RedisUtil.create_redis_pool() yield print ('服务关闭...' ) app = FastAPI(lifespan=lifespan) app.include_router(users.users_router)
2.utils/async_redis.py 在 async_redis.py
文件中,封装了与 Redis 相关的操作,提供了 Redis 连接池的创建、关闭和缓存操作的实现。具体包括:
create_redis_pool
:应用启动时初始化 Redis 连接池。
close_redis_pool
:应用关闭时关闭 Redis 连接。
get_cache
、set_cache
和 delete_cache
:用于获取、设置和删除 Redis 缓存。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 from typing import Any from fastapi import FastAPI, Requestfrom redis import asyncio as aioredisfrom redis.exceptions import AuthenticationError, TimeoutError, RedisErrorclass RedisUtil : """ Redis相关方法 """ @classmethod async def create_redis_pool (cls ) -> aioredis.Redis: """ 应用启动时初始化redis连接 :return: Redis连接对象 """ redis = await aioredis.from_url( url=f'redis://localhost' , port=6379 , db=1 , encoding='utf-8' , decode_responses=True , ) try : connection = await redis.ping() if connection: print ('redis连接成功' ) else : print ('redis连接失败' ) except AuthenticationError as e: print (f'redis用户名或密码错误,详细错误信息:{e} ' ) except TimeoutError as e: print (f'redis连接超时,详细错误信息:{e} ' ) except RedisError as e: print (f'redis连接错误,详细错误信息:{e} ' ) return redis @classmethod async def close_redis_pool (cls, app: FastAPI ): """ 应用关闭时关闭redis连接 :param app: fastapi对象 :return: """ await app.state.redis.close() @classmethod async def get_cache (cls, request: Request, cache_key: str ) -> Any : """ 获取缓存内容信息 :param request: Request对象 :param cache_key: 缓存键名 :return: 缓存内容信息 """ cache_value = await request.app.state.redis.get(cache_key) return cache_value @classmethod async def set_cache (cls, request: Request, cache_key: str , cache_value: str ): """ 设置缓存内容信息 :param request: Request对象 :param cache_key: 缓存键名 :return: """ await request.app.state.redis.set (cache_key, cache_value) @classmethod async def delete_cache (cls, request: Request, cache_key: str ): """ 删除缓存内容信息 :param request: Request对象 :param cache_key: 缓存键名 :return: """ await request.app.state.redis.delete(cache_key)
3.api/users.py users.py
处理与用户数据相关的 API 请求。在 get_users
路由中,首先会检查 Redis 缓存中是否有对应用户的数据。如果缓存命中,则直接返回缓存数据;若未命中,则从数据库获取用户数据并存入缓存。这种缓存机制能显著减少数据库访问,提高接口响应速度。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from fastapi import Requestfrom utils.async_redis import RedisUtil@users_router.get("/users/{user_id}" , tags=["users" ] ) async def get_users ( request: Request, user_id: int = Path(... ), db: AsyncSession = Depends(db_session ), ): cache_key = f"USER::{user_id} " username = await RedisUtil.get_cache(request, cache_key) if not username: print ("使用数据库..." ) user = await UserDao.get_user_dao(db, user_id) username = user.name await RedisUtil.set_cache(request, cache_key, username) else : print ("使用缓存..." ) return HttpResponse.success(data={"username" : username})
4.测试 使用 curl
工具发送 HTTP 请求,验证缓存机制的有效性。首次请求时,服务器会从数据库获取用户数据,并存入 Redis 缓存;而第二次请求时,服务器直接从 Redis 获取数据,避免了不必要的数据库查询,验证了缓存的作用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 uvicorn main:app --reload --port 5000 curl -X GET 127.0 .0 .1 :5000 /api/v1/users/1 服务端打印: 使用数据库... curl -X GET 127.0 .0 .1 :5000 /api/v1/users/1 服务端打印: 使用缓存...