# 

# 05-8. 效能優化與監控

&gt; ⏱️ **閱讀時間：** 20 分鐘
&gt; 🎯 **難度：** ⭐⭐⭐⭐ (高階)

---

## 🤔 一句話解釋

**效能優化和監控讓你的 API 更快、更穩定，並能及時發現和解決問題。**

---

## 📊 效能監控指標

```
┌─────────────────────────────────────────────────────────┐
│                     效能監控指標                        │
├──────────────┬──────────────────────────────────────────┤
│   延遲       │  P50, P95, P99 回應時間                  │
│   吞吐量     │  每秒請求數 (RPS)                        │
│   錯誤率     │  5xx/4xx 錯誤百分比                      │
│   可用性     │  服務正常運行時間百分比                   │
│   資源使用   │  CPU、記憶體、連線數                      │
└──────────────┴──────────────────────────────────────────┘
```

---

## 🔧 請求追蹤中介層

### 基本追蹤

```python
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware
import time
import logging
import uuid

app = FastAPI()
logger = logging.getLogger(__name__)

class RequestTracingMiddleware(BaseHTTPMiddleware):
    &#34;&#34;&#34;請求追蹤中介層&#34;&#34;&#34;

    async def dispatch(self, request: Request, call_next):
        # 生成請求 ID
        request_id = str(uuid.uuid4())[:8]
        request.state.request_id = request_id

        # 記錄開始時間
        start_time = time.perf_counter()

        # 執行請求
        response = await call_next(request)

        # 計算處理時間
        process_time = time.perf_counter() - start_time

        # 記錄日誌
        logger.info(
            f&#34;[{request_id}] {request.method} {request.url.path} &#34;
            f&#34;- {response.status_code} - {process_time:.3f}s&#34;
        )

        # 添加回應標頭
        response.headers[&#34;X-Request-ID&#34;] = request_id
        response.headers[&#34;X-Process-Time&#34;] = str(process_time)

        return response

app.add_middleware(RequestTracingMiddleware)
```

### 詳細指標收集

```python
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware
from dataclasses import dataclass, field
from collections import defaultdict
from typing import Dict, List
import time
import statistics

app = FastAPI()

@dataclass
class EndpointMetrics:
    &#34;&#34;&#34;端點指標&#34;&#34;&#34;
    request_count: int = 0
    error_count: int = 0
    response_times: List[float] = field(default_factory=list)

    @property
    def avg_response_time(self) -&gt; float:
        if not self.response_times:
            return 0
        return statistics.mean(self.response_times)

    @property
    def p95_response_time(self) -&gt; float:
        if not self.response_times:
            return 0
        sorted_times = sorted(self.response_times)
        index = int(len(sorted_times) * 0.95)
        return sorted_times[index] if sorted_times else 0

    @property
    def error_rate(self) -&gt; float:
        if self.request_count == 0:
            return 0
        return self.error_count / self.request_count * 100

class MetricsCollector:
    &#34;&#34;&#34;指標收集器&#34;&#34;&#34;

    def __init__(self, max_samples: int = 1000):
        self.endpoints: Dict[str, EndpointMetrics] = defaultdict(EndpointMetrics)
        self.max_samples = max_samples

    def record(self, path: str, status_code: int, response_time: float):
        metrics = self.endpoints[path]
        metrics.request_count &#43;= 1

        if status_code &gt;= 400:
            metrics.error_count &#43;= 1

        metrics.response_times.append(response_time)

        # 限制樣本數量
        if len(metrics.response_times) &gt; self.max_samples:
            metrics.response_times = metrics.response_times[-self.max_samples:]

    def get_summary(self) -&gt; dict:
        summary = {}
        for path, metrics in self.endpoints.items():
            summary[path] = {
                &#34;request_count&#34;: metrics.request_count,
                &#34;error_count&#34;: metrics.error_count,
                &#34;error_rate&#34;: f&#34;{metrics.error_rate:.2f}%&#34;,
                &#34;avg_response_time&#34;: f&#34;{metrics.avg_response_time:.3f}s&#34;,
                &#34;p95_response_time&#34;: f&#34;{metrics.p95_response_time:.3f}s&#34;
            }
        return summary

collector = MetricsCollector()

class MetricsMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        start_time = time.perf_counter()

        response = await call_next(request)

        response_time = time.perf_counter() - start_time
        collector.record(
            request.url.path,
            response.status_code,
            response_time
        )

        return response

app.add_middleware(MetricsMiddleware)

@app.get(&#34;/metrics&#34;)
async def get_metrics():
    return collector.get_summary()
```

