06-2. yield 依賴

⏱️ 閱讀時間: 15 分鐘 🎯 難度: ⭐⭐⭐ (進階)


🤔 一句話解釋

yield 依賴讓你在請求處理前後執行程式碼,常用於資源的取得和釋放,如資料庫連線。


🔄 普通依賴 vs yield 依賴

# 普通依賴:只在請求前執行
def get_db():
    db = Database()
    return db  # 沒有清理!

# yield 依賴:請求前後都執行
def get_db():
    db = Database()
    try:
        yield db  # 請求處理前
    finally:
        db.close()  # 請求處理後
普通依賴:
請求 ─▶ get_db() ─▶ 端點函數 ─▶ 回應
        │
        └── 建立 DB(沒有關閉)

yield 依賴:
請求 ─▶ get_db() ─▶ 端點函數 ─▶ get_db() ─▶ 回應
        │                        │
        └── yield db             └── finally: db.close()
           (請求前)                (請求後)

🎯 使用場景

場景說明
資料庫連線取得/釋放連線
檔案處理開啟/關閉檔案
交易管理開始/提交/回滾
鎖定機制取得/釋放鎖
臨時資源建立/清理臨時檔案

📦 基本用法

資料庫 Session

from fastapi import FastAPI, Depends
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session

app = FastAPI()

# 建立資料庫引擎
engine = create_engine("sqlite:///./test.db")
SessionLocal = sessionmaker(bind=engine)

def get_db():
    """資料庫 Session 依賴"""
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

@app.get("/users")
def get_users(db: Session = Depends(get_db)):
    return db.query(User).all()

非同步版本

from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

app = FastAPI()

engine = create_async_engine("postgresql+asyncpg://...")
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession)

async def get_db():
    """非同步資料庫 Session"""
    async with AsyncSessionLocal() as session:
        yield session

@app.get("/users")
async def get_users(db: AsyncSession = Depends(get_db)):
    result = await db.execute(select(User))
    return result.scalars().all()

🔧 進階用法

交易管理

from fastapi import FastAPI, Depends
from sqlalchemy.orm import Session

app = FastAPI()

def get_db_with_transaction():
    """帶交易的資料庫 Session"""
    db = SessionLocal()
    try:
        yield db
        db.commit()  # 成功則提交
    except Exception:
        db.rollback()  # 失敗則回滾
        raise
    finally:
        db.close()

@app.post("/transfer")
def transfer_money(
    from_id: int,
    to_id: int,
    amount: float,
    db: Session = Depends(get_db_with_transaction)
):
    # 如果這裡拋出異常,交易會自動回滾
    from_account = db.query(Account).get(from_id)
    to_account = db.query(Account).get(to_id)

    from_account.balance -= amount
    to_account.balance += amount

    return {"status": "success"}

檔案處理

from fastapi import FastAPI, Depends
import tempfile
import os

app = FastAPI()

def get_temp_file():
    """臨時檔案依賴"""
    fd, path = tempfile.mkstemp()
    try:
        yield path
    finally:
        os.close(fd)
        os.unlink(path)  # 刪除臨時檔案

@app.post("/process")
async def process_data(temp_path: str = Depends(get_temp_file)):
    # 使用臨時檔案
    with open(temp_path, 'w') as f:
        f.write("處理中的資料...")

    # 請求結束後,臨時檔案會被自動刪除
    return {"temp_file": temp_path}

分散式鎖

from fastapi import FastAPI, Depends, HTTPException
import redis
import uuid

app = FastAPI()
redis_client = redis.Redis()

def acquire_lock(resource: str):
    """取得分散式鎖"""
    lock_key = f"lock:{resource}"
    lock_value = str(uuid.uuid4())

    # 嘗試取得鎖
    acquired = redis_client.set(
        lock_key,
        lock_value,
        nx=True,
        ex=30  # 30 秒過期
    )

    if not acquired:
        raise HTTPException(status_code=423, detail="Resource locked")

    try:
        yield lock_value
    finally:
        # 使用 Lua 腳本安全釋放鎖
        script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        redis_client.eval(script, 1, lock_key, lock_value)

@app.post("/inventory/{item_id}/update")
def update_inventory(
    item_id: int,
    quantity: int,
    lock: str = Depends(lambda: acquire_lock(f"inventory:{item_id}"))
):
    # 更新庫存(有鎖保護)
    return {"item_id": item_id, "quantity": quantity}

📊 多個 yield 依賴

依賴順序

from fastapi import FastAPI, Depends

app = FastAPI()

def dep_a():
    print("A: 開始")
    yield "A"
    print("A: 結束")

def dep_b():
    print("B: 開始")
    yield "B"
    print("B: 結束")

def dep_c(a: str = Depends(dep_a), b: str = Depends(dep_b)):
    print("C: 開始")
    yield f"{a}+{b}"
    print("C: 結束")

@app.get("/test")
def test_endpoint(c: str = Depends(dep_c)):
    print("端點執行")
    return {"result": c}

# 輸出順序:
# A: 開始
# B: 開始
# C: 開始
# 端點執行
# C: 結束
# B: 結束
# A: 結束

執行流程圖

請求進入
    │
    ▼
┌─────────────────┐
│   dep_a 開始    │
└─────────────────┘
    │
    ▼
