# 

# 03-6. 交易管理

&gt; ⏱️ **閱讀時間：** 15 分鐘
&gt; 🎯 **難度：** ⭐⭐⭐ (進階)

---

## 🤔 一句話解釋

**交易（Transaction）確保多個資料庫操作要嘛全部成功，要嘛全部失敗，維持資料的一致性。**

---

## 🎯 為什麼需要交易管理？

### 沒有交易的問題

```python
# ❌ 沒有交易保護
async def transfer_money(from_id: int, to_id: int, amount: float):
    from_user = await get_user(from_id)
    from_user.balance -= amount
    await db.commit()  # ✅ 扣款成功

    # 如果這裡發生錯誤...
    raise Exception(&#34;Network error&#34;)

    to_user = await get_user(to_id)
    to_user.balance &#43;= amount
    await db.commit()  # ❌ 永遠不會執行

# 結果：錢從 A 扣了，但沒有加到 B！
```

### 使用交易

```python
# ✅ 使用交易保護
async def transfer_money(from_id: int, to_id: int, amount: float):
    async with db.begin():
        from_user = await get_user(from_id)
        from_user.balance -= amount

        to_user = await get_user(to_id)
        to_user.balance &#43;= amount

        # 如果發生錯誤，所有操作都會回滾

# 結果：要嘛兩邊都成功，要嘛都不變
```

---

## 🔑 ACID 特性

```
┌─────────────────────────────────────────────────────────┐
│                      ACID                               │
├─────────────────────────────────────────────────────────┤
│  A - Atomicity（原子性）                                 │
│      所有操作要嘛全部完成，要嘛全部不做                    │
├─────────────────────────────────────────────────────────┤
│  C - Consistency（一致性）                               │
│      交易前後，資料必須保持一致狀態                        │
├─────────────────────────────────────────────────────────┤
│  I - Isolation（隔離性）                                 │
│      多個交易同時執行時，互不干擾                          │
├─────────────────────────────────────────────────────────┤
│  D - Durability（持久性）                                │
│      交易完成後，結果永久保存                              │
└─────────────────────────────────────────────────────────┘
```

---

## 🔧 SQLAlchemy 交易管理

### 基本交易（同步）

```python
from sqlalchemy.orm import Session

def transfer_money_sync(
    db: Session,
    from_id: int,
    to_id: int,
    amount: float
):
    try:
        from_user = db.get(User, from_id)
        to_user = db.get(User, to_id)

        if not from_user or not to_user:
            raise ValueError(&#34;User not found&#34;)

        if from_user.balance &lt; amount:
            raise ValueError(&#34;Insufficient balance&#34;)

        from_user.balance -= amount
        to_user.balance &#43;= amount

        db.commit()  # 成功：提交交易
    except Exception:
        db.rollback()  # 失敗：回滾交易
        raise
```

### 基本交易（非同步）

```python
from sqlalchemy.ext.asyncio import AsyncSession

async def transfer_money_async(
    db: AsyncSession,
    from_id: int,
    to_id: int,
    amount: float
):
    try:
        from_user = await db.get(User, from_id)
        to_user = await db.get(User, to_id)

        if not from_user or not to_user:
            raise ValueError(&#34;User not found&#34;)

        if from_user.balance &lt; amount:
            raise ValueError(&#34;Insufficient balance&#34;)

        from_user.balance -= amount
        to_user.balance &#43;= amount

        await db.commit()
    except Exception:
        await db.rollback()
        raise
```

### 使用 Context Manager

```python
# 使用 begin() context manager
async def transfer_with_context(
    db: AsyncSession,
    from_id: int,
    to_id: int,
    amount: float
):
    async with db.begin():
        from_user = await db.get(User, from_id)
        to_user = await db.get(User, to_id)

        if not from_user or not to_user:
            raise ValueError(&#34;User not found&#34;)

        if from_user.balance &lt; amount:
            raise ValueError(&#34;Insufficient balance&#34;)

        from_user.balance -= amount
        to_user.balance &#43;= amount

        # 區塊結束時自動 commit
        # 發生異常時自動 rollback
```

