03-3. 非同步資料庫操作

⏱️ 閱讀時間: 18 分鐘 🎯 難度: ⭐⭐⭐ (進階)


🤔 一句話解釋

非同步資料庫操作讓你在等待資料庫回應時,可以處理其他請求,大幅提升併發效能。


🎯 為什麼需要非同步?

同步 vs 非同步

同步處理(每個請求等待資料庫):
Request 1: ████████░░░░░░░░  等待 DB
Request 2:         ████████░░░░░░░░  等待 DB
Request 3:                 ████████░░░░░░░░  等待 DB
總時間:████████████████████████████████████████

非同步處理(等待時處理其他請求):
Request 1: ██░░░░██  開始 → 等待 → 完成
Request 2:   ██░░░░██  開始 → 等待 → 完成
Request 3:     ██░░░░██  開始 → 等待 → 完成
總時間:████████████████

效能差異

場景同步非同步
100 個併發請求逐一處理同時處理
I/O 等待時阻塞處理其他請求
適合場景CPU 密集I/O 密集

📦 安裝

# SQLAlchemy 非同步支援
pip install sqlalchemy[asyncio]

# 非同步資料庫驅動
pip install asyncpg          # PostgreSQL(推薦)
pip install aiomysql         # MySQL
pip install aiosqlite        # SQLite

🔧 基本設定

建立非同步 Engine

from sqlalchemy.ext.asyncio import (
    create_async_engine,
    AsyncSession,
    async_sessionmaker
)
from sqlalchemy.orm import DeclarativeBase

# 非同步資料庫 URL
DATABASE_URL = "postgresql+asyncpg://user:password@localhost:5432/mydb"

# 建立非同步 Engine
async_engine = create_async_engine(
    DATABASE_URL,
    echo=True,              # 印出 SQL(開發用)
    pool_size=5,            # 連線池大小
    max_overflow=10,        # 額外連線數
    pool_pre_ping=True,     # 連線前檢查
    pool_recycle=3600,      # 連線回收時間(秒)
)

# 建立非同步 Session 工廠
AsyncSessionLocal = async_sessionmaker(
    async_engine,
    class_=AsyncSession,
    expire_on_commit=False,  # commit 後不要讓物件過期
    autoflush=False,
    autocommit=False
)

# 基礎類別
class Base(DeclarativeBase):
    pass

不同資料庫的 URL

# PostgreSQL(推薦)
"postgresql+asyncpg://user:pass@localhost:5432/dbname"

# MySQL
"mysql+aiomysql://user:pass@localhost:3306/dbname"

# SQLite
"sqlite+aiosqlite:///./app.db"

# SQLite 記憶體
"sqlite+aiosqlite:///:memory:"

📝 定義 Model

from sqlalchemy import String, Boolean, DateTime, Text, ForeignKey
from sqlalchemy.orm import Mapped, mapped_column, relationship
from datetime import datetime
from typing import Optional, List

class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True, index=True)
    username: Mapped[str] = mapped_column(String(50), unique=True, index=True)
    email: Mapped[str] = mapped_column(String(100), unique=True)
    hashed_password: Mapped[str] = mapped_column(String(255))
    is_active: Mapped[bool] = mapped_column(default=True)
    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)

    # 關聯(非同步需要特別注意載入策略)
    posts: Mapped[List["Post"]] = relationship(
        "Post",
        back_populates="author",
        lazy="selectin"  # 非同步建議使用 selectin
    )

class Post(Base):
    __tablename__ = "posts"

    id: Mapped[int] = mapped_column(primary_key=True, index=True)
    title: Mapped[str] = mapped_column(String(200))
    content: Mapped[str] = mapped_column(Text)
    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)

    author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
    author: Mapped["User"] = relationship("User", back_populates="posts")

🔄 非同步 CRUD 操作

建立(Create)

from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

async def create_user(
    db: AsyncSession,
    username: str,
    email: str,
    password: str
) -> User:
    """建立使用者"""
    user = User(
        username=username,
        email=email,
        hashed_password=password
    )
    db.add(user)
    await db.commit()
    await db.refresh(user)
    return user

