目錄
03-7. 查詢優化
⏱️ 閱讀時間: 18 分鐘 🎯 難度: ⭐⭐⭐ (進階)
🤔 一句話解釋
查詢優化是透過索引、載入策略、分頁等技術,讓資料庫查詢更快、更有效率。
🎯 常見效能問題
N+1 查詢問題
# ❌ N+1 問題:1 次查詢使用者 + N 次查詢文章
users = await db.execute(select(User))
for user in users.scalars():
# 每次迴圈都會觸發一次查詢
print(f"{user.username}: {len(user.posts)} posts")
# 執行的 SQL:
# SELECT * FROM users;
# SELECT * FROM posts WHERE user_id = 1;
# SELECT * FROM posts WHERE user_id = 2;
# SELECT * FROM posts WHERE user_id = 3;
# ... N 次解決方案
# ✅ 使用 Eager Loading:只有 2 次查詢
from sqlalchemy.orm import selectinload
stmt = select(User).options(selectinload(User.posts))
result = await db.execute(stmt)
users = result.scalars().unique().all()
for user in users:
print(f"{user.username}: {len(user.posts)} posts")
# 執行的 SQL:
# SELECT * FROM users;
# SELECT * FROM posts WHERE user_id IN (1, 2, 3, ...);📊 載入策略比較
各種載入策略
from sqlalchemy.orm import (
selectinload, # IN 查詢
joinedload, # JOIN
subqueryload, # 子查詢
lazyload, # 延遲載入
raiseload, # 禁止延遲載入
)
# 1. selectinload - 推薦用於一對多
stmt = select(User).options(selectinload(User.posts))
# SQL: SELECT * FROM users;
# SQL: SELECT * FROM posts WHERE user_id IN (1, 2, 3);
# 2. joinedload - 用於一對一、多對一
stmt = select(User).options(joinedload(User.profile))
# SQL: SELECT * FROM users LEFT JOIN profiles ON ...;
# 3. subqueryload - 複雜過濾時使用
stmt = select(User).options(subqueryload(User.posts))
# SQL: SELECT * FROM users;
# SQL: SELECT * FROM posts WHERE user_id IN (SELECT id FROM users);
# 4. lazyload - 預設行為,存取時才載入
stmt = select(User).options(lazyload(User.posts))
# 5. raiseload - 禁止延遲載入,幫助發現 N+1
stmt = select(User).options(raiseload(User.posts))
# 存取 user.posts 時會拋出異常策略選擇指南
| 關聯類型 | 推薦策略 | 原因 |
|---|---|---|
| 一對一 | joinedload | 一次 JOIN 取得所有資料 |
| 多對一 | joinedload | 同上 |
| 一對多 | selectinload | 避免笛卡爾積 |
| 多對多 | selectinload | 同上 |
巢狀載入
# 載入多層關聯
stmt = (
select(User)
.options(
selectinload(User.posts)
.selectinload(Post.comments)
.joinedload(Comment.author)
)
.options(
joinedload(User.profile)
)
)
# 只載入需要的欄位
from sqlalchemy.orm import load_only
stmt = (
select(User)
.options(
load_only(User.id, User.username, User.email),
selectinload(User.posts).load_only(Post.id, Post.title)
)
)🔍 索引優化
何時需要索引
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True) # 自動建立索引
# 常用於 WHERE 條件的欄位
email: Mapped[str] = mapped_column(String(100), unique=True, index=True)
username: Mapped[str] = mapped_column(String(50), unique=True, index=True)
# 常用於排序的欄位
created_at: Mapped[datetime] = mapped_column(index=True)
# 狀態欄位(如果經常過濾)
is_active: Mapped[bool] = mapped_column(index=True)
# 不需要索引的欄位
bio: Mapped[str] = mapped_column(Text) # 很少用於查詢複合索引
from sqlalchemy import Index
class Order(Base):
__tablename__ = "orders"
id: Mapped[int] = mapped_column(primary_key=True)
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
status: Mapped[str] = mapped_column(String(20))
created_at: Mapped[datetime] = mapped_column()
# 複合索引:常一起查詢的欄位
__table_args__ = (
Index("ix_orders_user_status", "user_id", "status"),
Index("ix_orders_user_created", "user_id", "created_at"),
)
# 查詢時會使用複合索引
stmt = (
select(Order)
.where(Order.user_id == 1)
.where(Order.status == "pending")
)索引使用建議
| 情況 | 建議 |
|---|---|
| 主鍵 | 自動建立 |
| 外鍵 | 建議建立 |
| WHERE 條件常用 | 建議建立 |
| ORDER BY 常用 | 考慮建立 |
| 組合查詢 | 使用複合索引 |
| 低選擇性欄位(如 bool) | 視情況 |
| 很少查詢的欄位 | 不建立 |
📄 分頁優化
基本分頁
# Offset 分頁(適合小資料量)
async def get_users_offset(
db: AsyncSession,
page: int = 1,
page_size: int = 20
) -> tuple[list[User], int]:
# 查詢資料
offset = (page - 1) * page_size
stmt = select(User).offset(offset).limit(page_size)
result = await db.execute(stmt)
users = list(result.scalars().all())
# 查詢總數
count_stmt = select(func.count()).select_from(User)
total = await db.scalar(count_stmt)
return users, totalCursor 分頁(適合大資料量)
from datetime import datetime
from typing import Optional
async def get_users_cursor(
db: AsyncSession,
cursor: Optional[datetime] = None, # 上一頁最後一筆的 created_at
limit: int = 20
) -> tuple[list[User], Optional[datetime]]:
"""Cursor 分頁(基於 created_at)"""
stmt = select(User).order_by(User.created_at.desc())
if cursor:
stmt = stmt.where(User.created_at < cursor)
stmt = stmt.limit(limit + 1) # 多取一筆判斷是否有下一頁
result = await db.execute(stmt)
users = list(result.scalars().all())
# 判斷是否有下一頁
has_next = len(users) > limit
if has_next:
users = users[:limit]
next_cursor = users[-1].created_at if users and has_next else None
return users, next_cursor
# 使用
async def paginated_api():
users, next_cursor = await get_users_cursor(db, cursor=None)
# 前端下次請求帶上 next_cursorKeyset 分頁
async def get_users_keyset(
db: AsyncSession,
last_id: Optional[int] = None,
last_created_at: Optional[datetime] = None,
limit: int = 20
) -> list[User]:
"""Keyset 分頁(基於多個欄位)"""
stmt = (
select(User)
.order_by(User.created_at.desc(), User.id.desc())
)
if last_id and last_created_at:
# (created_at, id) < (last_created_at, last_id)
stmt = stmt.where(
or_(
User.created_at < last_created_at,
and_(
User.created_at == last_created_at,
User.id < last_id
)
)
)
stmt = stmt.limit(limit)
result = await db.execute(stmt)
return list(result.scalars().all())分頁方式比較
| 方式 | 優點 | 缺點 | 適用場景 |
|---|---|---|---|
| Offset | 簡單、支援跳頁 | 大 offset 慢 | 小資料量 |
| Cursor | 效能穩定 | 不能跳頁 | 大資料量 |
| Keyset | 效能最好 | 實作複雜 | 超大資料量 |
🔧 查詢技巧
只查詢需要的欄位
from sqlalchemy.orm import load_only
# ❌ 查詢所有欄位
stmt = select(User)
# ✅ 只查詢需要的欄位
stmt = select(User).options(load_only(User.id, User.username))
# 或使用 select 指定欄位
stmt = select(User.id, User.username)
result = await db.execute(stmt)
for row in result:
print(row.id, row.username)使用 exists 而不是 count
from sqlalchemy import exists
# ❌ 使用 count
stmt = select(func.count()).select_from(User).where(User.email == email)
count = await db.scalar(stmt)
email_exists = count > 0
# ✅ 使用 exists(更快)
stmt = select(exists().where(User.email == email))
email_exists = await db.scalar(stmt)批量操作
from sqlalchemy import update, delete
# ❌ 逐一更新
users = await db.scalars(select(User).where(User.is_active == False))
for user in users:
user.is_deleted = True
await db.commit()
# ✅ 批量更新
stmt = (
update(User)
.where(User.is_active == False)
.values(is_deleted=True)
)
await db.execute(stmt)
await db.commit()
# ❌ 逐一刪除
users = await db.scalars(select(User).where(User.is_deleted == True))
for user in users:
await db.delete(user)
await db.commit()
# ✅ 批量刪除
stmt = delete(User).where(User.is_deleted == True)
await db.execute(stmt)
await db.commit()使用子查詢
from sqlalchemy import select
# 找出有發文的使用者
subquery = select(Post.author_id).distinct().subquery()
stmt = select(User).where(User.id.in_(select(subquery)))
# 找出文章數超過 10 篇的使用者
from sqlalchemy import func
subquery = (
select(Post.author_id, func.count(Post.id).label("post_count"))
.group_by(Post.author_id)
.having(func.count(Post.id) > 10)
.subquery()
)
stmt = select(User).join(subquery, User.id == subquery.c.author_id)📊 查詢分析
開啟 SQL 日誌
# 在 Engine 設定
engine = create_async_engine(
DATABASE_URL,
echo=True # 印出所有 SQL
)
# 或設定 logging
import logging
logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO)使用 EXPLAIN
async def analyze_query(db: AsyncSession, stmt):
"""分析查詢計劃"""
from sqlalchemy import text
# PostgreSQL
explain_stmt = stmt.prefix_with("EXPLAIN ANALYZE")
result = await db.execute(explain_stmt)
for row in result:
print(row)
# 使用
stmt = select(User).where(User.email == "test@example.com")
await analyze_query(db, stmt)監控慢查詢
import time
import logging
from sqlalchemy import event
logger = logging.getLogger(__name__)
@event.listens_for(Engine, "before_cursor_execute")
def receive_before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
conn.info.setdefault("query_start_time", []).append(time.time())
@event.listens_for(Engine, "after_cursor_execute")
def receive_after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
total = time.time() - conn.info["query_start_time"].pop(-1)
if total > 0.1: # 超過 100ms 的查詢
logger.warning(f"Slow query ({total:.2f}s): {statement[:200]}")📝 實戰範例
優化前
# ❌ 效能問題範例
async def get_user_dashboard(db: AsyncSession, user_id: int):
# N+1 問題
user = await db.get(User, user_id)
# 查詢所有欄位
posts = await db.scalars(select(Post).where(Post.author_id == user_id))
result = []
for post in posts:
# 每次迴圈都查詢 comments
comment_count = len(post.comments)
# 每次迴圈都查詢 tags
tags = [tag.name for tag in post.tags]
result.append({
"post": post,
"comment_count": comment_count,
"tags": tags
})
return result優化後
# ✅ 優化版本
async def get_user_dashboard_optimized(db: AsyncSession, user_id: int):
from sqlalchemy import func
# 一次查詢所有需要的資料
stmt = (
select(
Post,
func.count(Comment.id).label("comment_count")
)
.outerjoin(Comment, Post.id == Comment.post_id)
.where(Post.author_id == user_id)
.group_by(Post.id)
.options(
selectinload(Post.tags).load_only(Tag.name),
load_only(Post.id, Post.title, Post.created_at)
)
)
result = await db.execute(stmt)
return [
{
"post": post,
"comment_count": comment_count,
"tags": [tag.name for tag in post.tags]
}
for post, comment_count in result
]✅ 重點總結
常見問題與解決方案
| 問題 | 解決方案 |
|---|---|
| N+1 查詢 | 使用 selectinload/joinedload |
| 查詢太慢 | 加索引、分析執行計劃 |
| 大資料量分頁慢 | 使用 Cursor/Keyset 分頁 |
| 查詢太多欄位 | 使用 load_only |
| 逐一更新/刪除 | 使用批量操作 |
索引建議
| 欄位類型 | 建議 |
|---|---|
| 主鍵 | 自動建立 |
| 外鍵 | 建議建立 |
| WHERE 常用 | 建議建立 |
| ORDER BY 常用 | 考慮建立 |
🎤 面試這樣答
Q: 什麼是 N+1 查詢問題?如何解決?
答案:
N+1 問題是指查詢 N 筆資料時,因為 Lazy Loading,存取關聯會額外執行 N 次查詢。
解決方法:
- Eager Loading:使用
selectinload(一對多)或joinedload(一對一)- 批量查詢:手動用 IN 查詢
# 使用 selectinload stmt = select(User).options(selectinload(User.posts))原本 1 + N 次查詢變成 2 次查詢。
上一篇: 03-6. 交易管理 下一篇: 03-8. 多資料庫支援
最後更新:2025-12-17