---

## 🔄 巢狀交易（Savepoint）

### 使用場景

```
主交易
├── 操作 1（必須成功）
├── 巢狀交易（可以失敗）
│   └── 操作 2（嘗試執行）
└── 操作 3（必須成功）
```

### 實作

```python
async def complex_operation(db: AsyncSession):
    &#34;&#34;&#34;複雜操作使用 Savepoint&#34;&#34;&#34;
    async with db.begin():
        # 主交易：建立訂單
        order = Order(user_id=1, total=100)
        db.add(order)
        await db.flush()  # 取得 order.id

        # 巢狀交易：嘗試發送通知
        async with db.begin_nested():
            try:
                notification = Notification(
                    user_id=1,
                    message=f&#34;Order {order.id} created&#34;
                )
                db.add(notification)
                await db.flush()

                # 模擬通知服務錯誤
                # raise Exception(&#34;Notification service error&#34;)
            except Exception as e:
                # Savepoint rollback，不影響主交易
                print(f&#34;Notification failed: {e}&#34;)
                # 不需要手動 rollback，begin_nested 會處理

        # 繼續主交易：更新庫存
        product = await db.get(Product, 1)
        product.stock -= 1

        # 主交易結束：commit
```

### 多層巢狀

```python
async def multi_level_transaction(db: AsyncSession):
    &#34;&#34;&#34;多層巢狀交易&#34;&#34;&#34;
    async with db.begin():
        print(&#34;Level 0: Main transaction&#34;)
        user = User(username=&#34;john&#34;, email=&#34;john@example.com&#34;)
        db.add(user)

        async with db.begin_nested():
            print(&#34;Level 1: Savepoint 1&#34;)
            profile = Profile(user=user, bio=&#34;Developer&#34;)
            db.add(profile)

            async with db.begin_nested():
                print(&#34;Level 2: Savepoint 2&#34;)
                try:
                    # 這個操作失敗
                    raise ValueError(&#34;Something went wrong&#34;)
                except ValueError:
                    pass
                # Savepoint 2 rollback

            # Level 1 繼續
            settings = UserSettings(user=user, theme=&#34;dark&#34;)
            db.add(settings)

        # Level 0 繼續
        audit = AuditLog(action=&#34;user_created&#34;, user_id=user.id)
        db.add(audit)

        # 所有成功的操作都會被 commit
```

---

## ⚠️ 隔離等級

### 隔離等級說明

| 等級 | 髒讀 | 不可重複讀 | 幻讀 | 說明 |
|------|------|-----------|------|------|
| READ UNCOMMITTED | 可能 | 可能 | 可能 | 最低隔離 |
| READ COMMITTED | 不會 | 可能 | 可能 | PostgreSQL 預設 |
| REPEATABLE READ | 不會 | 不會 | 可能 | MySQL 預設 |
| SERIALIZABLE | 不會 | 不會 | 不會 | 最高隔離 |

### 設定隔離等級

```python
from sqlalchemy import create_engine

# 在 Engine 層級設定
engine = create_engine(
    DATABASE_URL,
    isolation_level=&#34;REPEATABLE READ&#34;
)

# 或在 Session 層級設定
from sqlalchemy.orm import sessionmaker

Session = sessionmaker(bind=engine)

with Session() as session:
    session.connection(execution_options={&#34;isolation_level&#34;: &#34;SERIALIZABLE&#34;})
    # 這個 Session 使用 SERIALIZABLE 隔離等級
```

### 非同步設定

```python
from sqlalchemy.ext.asyncio import create_async_engine

async_engine = create_async_engine(
    DATABASE_URL,
    isolation_level=&#34;REPEATABLE READ&#34;
)
```

---

## 🔒 樂觀鎖與悲觀鎖

