03-6. 交易管理

⏱️ 閱讀時間: 15 分鐘 🎯 難度: ⭐⭐⭐ (進階)


🤔 一句話解釋

交易(Transaction)確保多個資料庫操作要嘛全部成功,要嘛全部失敗,維持資料的一致性。


🎯 為什麼需要交易管理?

沒有交易的問題

# ❌ 沒有交易保護
async def transfer_money(from_id: int, to_id: int, amount: float):
    from_user = await get_user(from_id)
    from_user.balance -= amount
    await db.commit()  # ✅ 扣款成功

    # 如果這裡發生錯誤...
    raise Exception("Network error")

    to_user = await get_user(to_id)
    to_user.balance += amount
    await db.commit()  # ❌ 永遠不會執行

# 結果:錢從 A 扣了,但沒有加到 B!

使用交易

# ✅ 使用交易保護
async def transfer_money(from_id: int, to_id: int, amount: float):
    async with db.begin():
        from_user = await get_user(from_id)
        from_user.balance -= amount

        to_user = await get_user(to_id)
        to_user.balance += amount

        # 如果發生錯誤,所有操作都會回滾

# 結果:要嘛兩邊都成功,要嘛都不變

🔑 ACID 特性

┌─────────────────────────────────────────────────────────┐
│                      ACID                               │
├─────────────────────────────────────────────────────────┤
│  A - Atomicity(原子性)                                 │
│      所有操作要嘛全部完成,要嘛全部不做                    │
├─────────────────────────────────────────────────────────┤
│  C - Consistency(一致性)                               │
│      交易前後,資料必須保持一致狀態                        │
├─────────────────────────────────────────────────────────┤
│  I - Isolation(隔離性)                                 │
│      多個交易同時執行時,互不干擾                          │
├─────────────────────────────────────────────────────────┤
│  D - Durability(持久性)                                │
│      交易完成後,結果永久保存                              │
└─────────────────────────────────────────────────────────┘

🔧 SQLAlchemy 交易管理

基本交易(同步)

from sqlalchemy.orm import Session

def transfer_money_sync(
    db: Session,
    from_id: int,
    to_id: int,
    amount: float
):
    try:
        from_user = db.get(User, from_id)
        to_user = db.get(User, to_id)

        if not from_user or not to_user:
            raise ValueError("User not found")

        if from_user.balance < amount:
            raise ValueError("Insufficient balance")

        from_user.balance -= amount
        to_user.balance += amount

        db.commit()  # 成功:提交交易
    except Exception:
        db.rollback()  # 失敗:回滾交易
        raise

基本交易(非同步)

from sqlalchemy.ext.asyncio import AsyncSession

async def transfer_money_async(
    db: AsyncSession,
    from_id: int,
    to_id: int,
    amount: float
):
    try:
        from_user = await db.get(User, from_id)
        to_user = await db.get(User, to_id)

        if not from_user or not to_user:
            raise ValueError("User not found")

        if from_user.balance < amount:
            raise ValueError("Insufficient balance")

        from_user.balance -= amount
        to_user.balance += amount

        await db.commit()
    except Exception:
        await db.rollback()
        raise

使用 Context Manager

# 使用 begin() context manager
async def transfer_with_context(
    db: AsyncSession,
    from_id: int,
    to_id: int,
    amount: float
):
    async with db.begin():
        from_user = await db.get(User, from_id)
        to_user = await db.get(User, to_id)

        if not from_user or not to_user:
            raise ValueError("User not found")

        if from_user.balance < amount:
            raise ValueError("Insufficient balance")

        from_user.balance -= amount
        to_user.balance += amount

        # 區塊結束時自動 commit
        # 發生異常時自動 rollback

🔄 巢狀交易(Savepoint)

使用場景

主交易
├── 操作 1(必須成功)
├── 巢狀交易(可以失敗)
│   └── 操作 2(嘗試執行)
└── 操作 3(必須成功)

實作

