# 

# 03-7. 查詢優化

&gt; ⏱️ **閱讀時間：** 18 分鐘
&gt; 🎯 **難度：** ⭐⭐⭐ (進階)

---

## 🤔 一句話解釋

**查詢優化是透過索引、載入策略、分頁等技術，讓資料庫查詢更快、更有效率。**

---

## 🎯 常見效能問題

### N&#43;1 查詢問題

```python
# ❌ N&#43;1 問題：1 次查詢使用者 &#43; N 次查詢文章
users = await db.execute(select(User))
for user in users.scalars():
    # 每次迴圈都會觸發一次查詢
    print(f&#34;{user.username}: {len(user.posts)} posts&#34;)

# 執行的 SQL：
# SELECT * FROM users;
# SELECT * FROM posts WHERE user_id = 1;
# SELECT * FROM posts WHERE user_id = 2;
# SELECT * FROM posts WHERE user_id = 3;
# ... N 次
```

### 解決方案

```python
# ✅ 使用 Eager Loading：只有 2 次查詢
from sqlalchemy.orm import selectinload

stmt = select(User).options(selectinload(User.posts))
result = await db.execute(stmt)
users = result.scalars().unique().all()

for user in users:
    print(f&#34;{user.username}: {len(user.posts)} posts&#34;)

# 執行的 SQL：
# SELECT * FROM users;
# SELECT * FROM posts WHERE user_id IN (1, 2, 3, ...);
```

---

## 📊 載入策略比較

### 各種載入策略

```python
from sqlalchemy.orm import (
    selectinload,   # IN 查詢
    joinedload,     # JOIN
    subqueryload,   # 子查詢
    lazyload,       # 延遲載入
    raiseload,      # 禁止延遲載入
)

# 1. selectinload - 推薦用於一對多
stmt = select(User).options(selectinload(User.posts))
# SQL: SELECT * FROM users;
# SQL: SELECT * FROM posts WHERE user_id IN (1, 2, 3);

# 2. joinedload - 用於一對一、多對一
stmt = select(User).options(joinedload(User.profile))
# SQL: SELECT * FROM users LEFT JOIN profiles ON ...;

# 3. subqueryload - 複雜過濾時使用
stmt = select(User).options(subqueryload(User.posts))
# SQL: SELECT * FROM users;
# SQL: SELECT * FROM posts WHERE user_id IN (SELECT id FROM users);

# 4. lazyload - 預設行為，存取時才載入
stmt = select(User).options(lazyload(User.posts))

# 5. raiseload - 禁止延遲載入，幫助發現 N&#43;1
stmt = select(User).options(raiseload(User.posts))
# 存取 user.posts 時會拋出異常
```

### 策略選擇指南

| 關聯類型 | 推薦策略 | 原因 |
|----------|----------|------|
| 一對一 | `joinedload` | 一次 JOIN 取得所有資料 |
| 多對一 | `joinedload` | 同上 |
| 一對多 | `selectinload` | 避免笛卡爾積 |
| 多對多 | `selectinload` | 同上 |

### 巢狀載入

```python
# 載入多層關聯
stmt = (
    select(User)
    .options(
        selectinload(User.posts)
        .selectinload(Post.comments)
        .joinedload(Comment.author)
    )
    .options(
        joinedload(User.profile)
    )
)

# 只載入需要的欄位
from sqlalchemy.orm import load_only

stmt = (
    select(User)
    .options(
        load_only(User.id, User.username, User.email),
        selectinload(User.posts).load_only(Post.id, Post.title)
    )
)
```

---

## 🔍 索引優化

### 何時需要索引

```python
class User(Base):
    __tablename__ = &#34;users&#34;

    id: Mapped[int] = mapped_column(primary_key=True)  # 自動建立索引

    # 常用於 WHERE 條件的欄位
    email: Mapped[str] = mapped_column(String(100), unique=True, index=True)
    username: Mapped[str] = mapped_column(String(50), unique=True, index=True)

    # 常用於排序的欄位
    created_at: Mapped[datetime] = mapped_column(index=True)

    # 狀態欄位（如果經常過濾）
    is_active: Mapped[bool] = mapped_column(index=True)

    # 不需要索引的欄位
    bio: Mapped[str] = mapped_column(Text)  # 很少用於查詢
```

### 複合索引

```python
from sqlalchemy import Index

class Order(Base):
    __tablename__ = &#34;orders&#34;

    id: Mapped[int] = mapped_column(primary_key=True)
    user_id: Mapped[int] = mapped_column(ForeignKey(&#34;users.id&#34;))
    status: Mapped[str] = mapped_column(String(20))
    created_at: Mapped[datetime] = mapped_column()

    # 複合索引：常一起查詢的欄位
    __table_args__ = (
        Index(&#34;ix_orders_user_status&#34;, &#34;user_id&#34;, &#34;status&#34;),
        Index(&#34;ix_orders_user_created&#34;, &#34;user_id&#34;, &#34;created_at&#34;),
    )


# 查詢時會使用複合索引
stmt = (
    select(Order)
    .where(Order.user_id == 1)
    .where(Order.status == &#34;pending&#34;)
)
```

