目錄
03-6. 交易管理
⏱️ 閱讀時間: 15 分鐘 🎯 難度: ⭐⭐⭐ (進階)
🤔 一句話解釋
交易(Transaction)確保多個資料庫操作要嘛全部成功,要嘛全部失敗,維持資料的一致性。
🎯 為什麼需要交易管理?
沒有交易的問題
# ❌ 沒有交易保護
async def transfer_money(from_id: int, to_id: int, amount: float):
from_user = await get_user(from_id)
from_user.balance -= amount
await db.commit() # ✅ 扣款成功
# 如果這裡發生錯誤...
raise Exception("Network error")
to_user = await get_user(to_id)
to_user.balance += amount
await db.commit() # ❌ 永遠不會執行
# 結果:錢從 A 扣了,但沒有加到 B!使用交易
# ✅ 使用交易保護
async def transfer_money(from_id: int, to_id: int, amount: float):
async with db.begin():
from_user = await get_user(from_id)
from_user.balance -= amount
to_user = await get_user(to_id)
to_user.balance += amount
# 如果發生錯誤,所有操作都會回滾
# 結果:要嘛兩邊都成功,要嘛都不變🔑 ACID 特性
┌─────────────────────────────────────────────────────────┐
│ ACID │
├─────────────────────────────────────────────────────────┤
│ A - Atomicity(原子性) │
│ 所有操作要嘛全部完成,要嘛全部不做 │
├─────────────────────────────────────────────────────────┤
│ C - Consistency(一致性) │
│ 交易前後,資料必須保持一致狀態 │
├─────────────────────────────────────────────────────────┤
│ I - Isolation(隔離性) │
│ 多個交易同時執行時,互不干擾 │
├─────────────────────────────────────────────────────────┤
│ D - Durability(持久性) │
│ 交易完成後,結果永久保存 │
└─────────────────────────────────────────────────────────┘🔧 SQLAlchemy 交易管理
基本交易(同步)
from sqlalchemy.orm import Session
def transfer_money_sync(
db: Session,
from_id: int,
to_id: int,
amount: float
):
try:
from_user = db.get(User, from_id)
to_user = db.get(User, to_id)
if not from_user or not to_user:
raise ValueError("User not found")
if from_user.balance < amount:
raise ValueError("Insufficient balance")
from_user.balance -= amount
to_user.balance += amount
db.commit() # 成功:提交交易
except Exception:
db.rollback() # 失敗:回滾交易
raise基本交易(非同步)
from sqlalchemy.ext.asyncio import AsyncSession
async def transfer_money_async(
db: AsyncSession,
from_id: int,
to_id: int,
amount: float
):
try:
from_user = await db.get(User, from_id)
to_user = await db.get(User, to_id)
if not from_user or not to_user:
raise ValueError("User not found")
if from_user.balance < amount:
raise ValueError("Insufficient balance")
from_user.balance -= amount
to_user.balance += amount
await db.commit()
except Exception:
await db.rollback()
raise使用 Context Manager
# 使用 begin() context manager
async def transfer_with_context(
db: AsyncSession,
from_id: int,
to_id: int,
amount: float
):
async with db.begin():
from_user = await db.get(User, from_id)
to_user = await db.get(User, to_id)
if not from_user or not to_user:
raise ValueError("User not found")
if from_user.balance < amount:
raise ValueError("Insufficient balance")
from_user.balance -= amount
to_user.balance += amount
# 區塊結束時自動 commit
# 發生異常時自動 rollback🔄 巢狀交易(Savepoint)
使用場景
主交易
├── 操作 1(必須成功)
├── 巢狀交易(可以失敗)
│ └── 操作 2(嘗試執行)
└── 操作 3(必須成功)實作
async def complex_operation(db: AsyncSession):
"""複雜操作使用 Savepoint"""
async with db.begin():
# 主交易:建立訂單
order = Order(user_id=1, total=100)
db.add(order)
await db.flush() # 取得 order.id
# 巢狀交易:嘗試發送通知
async with db.begin_nested():
try:
notification = Notification(
user_id=1,
message=f"Order {order.id} created"
)
db.add(notification)
await db.flush()
# 模擬通知服務錯誤
# raise Exception("Notification service error")
except Exception as e:
# Savepoint rollback,不影響主交易
print(f"Notification failed: {e}")
# 不需要手動 rollback,begin_nested 會處理
# 繼續主交易:更新庫存
product = await db.get(Product, 1)
product.stock -= 1
# 主交易結束:commit多層巢狀
async def multi_level_transaction(db: AsyncSession):
"""多層巢狀交易"""
async with db.begin():
print("Level 0: Main transaction")
user = User(username="john", email="john@example.com")
db.add(user)
async with db.begin_nested():
print("Level 1: Savepoint 1")
profile = Profile(user=user, bio="Developer")
db.add(profile)
async with db.begin_nested():
print("Level 2: Savepoint 2")
try:
# 這個操作失敗
raise ValueError("Something went wrong")
except ValueError:
pass
# Savepoint 2 rollback
# Level 1 繼續
settings = UserSettings(user=user, theme="dark")
db.add(settings)
# Level 0 繼續
audit = AuditLog(action="user_created", user_id=user.id)
db.add(audit)
# 所有成功的操作都會被 commit⚠️ 隔離等級
隔離等級說明
| 等級 | 髒讀 | 不可重複讀 | 幻讀 | 說明 |
|---|---|---|---|---|
| READ UNCOMMITTED | 可能 | 可能 | 可能 | 最低隔離 |
| READ COMMITTED | 不會 | 可能 | 可能 | PostgreSQL 預設 |
| REPEATABLE READ | 不會 | 不會 | 可能 | MySQL 預設 |
| SERIALIZABLE | 不會 | 不會 | 不會 | 最高隔離 |
設定隔離等級
from sqlalchemy import create_engine
# 在 Engine 層級設定
engine = create_engine(
DATABASE_URL,
isolation_level="REPEATABLE READ"
)
# 或在 Session 層級設定
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind=engine)
with Session() as session:
session.connection(execution_options={"isolation_level": "SERIALIZABLE"})
# 這個 Session 使用 SERIALIZABLE 隔離等級非同步設定
from sqlalchemy.ext.asyncio import create_async_engine
async_engine = create_async_engine(
DATABASE_URL,
isolation_level="REPEATABLE READ"
)🔒 樂觀鎖與悲觀鎖
樂觀鎖(Optimistic Locking)
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import Mapped, mapped_column
class Product(Base):
__tablename__ = "products"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(100))
stock: Mapped[int] = mapped_column(default=0)
version: Mapped[int] = mapped_column(default=1) # 版本號
__mapper_args__ = {
"version_id_col": version # 設定版本欄位
}
async def update_stock_optimistic(
db: AsyncSession,
product_id: int,
quantity: int
):
"""樂觀鎖更新庫存"""
from sqlalchemy.orm.exc import StaleDataError
try:
product = await db.get(Product, product_id)
product.stock -= quantity
await db.commit()
except StaleDataError:
# 版本衝突:其他交易已經修改了這筆資料
await db.rollback()
raise ValueError("Concurrent modification detected, please retry")悲觀鎖(Pessimistic Locking)
from sqlalchemy import select
async def update_stock_pessimistic(
db: AsyncSession,
product_id: int,
quantity: int
):
"""悲觀鎖更新庫存"""
async with db.begin():
# SELECT ... FOR UPDATE
stmt = (
select(Product)
.where(Product.id == product_id)
.with_for_update() # 鎖定該行
)
result = await db.execute(stmt)
product = result.scalar_one()
if product.stock < quantity:
raise ValueError("Insufficient stock")
product.stock -= quantity
# commit 時釋放鎖
async def update_stock_pessimistic_nowait(
db: AsyncSession,
product_id: int,
quantity: int
):
"""悲觀鎖(不等待)"""
async with db.begin():
stmt = (
select(Product)
.where(Product.id == product_id)
.with_for_update(nowait=True) # 如果已被鎖定,立即報錯
)
try:
result = await db.execute(stmt)
product = result.scalar_one()
product.stock -= quantity
except Exception:
raise ValueError("Resource is locked by another transaction")選擇建議
| 場景 | 建議 |
|---|---|
| 衝突機率低 | 樂觀鎖 |
| 衝突機率高 | 悲觀鎖 |
| 讀多寫少 | 樂觀鎖 |
| 需要保證一致性 | 悲觀鎖 |
📝 實戰範例
訂單處理
from dataclasses import dataclass
from typing import List
from decimal import Decimal
@dataclass
class OrderItemDTO:
product_id: int
quantity: int
@dataclass
class CreateOrderDTO:
user_id: int
items: List[OrderItemDTO]
class OrderService:
def __init__(self, db: AsyncSession):
self.db = db
async def create_order(self, data: CreateOrderDTO) -> Order:
"""建立訂單(含交易管理)"""
async with self.db.begin():
# 1. 檢查並鎖定商品庫存
products = {}
total = Decimal("0")
for item in data.items:
stmt = (
select(Product)
.where(Product.id == item.product_id)
.with_for_update()
)
result = await self.db.execute(stmt)
product = result.scalar_one_or_none()
if not product:
raise ValueError(f"Product {item.product_id} not found")
if product.stock < item.quantity:
raise ValueError(f"Insufficient stock for {product.name}")
products[item.product_id] = product
total += product.price * item.quantity
# 2. 檢查使用者餘額
user = await self.db.get(User, data.user_id)
if not user:
raise ValueError("User not found")
if user.balance < total:
raise ValueError("Insufficient balance")
# 3. 建立訂單
order = Order(
user_id=data.user_id,
total=total,
status="pending"
)
self.db.add(order)
await self.db.flush() # 取得 order.id
# 4. 建立訂單項目並扣減庫存
for item in data.items:
product = products[item.product_id]
order_item = OrderItem(
order_id=order.id,
product_id=item.product_id,
quantity=item.quantity,
unit_price=product.price
)
self.db.add(order_item)
# 扣減庫存
product.stock -= item.quantity
# 5. 扣減使用者餘額
user.balance -= total
# 6. 建立交易記錄
transaction = Transaction(
user_id=data.user_id,
order_id=order.id,
amount=-total,
type="payment"
)
self.db.add(transaction)
# 交易結束,自動 commit
return order帶重試的樂觀鎖
import asyncio
from sqlalchemy.orm.exc import StaleDataError
async def update_with_retry(
db_factory, # Session factory
product_id: int,
quantity: int,
max_retries: int = 3
):
"""帶重試的樂觀鎖更新"""
for attempt in range(max_retries):
async with db_factory() as db:
try:
product = await db.get(Product, product_id)
if not product:
raise ValueError("Product not found")
if product.stock < quantity:
raise ValueError("Insufficient stock")
product.stock -= quantity
await db.commit()
return product
except StaleDataError:
await db.rollback()
if attempt == max_retries - 1:
raise ValueError("Failed after max retries")
# 指數退避
await asyncio.sleep(0.1 * (2 ** attempt))
raise ValueError("Failed to update")✅ 重點總結
交易管理方式
| 方式 | 使用場景 |
|---|---|
db.begin() | 明確控制交易範圍 |
db.begin_nested() | 巢狀交易(Savepoint) |
try/commit/rollback | 手動控制 |
鎖的選擇
| 類型 | 方式 | 適用場景 |
|---|---|---|
| 樂觀鎖 | version 欄位 | 衝突少、讀多寫少 |
| 悲觀鎖 | with_for_update() | 衝突多、需保證一致性 |
最佳實踐
- 交易範圍要盡可能小
- 避免在交易中做耗時操作
- 正確處理異常和回滾
- 根據場景選擇適當的鎖策略
🎤 面試這樣答
Q: 什麼是資料庫交易的 ACID 特性?
答案:
ACID 是資料庫交易的四個特性:
- Atomicity(原子性):交易中的操作要嘛全部成功,要嘛全部失敗
- Consistency(一致性):交易完成後,資料必須保持一致狀態
- Isolation(隔離性):多個交易同時執行時,互不干擾
- Durability(持久性):交易完成後,結果永久保存
async with db.begin(): user.balance -= 100 merchant.balance += 100 # 要嘛都成功,要嘛都失敗
上一篇: 03-5. Repository 模式 下一篇: 03-7. 查詢優化
最後更新:2025-12-17