# 

# 04-5. 安全最佳實踐

&gt; ⏱️ **閱讀時間：** 20 分鐘
&gt; 🎯 **難度：** ⭐⭐⭐⭐ (高階)

---

## 🤔 一句話解釋

**安全最佳實踐是一系列防護措施，保護你的 API 免受常見攻擊，包括注入、XSS、CSRF 等。**

---

## 🛡️ OWASP Top 10

```
┌─────────────────────────────────────────────────────────┐
│              OWASP API Security Top 10                  │
├─────────────────────────────────────────────────────────┤
│  1. 失效的物件級授權（BOLA）                              │
│  2. 失效的身份認證                                       │
│  3. 過度的資料暴露                                       │
│  4. 缺乏資源和速率限制                                    │
│  5. 失效的功能級授權                                     │
│  6. 大量指派                                             │
│  7. 安全配置錯誤                                         │
│  8. 注入攻擊                                             │
│  9. 不當的資產管理                                       │
│  10. 不足的日誌和監控                                    │
└─────────────────────────────────────────────────────────┘
```

---

## 1️⃣ 防止 BOLA（物件級授權失效）

### 問題

```python
# ❌ 危險：任何人都可以存取任何使用者的資料
@app.get(&#34;/users/{user_id}&#34;)
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
    return await db.get(User, user_id)
```

### 解決方案

```python
# ✅ 安全：檢查資源擁有者
@app.get(&#34;/users/{user_id}&#34;)
async def get_user(
    user_id: int,
    current_user: User = Depends(get_current_active_user),
    db: AsyncSession = Depends(get_db)
):
    # 只能存取自己的資料，或有權限
    if user_id != current_user.id and not current_user.is_superuser:
        raise HTTPException(status_code=403, detail=&#34;Permission denied&#34;)

    return await db.get(User, user_id)


# ✅ 更好的方式：依賴項
def get_owned_user(
    user_id: int,
    current_user: User = Depends(get_current_active_user),
    db: AsyncSession = Depends(get_db)
) -&gt; User:
    user = await db.get(User, user_id)
    if not user:
        raise HTTPException(status_code=404, detail=&#34;User not found&#34;)

    if user.id != current_user.id and not current_user.is_superuser:
        raise HTTPException(status_code=403, detail=&#34;Permission denied&#34;)

    return user
```

---

## 2️⃣ 防止注入攻擊

### SQL 注入

```python
# ❌ 危險：SQL 注入
@app.get(&#34;/users&#34;)
async def search_users(username: str, db: AsyncSession = Depends(get_db)):
    # 絕對不要這樣做！
    result = await db.execute(
        text(f&#34;SELECT * FROM users WHERE username = &#39;{username}&#39;&#34;)
    )
    return result.fetchall()


# ✅ 安全：使用參數化查詢
@app.get(&#34;/users&#34;)
async def search_users(username: str, db: AsyncSession = Depends(get_db)):
    # SQLAlchemy ORM 自動防止 SQL 注入
    stmt = select(User).where(User.username == username)
    result = await db.execute(stmt)
    return result.scalars().all()


# ✅ 如果必須用原生 SQL
@app.get(&#34;/users&#34;)
async def search_users(username: str, db: AsyncSession = Depends(get_db)):
    # 使用參數綁定
    result = await db.execute(
        text(&#34;SELECT * FROM users WHERE username = :username&#34;),
        {&#34;username&#34;: username}
    )
    return result.fetchall()
```

### 命令注入

```python
import subprocess
import shlex

# ❌ 危險：命令注入
@app.post(&#34;/convert&#34;)
async def convert_file(filename: str):
    # 攻擊者可以傳入 &#34;file.txt; rm -rf /&#34;
    subprocess.run(f&#34;convert {filename}&#34;, shell=True)


# ✅ 安全：不使用 shell
@app.post(&#34;/convert&#34;)
async def convert_file(filename: str):
    # 驗證檔名
    if not re.match(r&#39;^[\w\-\.]&#43;$&#39;, filename):
        raise HTTPException(status_code=400, detail=&#34;Invalid filename&#34;)

    # 不使用 shell=True
    subprocess.run([&#34;convert&#34;, filename], shell=False)
```

---

## 3️⃣ 輸入驗證

### 使用 Pydantic 驗證