---

## 📈 Prometheus 整合

### 安裝依賴

```bash
pip install prometheus-client prometheus-fastapi-instrumentator
```

### 基本設定

```python
from fastapi import FastAPI
from prometheus_fastapi_instrumentator import Instrumentator

app = FastAPI()

# 啟用 Prometheus 指標
Instrumentator().instrument(app).expose(app)
```

### 自訂指標

```python
from fastapi import FastAPI, Request
from prometheus_client import Counter, Histogram, Gauge
from prometheus_fastapi_instrumentator import Instrumentator
import time

app = FastAPI()

# 自訂指標
REQUEST_COUNT = Counter(
    &#34;app_request_count&#34;,
    &#34;Application Request Count&#34;,
    [&#34;method&#34;, &#34;endpoint&#34;, &#34;status&#34;]
)

REQUEST_LATENCY = Histogram(
    &#34;app_request_latency_seconds&#34;,
    &#34;Application Request Latency&#34;,
    [&#34;method&#34;, &#34;endpoint&#34;],
    buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0]
)

ACTIVE_REQUESTS = Gauge(
    &#34;app_active_requests&#34;,
    &#34;Active Requests&#34;
)

DB_CONNECTIONS = Gauge(
    &#34;app_db_connections&#34;,
    &#34;Database Connections&#34;,
    [&#34;state&#34;]  # active, idle
)

@app.middleware(&#34;http&#34;)
async def metrics_middleware(request: Request, call_next):
    ACTIVE_REQUESTS.inc()

    start_time = time.perf_counter()
    response = await call_next(request)
    latency = time.perf_counter() - start_time

    REQUEST_COUNT.labels(
        method=request.method,
        endpoint=request.url.path,
        status=response.status_code
    ).inc()

    REQUEST_LATENCY.labels(
        method=request.method,
        endpoint=request.url.path
    ).observe(latency)

    ACTIVE_REQUESTS.dec()

    return response

# 啟用內建指標
Instrumentator().instrument(app).expose(app, endpoint=&#34;/metrics&#34;)
```

---

## 🩺 健康檢查

### 基本健康檢查

```python
from fastapi import FastAPI
from pydantic import BaseModel
from datetime import datetime
from typing import Dict

app = FastAPI()

class HealthStatus(BaseModel):
    status: str
    timestamp: str
    version: str
    checks: Dict[str, bool]

@app.get(&#34;/health&#34;)
async def health_check() -&gt; HealthStatus:
    return HealthStatus(
        status=&#34;healthy&#34;,
        timestamp=datetime.utcnow().isoformat(),
        version=&#34;1.0.0&#34;,
        checks={
            &#34;api&#34;: True
        }
    )
```

### 完整健康檢查

