本文详细介绍了如何使用 SQLAlchemy 通过异步操作与MySQL数据库进行交互。在实现过程中,我们使用了 FastAPI 提供的生命周期事件,SQLAlchemy 的异步支持,以及自定义数据库会话管理和事务封装方法。通过这种结构,文章展示了如何在 FastAPI 中组织代码,使得开发、测试和扩展都变得更加高效和清晰。在实际开发中,可以在此基础上继续扩展更多功能。
1.创建数据库 首先在 MySQL 中创建一个名为 fastapi_demo
的数据库,并设置默认字符集为 utf8mb4
。
1 2 3 4 mysql -u root -p root CREATE DATABASE fastapi_demo default charset=utf8mb4;
2.main.py 这里通过 FastAPI
的生命周期事件来初始化和释放资源。我们使用 @asynccontextmanager
来定义 lifespan
方法,确保在应用启动时初始化数据库资源,在应用关闭时释放资源。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 from contextlib import asynccontextmanagerfrom fastapi import FastAPIfrom api import usersfrom model import db_init@asynccontextmanager async def lifespan (app: FastAPI ): print ('服务启动...' ) await db_init() yield print ('服务关闭...' ) app = FastAPI(lifespan=lifespan) app.include_router(users.users_router)
3.model/__init__.py
新建 model
文件夹,在 model/__init__.py
文件中,我们配置了数据库引擎和会话管理。这段代码中,我们使用了 create_async_engine
来连接数据库,并通过 sessionmaker
定义了一个异步的会话生成器,并且通过 db_init
方法初始化数据库表结构。此外,db_session
作为接口的Depends选项用于提供session,auto_commmit
作为一个事务封装器,确保数据库操作的原子性,异常发生时可以回滚事务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 import contextlibfrom sqlalchemy.orm import sessionmaker, declarative_basefrom sqlalchemy.ext.asyncio import create_async_engine, AsyncSessionBase = declarative_base() async_uri = 'mysql+aiomysql://root:root@localhost:3306/fastapi_demo?charset=utf8mb4' async_engine = create_async_engine( url=async_uri, echo=True , pool_size=10 , max_overflow=30 , pool_recycle=60 * 30 ) async_session = sessionmaker(async_engine, class_=AsyncSession) async def db_init () -> None : async with async_engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) async def db_session () -> AsyncSession: async with async_session() as session: return session @contextlib.asynccontextmanager async def auto_commmit (session: AsyncSession ): try : yield session await session.commit() except Exception as e: await session.rollback() raise e finally : await session.close() from .user import *
4.model/user.py 在这个文件中,我们定义了 User
数据模型。使用 SQLAlchemy 的声明式模型来表示用户数据表。每个 User
对象包括用户的基本信息,如 name
、mobile
、age
和 introduction
,以及 create_time
来记录创建时间。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from datetime import datetimefrom sqlalchemy import Column, String, Integer, DateTimefrom . import Baseclass User (Base ): __tablename__ = 'user' id = Column(Integer, primary_key=True , autoincrement=True , doc="主键" ) name = Column(String(20 ), index=True ) mobile = Column(String(11 )) age = Column(Integer) introduction = Column(String(100 )) create_time = Column(DateTime, default=datetime.now)
5.dao/user_dao.py 新建 dao
文件夹,在 dao/user_dao.py
中,我们封装了与数据库交互的具体操作方法,包括用户的增、删、查、改等操作。每个方法都通过异步数据库会话 AsyncSession
执行 SQLAlchemy 的查询或修改操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 from sqlalchemy.ext.asyncio import AsyncSessionfrom sqlalchemy import delete, select, update from model import auto_commmitfrom model.user import Userfrom forms.user_form import UserReqclass UserDao : """ 用户模块数据库操作层 """ @classmethod async def get_user_dao (cls, db: AsyncSession, user_id: int ): """ 根据用户名获取用户信息 :param db: orm对象 :param user_name: 用户名 :return: 当前用户名的用户信息对象 """ query_user_info = ( ( await db.execute( select(User) .where(User.id == user_id) ) ) .scalars() .first() ) return query_user_info @classmethod async def add_user_dao (cls, db: AsyncSession, user: UserReq ): """ 新增用户数据库操作 :param db: orm对象 :param user: 用户对象 :return: """ async with auto_commmit(db) as session: db_user = User(**user.model_dump()) session.add(db_user) await session.flush() return db_user.id @classmethod async def update_user_dao (cls, db: AsyncSession, user_id: int , user: UserReq ): """ 更新用户数据库操作 :param db: orm对象 :param user_id: 用户id :param user: 用户对象 :return: """ async with auto_commmit(db) as session: await session.execute( update(User) .where(User.id == user_id) .values(**user.model_dump()) ) @classmethod async def delete_user_dao (cls, db: AsyncSession, user_id: int ): """ 删除用户数据库操作 :param db: orm对象 :param user_id: 用户id :return: """ async with auto_commmit(db) as session: await session.execute(delete(User).where(User.id == user_id))
6.api/users.py api/users.py
定义了用户相关的 API 路由,包括增、删、查、改等操作。通过 FastAPI
的 Depends
机制来注入数据库会话,从而执行相应的数据库操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 from sqlalchemy.ext.asyncio import AsyncSessionfrom dao.user_dao import UserDaofrom model import db_session@users_router.post("/users/" , tags=["users" ] ) async def post_users ( user: UserReq = Body(... ), db: AsyncSession = Depends(db_session ) ): user_id = await UserDao.add_user_dao(db, user) return HttpResponse.success(data={"user_id" : user_id}) @users_router.delete("/users/{user_id}" , tags=["users" ] ) async def delete_users ( user_id: int = Path(... ) , db: AsyncSession = Depends(db_session ) ): await UserDao.delete_user_dao(db, user_id) return HttpResponse.success() @users_router.get("/users/{user_id}" , tags=["users" ] ) async def get_users ( user_id: int = Path(... ), db: AsyncSession = Depends(db_session ) ): user = await UserDao.get_user_dao(db, user_id) return HttpResponse.success(data={"user" : user}) @users_router.put("/users/{user_id}" , tags=["users" ] ) async def put_users ( user_id: int = Path(... ), user: UserReq = Body(... ), db: AsyncSession = Depends(db_session ) ): await UserDao.update_user_dao(db, user_id, user) return HttpResponse.success()
7.测试 在最后的测试部分,我们启动 FastAPI 服务并通过 curl
命令进行接口测试。这里展示了如何进行用户的增、删、查、改操作,验证了接口是否正常工作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 uvicorn main:app --reload --port 5000 curl -H "Content-Type: application/json" -X POST -d '{"name":"myname", "mobile":"123", "age": 18, "introduction": "student"}' 127.0 .0 .1 :5000 /api/v1/users/ {"code" :200 ,"msg" :"操作成功" ,"data" :{"user_id" :5 },"state" :null} curl -H "Content-Type: application/json" -X PUT -d '{"name":"yourname", "mobile":"123", "age": 18, "introduction": "student"}' 127.0 .0 .1 :5000 /api/v1/users/5 {"code" :200 ,"msg" :"操作成功" ,"data" :null,"state" :null} curl -X GET 127.0 .0 .1 :5000 /api/v1/users/5 {"code" :200 ,"msg" :"操作成功" ,"data" :{"user" :{"name" :"yourname" ,"mobile" :"123" ,"introduction" :"student" ,"age" :18 ,"id" :5 ,"create_time" :"2024-11-06T22:35:12" }} curl -X DELETE 127.0 .0 .1 :5000 /api/v1/users/5 {"code" :200 ,"msg" :"操作成功" ,"data" :null,"state" :null}