### 索引使用建議

| 情況 | 建議 |
|------|------|
| 主鍵 | 自動建立 |
| 外鍵 | 建議建立 |
| WHERE 條件常用 | 建議建立 |
| ORDER BY 常用 | 考慮建立 |
| 組合查詢 | 使用複合索引 |
| 低選擇性欄位（如 bool） | 視情況 |
| 很少查詢的欄位 | 不建立 |

---

## 📄 分頁優化

### 基本分頁

```python
# Offset 分頁（適合小資料量）
async def get_users_offset(
    db: AsyncSession,
    page: int = 1,
    page_size: int = 20
) -&gt; tuple[list[User], int]:
    # 查詢資料
    offset = (page - 1) * page_size
    stmt = select(User).offset(offset).limit(page_size)
    result = await db.execute(stmt)
    users = list(result.scalars().all())

    # 查詢總數
    count_stmt = select(func.count()).select_from(User)
    total = await db.scalar(count_stmt)

    return users, total
```

### Cursor 分頁（適合大資料量）

```python
from datetime import datetime
from typing import Optional

async def get_users_cursor(
    db: AsyncSession,
    cursor: Optional[datetime] = None,  # 上一頁最後一筆的 created_at
    limit: int = 20
) -&gt; tuple[list[User], Optional[datetime]]:
    &#34;&#34;&#34;Cursor 分頁（基於 created_at）&#34;&#34;&#34;
    stmt = select(User).order_by(User.created_at.desc())

    if cursor:
        stmt = stmt.where(User.created_at &lt; cursor)

    stmt = stmt.limit(limit &#43; 1)  # 多取一筆判斷是否有下一頁

    result = await db.execute(stmt)
    users = list(result.scalars().all())

    # 判斷是否有下一頁
    has_next = len(users) &gt; limit
    if has_next:
        users = users[:limit]

    next_cursor = users[-1].created_at if users and has_next else None

    return users, next_cursor


# 使用
async def paginated_api():
    users, next_cursor = await get_users_cursor(db, cursor=None)
    # 前端下次請求帶上 next_cursor
```

### Keyset 分頁

```python
async def get_users_keyset(
    db: AsyncSession,
    last_id: Optional[int] = None,
    last_created_at: Optional[datetime] = None,
    limit: int = 20
) -&gt; list[User]:
    &#34;&#34;&#34;Keyset 分頁（基於多個欄位）&#34;&#34;&#34;
    stmt = (
        select(User)
        .order_by(User.created_at.desc(), User.id.desc())
    )

    if last_id and last_created_at:
        # (created_at, id) &lt; (last_created_at, last_id)
        stmt = stmt.where(
            or_(
                User.created_at &lt; last_created_at,
                and_(
                    User.created_at == last_created_at,
                    User.id &lt; last_id
                )
            )
        )

    stmt = stmt.limit(limit)

    result = await db.execute(stmt)
    return list(result.scalars().all())
```

### 分頁方式比較

| 方式 | 優點 | 缺點 | 適用場景 |
|------|------|------|----------|
| Offset | 簡單、支援跳頁 | 大 offset 慢 | 小資料量 |
| Cursor | 效能穩定 | 不能跳頁 | 大資料量 |
| Keyset | 效能最好 | 實作複雜 | 超大資料量 |

---

## 🔧 查詢技巧

### 只查詢需要的欄位

```python
from sqlalchemy.orm import load_only

# ❌ 查詢所有欄位
stmt = select(User)

# ✅ 只查詢需要的欄位
stmt = select(User).options(load_only(User.id, User.username))

# 或使用 select 指定欄位
stmt = select(User.id, User.username)
result = await db.execute(stmt)
for row in result:
    print(row.id, row.username)
```

### 使用 exists 而不是 count

```python
from sqlalchemy import exists

# ❌ 使用 count
stmt = select(func.count()).select_from(User).where(User.email == email)
count = await db.scalar(stmt)
email_exists = count &gt; 0

# ✅ 使用 exists（更快）
stmt = select(exists().where(User.email == email))
email_exists = await db.scalar(stmt)
```

### 批量操作

```python
from sqlalchemy import update, delete

# ❌ 逐一更新
users = await db.scalars(select(User).where(User.is_active == False))
for user in users:
    user.is_deleted = True
await db.commit()

# ✅ 批量更新
stmt = (
    update(User)
    .where(User.is_active == False)
    .values(is_deleted=True)
)
await db.execute(stmt)
await db.commit()

# ❌ 逐一刪除
users = await db.scalars(select(User).where(User.is_deleted == True))
for user in users:
    await db.delete(user)
await db.commit()

# ✅ 批量刪除
stmt = delete(User).where(User.is_deleted == True)
await db.execute(stmt)
await db.commit()
```

### 使用子查詢

```python
from sqlalchemy import select

# 找出有發文的使用者
subquery = select(Post.author_id).distinct().subquery()
stmt = select(User).where(User.id.in_(select(subquery)))

# 找出文章數超過 10 篇的使用者
from sqlalchemy import func

subquery = (
    select(Post.author_id, func.count(Post.id).label(&#34;post_count&#34;))
    .group_by(Post.author_id)
    .having(func.count(Post.id) &gt; 10)
    .subquery()
)

stmt = select(User).join(subquery, User.id == subquery.c.author_id)
```

