目錄
03-3. 非同步資料庫操作
⏱️ 閱讀時間: 18 分鐘 🎯 難度: ⭐⭐⭐ (進階)
🤔 一句話解釋
非同步資料庫操作讓你在等待資料庫回應時,可以處理其他請求,大幅提升併發效能。
🎯 為什麼需要非同步?
同步 vs 非同步
同步處理(每個請求等待資料庫):
Request 1: ████████░░░░░░░░ 等待 DB
Request 2: ████████░░░░░░░░ 等待 DB
Request 3: ████████░░░░░░░░ 等待 DB
總時間:████████████████████████████████████████
非同步處理(等待時處理其他請求):
Request 1: ██░░░░██ 開始 → 等待 → 完成
Request 2: ██░░░░██ 開始 → 等待 → 完成
Request 3: ██░░░░██ 開始 → 等待 → 完成
總時間:████████████████效能差異
| 場景 | 同步 | 非同步 |
|---|---|---|
| 100 個併發請求 | 逐一處理 | 同時處理 |
| I/O 等待時 | 阻塞 | 處理其他請求 |
| 適合場景 | CPU 密集 | I/O 密集 |
📦 安裝
# SQLAlchemy 非同步支援
pip install sqlalchemy[asyncio]
# 非同步資料庫驅動
pip install asyncpg # PostgreSQL(推薦)
pip install aiomysql # MySQL
pip install aiosqlite # SQLite🔧 基本設定
建立非同步 Engine
from sqlalchemy.ext.asyncio import (
create_async_engine,
AsyncSession,
async_sessionmaker
)
from sqlalchemy.orm import DeclarativeBase
# 非同步資料庫 URL
DATABASE_URL = "postgresql+asyncpg://user:password@localhost:5432/mydb"
# 建立非同步 Engine
async_engine = create_async_engine(
DATABASE_URL,
echo=True, # 印出 SQL(開發用)
pool_size=5, # 連線池大小
max_overflow=10, # 額外連線數
pool_pre_ping=True, # 連線前檢查
pool_recycle=3600, # 連線回收時間(秒)
)
# 建立非同步 Session 工廠
AsyncSessionLocal = async_sessionmaker(
async_engine,
class_=AsyncSession,
expire_on_commit=False, # commit 後不要讓物件過期
autoflush=False,
autocommit=False
)
# 基礎類別
class Base(DeclarativeBase):
pass不同資料庫的 URL
# PostgreSQL(推薦)
"postgresql+asyncpg://user:pass@localhost:5432/dbname"
# MySQL
"mysql+aiomysql://user:pass@localhost:3306/dbname"
# SQLite
"sqlite+aiosqlite:///./app.db"
# SQLite 記憶體
"sqlite+aiosqlite:///:memory:"📝 定義 Model
from sqlalchemy import String, Boolean, DateTime, Text, ForeignKey
from sqlalchemy.orm import Mapped, mapped_column, relationship
from datetime import datetime
from typing import Optional, List
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True, index=True)
username: Mapped[str] = mapped_column(String(50), unique=True, index=True)
email: Mapped[str] = mapped_column(String(100), unique=True)
hashed_password: Mapped[str] = mapped_column(String(255))
is_active: Mapped[bool] = mapped_column(default=True)
created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
# 關聯(非同步需要特別注意載入策略)
posts: Mapped[List["Post"]] = relationship(
"Post",
back_populates="author",
lazy="selectin" # 非同步建議使用 selectin
)
class Post(Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True, index=True)
title: Mapped[str] = mapped_column(String(200))
content: Mapped[str] = mapped_column(Text)
created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
author: Mapped["User"] = relationship("User", back_populates="posts")🔄 非同步 CRUD 操作
建立(Create)
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
async def create_user(
db: AsyncSession,
username: str,
email: str,
password: str
) -> User:
"""建立使用者"""
user = User(
username=username,
email=email,
hashed_password=password
)
db.add(user)
await db.commit()
await db.refresh(user)
return user
async def create_users_bulk(
db: AsyncSession,
users_data: list[dict]
) -> list[User]:
"""批量建立使用者"""
users = [User(**data) for data in users_data]
db.add_all(users)
await db.commit()
# 批量 refresh
for user in users:
await db.refresh(user)
return users查詢(Read)
from sqlalchemy import select, func
from sqlalchemy.orm import selectinload
async def get_user_by_id(db: AsyncSession, user_id: int) -> User | None:
"""根據 ID 查詢使用者"""
stmt = select(User).where(User.id == user_id)
result = await db.execute(stmt)
return result.scalar_one_or_none()
async def get_user_by_email(db: AsyncSession, email: str) -> User | None:
"""根據 email 查詢使用者"""
stmt = select(User).where(User.email == email)
result = await db.execute(stmt)
return result.scalar_one_or_none()
async def get_users(
db: AsyncSession,
skip: int = 0,
limit: int = 100
) -> list[User]:
"""查詢使用者列表"""
stmt = select(User).offset(skip).limit(limit)
result = await db.execute(stmt)
return list(result.scalars().all())
async def get_user_with_posts(db: AsyncSession, user_id: int) -> User | None:
"""查詢使用者(含文章)"""
stmt = (
select(User)
.where(User.id == user_id)
.options(selectinload(User.posts))
)
result = await db.execute(stmt)
return result.scalar_one_or_none()
async def count_users(db: AsyncSession) -> int:
"""計算使用者數量"""
stmt = select(func.count()).select_from(User)
result = await db.execute(stmt)
return result.scalar() or 0更新(Update)
from sqlalchemy import update
async def update_user(
db: AsyncSession,
user_id: int,
**kwargs
) -> User | None:
"""更新使用者"""
# 方法 1: 先查詢再更新
stmt = select(User).where(User.id == user_id)
result = await db.execute(stmt)
user = result.scalar_one_or_none()
if not user:
return None
for key, value in kwargs.items():
if hasattr(user, key):
setattr(user, key, value)
await db.commit()
await db.refresh(user)
return user
async def update_users_bulk(
db: AsyncSession,
user_ids: list[int],
**kwargs
) -> int:
"""批量更新使用者"""
# 方法 2: 直接 UPDATE(效能更好)
stmt = (
update(User)
.where(User.id.in_(user_ids))
.values(**kwargs)
)
result = await db.execute(stmt)
await db.commit()
return result.rowcount刪除(Delete)
from sqlalchemy import delete
async def delete_user(db: AsyncSession, user_id: int) -> bool:
"""刪除使用者"""
stmt = select(User).where(User.id == user_id)
result = await db.execute(stmt)
user = result.scalar_one_or_none()
if not user:
return False
await db.delete(user)
await db.commit()
return True
async def delete_users_bulk(db: AsyncSession, user_ids: list[int]) -> int:
"""批量刪除使用者"""
stmt = delete(User).where(User.id.in_(user_ids))
result = await db.execute(stmt)
await db.commit()
return result.rowcount🚀 FastAPI 整合
依賴注入
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from contextlib import asynccontextmanager
# 應用程式生命週期
@asynccontextmanager
async def lifespan(app: FastAPI):
# 啟動時:建立資料表
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield
# 關閉時:關閉連線池
await async_engine.dispose()
app = FastAPI(lifespan=lifespan)
# 依賴項:取得 Session
async def get_db() -> AsyncSession:
async with AsyncSessionLocal() as session:
try:
yield session
except Exception:
await session.rollback()
raise
finally:
await session.close()API 端點
from pydantic import BaseModel, EmailStr
# Pydantic Schemas
class UserCreate(BaseModel):
username: str
email: EmailStr
password: str
class UserResponse(BaseModel):
id: int
username: str
email: str
is_active: bool
class Config:
from_attributes = True
class UserWithPosts(UserResponse):
posts: list["PostResponse"] = []
class PostResponse(BaseModel):
id: int
title: str
content: str
class Config:
from_attributes = True
# API 端點
@app.post("/users", response_model=UserResponse)
async def create_user_endpoint(
user_data: UserCreate,
db: AsyncSession = Depends(get_db)
):
# 檢查是否已存在
existing = await get_user_by_email(db, user_data.email)
if existing:
raise HTTPException(status_code=400, detail="Email already registered")
user = await create_user(
db,
username=user_data.username,
email=user_data.email,
password=user_data.password # 實際應該雜湊
)
return user
@app.get("/users", response_model=list[UserResponse])
async def get_users_endpoint(
skip: int = 0,
limit: int = 100,
db: AsyncSession = Depends(get_db)
):
return await get_users(db, skip=skip, limit=limit)
@app.get("/users/{user_id}", response_model=UserWithPosts)
async def get_user_endpoint(
user_id: int,
db: AsyncSession = Depends(get_db)
):
user = await get_user_with_posts(db, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@app.delete("/users/{user_id}")
async def delete_user_endpoint(
user_id: int,
db: AsyncSession = Depends(get_db)
):
success = await delete_user(db, user_id)
if not success:
raise HTTPException(status_code=404, detail="User not found")
return {"message": "User deleted"}⚠️ 非同步注意事項
1. 關聯載入
# ❌ 錯誤:非同步中存取未載入的關聯
async def bad_example(db: AsyncSession, user_id: int):
stmt = select(User).where(User.id == user_id)
result = await db.execute(stmt)
user = result.scalar_one()
# 這會報錯:greenlet_spawn 錯誤
print(user.posts) # ❌ MissingGreenlet error
# ✅ 正確:使用 selectinload 預先載入
async def good_example(db: AsyncSession, user_id: int):
stmt = (
select(User)
.where(User.id == user_id)
.options(selectinload(User.posts))
)
result = await db.execute(stmt)
user = result.scalar_one()
print(user.posts) # ✅ 正常存取2. Session 生命週期
# ❌ 錯誤:在 Session 關閉後存取物件
async def bad_session_example():
async with AsyncSessionLocal() as db:
stmt = select(User).where(User.id == 1)
result = await db.execute(stmt)
user = result.scalar_one()
# Session 已關閉
print(user.email) # ❌ 可能報錯(如果使用 expire_on_commit=True)
# ✅ 正確:設定 expire_on_commit=False 或在 Session 內操作
AsyncSessionLocal = async_sessionmaker(
async_engine,
class_=AsyncSession,
expire_on_commit=False # 關鍵設定
)3. 避免同步操作
# ❌ 錯誤:在非同步中使用同步操作
async def bad_sync_in_async(db: AsyncSession):
# 這會阻塞事件迴圈
import time
time.sleep(5) # ❌ 同步睡眠
# 使用同步的資料庫驅動
import psycopg2 # ❌ 同步驅動
# ✅ 正確:使用非同步操作
async def good_async_example(db: AsyncSession):
import asyncio
await asyncio.sleep(5) # ✅ 非同步睡眠
# 使用非同步的資料庫驅動(asyncpg)🔄 交易管理
基本交易
async def transfer_money(
db: AsyncSession,
from_user_id: int,
to_user_id: int,
amount: float
):
"""轉帳(交易範例)"""
try:
# 開始交易
async with db.begin():
# 查詢兩個使用者
from_user = await db.get(User, from_user_id)
to_user = await db.get(User, to_user_id)
if not from_user or not to_user:
raise ValueError("User not found")
if from_user.balance < amount:
raise ValueError("Insufficient balance")
# 執行轉帳
from_user.balance -= amount
to_user.balance += amount
# begin() 結束時自動 commit
except Exception:
# 發生錯誤時自動 rollback
raise
# 或者手動管理
async def transfer_money_manual(db: AsyncSession, ...):
try:
from_user = await db.get(User, from_user_id)
to_user = await db.get(User, to_user_id)
from_user.balance -= amount
to_user.balance += amount
await db.commit()
except Exception:
await db.rollback()
raise巢狀交易(Savepoint)
async def complex_operation(db: AsyncSession):
"""複雜操作(使用 Savepoint)"""
try:
# 主交易
async with db.begin():
user = User(username="john", email="john@example.com")
db.add(user)
# 巢狀交易(Savepoint)
async with db.begin_nested():
try:
post = Post(title="Test", content="...", author=user)
db.add(post)
except Exception:
# Savepoint rollback,不影響主交易
pass
# 繼續主交易
another_user = User(username="jane", email="jane@example.com")
db.add(another_user)
except Exception:
raise📝 完整範例:Repository 模式
from abc import ABC, abstractmethod
from typing import TypeVar, Generic, Type
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
T = TypeVar("T")
class BaseRepository(ABC, Generic[T]):
"""基礎 Repository"""
def __init__(self, db: AsyncSession, model: Type[T]):
self.db = db
self.model = model
async def get(self, id: int) -> T | None:
return await self.db.get(self.model, id)
async def get_all(self, skip: int = 0, limit: int = 100) -> list[T]:
stmt = select(self.model).offset(skip).limit(limit)
result = await self.db.execute(stmt)
return list(result.scalars().all())
async def create(self, **kwargs) -> T:
obj = self.model(**kwargs)
self.db.add(obj)
await self.db.commit()
await self.db.refresh(obj)
return obj
async def update(self, id: int, **kwargs) -> T | None:
obj = await self.get(id)
if not obj:
return None
for key, value in kwargs.items():
setattr(obj, key, value)
await self.db.commit()
await self.db.refresh(obj)
return obj
async def delete(self, id: int) -> bool:
obj = await self.get(id)
if not obj:
return False
await self.db.delete(obj)
await self.db.commit()
return True
class UserRepository(BaseRepository[User]):
"""使用者 Repository"""
def __init__(self, db: AsyncSession):
super().__init__(db, User)
async def get_by_email(self, email: str) -> User | None:
stmt = select(User).where(User.email == email)
result = await self.db.execute(stmt)
return result.scalar_one_or_none()
async def get_by_username(self, username: str) -> User | None:
stmt = select(User).where(User.username == username)
result = await self.db.execute(stmt)
return result.scalar_one_or_none()
async def get_active_users(self) -> list[User]:
stmt = select(User).where(User.is_active == True)
result = await self.db.execute(stmt)
return list(result.scalars().all())
# 使用 Repository
async def user_service_example(db: AsyncSession):
repo = UserRepository(db)
# 建立使用者
user = await repo.create(
username="john",
email="john@example.com",
hashed_password="..."
)
# 查詢
user = await repo.get_by_email("john@example.com")
# 更新
user = await repo.update(user.id, is_active=False)
# 刪除
await repo.delete(user.id)✅ 重點總結
非同步設定
# 非同步 Engine
async_engine = create_async_engine("postgresql+asyncpg://...")
# 非同步 Session
AsyncSessionLocal = async_sessionmaker(
async_engine,
class_=AsyncSession,
expire_on_commit=False
)重要差異
| 操作 | 同步 | 非同步 |
|---|---|---|
| 執行查詢 | db.scalar() | await db.execute() + .scalar() |
| 提交 | db.commit() | await db.commit() |
| 重新載入 | db.refresh() | await db.refresh() |
| 關聯載入 | joinedload | selectinload(推薦) |
注意事項
- 使用非同步驅動程式(asyncpg, aiomysql)
- 設定
expire_on_commit=False - 關聯必須用
selectinload預先載入 - 避免在非同步中使用同步操作
🎤 面試這樣答
Q: FastAPI 中為什麼要用非同步資料庫操作?
答案:
FastAPI 是非同步框架,使用非同步資料庫操作可以:
- 提高併發效能:等待資料庫時可以處理其他請求
- 充分利用資源:不會因為 I/O 等待而阻塞
- 統一程式風格:整個應用都使用 async/await
但要注意:
- 需要使用非同步驅動(如 asyncpg)
- 關聯載入需要用 selectinload
- Session 設定 expire_on_commit=False
上一篇: 03-2. 關聯關係 下一篇: 03-4. 資料庫遷移 Alembic
最後更新:2025-12-17