目錄
05-8. 效能優化與監控
⏱️ 閱讀時間: 20 分鐘 🎯 難度: ⭐⭐⭐⭐ (高階)
🤔 一句話解釋
效能優化和監控讓你的 API 更快、更穩定,並能及時發現和解決問題。
📊 效能監控指標
┌─────────────────────────────────────────────────────────┐
│ 效能監控指標 │
├──────────────┬──────────────────────────────────────────┤
│ 延遲 │ P50, P95, P99 回應時間 │
│ 吞吐量 │ 每秒請求數 (RPS) │
│ 錯誤率 │ 5xx/4xx 錯誤百分比 │
│ 可用性 │ 服務正常運行時間百分比 │
│ 資源使用 │ CPU、記憶體、連線數 │
└──────────────┴──────────────────────────────────────────┘🔧 請求追蹤中介層
基本追蹤
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware
import time
import logging
import uuid
app = FastAPI()
logger = logging.getLogger(__name__)
class RequestTracingMiddleware(BaseHTTPMiddleware):
"""請求追蹤中介層"""
async def dispatch(self, request: Request, call_next):
# 生成請求 ID
request_id = str(uuid.uuid4())[:8]
request.state.request_id = request_id
# 記錄開始時間
start_time = time.perf_counter()
# 執行請求
response = await call_next(request)
# 計算處理時間
process_time = time.perf_counter() - start_time
# 記錄日誌
logger.info(
f"[{request_id}] {request.method} {request.url.path} "
f"- {response.status_code} - {process_time:.3f}s"
)
# 添加回應標頭
response.headers["X-Request-ID"] = request_id
response.headers["X-Process-Time"] = str(process_time)
return response
app.add_middleware(RequestTracingMiddleware)詳細指標收集
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware
from dataclasses import dataclass, field
from collections import defaultdict
from typing import Dict, List
import time
import statistics
app = FastAPI()
@dataclass
class EndpointMetrics:
"""端點指標"""
request_count: int = 0
error_count: int = 0
response_times: List[float] = field(default_factory=list)
@property
def avg_response_time(self) -> float:
if not self.response_times:
return 0
return statistics.mean(self.response_times)
@property
def p95_response_time(self) -> float:
if not self.response_times:
return 0
sorted_times = sorted(self.response_times)
index = int(len(sorted_times) * 0.95)
return sorted_times[index] if sorted_times else 0
@property
def error_rate(self) -> float:
if self.request_count == 0:
return 0
return self.error_count / self.request_count * 100
class MetricsCollector:
"""指標收集器"""
def __init__(self, max_samples: int = 1000):
self.endpoints: Dict[str, EndpointMetrics] = defaultdict(EndpointMetrics)
self.max_samples = max_samples
def record(self, path: str, status_code: int, response_time: float):
metrics = self.endpoints[path]
metrics.request_count += 1
if status_code >= 400:
metrics.error_count += 1
metrics.response_times.append(response_time)
# 限制樣本數量
if len(metrics.response_times) > self.max_samples:
metrics.response_times = metrics.response_times[-self.max_samples:]
def get_summary(self) -> dict:
summary = {}
for path, metrics in self.endpoints.items():
summary[path] = {
"request_count": metrics.request_count,
"error_count": metrics.error_count,
"error_rate": f"{metrics.error_rate:.2f}%",
"avg_response_time": f"{metrics.avg_response_time:.3f}s",
"p95_response_time": f"{metrics.p95_response_time:.3f}s"
}
return summary
collector = MetricsCollector()
class MetricsMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
start_time = time.perf_counter()
response = await call_next(request)
response_time = time.perf_counter() - start_time
collector.record(
request.url.path,
response.status_code,
response_time
)
return response
app.add_middleware(MetricsMiddleware)
@app.get("/metrics")
async def get_metrics():
return collector.get_summary()📈 Prometheus 整合
安裝依賴
pip install prometheus-client prometheus-fastapi-instrumentator基本設定
from fastapi import FastAPI
from prometheus_fastapi_instrumentator import Instrumentator
app = FastAPI()
# 啟用 Prometheus 指標
Instrumentator().instrument(app).expose(app)自訂指標
from fastapi import FastAPI, Request
from prometheus_client import Counter, Histogram, Gauge
from prometheus_fastapi_instrumentator import Instrumentator
import time
app = FastAPI()
# 自訂指標
REQUEST_COUNT = Counter(
"app_request_count",
"Application Request Count",
["method", "endpoint", "status"]
)
REQUEST_LATENCY = Histogram(
"app_request_latency_seconds",
"Application Request Latency",
["method", "endpoint"],
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0]
)
ACTIVE_REQUESTS = Gauge(
"app_active_requests",
"Active Requests"
)
DB_CONNECTIONS = Gauge(
"app_db_connections",
"Database Connections",
["state"] # active, idle
)
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
ACTIVE_REQUESTS.inc()
start_time = time.perf_counter()
response = await call_next(request)
latency = time.perf_counter() - start_time
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()
REQUEST_LATENCY.labels(
method=request.method,
endpoint=request.url.path
).observe(latency)
ACTIVE_REQUESTS.dec()
return response
# 啟用內建指標
Instrumentator().instrument(app).expose(app, endpoint="/metrics")🩺 健康檢查
基本健康檢查
from fastapi import FastAPI
from pydantic import BaseModel
from datetime import datetime
from typing import Dict
app = FastAPI()
class HealthStatus(BaseModel):
status: str
timestamp: str
version: str
checks: Dict[str, bool]
@app.get("/health")
async def health_check() -> HealthStatus:
return HealthStatus(
status="healthy",
timestamp=datetime.utcnow().isoformat(),
version="1.0.0",
checks={
"api": True
}
)完整健康檢查
from fastapi import FastAPI
from pydantic import BaseModel
from typing import Dict, Optional
from datetime import datetime
import asyncio
import httpx
import redis.asyncio as redis
from sqlalchemy.ext.asyncio import AsyncSession
app = FastAPI()
class ComponentHealth(BaseModel):
status: str
latency_ms: Optional[float] = None
message: Optional[str] = None
class HealthResponse(BaseModel):
status: str
timestamp: str
uptime_seconds: float
version: str
components: Dict[str, ComponentHealth]
START_TIME = datetime.utcnow()
async def check_database(db: AsyncSession) -> ComponentHealth:
"""檢查資料庫連線"""
try:
start = datetime.utcnow()
await db.execute("SELECT 1")
latency = (datetime.utcnow() - start).total_seconds() * 1000
return ComponentHealth(
status="healthy",
latency_ms=round(latency, 2)
)
except Exception as e:
return ComponentHealth(
status="unhealthy",
message=str(e)
)
async def check_redis(redis_client: redis.Redis) -> ComponentHealth:
"""檢查 Redis 連線"""
try:
start = datetime.utcnow()
await redis_client.ping()
latency = (datetime.utcnow() - start).total_seconds() * 1000
return ComponentHealth(
status="healthy",
latency_ms=round(latency, 2)
)
except Exception as e:
return ComponentHealth(
status="unhealthy",
message=str(e)
)
async def check_external_service(url: str) -> ComponentHealth:
"""檢查外部服務"""
try:
start = datetime.utcnow()
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=5.0)
response.raise_for_status()
latency = (datetime.utcnow() - start).total_seconds() * 1000
return ComponentHealth(
status="healthy",
latency_ms=round(latency, 2)
)
except Exception as e:
return ComponentHealth(
status="unhealthy",
message=str(e)
)
@app.get("/health")
async def health_check() -> HealthResponse:
"""完整健康檢查"""
# 並行檢查所有組件
# 實際應用中應該注入真正的連線
components = {
"api": ComponentHealth(status="healthy")
}
# 計算運行時間
uptime = (datetime.utcnow() - START_TIME).total_seconds()
# 判斷整體狀態
overall_status = "healthy"
for component in components.values():
if component.status != "healthy":
overall_status = "degraded"
break
return HealthResponse(
status=overall_status,
timestamp=datetime.utcnow().isoformat(),
uptime_seconds=round(uptime, 2),
version="1.0.0",
components=components
)
@app.get("/health/live")
async def liveness():
"""存活探針(K8s)"""
return {"status": "alive"}
@app.get("/health/ready")
async def readiness():
"""就緒探針(K8s)"""
# 檢查必要服務是否就緒
return {"status": "ready"}⚡ 效能優化
回應壓縮
from fastapi import FastAPI
from fastapi.middleware.gzip import GZipMiddleware
app = FastAPI()
# 啟用 GZip 壓縮
app.add_middleware(
GZipMiddleware,
minimum_size=1000 # 最小壓縮大小(bytes)
)連線池優化
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
import httpx
app = FastAPI()
# 資料庫連線池
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
pool_size=20, # 連線池大小
max_overflow=10, # 額外連線數
pool_pre_ping=True, # 檢查連線有效性
pool_recycle=3600, # 連線回收時間(秒)
echo=False # 關閉 SQL 日誌
)
# HTTP 客戶端連線池
http_client: httpx.AsyncClient = None
@app.on_event("startup")
async def startup():
global http_client
http_client = httpx.AsyncClient(
limits=httpx.Limits(
max_connections=100,
max_keepalive_connections=20
),
timeout=httpx.Timeout(30.0)
)
@app.on_event("shutdown")
async def shutdown():
await http_client.aclose()
await engine.dispose()快取優化
from fastapi import FastAPI, Depends
from functools import lru_cache
import redis.asyncio as redis
import json
from typing import Optional
app = FastAPI()
class CacheService:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def get(self, key: str) -> Optional[dict]:
data = await self.redis.get(key)
if data:
return json.loads(data)
return None
async def set(
self,
key: str,
value: dict,
ttl: int = 300
):
await self.redis.set(
key,
json.dumps(value),
ex=ttl
)
async def invalidate(self, pattern: str):
keys = await self.redis.keys(pattern)
if keys:
await self.redis.delete(*keys)
# 設定快取(記憶體內快取)
@lru_cache(maxsize=1000)
def get_config(key: str) -> dict:
"""設定快取"""
configs = {
"feature_flags": {"new_ui": True},
"rate_limits": {"api": 100}
}
return configs.get(key, {})
@app.get("/config/{key}")
async def get_configuration(key: str):
return get_config(key)分頁優化
from fastapi import FastAPI, Query
from pydantic import BaseModel
from typing import List, Generic, TypeVar, Optional
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
T = TypeVar("T")
class PaginatedResponse(BaseModel, Generic[T]):
items: List[T]
total: int
page: int
page_size: int
has_next: bool
has_prev: bool
async def paginate(
db: AsyncSession,
query,
page: int = 1,
page_size: int = 20
) -> dict:
"""通用分頁函數"""
# 計算總數(優化:使用 count 而非 len)
count_query = select(func.count()).select_from(query.subquery())
total = await db.scalar(count_query)
# 取得資料
offset = (page - 1) * page_size
items_query = query.offset(offset).limit(page_size)
result = await db.execute(items_query)
items = result.scalars().all()
return {
"items": items,
"total": total,
"page": page,
"page_size": page_size,
"has_next": offset + len(items) < total,
"has_prev": page > 1
}
# 游標分頁(大資料量)
async def cursor_paginate(
db: AsyncSession,
query,
cursor: Optional[int] = None,
limit: int = 20
) -> dict:
"""游標分頁(效能更好)"""
if cursor:
query = query.where(Item.id > cursor)
query = query.order_by(Item.id).limit(limit + 1)
result = await db.execute(query)
items = result.scalars().all()
has_next = len(items) > limit
if has_next:
items = items[:limit]
return {
"items": items,
"next_cursor": items[-1].id if items else None,
"has_next": has_next
}📝 結構化日誌
from fastapi import FastAPI, Request
import logging
import json
from datetime import datetime
import sys
app = FastAPI()
class JSONFormatter(logging.Formatter):
"""JSON 格式的日誌"""
def format(self, record):
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"message": record.getMessage(),
"logger": record.name,
"module": record.module,
"function": record.funcName,
"line": record.lineno
}
if hasattr(record, "request_id"):
log_data["request_id"] = record.request_id
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
return json.dumps(log_data)
# 設定日誌
def setup_logging():
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(JSONFormatter())
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
root_logger.addHandler(handler)
setup_logging()
logger = logging.getLogger(__name__)
@app.middleware("http")
async def logging_middleware(request: Request, call_next):
request_id = request.headers.get("X-Request-ID", "unknown")
# 設定請求 ID 到日誌記錄
old_factory = logging.getLogRecordFactory()
def record_factory(*args, **kwargs):
record = old_factory(*args, **kwargs)
record.request_id = request_id
return record
logging.setLogRecordFactory(record_factory)
logger.info(f"Request started: {request.method} {request.url.path}")
response = await call_next(request)
logger.info(f"Request completed: {response.status_code}")
logging.setLogRecordFactory(old_factory)
return response🔄 效能測試
使用 locust
# locustfile.py
from locust import HttpUser, task, between
class APIUser(HttpUser):
wait_time = between(1, 3)
@task(3)
def get_items(self):
self.client.get("/items")
@task(1)
def create_item(self):
self.client.post("/items", json={
"name": "Test Item",
"price": 99.99
})
@task(2)
def get_item(self):
self.client.get("/items/1")
# 執行:locust -f locustfile.py --host=http://localhost:8000✅ 重點總結
監控指標
| 指標 | 說明 |
|---|---|
| 延遲 | P50, P95, P99 |
| 吞吐量 | RPS |
| 錯誤率 | 4xx/5xx 比例 |
| 資源 | CPU、記憶體 |
優化方向
| 方向 | 方法 |
|---|---|
| 網路 | 壓縮、CDN、連線池 |
| 資料庫 | 索引、快取、分頁 |
| 程式碼 | 非同步、批次處理 |
| 架構 | 水平擴展、負載均衡 |
最佳實踐
- 先監控,再優化
- 識別瓶頸,針對性優化
- 避免過早優化
- 持續監控和迭代
🎤 面試這樣答
Q: 如何優化 FastAPI 應用的效能?
答案:
常見優化方向:
非同步優化
- 使用 async/await
- 並行處理 I/O 操作
快取策略
- Redis 快取熱點資料
- HTTP 快取標頭
資料庫優化
- 連線池
- 索引優化
- 查詢優化(避免 N+1)
網路優化
- GZip 壓縮
- 連線池重用
監控與調優
- Prometheus 指標
- 識別慢端點
- 效能測試(locust)
# 連線池範例 engine = create_async_engine( DATABASE_URL, pool_size=20, max_overflow=10 )
上一篇: 05-7. 背景任務 下一篇: 06-1. 依賴注入基礎
最後更新:2025-12-18