---

## 📊 查詢分析

### 開啟 SQL 日誌

```python
# 在 Engine 設定
engine = create_async_engine(
    DATABASE_URL,
    echo=True  # 印出所有 SQL
)

# 或設定 logging
import logging

logging.getLogger(&#34;sqlalchemy.engine&#34;).setLevel(logging.INFO)
```

### 使用 EXPLAIN

```python
async def analyze_query(db: AsyncSession, stmt):
    &#34;&#34;&#34;分析查詢計劃&#34;&#34;&#34;
    from sqlalchemy import text

    # PostgreSQL
    explain_stmt = stmt.prefix_with(&#34;EXPLAIN ANALYZE&#34;)
    result = await db.execute(explain_stmt)
    for row in result:
        print(row)

# 使用
stmt = select(User).where(User.email == &#34;test@example.com&#34;)
await analyze_query(db, stmt)
```

### 監控慢查詢

```python
import time
import logging
from sqlalchemy import event

logger = logging.getLogger(__name__)

@event.listens_for(Engine, &#34;before_cursor_execute&#34;)
def receive_before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
    conn.info.setdefault(&#34;query_start_time&#34;, []).append(time.time())

@event.listens_for(Engine, &#34;after_cursor_execute&#34;)
def receive_after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
    total = time.time() - conn.info[&#34;query_start_time&#34;].pop(-1)
    if total &gt; 0.1:  # 超過 100ms 的查詢
        logger.warning(f&#34;Slow query ({total:.2f}s): {statement[:200]}&#34;)
```

---

## 📝 實戰範例

### 優化前

```python
# ❌ 效能問題範例
async def get_user_dashboard(db: AsyncSession, user_id: int):
    # N&#43;1 問題
    user = await db.get(User, user_id)

    # 查詢所有欄位
    posts = await db.scalars(select(Post).where(Post.author_id == user_id))

    result = []
    for post in posts:
        # 每次迴圈都查詢 comments
        comment_count = len(post.comments)
        # 每次迴圈都查詢 tags
        tags = [tag.name for tag in post.tags]
        result.append({
            &#34;post&#34;: post,
            &#34;comment_count&#34;: comment_count,
            &#34;tags&#34;: tags
        })

    return result
```

### 優化後

```python
# ✅ 優化版本
async def get_user_dashboard_optimized(db: AsyncSession, user_id: int):
    from sqlalchemy import func

    # 一次查詢所有需要的資料
    stmt = (
        select(
            Post,
            func.count(Comment.id).label(&#34;comment_count&#34;)
        )
        .outerjoin(Comment, Post.id == Comment.post_id)
        .where(Post.author_id == user_id)
        .group_by(Post.id)
        .options(
            selectinload(Post.tags).load_only(Tag.name),
            load_only(Post.id, Post.title, Post.created_at)
        )
    )

    result = await db.execute(stmt)

    return [
        {
            &#34;post&#34;: post,
            &#34;comment_count&#34;: comment_count,
            &#34;tags&#34;: [tag.name for tag in post.tags]
        }
        for post, comment_count in result
    ]
```

---

## ✅ 重點總結

### 常見問題與解決方案

| 問題 | 解決方案 |
|------|----------|
| N&#43;1 查詢 | 使用 `selectinload`/`joinedload` |
| 查詢太慢 | 加索引、分析執行計劃 |
| 大資料量分頁慢 | 使用 Cursor/Keyset 分頁 |
| 查詢太多欄位 | 使用 `load_only` |
| 逐一更新/刪除 | 使用批量操作 |

### 索引建議

| 欄位類型 | 建議 |
|----------|------|
| 主鍵 | 自動建立 |
| 外鍵 | 建議建立 |
| WHERE 常用 | 建議建立 |
| ORDER BY 常用 | 考慮建立 |

---

## 🎤 面試這樣答

### Q: 什麼是 N&#43;1 查詢問題？如何解決？

**答案：**

&gt; N&#43;1 問題是指查詢 N 筆資料時，因為 Lazy Loading，存取關聯會額外執行 N 次查詢。
&gt;
&gt; **解決方法：**
&gt; 1. **Eager Loading**：使用 `selectinload`（一對多）或 `joinedload`（一對一）
&gt; 2. **批量查詢**：手動用 IN 查詢
&gt;
&gt; ```python
&gt; # 使用 selectinload
&gt; stmt = select(User).options(selectinload(User.posts))
&gt; ```
&gt;
&gt; 原本 1 &#43; N 次查詢變成 2 次查詢。

---

**上一篇：** [03-6. 交易管理](./03-6)
**下一篇：** [03-8. 多資料庫支援](./03-8)

---

最後更新：2025-12-17


---

> 作者: luk  
> URL: https://yoru-karu-blog-lalaluk-52581ac5e0cef170a3c8922c19182ecb6f7bd604.gitlab.io/posts/tutorial/fastapi/03-7/  