async def create_users_bulk(
    db: AsyncSession,
    users_data: list[dict]
) -> list[User]:
    """批量建立使用者"""
    users = [User(**data) for data in users_data]
    db.add_all(users)
    await db.commit()

    # 批量 refresh
    for user in users:
        await db.refresh(user)

    return users

查詢(Read)

from sqlalchemy import select, func
from sqlalchemy.orm import selectinload

async def get_user_by_id(db: AsyncSession, user_id: int) -> User | None:
    """根據 ID 查詢使用者"""
    stmt = select(User).where(User.id == user_id)
    result = await db.execute(stmt)
    return result.scalar_one_or_none()

async def get_user_by_email(db: AsyncSession, email: str) -> User | None:
    """根據 email 查詢使用者"""
    stmt = select(User).where(User.email == email)
    result = await db.execute(stmt)
    return result.scalar_one_or_none()

async def get_users(
    db: AsyncSession,
    skip: int = 0,
    limit: int = 100
) -> list[User]:
    """查詢使用者列表"""
    stmt = select(User).offset(skip).limit(limit)
    result = await db.execute(stmt)
    return list(result.scalars().all())

async def get_user_with_posts(db: AsyncSession, user_id: int) -> User | None:
    """查詢使用者(含文章)"""
    stmt = (
        select(User)
        .where(User.id == user_id)
        .options(selectinload(User.posts))
    )
    result = await db.execute(stmt)
    return result.scalar_one_or_none()

async def count_users(db: AsyncSession) -> int:
    """計算使用者數量"""
    stmt = select(func.count()).select_from(User)
    result = await db.execute(stmt)
    return result.scalar() or 0

更新(Update)

from sqlalchemy import update

async def update_user(
    db: AsyncSession,
    user_id: int,
    **kwargs
) -> User | None:
    """更新使用者"""
    # 方法 1: 先查詢再更新
    stmt = select(User).where(User.id == user_id)
    result = await db.execute(stmt)
    user = result.scalar_one_or_none()

    if not user:
        return None

    for key, value in kwargs.items():
        if hasattr(user, key):
            setattr(user, key, value)

    await db.commit()
    await db.refresh(user)
    return user

async def update_users_bulk(
    db: AsyncSession,
    user_ids: list[int],
    **kwargs
) -> int:
    """批量更新使用者"""
    # 方法 2: 直接 UPDATE(效能更好)
    stmt = (
        update(User)
        .where(User.id.in_(user_ids))
        .values(**kwargs)
    )
    result = await db.execute(stmt)
    await db.commit()
    return result.rowcount

刪除(Delete)

from sqlalchemy import delete

async def delete_user(db: AsyncSession, user_id: int) -> bool:
    """刪除使用者"""
    stmt = select(User).where(User.id == user_id)
    result = await db.execute(stmt)
    user = result.scalar_one_or_none()

    if not user:
        return False

    await db.delete(user)
    await db.commit()
    return True

async def delete_users_bulk(db: AsyncSession, user_ids: list[int]) -> int:
    """批量刪除使用者"""
    stmt = delete(User).where(User.id.in_(user_ids))
    result = await db.execute(stmt)
    await db.commit()
    return result.rowcount

🚀 FastAPI 整合

依賴注入

from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from contextlib import asynccontextmanager

# 應用程式生命週期
@asynccontextmanager
async def lifespan(app: FastAPI):
    # 啟動時:建立資料表
    async with async_engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    yield
    # 關閉時:關閉連線池
    await async_engine.dispose()

app = FastAPI(lifespan=lifespan)

# 依賴項:取得 Session
async def get_db() -> AsyncSession:
    async with AsyncSessionLocal() as session:
        try:
            yield session
        except Exception:
            await session.rollback()
            raise
        finally:
            await session.close()

API 端點

from pydantic import BaseModel, EmailStr

# Pydantic Schemas
class UserCreate(BaseModel):
    username: str
    email: EmailStr
    password: str

class UserResponse(BaseModel):
    id: int
    username: str
    email: str
    is_active: bool

    class Config:
        from_attributes = True

class UserWithPosts(UserResponse):
    posts: list["PostResponse"] = []

class PostResponse(BaseModel):
    id: int
    title: str
    content: str

    class Config:
        from_attributes = True

