# 

# 03-3. 非同步資料庫操作

&gt; ⏱️ **閱讀時間：** 18 分鐘
&gt; 🎯 **難度：** ⭐⭐⭐ (進階)

---

## 🤔 一句話解釋

**非同步資料庫操作讓你在等待資料庫回應時，可以處理其他請求，大幅提升併發效能。**

---

## 🎯 為什麼需要非同步？

### 同步 vs 非同步

```
同步處理（每個請求等待資料庫）：
Request 1: ████████░░░░░░░░  等待 DB
Request 2:         ████████░░░░░░░░  等待 DB
Request 3:                 ████████░░░░░░░░  等待 DB
總時間：████████████████████████████████████████

非同步處理（等待時處理其他請求）：
Request 1: ██░░░░██  開始 → 等待 → 完成
Request 2:   ██░░░░██  開始 → 等待 → 完成
Request 3:     ██░░░░██  開始 → 等待 → 完成
總時間：████████████████
```

### 效能差異

| 場景 | 同步 | 非同步 |
|------|------|--------|
| 100 個併發請求 | 逐一處理 | 同時處理 |
| I/O 等待時 | 阻塞 | 處理其他請求 |
| 適合場景 | CPU 密集 | I/O 密集 |

---

## 📦 安裝

```bash
# SQLAlchemy 非同步支援
pip install sqlalchemy[asyncio]

# 非同步資料庫驅動
pip install asyncpg          # PostgreSQL（推薦）
pip install aiomysql         # MySQL
pip install aiosqlite        # SQLite
```

---

## 🔧 基本設定

### 建立非同步 Engine

```python
from sqlalchemy.ext.asyncio import (
    create_async_engine,
    AsyncSession,
    async_sessionmaker
)
from sqlalchemy.orm import DeclarativeBase

# 非同步資料庫 URL
DATABASE_URL = &#34;postgresql&#43;asyncpg://user:password@localhost:5432/mydb&#34;

# 建立非同步 Engine
async_engine = create_async_engine(
    DATABASE_URL,
    echo=True,              # 印出 SQL（開發用）
    pool_size=5,            # 連線池大小
    max_overflow=10,        # 額外連線數
    pool_pre_ping=True,     # 連線前檢查
    pool_recycle=3600,      # 連線回收時間（秒）
)

# 建立非同步 Session 工廠
AsyncSessionLocal = async_sessionmaker(
    async_engine,
    class_=AsyncSession,
    expire_on_commit=False,  # commit 後不要讓物件過期
    autoflush=False,
    autocommit=False
)

# 基礎類別
class Base(DeclarativeBase):
    pass
```

### 不同資料庫的 URL

```python
# PostgreSQL（推薦）
&#34;postgresql&#43;asyncpg://user:pass@localhost:5432/dbname&#34;

# MySQL
&#34;mysql&#43;aiomysql://user:pass@localhost:3306/dbname&#34;

# SQLite
&#34;sqlite&#43;aiosqlite:///./app.db&#34;

# SQLite 記憶體
&#34;sqlite&#43;aiosqlite:///:memory:&#34;
```

---

## 📝 定義 Model

```python
from sqlalchemy import String, Boolean, DateTime, Text, ForeignKey
from sqlalchemy.orm import Mapped, mapped_column, relationship
from datetime import datetime
from typing import Optional, List

class User(Base):
    __tablename__ = &#34;users&#34;

    id: Mapped[int] = mapped_column(primary_key=True, index=True)
    username: Mapped[str] = mapped_column(String(50), unique=True, index=True)
    email: Mapped[str] = mapped_column(String(100), unique=True)
    hashed_password: Mapped[str] = mapped_column(String(255))
    is_active: Mapped[bool] = mapped_column(default=True)
    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)

    # 關聯（非同步需要特別注意載入策略）
    posts: Mapped[List[&#34;Post&#34;]] = relationship(
        &#34;Post&#34;,
        back_populates=&#34;author&#34;,
        lazy=&#34;selectin&#34;  # 非同步建議使用 selectin
    )

class Post(Base):
    __tablename__ = &#34;posts&#34;

    id: Mapped[int] = mapped_column(primary_key=True, index=True)
    title: Mapped[str] = mapped_column(String(200))
    content: Mapped[str] = mapped_column(Text)
    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)

    author_id: Mapped[int] = mapped_column(ForeignKey(&#34;users.id&#34;))
    author: Mapped[&#34;User&#34;] = relationship(&#34;User&#34;, back_populates=&#34;posts&#34;)
```