```python
from pydantic import BaseModel, EmailStr, Field, field_validator
import re


class UserCreate(BaseModel):
    username: str = Field(
        ...,
        min_length=3,
        max_length=50,
        pattern=r&#39;^[a-zA-Z0-9_]&#43;$&#39;  # 只允許字母數字底線
    )
    email: EmailStr
    password: str = Field(..., min_length=8, max_length=128)

    @field_validator(&#39;username&#39;)
    @classmethod
    def validate_username(cls, v: str) -&gt; str:
        # 防止保留字
        reserved = [&#39;admin&#39;, &#39;root&#39;, &#39;system&#39;, &#39;null&#39;, &#39;undefined&#39;]
        if v.lower() in reserved:
            raise ValueError(&#39;This username is reserved&#39;)
        return v

    @field_validator(&#39;password&#39;)
    @classmethod
    def validate_password(cls, v: str) -&gt; str:
        if not re.search(r&#39;[A-Z]&#39;, v):
            raise ValueError(&#39;Password must contain uppercase letter&#39;)
        if not re.search(r&#39;[a-z]&#39;, v):
            raise ValueError(&#39;Password must contain lowercase letter&#39;)
        if not re.search(r&#39;\d&#39;, v):
            raise ValueError(&#39;Password must contain digit&#39;)
        return v


class PostCreate(BaseModel):
    title: str = Field(..., min_length=1, max_length=200)
    content: str = Field(..., min_length=1, max_length=50000)

    @field_validator(&#39;content&#39;)
    @classmethod
    def sanitize_content(cls, v: str) -&gt; str:
        # 清理 HTML（如果允許 HTML）
        import bleach
        allowed_tags = [&#39;p&#39;, &#39;br&#39;, &#39;strong&#39;, &#39;em&#39;, &#39;a&#39;, &#39;ul&#39;, &#39;ol&#39;, &#39;li&#39;]
        return bleach.clean(v, tags=allowed_tags, strip=True)
```

---

## 4️⃣ 速率限制

### 安裝

```bash
pip install slowapi
```

### 實作

```python
from fastapi import FastAPI, Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

# 建立 Limiter
limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)


# 全域限制
@app.get(&#34;/&#34;)
@limiter.limit(&#34;100/minute&#34;)
async def home(request: Request):
    return {&#34;message&#34;: &#34;Hello&#34;}


# 登入端點更嚴格
@app.post(&#34;/auth/login&#34;)
@limiter.limit(&#34;5/minute&#34;)  # 每分鐘最多 5 次
async def login(request: Request):
    pass


# 根據使用者限制
def get_user_identifier(request: Request) -&gt; str:
    # 已登入使用者用 user_id，未登入用 IP
    user = getattr(request.state, &#39;user&#39;, None)
    if user:
        return f&#34;user:{user.id}&#34;
    return get_remote_address(request)


@app.get(&#34;/api/expensive&#34;)
@limiter.limit(&#34;10/hour&#34;, key_func=get_user_identifier)
async def expensive_operation(request: Request):
    pass
```

### 自訂速率限制

```python
from datetime import datetime, timedelta
import redis.asyncio as redis

redis_client = redis.from_url(&#34;redis://localhost:6379&#34;)


class RateLimiter:
    &#34;&#34;&#34;自訂速率限制器&#34;&#34;&#34;

    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client

    async def is_allowed(
        self,
        key: str,
        limit: int,
        window_seconds: int
    ) -&gt; tuple[bool, int]:
        &#34;&#34;&#34;檢查是否允許請求&#34;&#34;&#34;
        now = datetime.utcnow()
        window_start = now - timedelta(seconds=window_seconds)

        pipe = self.redis.pipeline()

        # 使用 sorted set 儲存請求時間
        redis_key = f&#34;ratelimit:{key}&#34;

        # 移除過期的請求
        pipe.zremrangebyscore(redis_key, 0, window_start.timestamp())

        # 計算當前請求數
        pipe.zcard(redis_key)

        # 執行
        _, current_count = await pipe.execute()

        if current_count &gt;= limit:
            # 計算重試時間
            oldest = await self.redis.zrange(redis_key, 0, 0, withscores=True)
            if oldest:
                retry_after = int(oldest[0][1] &#43; window_seconds - now.timestamp())
                return False, retry_after
            return False, window_seconds

        # 允許請求，記錄這次請求
        await self.redis.zadd(redis_key, {str(now.timestamp()): now.timestamp()})
        await self.redis.expire(redis_key, window_seconds)

        return True, 0


# 依賴項
async def rate_limit(
    request: Request,
    limit: int = 100,
    window: int = 60
):
    &#34;&#34;&#34;速率限制依賴項&#34;&#34;&#34;
    limiter = RateLimiter(redis_client)
    key = get_remote_address(request)

    allowed, retry_after = await limiter.is_allowed(key, limit, window)

    if not allowed:
        raise HTTPException(
            status_code=429,
            detail=&#34;Too many requests&#34;,
            headers={&#34;Retry-After&#34;: str(retry_after)}
        )
```