async def complex_operation(db: AsyncSession):
    """複雜操作使用 Savepoint"""
    async with db.begin():
        # 主交易:建立訂單
        order = Order(user_id=1, total=100)
        db.add(order)
        await db.flush()  # 取得 order.id

        # 巢狀交易:嘗試發送通知
        async with db.begin_nested():
            try:
                notification = Notification(
                    user_id=1,
                    message=f"Order {order.id} created"
                )
                db.add(notification)
                await db.flush()

                # 模擬通知服務錯誤
                # raise Exception("Notification service error")
            except Exception as e:
                # Savepoint rollback,不影響主交易
                print(f"Notification failed: {e}")
                # 不需要手動 rollback,begin_nested 會處理

        # 繼續主交易:更新庫存
        product = await db.get(Product, 1)
        product.stock -= 1

        # 主交易結束:commit

多層巢狀

async def multi_level_transaction(db: AsyncSession):
    """多層巢狀交易"""
    async with db.begin():
        print("Level 0: Main transaction")
        user = User(username="john", email="john@example.com")
        db.add(user)

        async with db.begin_nested():
            print("Level 1: Savepoint 1")
            profile = Profile(user=user, bio="Developer")
            db.add(profile)

            async with db.begin_nested():
                print("Level 2: Savepoint 2")
                try:
                    # 這個操作失敗
                    raise ValueError("Something went wrong")
                except ValueError:
                    pass
                # Savepoint 2 rollback

            # Level 1 繼續
            settings = UserSettings(user=user, theme="dark")
            db.add(settings)

        # Level 0 繼續
        audit = AuditLog(action="user_created", user_id=user.id)
        db.add(audit)

        # 所有成功的操作都會被 commit

⚠️ 隔離等級

隔離等級說明

等級髒讀不可重複讀幻讀說明
READ UNCOMMITTED可能可能可能最低隔離
READ COMMITTED不會可能可能PostgreSQL 預設
REPEATABLE READ不會不會可能MySQL 預設
SERIALIZABLE不會不會不會最高隔離

設定隔離等級

from sqlalchemy import create_engine

# 在 Engine 層級設定
engine = create_engine(
    DATABASE_URL,
    isolation_level="REPEATABLE READ"
)

# 或在 Session 層級設定
from sqlalchemy.orm import sessionmaker

Session = sessionmaker(bind=engine)

with Session() as session:
    session.connection(execution_options={"isolation_level": "SERIALIZABLE"})
    # 這個 Session 使用 SERIALIZABLE 隔離等級

非同步設定

from sqlalchemy.ext.asyncio import create_async_engine

async_engine = create_async_engine(
    DATABASE_URL,
    isolation_level="REPEATABLE READ"
)

🔒 樂觀鎖與悲觀鎖

樂觀鎖(Optimistic Locking)

from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import Mapped, mapped_column

class Product(Base):
    __tablename__ = "products"

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(100))
    stock: Mapped[int] = mapped_column(default=0)
    version: Mapped[int] = mapped_column(default=1)  # 版本號

    __mapper_args__ = {
        "version_id_col": version  # 設定版本欄位
    }


async def update_stock_optimistic(
    db: AsyncSession,
    product_id: int,
    quantity: int
):
    """樂觀鎖更新庫存"""
    from sqlalchemy.orm.exc import StaleDataError

    try:
        product = await db.get(Product, product_id)
        product.stock -= quantity
        await db.commit()
    except StaleDataError:
        # 版本衝突:其他交易已經修改了這筆資料
        await db.rollback()
        raise ValueError("Concurrent modification detected, please retry")

悲觀鎖(Pessimistic Locking)

from sqlalchemy import select

async def update_stock_pessimistic(
    db: AsyncSession,
    product_id: int,
    quantity: int
):
    """悲觀鎖更新庫存"""
    async with db.begin():
        # SELECT ... FOR UPDATE
        stmt = (
            select(Product)
            .where(Product.id == product_id)
            .with_for_update()  # 鎖定該行
        )
        result = await db.execute(stmt)
        product = result.scalar_one()

        if product.stock < quantity:
            raise ValueError("Insufficient stock")

        product.stock -= quantity
        # commit 時釋放鎖


async def update_stock_pessimistic_nowait(
    db: AsyncSession,
    product_id: int,
    quantity: int
):
    """悲觀鎖(不等待)"""
    async with db.begin():
        stmt = (
            select(Product)
            .where(Product.id == product_id)
            .with_for_update(nowait=True)  # 如果已被鎖定,立即報錯
        )
        try:
            result = await db.execute(stmt)
            product = result.scalar_one()
            product.stock -= quantity
        except Exception:
            raise ValueError("Resource is locked by another transaction")

選擇建議