```python
from fastapi import FastAPI
from pydantic import BaseModel
from typing import Dict, Optional
from datetime import datetime
import asyncio
import httpx
import redis.asyncio as redis
from sqlalchemy.ext.asyncio import AsyncSession

app = FastAPI()

class ComponentHealth(BaseModel):
    status: str
    latency_ms: Optional[float] = None
    message: Optional[str] = None

class HealthResponse(BaseModel):
    status: str
    timestamp: str
    uptime_seconds: float
    version: str
    components: Dict[str, ComponentHealth]

START_TIME = datetime.utcnow()

async def check_database(db: AsyncSession) -&gt; ComponentHealth:
    &#34;&#34;&#34;檢查資料庫連線&#34;&#34;&#34;
    try:
        start = datetime.utcnow()
        await db.execute(&#34;SELECT 1&#34;)
        latency = (datetime.utcnow() - start).total_seconds() * 1000

        return ComponentHealth(
            status=&#34;healthy&#34;,
            latency_ms=round(latency, 2)
        )
    except Exception as e:
        return ComponentHealth(
            status=&#34;unhealthy&#34;,
            message=str(e)
        )

async def check_redis(redis_client: redis.Redis) -&gt; ComponentHealth:
    &#34;&#34;&#34;檢查 Redis 連線&#34;&#34;&#34;
    try:
        start = datetime.utcnow()
        await redis_client.ping()
        latency = (datetime.utcnow() - start).total_seconds() * 1000

        return ComponentHealth(
            status=&#34;healthy&#34;,
            latency_ms=round(latency, 2)
        )
    except Exception as e:
        return ComponentHealth(
            status=&#34;unhealthy&#34;,
            message=str(e)
        )

async def check_external_service(url: str) -&gt; ComponentHealth:
    &#34;&#34;&#34;檢查外部服務&#34;&#34;&#34;
    try:
        start = datetime.utcnow()
        async with httpx.AsyncClient() as client:
            response = await client.get(url, timeout=5.0)
            response.raise_for_status()

        latency = (datetime.utcnow() - start).total_seconds() * 1000

        return ComponentHealth(
            status=&#34;healthy&#34;,
            latency_ms=round(latency, 2)
        )
    except Exception as e:
        return ComponentHealth(
            status=&#34;unhealthy&#34;,
            message=str(e)
        )

@app.get(&#34;/health&#34;)
async def health_check() -&gt; HealthResponse:
    &#34;&#34;&#34;完整健康檢查&#34;&#34;&#34;
    # 並行檢查所有組件
    # 實際應用中應該注入真正的連線
    components = {
        &#34;api&#34;: ComponentHealth(status=&#34;healthy&#34;)
    }

    # 計算運行時間
    uptime = (datetime.utcnow() - START_TIME).total_seconds()

    # 判斷整體狀態
    overall_status = &#34;healthy&#34;
    for component in components.values():
        if component.status != &#34;healthy&#34;:
            overall_status = &#34;degraded&#34;
            break

    return HealthResponse(
        status=overall_status,
        timestamp=datetime.utcnow().isoformat(),
        uptime_seconds=round(uptime, 2),
        version=&#34;1.0.0&#34;,
        components=components
    )

@app.get(&#34;/health/live&#34;)
async def liveness():
    &#34;&#34;&#34;存活探針（K8s）&#34;&#34;&#34;
    return {&#34;status&#34;: &#34;alive&#34;}

@app.get(&#34;/health/ready&#34;)
async def readiness():
    &#34;&#34;&#34;就緒探針（K8s）&#34;&#34;&#34;
    # 檢查必要服務是否就緒
    return {&#34;status&#34;: &#34;ready&#34;}
```

---

## ⚡ 效能優化

### 回應壓縮

```python
from fastapi import FastAPI
from fastapi.middleware.gzip import GZipMiddleware

app = FastAPI()

# 啟用 GZip 壓縮
app.add_middleware(
    GZipMiddleware,
    minimum_size=1000  # 最小壓縮大小（bytes）
)
```

### 連線池優化

```python
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
import httpx

app = FastAPI()

# 資料庫連線池
engine = create_async_engine(
    &#34;postgresql&#43;asyncpg://user:pass@localhost/db&#34;,
    pool_size=20,              # 連線池大小
    max_overflow=10,           # 額外連線數
    pool_pre_ping=True,        # 檢查連線有效性
    pool_recycle=3600,         # 連線回收時間（秒）
    echo=False                 # 關閉 SQL 日誌
)

# HTTP 客戶端連線池
http_client: httpx.AsyncClient = None

@app.on_event(&#34;startup&#34;)
async def startup():
    global http_client
    http_client = httpx.AsyncClient(
        limits=httpx.Limits(
            max_connections=100,
            max_keepalive_connections=20
        ),
        timeout=httpx.Timeout(30.0)
    )

@app.on_event(&#34;shutdown&#34;)
async def shutdown():
    await http_client.aclose()
    await engine.dispose()
```