### 樂觀鎖（Optimistic Locking）

```python
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import Mapped, mapped_column

class Product(Base):
    __tablename__ = &#34;products&#34;

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(100))
    stock: Mapped[int] = mapped_column(default=0)
    version: Mapped[int] = mapped_column(default=1)  # 版本號

    __mapper_args__ = {
        &#34;version_id_col&#34;: version  # 設定版本欄位
    }


async def update_stock_optimistic(
    db: AsyncSession,
    product_id: int,
    quantity: int
):
    &#34;&#34;&#34;樂觀鎖更新庫存&#34;&#34;&#34;
    from sqlalchemy.orm.exc import StaleDataError

    try:
        product = await db.get(Product, product_id)
        product.stock -= quantity
        await db.commit()
    except StaleDataError:
        # 版本衝突：其他交易已經修改了這筆資料
        await db.rollback()
        raise ValueError(&#34;Concurrent modification detected, please retry&#34;)
```

### 悲觀鎖（Pessimistic Locking）

```python
from sqlalchemy import select

async def update_stock_pessimistic(
    db: AsyncSession,
    product_id: int,
    quantity: int
):
    &#34;&#34;&#34;悲觀鎖更新庫存&#34;&#34;&#34;
    async with db.begin():
        # SELECT ... FOR UPDATE
        stmt = (
            select(Product)
            .where(Product.id == product_id)
            .with_for_update()  # 鎖定該行
        )
        result = await db.execute(stmt)
        product = result.scalar_one()

        if product.stock &lt; quantity:
            raise ValueError(&#34;Insufficient stock&#34;)

        product.stock -= quantity
        # commit 時釋放鎖


async def update_stock_pessimistic_nowait(
    db: AsyncSession,
    product_id: int,
    quantity: int
):
    &#34;&#34;&#34;悲觀鎖（不等待）&#34;&#34;&#34;
    async with db.begin():
        stmt = (
            select(Product)
            .where(Product.id == product_id)
            .with_for_update(nowait=True)  # 如果已被鎖定，立即報錯
        )
        try:
            result = await db.execute(stmt)
            product = result.scalar_one()
            product.stock -= quantity
        except Exception:
            raise ValueError(&#34;Resource is locked by another transaction&#34;)
```

### 選擇建議

| 場景 | 建議 |
|------|------|
| 衝突機率低 | 樂觀鎖 |
| 衝突機率高 | 悲觀鎖 |
| 讀多寫少 | 樂觀鎖 |
| 需要保證一致性 | 悲觀鎖 |

---

## 📝 實戰範例

### 訂單處理

```python
from dataclasses import dataclass
from typing import List
from decimal import Decimal

@dataclass
class OrderItemDTO:
    product_id: int
    quantity: int

@dataclass
class CreateOrderDTO:
    user_id: int
    items: List[OrderItemDTO]


class OrderService:
    def __init__(self, db: AsyncSession):
        self.db = db

    async def create_order(self, data: CreateOrderDTO) -&gt; Order:
        &#34;&#34;&#34;建立訂單（含交易管理）&#34;&#34;&#34;
        async with self.db.begin():
            # 1. 檢查並鎖定商品庫存
            products = {}
            total = Decimal(&#34;0&#34;)

            for item in data.items:
                stmt = (
                    select(Product)
                    .where(Product.id == item.product_id)
                    .with_for_update()
                )
                result = await self.db.execute(stmt)
                product = result.scalar_one_or_none()

                if not product:
                    raise ValueError(f&#34;Product {item.product_id} not found&#34;)

                if product.stock &lt; item.quantity:
                    raise ValueError(f&#34;Insufficient stock for {product.name}&#34;)

                products[item.product_id] = product
                total &#43;= product.price * item.quantity

            # 2. 檢查使用者餘額
            user = await self.db.get(User, data.user_id)
            if not user:
                raise ValueError(&#34;User not found&#34;)

            if user.balance &lt; total:
                raise ValueError(&#34;Insufficient balance&#34;)

            # 3. 建立訂單
            order = Order(
                user_id=data.user_id,
                total=total,
                status=&#34;pending&#34;
            )
            self.db.add(order)
            await self.db.flush()  # 取得 order.id

            # 4. 建立訂單項目並扣減庫存
            for item in data.items:
                product = products[item.product_id]

                order_item = OrderItem(
                    order_id=order.id,
                    product_id=item.product_id,
                    quantity=item.quantity,
                    unit_price=product.price
                )
                self.db.add(order_item)

                # 扣減庫存
                product.stock -= item.quantity

            # 5. 扣減使用者餘額
            user.balance -= total

            # 6. 建立交易記錄
            transaction = Transaction(
                user_id=data.user_id,
                order_id=order.id,
                amount=-total,
                type=&#34;payment&#34;
            )
            self.db.add(transaction)

            # 交易結束，自動 commit
            return order
```