---

## 🔄 非同步 CRUD 操作

### 建立（Create）

```python
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

async def create_user(
    db: AsyncSession,
    username: str,
    email: str,
    password: str
) -&gt; User:
    &#34;&#34;&#34;建立使用者&#34;&#34;&#34;
    user = User(
        username=username,
        email=email,
        hashed_password=password
    )
    db.add(user)
    await db.commit()
    await db.refresh(user)
    return user

async def create_users_bulk(
    db: AsyncSession,
    users_data: list[dict]
) -&gt; list[User]:
    &#34;&#34;&#34;批量建立使用者&#34;&#34;&#34;
    users = [User(**data) for data in users_data]
    db.add_all(users)
    await db.commit()

    # 批量 refresh
    for user in users:
        await db.refresh(user)

    return users
```

### 查詢（Read）

```python
from sqlalchemy import select, func
from sqlalchemy.orm import selectinload

async def get_user_by_id(db: AsyncSession, user_id: int) -&gt; User | None:
    &#34;&#34;&#34;根據 ID 查詢使用者&#34;&#34;&#34;
    stmt = select(User).where(User.id == user_id)
    result = await db.execute(stmt)
    return result.scalar_one_or_none()

async def get_user_by_email(db: AsyncSession, email: str) -&gt; User | None:
    &#34;&#34;&#34;根據 email 查詢使用者&#34;&#34;&#34;
    stmt = select(User).where(User.email == email)
    result = await db.execute(stmt)
    return result.scalar_one_or_none()

async def get_users(
    db: AsyncSession,
    skip: int = 0,
    limit: int = 100
) -&gt; list[User]:
    &#34;&#34;&#34;查詢使用者列表&#34;&#34;&#34;
    stmt = select(User).offset(skip).limit(limit)
    result = await db.execute(stmt)
    return list(result.scalars().all())

async def get_user_with_posts(db: AsyncSession, user_id: int) -&gt; User | None:
    &#34;&#34;&#34;查詢使用者（含文章）&#34;&#34;&#34;
    stmt = (
        select(User)
        .where(User.id == user_id)
        .options(selectinload(User.posts))
    )
    result = await db.execute(stmt)
    return result.scalar_one_or_none()

async def count_users(db: AsyncSession) -&gt; int:
    &#34;&#34;&#34;計算使用者數量&#34;&#34;&#34;
    stmt = select(func.count()).select_from(User)
    result = await db.execute(stmt)
    return result.scalar() or 0
```

### 更新（Update）

```python
from sqlalchemy import update

async def update_user(
    db: AsyncSession,
    user_id: int,
    **kwargs
) -&gt; User | None:
    &#34;&#34;&#34;更新使用者&#34;&#34;&#34;
    # 方法 1: 先查詢再更新
    stmt = select(User).where(User.id == user_id)
    result = await db.execute(stmt)
    user = result.scalar_one_or_none()

    if not user:
        return None

    for key, value in kwargs.items():
        if hasattr(user, key):
            setattr(user, key, value)

    await db.commit()
    await db.refresh(user)
    return user

async def update_users_bulk(
    db: AsyncSession,
    user_ids: list[int],
    **kwargs
) -&gt; int:
    &#34;&#34;&#34;批量更新使用者&#34;&#34;&#34;
    # 方法 2: 直接 UPDATE（效能更好）
    stmt = (
        update(User)
        .where(User.id.in_(user_ids))
        .values(**kwargs)
    )
    result = await db.execute(stmt)
    await db.commit()
    return result.rowcount
```