---

## 5️⃣ CORS 設定

```python
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()

# ❌ 危險：允許所有來源
app.add_middleware(
    CORSMiddleware,
    allow_origins=[&#34;*&#34;],  # 不要在生產環境這樣做！
    allow_credentials=True,
    allow_methods=[&#34;*&#34;],
    allow_headers=[&#34;*&#34;],
)

# ✅ 安全：只允許特定來源
app.add_middleware(
    CORSMiddleware,
    allow_origins=[
        &#34;https://myapp.com&#34;,
        &#34;https://admin.myapp.com&#34;,
    ],
    allow_credentials=True,
    allow_methods=[&#34;GET&#34;, &#34;POST&#34;, &#34;PUT&#34;, &#34;DELETE&#34;],
    allow_headers=[&#34;Authorization&#34;, &#34;Content-Type&#34;],
)
```

---

## 6️⃣ 安全 Headers

```python
from fastapi import FastAPI
from starlette.middleware.base import BaseHTTPMiddleware


class SecurityHeadersMiddleware(BaseHTTPMiddleware):
    &#34;&#34;&#34;安全 Headers 中間件&#34;&#34;&#34;

    async def dispatch(self, request, call_next):
        response = await call_next(request)

        # 防止 XSS
        response.headers[&#34;X-XSS-Protection&#34;] = &#34;1; mode=block&#34;

        # 防止 MIME 類型嗅探
        response.headers[&#34;X-Content-Type-Options&#34;] = &#34;nosniff&#34;

        # 防止 Clickjacking
        response.headers[&#34;X-Frame-Options&#34;] = &#34;DENY&#34;

        # 內容安全政策
        response.headers[&#34;Content-Security-Policy&#34;] = (
            &#34;default-src &#39;self&#39;; &#34;
            &#34;script-src &#39;self&#39;; &#34;
            &#34;style-src &#39;self&#39; &#39;unsafe-inline&#39;; &#34;
            &#34;img-src &#39;self&#39; data: https:; &#34;
        )

        # 強制 HTTPS
        response.headers[&#34;Strict-Transport-Security&#34;] = (
            &#34;max-age=31536000; includeSubDomains&#34;
        )

        # 隱藏伺服器資訊
        response.headers[&#34;Server&#34;] = &#34;WebServer&#34;

        return response


app.add_middleware(SecurityHeadersMiddleware)
```

---

## 7️⃣ 敏感資料處理

### 不要暴露敏感資料

```python
from pydantic import BaseModel, SecretStr


class UserInDB(BaseModel):
    &#34;&#34;&#34;資料庫中的使用者（內部使用）&#34;&#34;&#34;
    id: int
    username: str
    email: str
    hashed_password: str
    is_active: bool


class UserResponse(BaseModel):
    &#34;&#34;&#34;API 回應的使用者（對外）&#34;&#34;&#34;
    id: int
    username: str
    email: str
    is_active: bool

    # 不包含 hashed_password！

    class Config:
        from_attributes = True


# ✅ 只返回需要的欄位
@app.get(&#34;/users/me&#34;, response_model=UserResponse)
async def get_me(current_user: User = Depends(get_current_user)):
    return current_user


# ❌ 危險：可能暴露敏感欄位
@app.get(&#34;/users/me&#34;)
async def get_me_bad(current_user: User = Depends(get_current_user)):
    return current_user  # 可能包含密碼雜湊等敏感資料
```

### 日誌處理