### 快取優化

```python
from fastapi import FastAPI, Depends
from functools import lru_cache
import redis.asyncio as redis
import json
from typing import Optional

app = FastAPI()

class CacheService:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client

    async def get(self, key: str) -&gt; Optional[dict]:
        data = await self.redis.get(key)
        if data:
            return json.loads(data)
        return None

    async def set(
        self,
        key: str,
        value: dict,
        ttl: int = 300
    ):
        await self.redis.set(
            key,
            json.dumps(value),
            ex=ttl
        )

    async def invalidate(self, pattern: str):
        keys = await self.redis.keys(pattern)
        if keys:
            await self.redis.delete(*keys)

# 設定快取（記憶體內快取）
@lru_cache(maxsize=1000)
def get_config(key: str) -&gt; dict:
    &#34;&#34;&#34;設定快取&#34;&#34;&#34;
    configs = {
        &#34;feature_flags&#34;: {&#34;new_ui&#34;: True},
        &#34;rate_limits&#34;: {&#34;api&#34;: 100}
    }
    return configs.get(key, {})

@app.get(&#34;/config/{key}&#34;)
async def get_configuration(key: str):
    return get_config(key)
```

### 分頁優化

```python
from fastapi import FastAPI, Query
from pydantic import BaseModel
from typing import List, Generic, TypeVar, Optional
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession

T = TypeVar(&#34;T&#34;)

class PaginatedResponse(BaseModel, Generic[T]):
    items: List[T]
    total: int
    page: int
    page_size: int
    has_next: bool
    has_prev: bool

async def paginate(
    db: AsyncSession,
    query,
    page: int = 1,
    page_size: int = 20
) -&gt; dict:
    &#34;&#34;&#34;通用分頁函數&#34;&#34;&#34;
    # 計算總數（優化：使用 count 而非 len）
    count_query = select(func.count()).select_from(query.subquery())
    total = await db.scalar(count_query)

    # 取得資料
    offset = (page - 1) * page_size
    items_query = query.offset(offset).limit(page_size)
    result = await db.execute(items_query)
    items = result.scalars().all()

    return {
        &#34;items&#34;: items,
        &#34;total&#34;: total,
        &#34;page&#34;: page,
        &#34;page_size&#34;: page_size,
        &#34;has_next&#34;: offset &#43; len(items) &lt; total,
        &#34;has_prev&#34;: page &gt; 1
    }

# 游標分頁（大資料量）
async def cursor_paginate(
    db: AsyncSession,
    query,
    cursor: Optional[int] = None,
    limit: int = 20
) -&gt; dict:
    &#34;&#34;&#34;游標分頁（效能更好）&#34;&#34;&#34;
    if cursor:
        query = query.where(Item.id &gt; cursor)

    query = query.order_by(Item.id).limit(limit &#43; 1)
    result = await db.execute(query)
    items = result.scalars().all()

    has_next = len(items) &gt; limit
    if has_next:
        items = items[:limit]

    return {
        &#34;items&#34;: items,
        &#34;next_cursor&#34;: items[-1].id if items else None,
        &#34;has_next&#34;: has_next
    }
```

---

## 📝 結構化日誌