### 刪除（Delete）

```python
from sqlalchemy import delete

async def delete_user(db: AsyncSession, user_id: int) -&gt; bool:
    &#34;&#34;&#34;刪除使用者&#34;&#34;&#34;
    stmt = select(User).where(User.id == user_id)
    result = await db.execute(stmt)
    user = result.scalar_one_or_none()

    if not user:
        return False

    await db.delete(user)
    await db.commit()
    return True

async def delete_users_bulk(db: AsyncSession, user_ids: list[int]) -&gt; int:
    &#34;&#34;&#34;批量刪除使用者&#34;&#34;&#34;
    stmt = delete(User).where(User.id.in_(user_ids))
    result = await db.execute(stmt)
    await db.commit()
    return result.rowcount
```

---

## 🚀 FastAPI 整合

### 依賴注入

```python
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from contextlib import asynccontextmanager

# 應用程式生命週期
@asynccontextmanager
async def lifespan(app: FastAPI):
    # 啟動時：建立資料表
    async with async_engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    yield
    # 關閉時：關閉連線池
    await async_engine.dispose()

app = FastAPI(lifespan=lifespan)

# 依賴項：取得 Session
async def get_db() -&gt; AsyncSession:
    async with AsyncSessionLocal() as session:
        try:
            yield session
        except Exception:
            await session.rollback()
            raise
        finally:
            await session.close()
```

### API 端點

```python
from pydantic import BaseModel, EmailStr

# Pydantic Schemas
class UserCreate(BaseModel):
    username: str
    email: EmailStr
    password: str

class UserResponse(BaseModel):
    id: int
    username: str
    email: str
    is_active: bool

    class Config:
        from_attributes = True

class UserWithPosts(UserResponse):
    posts: list[&#34;PostResponse&#34;] = []

class PostResponse(BaseModel):
    id: int
    title: str
    content: str

    class Config:
        from_attributes = True

# API 端點
@app.post(&#34;/users&#34;, response_model=UserResponse)
async def create_user_endpoint(
    user_data: UserCreate,
    db: AsyncSession = Depends(get_db)
):
    # 檢查是否已存在
    existing = await get_user_by_email(db, user_data.email)
    if existing:
        raise HTTPException(status_code=400, detail=&#34;Email already registered&#34;)

    user = await create_user(
        db,
        username=user_data.username,
        email=user_data.email,
        password=user_data.password  # 實際應該雜湊
    )
    return user

@app.get(&#34;/users&#34;, response_model=list[UserResponse])
async def get_users_endpoint(
    skip: int = 0,
    limit: int = 100,
    db: AsyncSession = Depends(get_db)
):
    return await get_users(db, skip=skip, limit=limit)

@app.get(&#34;/users/{user_id}&#34;, response_model=UserWithPosts)
async def get_user_endpoint(
    user_id: int,
    db: AsyncSession = Depends(get_db)
):
    user = await get_user_with_posts(db, user_id)
    if not user:
        raise HTTPException(status_code=404, detail=&#34;User not found&#34;)
    return user

@app.delete(&#34;/users/{user_id}&#34;)
async def delete_user_endpoint(
    user_id: int,
    db: AsyncSession = Depends(get_db)
):
    success = await delete_user(db, user_id)
    if not success:
        raise HTTPException(status_code=404, detail=&#34;User not found&#34;)
    return {&#34;message&#34;: &#34;User deleted&#34;}
```

---

## ⚠️ 非同步注意事項

### 1. 關聯載入

```python
# ❌ 錯誤：非同步中存取未載入的關聯
async def bad_example(db: AsyncSession, user_id: int):
    stmt = select(User).where(User.id == user_id)
    result = await db.execute(stmt)
    user = result.scalar_one()

    # 這會報錯：greenlet_spawn 錯誤
    print(user.posts)  # ❌ MissingGreenlet error

# ✅ 正確：使用 selectinload 預先載入
async def good_example(db: AsyncSession, user_id: int):
    stmt = (
        select(User)
        .where(User.id == user_id)
        .options(selectinload(User.posts))
    )
    result = await db.execute(stmt)
    user = result.scalar_one()

    print(user.posts)  # ✅ 正常存取
```