### 帶重試的樂觀鎖

```python
import asyncio
from sqlalchemy.orm.exc import StaleDataError

async def update_with_retry(
    db_factory,  # Session factory
    product_id: int,
    quantity: int,
    max_retries: int = 3
):
    &#34;&#34;&#34;帶重試的樂觀鎖更新&#34;&#34;&#34;
    for attempt in range(max_retries):
        async with db_factory() as db:
            try:
                product = await db.get(Product, product_id)
                if not product:
                    raise ValueError(&#34;Product not found&#34;)

                if product.stock &lt; quantity:
                    raise ValueError(&#34;Insufficient stock&#34;)

                product.stock -= quantity
                await db.commit()
                return product

            except StaleDataError:
                await db.rollback()
                if attempt == max_retries - 1:
                    raise ValueError(&#34;Failed after max retries&#34;)

                # 指數退避
                await asyncio.sleep(0.1 * (2 ** attempt))

    raise ValueError(&#34;Failed to update&#34;)
```

---

## ✅ 重點總結

### 交易管理方式

| 方式 | 使用場景 |
|------|----------|
| `db.begin()` | 明確控制交易範圍 |
| `db.begin_nested()` | 巢狀交易（Savepoint）|
| `try/commit/rollback` | 手動控制 |

### 鎖的選擇

| 類型 | 方式 | 適用場景 |
|------|------|----------|
| 樂觀鎖 | version 欄位 | 衝突少、讀多寫少 |
| 悲觀鎖 | `with_for_update()` | 衝突多、需保證一致性 |

### 最佳實踐

1. 交易範圍要盡可能小
2. 避免在交易中做耗時操作
3. 正確處理異常和回滾
4. 根據場景選擇適當的鎖策略

---

## 🎤 面試這樣答

### Q: 什麼是資料庫交易的 ACID 特性？

**答案：**

&gt; ACID 是資料庫交易的四個特性：
&gt;
&gt; 1. **Atomicity（原子性）**：交易中的操作要嘛全部成功，要嘛全部失敗
&gt; 2. **Consistency（一致性）**：交易完成後，資料必須保持一致狀態
&gt; 3. **Isolation（隔離性）**：多個交易同時執行時，互不干擾
&gt; 4. **Durability（持久性）**：交易完成後，結果永久保存
&gt;
&gt; ```python
&gt; async with db.begin():
&gt;     user.balance -= 100
&gt;     merchant.balance &#43;= 100
&gt;     # 要嘛都成功，要嘛都失敗
&gt; ```

---

**上一篇：** [03-5. Repository 模式](./03-5)
**下一篇：** [03-7. 查詢優化](./03-7)

---

最後更新：2025-12-17


---

> 作者: luk  
> URL: https://yoru-karu-blog-lalaluk-52581ac5e0cef170a3c8922c19182ecb6f7bd604.gitlab.io/posts/tutorial/fastapi/03-6/  