```python
from fastapi import FastAPI, Request
import logging
import json
from datetime import datetime
import sys

app = FastAPI()

class JSONFormatter(logging.Formatter):
    &#34;&#34;&#34;JSON 格式的日誌&#34;&#34;&#34;

    def format(self, record):
        log_data = {
            &#34;timestamp&#34;: datetime.utcnow().isoformat(),
            &#34;level&#34;: record.levelname,
            &#34;message&#34;: record.getMessage(),
            &#34;logger&#34;: record.name,
            &#34;module&#34;: record.module,
            &#34;function&#34;: record.funcName,
            &#34;line&#34;: record.lineno
        }

        if hasattr(record, &#34;request_id&#34;):
            log_data[&#34;request_id&#34;] = record.request_id

        if record.exc_info:
            log_data[&#34;exception&#34;] = self.formatException(record.exc_info)

        return json.dumps(log_data)

# 設定日誌
def setup_logging():
    handler = logging.StreamHandler(sys.stdout)
    handler.setFormatter(JSONFormatter())

    root_logger = logging.getLogger()
    root_logger.setLevel(logging.INFO)
    root_logger.addHandler(handler)

setup_logging()
logger = logging.getLogger(__name__)

@app.middleware(&#34;http&#34;)
async def logging_middleware(request: Request, call_next):
    request_id = request.headers.get(&#34;X-Request-ID&#34;, &#34;unknown&#34;)

    # 設定請求 ID 到日誌記錄
    old_factory = logging.getLogRecordFactory()

    def record_factory(*args, **kwargs):
        record = old_factory(*args, **kwargs)
        record.request_id = request_id
        return record

    logging.setLogRecordFactory(record_factory)

    logger.info(f&#34;Request started: {request.method} {request.url.path}&#34;)

    response = await call_next(request)

    logger.info(f&#34;Request completed: {response.status_code}&#34;)

    logging.setLogRecordFactory(old_factory)

    return response
```

---

## 🔄 效能測試

### 使用 locust

```python
# locustfile.py
from locust import HttpUser, task, between

class APIUser(HttpUser):
    wait_time = between(1, 3)

    @task(3)
    def get_items(self):
        self.client.get(&#34;/items&#34;)

    @task(1)
    def create_item(self):
        self.client.post(&#34;/items&#34;, json={
            &#34;name&#34;: &#34;Test Item&#34;,
            &#34;price&#34;: 99.99
        })

    @task(2)
    def get_item(self):
        self.client.get(&#34;/items/1&#34;)

# 執行：locust -f locustfile.py --host=http://localhost:8000
```

---

## ✅ 重點總結

### 監控指標

| 指標 | 說明 |
|------|------|
| **延遲** | P50, P95, P99 |
| **吞吐量** | RPS |
| **錯誤率** | 4xx/5xx 比例 |
| **資源** | CPU、記憶體 |

### 優化方向

| 方向 | 方法 |
|------|------|
| **網路** | 壓縮、CDN、連線池 |
| **資料庫** | 索引、快取、分頁 |
| **程式碼** | 非同步、批次處理 |
| **架構** | 水平擴展、負載均衡 |

### 最佳實踐

1. 先監控，再優化
2. 識別瓶頸，針對性優化
3. 避免過早優化
4. 持續監控和迭代

---

## 🎤 面試這樣答

### Q: 如何優化 FastAPI 應用的效能？

**答案：**

&gt; **常見優化方向：**
&gt;
&gt; 1. **非同步優化**
&gt;    - 使用 async/await
&gt;    - 並行處理 I/O 操作
&gt;
&gt; 2. **快取策略**
&gt;    - Redis 快取熱點資料
&gt;    - HTTP 快取標頭
&gt;
&gt; 3. **資料庫優化**
&gt;    - 連線池
&gt;    - 索引優化
&gt;    - 查詢優化（避免 N&#43;1）
&gt;
&gt; 4. **網路優化**
&gt;    - GZip 壓縮
&gt;    - 連線池重用
&gt;
&gt; 5. **監控與調優**
&gt;    - Prometheus 指標
&gt;    - 識別慢端點
&gt;    - 效能測試（locust）
&gt;
&gt; ```python
&gt; # 連線池範例
&gt; engine = create_async_engine(
&gt;     DATABASE_URL,
&gt;     pool_size=20,
&gt;     max_overflow=10
&gt; )
&gt; ```

---

**上一篇：** [05-7. 背景任務](./05-7)
**下一篇：** [06-1. 依賴注入基礎](./06-1)

---

最後更新：2025-12-18


---

> 作者: luk  
> URL: https://yoru-karu-blog-lalaluk-52581ac5e0cef170a3c8922c19182ecb6f7bd604.gitlab.io/posts/tutorial/fastapi/05-8/  