# API 端點
@app.post("/users", response_model=UserResponse)
async def create_user_endpoint(
    user_data: UserCreate,
    db: AsyncSession = Depends(get_db)
):
    # 檢查是否已存在
    existing = await get_user_by_email(db, user_data.email)
    if existing:
        raise HTTPException(status_code=400, detail="Email already registered")

    user = await create_user(
        db,
        username=user_data.username,
        email=user_data.email,
        password=user_data.password  # 實際應該雜湊
    )
    return user

@app.get("/users", response_model=list[UserResponse])
async def get_users_endpoint(
    skip: int = 0,
    limit: int = 100,
    db: AsyncSession = Depends(get_db)
):
    return await get_users(db, skip=skip, limit=limit)

@app.get("/users/{user_id}", response_model=UserWithPosts)
async def get_user_endpoint(
    user_id: int,
    db: AsyncSession = Depends(get_db)
):
    user = await get_user_with_posts(db, user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

@app.delete("/users/{user_id}")
async def delete_user_endpoint(
    user_id: int,
    db: AsyncSession = Depends(get_db)
):
    success = await delete_user(db, user_id)
    if not success:
        raise HTTPException(status_code=404, detail="User not found")
    return {"message": "User deleted"}

⚠️ 非同步注意事項

1. 關聯載入

# ❌ 錯誤:非同步中存取未載入的關聯
async def bad_example(db: AsyncSession, user_id: int):
    stmt = select(User).where(User.id == user_id)
    result = await db.execute(stmt)
    user = result.scalar_one()

    # 這會報錯:greenlet_spawn 錯誤
    print(user.posts)  # ❌ MissingGreenlet error

# ✅ 正確:使用 selectinload 預先載入
async def good_example(db: AsyncSession, user_id: int):
    stmt = (
        select(User)
        .where(User.id == user_id)
        .options(selectinload(User.posts))
    )
    result = await db.execute(stmt)
    user = result.scalar_one()

    print(user.posts)  # ✅ 正常存取

2. Session 生命週期

# ❌ 錯誤:在 Session 關閉後存取物件
async def bad_session_example():
    async with AsyncSessionLocal() as db:
        stmt = select(User).where(User.id == 1)
        result = await db.execute(stmt)
        user = result.scalar_one()

    # Session 已關閉
    print(user.email)  # ❌ 可能報錯(如果使用 expire_on_commit=True)

# ✅ 正確:設定 expire_on_commit=False 或在 Session 內操作
AsyncSessionLocal = async_sessionmaker(
    async_engine,
    class_=AsyncSession,
    expire_on_commit=False  # 關鍵設定
)

3. 避免同步操作

# ❌ 錯誤:在非同步中使用同步操作
async def bad_sync_in_async(db: AsyncSession):
    # 這會阻塞事件迴圈
    import time
    time.sleep(5)  # ❌ 同步睡眠

    # 使用同步的資料庫驅動
    import psycopg2  # ❌ 同步驅動

# ✅ 正確:使用非同步操作
async def good_async_example(db: AsyncSession):
    import asyncio
    await asyncio.sleep(5)  # ✅ 非同步睡眠

    # 使用非同步的資料庫驅動(asyncpg)

🔄 交易管理

基本交易

async def transfer_money(
    db: AsyncSession,
    from_user_id: int,
    to_user_id: int,
    amount: float
):
    """轉帳(交易範例)"""
    try:
        # 開始交易
        async with db.begin():
            # 查詢兩個使用者
            from_user = await db.get(User, from_user_id)
            to_user = await db.get(User, to_user_id)

            if not from_user or not to_user:
                raise ValueError("User not found")

            if from_user.balance < amount:
                raise ValueError("Insufficient balance")

            # 執行轉帳
            from_user.balance -= amount
            to_user.balance += amount

            # begin() 結束時自動 commit
    except Exception:
        # 發生錯誤時自動 rollback
        raise

# 或者手動管理
async def transfer_money_manual(db: AsyncSession, ...):
    try:
        from_user = await db.get(User, from_user_id)
        to_user = await db.get(User, to_user_id)

        from_user.balance -= amount
        to_user.balance += amount

        await db.commit()
    except Exception:
        await db.rollback()
        raise

巢狀交易(Savepoint)

async def complex_operation(db: AsyncSession):
    """複雜操作(使用 Savepoint)"""
    try:
        # 主交易
        async with db.begin():
            user = User(username="john", email="john@example.com")
            db.add(user)

            # 巢狀交易(Savepoint)
            async with db.begin_nested():
                try:
                    post = Post(title="Test", content="...", author=user)
                    db.add(post)
                except Exception:
                    # Savepoint rollback,不影響主交易
                    pass

            # 繼續主交易
            another_user = User(username="jane", email="jane@example.com")
            db.add(another_user)
    except Exception:
        raise

📝 完整範例:Repository 模式

from abc import ABC, abstractmethod
from typing import TypeVar, Generic, Type
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

T = TypeVar("T")

class BaseRepository(ABC, Generic[T]):
    """基礎 Repository"""

    def __init__(self, db: AsyncSession, model: Type[T]):
        self.db = db
        self.model = model

    async def get(self, id: int) -> T | None:
        return await self.db.get(self.model, id)

    async def get_all(self, skip: int = 0, limit: int = 100) -> list[T]:
        stmt = select(self.model).offset(skip).limit(limit)
        result = await self.db.execute(stmt)
        return list(result.scalars().all())

    async def create(self, **kwargs) -> T:
        obj = self.model(**kwargs)
        self.db.add(obj)
        await self.db.commit()
        await self.db.refresh(obj)
        return obj

    async def update(self, id: int, **kwargs) -> T | None:
        obj = await self.get(id)
        if not obj:
            return None

        for key, value in kwargs.items():
            setattr(obj, key, value)

        await self.db.commit()
        await self.db.refresh(obj)
        return obj

    async def delete(self, id: int) -> bool:
        obj = await self.get(id)
        if not obj:
            return False

        await self.db.delete(obj)
        await self.db.commit()
        return True


class UserRepository(BaseRepository[User]):
    """使用者 Repository"""

    def __init__(self, db: AsyncSession):
        super().__init__(db, User)

    async def get_by_email(self, email: str) -> User | None:
        stmt = select(User).where(User.email == email)
        result = await self.db.execute(stmt)
        return result.scalar_one_or_none()

    async def get_by_username(self, username: str) -> User | None:
        stmt = select(User).where(User.username == username)
        result = await self.db.execute(stmt)
        return result.scalar_one_or_none()

    async def get_active_users(self) -> list[User]:
        stmt = select(User).where(User.is_active == True)
        result = await self.db.execute(stmt)
        return list(result.scalars().all())


# 使用 Repository
async def user_service_example(db: AsyncSession):
    repo = UserRepository(db)

    # 建立使用者
    user = await repo.create(
        username="john",
        email="john@example.com",
        hashed_password="..."
    )

    # 查詢
    user = await repo.get_by_email("john@example.com")

    # 更新
    user = await repo.update(user.id, is_active=False)

    # 刪除
    await repo.delete(user.id)

✅ 重點總結

非同步設定

# 非同步 Engine
async_engine = create_async_engine("postgresql+asyncpg://...")

# 非同步 Session
AsyncSessionLocal = async_sessionmaker(
    async_engine,
    class_=AsyncSession,
    expire_on_commit=False
)

重要差異

操作同步非同步
執行查詢db.scalar()await db.execute() + .scalar()
提交db.commit()await db.commit()
重新載入db.refresh()await db.refresh()
關聯載入joinedloadselectinload(推薦)

注意事項

  1. 使用非同步驅動程式(asyncpg, aiomysql)
  2. 設定 expire_on_commit=False
  3. 關聯必須用 selectinload 預先載入
  4. 避免在非同步中使用同步操作

🎤 面試這樣答

Q: FastAPI 中為什麼要用非同步資料庫操作?

答案:

FastAPI 是非同步框架,使用非同步資料庫操作可以:

  1. 提高併發效能:等待資料庫時可以處理其他請求
  2. 充分利用資源:不會因為 I/O 等待而阻塞
  3. 統一程式風格:整個應用都使用 async/await

但要注意:

  • 需要使用非同步驅動(如 asyncpg)
  • 關聯載入需要用 selectinload
  • Session 設定 expire_on_commit=False

上一篇: 03-2. 關聯關係 下一篇: 03-4. 資料庫遷移 Alembic


最後更新:2025-12-17

0%