### 2. Session 生命週期

```python
# ❌ 錯誤：在 Session 關閉後存取物件
async def bad_session_example():
    async with AsyncSessionLocal() as db:
        stmt = select(User).where(User.id == 1)
        result = await db.execute(stmt)
        user = result.scalar_one()

    # Session 已關閉
    print(user.email)  # ❌ 可能報錯（如果使用 expire_on_commit=True）

# ✅ 正確：設定 expire_on_commit=False 或在 Session 內操作
AsyncSessionLocal = async_sessionmaker(
    async_engine,
    class_=AsyncSession,
    expire_on_commit=False  # 關鍵設定
)
```

### 3. 避免同步操作

```python
# ❌ 錯誤：在非同步中使用同步操作
async def bad_sync_in_async(db: AsyncSession):
    # 這會阻塞事件迴圈
    import time
    time.sleep(5)  # ❌ 同步睡眠

    # 使用同步的資料庫驅動
    import psycopg2  # ❌ 同步驅動

# ✅ 正確：使用非同步操作
async def good_async_example(db: AsyncSession):
    import asyncio
    await asyncio.sleep(5)  # ✅ 非同步睡眠

    # 使用非同步的資料庫驅動（asyncpg）
```

---

## 🔄 交易管理

### 基本交易

```python
async def transfer_money(
    db: AsyncSession,
    from_user_id: int,
    to_user_id: int,
    amount: float
):
    &#34;&#34;&#34;轉帳（交易範例）&#34;&#34;&#34;
    try:
        # 開始交易
        async with db.begin():
            # 查詢兩個使用者
            from_user = await db.get(User, from_user_id)
            to_user = await db.get(User, to_user_id)

            if not from_user or not to_user:
                raise ValueError(&#34;User not found&#34;)

            if from_user.balance &lt; amount:
                raise ValueError(&#34;Insufficient balance&#34;)

            # 執行轉帳
            from_user.balance -= amount
            to_user.balance &#43;= amount

            # begin() 結束時自動 commit
    except Exception:
        # 發生錯誤時自動 rollback
        raise

# 或者手動管理
async def transfer_money_manual(db: AsyncSession, ...):
    try:
        from_user = await db.get(User, from_user_id)
        to_user = await db.get(User, to_user_id)

        from_user.balance -= amount
        to_user.balance &#43;= amount

        await db.commit()
    except Exception:
        await db.rollback()
        raise
```

### 巢狀交易（Savepoint）

```python
async def complex_operation(db: AsyncSession):
    &#34;&#34;&#34;複雜操作（使用 Savepoint）&#34;&#34;&#34;
    try:
        # 主交易
        async with db.begin():
            user = User(username=&#34;john&#34;, email=&#34;john@example.com&#34;)
            db.add(user)

            # 巢狀交易（Savepoint）
            async with db.begin_nested():
                try:
                    post = Post(title=&#34;Test&#34;, content=&#34;...&#34;, author=user)
                    db.add(post)
                except Exception:
                    # Savepoint rollback，不影響主交易
                    pass

            # 繼續主交易
            another_user = User(username=&#34;jane&#34;, email=&#34;jane@example.com&#34;)
            db.add(another_user)
    except Exception:
        raise
```

---

## 📝 完整範例：Repository 模式