┌─────────────────┐
│   dep_b 開始    │
└─────────────────┘
    │
    ▼
┌─────────────────┐
│   dep_c 開始    │
└─────────────────┘
    │
    ▼
┌─────────────────┐
│   端點執行      │
└─────────────────┘
    │
    ▼
┌─────────────────┐
│   dep_c 結束    │
└─────────────────┘
    │
    ▼
┌─────────────────┐
│   dep_b 結束    │
└─────────────────┘
    │
    ▼
┌─────────────────┐
│   dep_a 結束    │
└─────────────────┘
    │
    ▼
回應發送

⚠️ 異常處理

在 yield 後捕獲異常

from fastapi import FastAPI, Depends, HTTPException

app = FastAPI()

def get_db():
    db = SessionLocal()
    try:
        yield db
    except Exception as e:
        # 可以在這裡處理異常
        db.rollback()
        raise  # 重新拋出異常
    finally:
        db.close()

HTTPException 的處理

from fastapi import FastAPI, Depends, HTTPException

app = FastAPI()

def resource_dependency():
    resource = acquire_resource()
    try:
        yield resource
    except Exception as e:
        # 注意:HTTPException 也會在這裡被捕獲
        release_resource(resource)
        raise

@app.get("/items/{item_id}")
def get_item(
    item_id: int,
    resource = Depends(resource_dependency)
):
    if item_id < 0:
        raise HTTPException(status_code=400, detail="Invalid ID")
    return {"item_id": item_id}

確保清理執行

from fastapi import FastAPI, Depends

app = FastAPI()

def safe_dependency():
    """確保清理一定執行"""
    resource = None
    try:
        resource = acquire_resource()
        yield resource
    finally:
        # finally 區塊一定會執行
        if resource:
            release_resource(resource)

📝 實戰範例:完整的資料庫操作

from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import sessionmaker, Session, declarative_base
from pydantic import BaseModel
from typing import List

# 資料庫設定
engine = create_engine("sqlite:///./test.db")
SessionLocal = sessionmaker(bind=engine)
Base = declarative_base()

# 模型
class UserModel(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    name = Column(String(100))
    email = Column(String(100), unique=True)

Base.metadata.create_all(engine)

# Pydantic 模型
class UserCreate(BaseModel):
    name: str
    email: str

class UserResponse(BaseModel):
    id: int
    name: str
    email: str

    class Config:
        from_attributes = True

# 依賴
def get_db():
    """取得資料庫 Session"""
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

def get_db_transactional():
    """取得帶交易的資料庫 Session"""
    db = SessionLocal()
    try:
        yield db
        db.commit()
    except Exception:
        db.rollback()
        raise
    finally:
        db.close()

# FastAPI 應用
app = FastAPI()

@app.get("/users", response_model=List[UserResponse])
def list_users(
    skip: int = 0,
    limit: int = 100,
    db: Session = Depends(get_db)
):
    """列出所有使用者"""
    users = db.query(UserModel).offset(skip).limit(limit).all()
    return users

@app.get("/users/{user_id}", response_model=UserResponse)
def get_user(user_id: int, db: Session = Depends(get_db)):
    """取得單一使用者"""
    user = db.query(UserModel).filter(UserModel.id == user_id).first()
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

@app.post("/users", response_model=UserResponse)
def create_user(
    user: UserCreate,
    db: Session = Depends(get_db_transactional)
):
    """建立使用者"""
    # 檢查 email 是否已存在
    existing = db.query(UserModel).filter(
        UserModel.email == user.email
    ).first()

    if existing:
        raise HTTPException(status_code=400, detail="Email already exists")

    db_user = UserModel(**user.model_dump())
    db.add(db_user)
    db.flush()  # 取得 ID
    db.refresh(db_user)

    return db_user

@app.delete("/users/{user_id}")
def delete_user(
    user_id: int,
    db: Session = Depends(get_db_transactional)
):
    """刪除使用者"""
    user = db.query(UserModel).filter(UserModel.id == user_id).first()
    if not user:
        raise HTTPException(status_code=404, detail="User not found")

    db.delete(user)
    return {"message": "User deleted"}

✅ 重點總結

yield 依賴特性

特性說明
yield 前請求處理前執行
yield 後請求處理後執行
finally一定會執行
執行順序後進先出(LIFO)

使用場景

場景範例
資源管理資料庫連線
交易commit/rollback
鎖定分散式鎖
清理臨時檔案

最佳實踐

  1. 使用 try/finally 確保清理
  2. 異常處理放在 except 區塊
  3. 重要清理邏輯放在 finally
  4. 考慮非同步版本的效能

🎤 面試這樣答

Q: FastAPI 的 yield 依賴有什麼用?

答案:

yield 依賴讓你在請求處理前後執行程式碼:

def get_db():
    db = SessionLocal()
    try:
        yield db      # 請求前:提供 db
    finally:
        db.close()    # 請求後:關閉連線

執行順序:

  1. yield 前的程式碼
  2. 端點函數
  3. yield 後的程式碼(清理)

常見用途:

  • 資料庫連線管理
  • 交易管理(commit/rollback)
  • 資源清理

上一篇: 06-1. 依賴注入基礎 下一篇: 06-3. 依賴覆蓋


最後更新:2025-12-18

0%