```python
import logging

logger = logging.getLogger(__name__)


class SensitiveDataFilter(logging.Filter):
    &#34;&#34;&#34;過濾敏感資料&#34;&#34;&#34;

    SENSITIVE_FIELDS = [&#39;password&#39;, &#39;token&#39;, &#39;secret&#39;, &#39;credit_card&#39;]

    def filter(self, record):
        if hasattr(record, &#39;msg&#39;):
            for field in self.SENSITIVE_FIELDS:
                if field in str(record.msg).lower():
                    record.msg = &#34;[FILTERED]&#34;
        return True


# 設定 logger
logger.addFilter(SensitiveDataFilter())


# ❌ 不要記錄密碼
logger.info(f&#34;User login attempt: {username}, password: {password}&#34;)

# ✅ 只記錄必要資訊
logger.info(f&#34;User login attempt: {username}&#34;)
```

---

## 8️⃣ 安全設定

```python
# app/core/config.py
from pydantic_settings import BaseSettings
from pydantic import SecretStr


class Settings(BaseSettings):
    &#34;&#34;&#34;應用程式設定&#34;&#34;&#34;

    # 基本設定
    environment: str = &#34;development&#34;
    debug: bool = False

    # 安全設定
    secret_key: SecretStr
    algorithm: str = &#34;HS256&#34;
    access_token_expire_minutes: int = 30

    # 資料庫（使用 SecretStr）
    database_url: SecretStr

    # CORS
    allowed_origins: list[str] = []

    # 速率限制
    rate_limit_per_minute: int = 100

    class Config:
        env_file = &#34;.env&#34;
        env_file_encoding = &#34;utf-8&#34;


settings = Settings()

# 生產環境檢查
if settings.environment == &#34;production&#34;:
    assert not settings.debug, &#34;Debug must be False in production&#34;
    assert len(settings.secret_key.get_secret_value()) &gt;= 32, &#34;Secret key too short&#34;
    assert settings.allowed_origins, &#34;CORS origins must be configured&#34;
```

---

## 📝 安全檢查清單

```
認證安全
☐ 使用強密碼雜湊（bcrypt/argon2）
☐ 實作帳號鎖定（登入失敗次數限制）
☐ JWT 使用短期過期時間
☐ 實作 Token 撤銷機制

授權安全
☐ 檢查資源擁有者（BOLA 防護）
☐ 實作 RBAC 或 ABAC
☐ 最小權限原則

輸入驗證
☐ 使用 Pydantic 驗證所有輸入
☐ 限制字串長度和格式
☐ 防止 SQL/命令注入

API 安全
☐ 實作速率限制
☐ 設定正確的 CORS
☐ 添加安全 Headers
☐ 使用 HTTPS

資料保護
☐ 不暴露敏感資料
☐ 使用 response_model 過濾輸出
☐ 日誌過濾敏感資訊

監控
☐ 記錄所有認證事件
☐ 監控異常行為
☐ 設定告警機制
```

---

## ✅ 重點總結

### 常見攻擊與防護

| 攻擊 | 防護方式 |
|------|----------|
| SQL 注入 | 參數化查詢、ORM |
| XSS | 輸入驗證、CSP Header |
| CSRF | SameSite Cookie、CSRF Token |
| BOLA | 資源擁有者檢查 |
| 暴力破解 | 速率限制、帳號鎖定 |

### 安全原則

1. **最小權限**：只給必要的權限
2. **深度防禦**：多層防護
3. **輸入驗證**：永遠不信任使用者輸入
4. **輸出過濾**：只返回需要的資料

---

## 🎤 面試這樣答

### Q: 如何防止 SQL 注入？

**答案：**

&gt; 1. **使用 ORM**：SQLAlchemy 等 ORM 自動處理參數化
&gt; 2. **參數化查詢**：使用 `:param` 綁定，而非字串拼接
&gt; 3. **輸入驗證**：用 Pydantic 驗證和限制輸入
&gt;
&gt; ```python
&gt; # ❌ 危險
&gt; f&#34;SELECT * FROM users WHERE username = &#39;{username}&#39;&#34;
&gt;
&gt; # ✅ 安全
&gt; select(User).where(User.username == username)
&gt; ```

---

**上一篇：** [04-4. 權限控制](./04-4)
**下一篇：** [04-6. 密碼重設流程](./04-6)

---

最後更新：2025-12-17


---

> 作者: luk  
> URL: https://yoru-karu-blog-lalaluk-52581ac5e0cef170a3c8922c19182ecb6f7bd604.gitlab.io/posts/tutorial/fastapi/04-5/  