```python
from abc import ABC, abstractmethod
from typing import TypeVar, Generic, Type
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

T = TypeVar(&#34;T&#34;)

class BaseRepository(ABC, Generic[T]):
    &#34;&#34;&#34;基礎 Repository&#34;&#34;&#34;

    def __init__(self, db: AsyncSession, model: Type[T]):
        self.db = db
        self.model = model

    async def get(self, id: int) -&gt; T | None:
        return await self.db.get(self.model, id)

    async def get_all(self, skip: int = 0, limit: int = 100) -&gt; list[T]:
        stmt = select(self.model).offset(skip).limit(limit)
        result = await self.db.execute(stmt)
        return list(result.scalars().all())

    async def create(self, **kwargs) -&gt; T:
        obj = self.model(**kwargs)
        self.db.add(obj)
        await self.db.commit()
        await self.db.refresh(obj)
        return obj

    async def update(self, id: int, **kwargs) -&gt; T | None:
        obj = await self.get(id)
        if not obj:
            return None

        for key, value in kwargs.items():
            setattr(obj, key, value)

        await self.db.commit()
        await self.db.refresh(obj)
        return obj

    async def delete(self, id: int) -&gt; bool:
        obj = await self.get(id)
        if not obj:
            return False

        await self.db.delete(obj)
        await self.db.commit()
        return True


class UserRepository(BaseRepository[User]):
    &#34;&#34;&#34;使用者 Repository&#34;&#34;&#34;

    def __init__(self, db: AsyncSession):
        super().__init__(db, User)

    async def get_by_email(self, email: str) -&gt; User | None:
        stmt = select(User).where(User.email == email)
        result = await self.db.execute(stmt)
        return result.scalar_one_or_none()

    async def get_by_username(self, username: str) -&gt; User | None:
        stmt = select(User).where(User.username == username)
        result = await self.db.execute(stmt)
        return result.scalar_one_or_none()

    async def get_active_users(self) -&gt; list[User]:
        stmt = select(User).where(User.is_active == True)
        result = await self.db.execute(stmt)
        return list(result.scalars().all())


# 使用 Repository
async def user_service_example(db: AsyncSession):
    repo = UserRepository(db)

    # 建立使用者
    user = await repo.create(
        username=&#34;john&#34;,
        email=&#34;john@example.com&#34;,
        hashed_password=&#34;...&#34;
    )

    # 查詢
    user = await repo.get_by_email(&#34;john@example.com&#34;)

    # 更新
    user = await repo.update(user.id, is_active=False)

    # 刪除
    await repo.delete(user.id)
```

---

## ✅ 重點總結

### 非同步設定

```python
# 非同步 Engine
async_engine = create_async_engine(&#34;postgresql&#43;asyncpg://...&#34;)

# 非同步 Session
AsyncSessionLocal = async_sessionmaker(
    async_engine,
    class_=AsyncSession,
    expire_on_commit=False
)
```

### 重要差異

| 操作 | 同步 | 非同步 |
|------|------|--------|
| 執行查詢 | `db.scalar()` | `await db.execute()` &#43; `.scalar()` |
| 提交 | `db.commit()` | `await db.commit()` |
| 重新載入 | `db.refresh()` | `await db.refresh()` |
| 關聯載入 | `joinedload` | `selectinload`（推薦）|

### 注意事項

1. 使用非同步驅動程式（asyncpg, aiomysql）
2. 設定 `expire_on_commit=False`
3. 關聯必須用 `selectinload` 預先載入
4. 避免在非同步中使用同步操作

---

## 🎤 面試這樣答

### Q: FastAPI 中為什麼要用非同步資料庫操作？

**答案：**

&gt; FastAPI 是非同步框架，使用非同步資料庫操作可以：
&gt;
&gt; 1. **提高併發效能**：等待資料庫時可以處理其他請求
&gt; 2. **充分利用資源**：不會因為 I/O 等待而阻塞
&gt; 3. **統一程式風格**：整個應用都使用 async/await
&gt;
&gt; 但要注意：
&gt; - 需要使用非同步驅動（如 asyncpg）
&gt; - 關聯載入需要用 selectinload
&gt; - Session 設定 expire_on_commit=False

---

**上一篇：** [03-2. 關聯關係](./03-2)
**下一篇：** [03-4. 資料庫遷移 Alembic](./03-4)

---

最後更新：2025-12-17


---

> 作者: luk  
> URL: https://yoru-karu-blog-lalaluk-52581ac5e0cef170a3c8922c19182ecb6f7bd604.gitlab.io/posts/tutorial/fastapi/03-3/  