場景建議
衝突機率低樂觀鎖
衝突機率高悲觀鎖
讀多寫少樂觀鎖
需要保證一致性悲觀鎖

📝 實戰範例

訂單處理

from dataclasses import dataclass
from typing import List
from decimal import Decimal

@dataclass
class OrderItemDTO:
    product_id: int
    quantity: int

@dataclass
class CreateOrderDTO:
    user_id: int
    items: List[OrderItemDTO]


class OrderService:
    def __init__(self, db: AsyncSession):
        self.db = db

    async def create_order(self, data: CreateOrderDTO) -> Order:
        """建立訂單(含交易管理)"""
        async with self.db.begin():
            # 1. 檢查並鎖定商品庫存
            products = {}
            total = Decimal("0")

            for item in data.items:
                stmt = (
                    select(Product)
                    .where(Product.id == item.product_id)
                    .with_for_update()
                )
                result = await self.db.execute(stmt)
                product = result.scalar_one_or_none()

                if not product:
                    raise ValueError(f"Product {item.product_id} not found")

                if product.stock < item.quantity:
                    raise ValueError(f"Insufficient stock for {product.name}")

                products[item.product_id] = product
                total += product.price * item.quantity

            # 2. 檢查使用者餘額
            user = await self.db.get(User, data.user_id)
            if not user:
                raise ValueError("User not found")

            if user.balance < total:
                raise ValueError("Insufficient balance")

            # 3. 建立訂單
            order = Order(
                user_id=data.user_id,
                total=total,
                status="pending"
            )
            self.db.add(order)
            await self.db.flush()  # 取得 order.id

            # 4. 建立訂單項目並扣減庫存
            for item in data.items:
                product = products[item.product_id]

                order_item = OrderItem(
                    order_id=order.id,
                    product_id=item.product_id,
                    quantity=item.quantity,
                    unit_price=product.price
                )
                self.db.add(order_item)

                # 扣減庫存
                product.stock -= item.quantity

            # 5. 扣減使用者餘額
            user.balance -= total

            # 6. 建立交易記錄
            transaction = Transaction(
                user_id=data.user_id,
                order_id=order.id,
                amount=-total,
                type="payment"
            )
            self.db.add(transaction)

            # 交易結束,自動 commit
            return order

帶重試的樂觀鎖

import asyncio
from sqlalchemy.orm.exc import StaleDataError

async def update_with_retry(
    db_factory,  # Session factory
    product_id: int,
    quantity: int,
    max_retries: int = 3
):
    """帶重試的樂觀鎖更新"""
    for attempt in range(max_retries):
        async with db_factory() as db:
            try:
                product = await db.get(Product, product_id)
                if not product:
                    raise ValueError("Product not found")

                if product.stock < quantity:
                    raise ValueError("Insufficient stock")

                product.stock -= quantity
                await db.commit()
                return product

            except StaleDataError:
                await db.rollback()
                if attempt == max_retries - 1:
                    raise ValueError("Failed after max retries")

                # 指數退避
                await asyncio.sleep(0.1 * (2 ** attempt))

    raise ValueError("Failed to update")

✅ 重點總結

交易管理方式

方式使用場景
db.begin()明確控制交易範圍
db.begin_nested()巢狀交易(Savepoint)
try/commit/rollback手動控制

鎖的選擇

類型方式適用場景
樂觀鎖version 欄位衝突少、讀多寫少
悲觀鎖with_for_update()衝突多、需保證一致性

最佳實踐

  1. 交易範圍要盡可能小
  2. 避免在交易中做耗時操作
  3. 正確處理異常和回滾
  4. 根據場景選擇適當的鎖策略

🎤 面試這樣答

Q: 什麼是資料庫交易的 ACID 特性?

答案:

ACID 是資料庫交易的四個特性:

  1. Atomicity(原子性):交易中的操作要嘛全部成功,要嘛全部失敗
  2. Consistency(一致性):交易完成後,資料必須保持一致狀態
  3. Isolation(隔離性):多個交易同時執行時,互不干擾
  4. Durability(持久性):交易完成後,結果永久保存
async with db.begin():
    user.balance -= 100
    merchant.balance += 100
    # 要嘛都成功,要嘛都失敗

上一篇: 03-5. Repository 模式 下一篇: 03-7. 查詢優化


最後更新:2025-12-17

0%