04-5. 安全最佳實踐

⏱️ 閱讀時間: 20 分鐘 🎯 難度: ⭐⭐⭐⭐ (高階)


🤔 一句話解釋

安全最佳實踐是一系列防護措施,保護你的 API 免受常見攻擊,包括注入、XSS、CSRF 等。


🛡️ OWASP Top 10

┌─────────────────────────────────────────────────────────┐
│              OWASP API Security Top 10                  │
├─────────────────────────────────────────────────────────┤
│  1. 失效的物件級授權(BOLA)                              │
│  2. 失效的身份認證                                       │
│  3. 過度的資料暴露                                       │
│  4. 缺乏資源和速率限制                                    │
│  5. 失效的功能級授權                                     │
│  6. 大量指派                                             │
│  7. 安全配置錯誤                                         │
│  8. 注入攻擊                                             │
│  9. 不當的資產管理                                       │
│  10. 不足的日誌和監控                                    │
└─────────────────────────────────────────────────────────┘

1️⃣ 防止 BOLA(物件級授權失效)

問題

# ❌ 危險:任何人都可以存取任何使用者的資料
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
    return await db.get(User, user_id)

解決方案

# ✅ 安全:檢查資源擁有者
@app.get("/users/{user_id}")
async def get_user(
    user_id: int,
    current_user: User = Depends(get_current_active_user),
    db: AsyncSession = Depends(get_db)
):
    # 只能存取自己的資料,或有權限
    if user_id != current_user.id and not current_user.is_superuser:
        raise HTTPException(status_code=403, detail="Permission denied")

    return await db.get(User, user_id)


# ✅ 更好的方式:依賴項
def get_owned_user(
    user_id: int,
    current_user: User = Depends(get_current_active_user),
    db: AsyncSession = Depends(get_db)
) -> User:
    user = await db.get(User, user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")

    if user.id != current_user.id and not current_user.is_superuser:
        raise HTTPException(status_code=403, detail="Permission denied")

    return user

2️⃣ 防止注入攻擊

SQL 注入

# ❌ 危險:SQL 注入
@app.get("/users")
async def search_users(username: str, db: AsyncSession = Depends(get_db)):
    # 絕對不要這樣做!
    result = await db.execute(
        text(f"SELECT * FROM users WHERE username = '{username}'")
    )
    return result.fetchall()


# ✅ 安全:使用參數化查詢
@app.get("/users")
async def search_users(username: str, db: AsyncSession = Depends(get_db)):
    # SQLAlchemy ORM 自動防止 SQL 注入
    stmt = select(User).where(User.username == username)
    result = await db.execute(stmt)
    return result.scalars().all()


# ✅ 如果必須用原生 SQL
@app.get("/users")
async def search_users(username: str, db: AsyncSession = Depends(get_db)):
    # 使用參數綁定
    result = await db.execute(
        text("SELECT * FROM users WHERE username = :username"),
        {"username": username}
    )
    return result.fetchall()

命令注入

import subprocess
import shlex

# ❌ 危險:命令注入
@app.post("/convert")
async def convert_file(filename: str):
    # 攻擊者可以傳入 "file.txt; rm -rf /"
    subprocess.run(f"convert {filename}", shell=True)


# ✅ 安全:不使用 shell
@app.post("/convert")
async def convert_file(filename: str):
    # 驗證檔名
    if not re.match(r'^[\w\-\.]+$', filename):
        raise HTTPException(status_code=400, detail="Invalid filename")

    # 不使用 shell=True
    subprocess.run(["convert", filename], shell=False)

3️⃣ 輸入驗證

使用 Pydantic 驗證

from pydantic import BaseModel, EmailStr, Field, field_validator
import re


class UserCreate(BaseModel):
    username: str = Field(
        ...,
        min_length=3,
        max_length=50,
        pattern=r'^[a-zA-Z0-9_]+$'  # 只允許字母數字底線
    )
    email: EmailStr
    password: str = Field(..., min_length=8, max_length=128)

    @field_validator('username')
    @classmethod
    def validate_username(cls, v: str) -> str:
        # 防止保留字
        reserved = ['admin', 'root', 'system', 'null', 'undefined']
        if v.lower() in reserved:
            raise ValueError('This username is reserved')
        return v

    @field_validator('password')
    @classmethod
    def validate_password(cls, v: str) -> str:
        if not re.search(r'[A-Z]', v):
            raise ValueError('Password must contain uppercase letter')
        if not re.search(r'[a-z]', v):
            raise ValueError('Password must contain lowercase letter')
        if not re.search(r'\d', v):
            raise ValueError('Password must contain digit')
        return v


class PostCreate(BaseModel):
    title: str = Field(..., min_length=1, max_length=200)
    content: str = Field(..., min_length=1, max_length=50000)

    @field_validator('content')
    @classmethod
    def sanitize_content(cls, v: str) -> str:
        # 清理 HTML(如果允許 HTML)
        import bleach
        allowed_tags = ['p', 'br', 'strong', 'em', 'a', 'ul', 'ol', 'li']
        return bleach.clean(v, tags=allowed_tags, strip=True)

4️⃣ 速率限制

安裝

pip install slowapi

實作

from fastapi import FastAPI, Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

# 建立 Limiter
limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)


# 全域限制
@app.get("/")
@limiter.limit("100/minute")
async def home(request: Request):
    return {"message": "Hello"}


# 登入端點更嚴格
@app.post("/auth/login")
@limiter.limit("5/minute")  # 每分鐘最多 5 次
async def login(request: Request):
    pass


# 根據使用者限制
def get_user_identifier(request: Request) -> str:
    # 已登入使用者用 user_id,未登入用 IP
    user = getattr(request.state, 'user', None)
    if user:
        return f"user:{user.id}"
    return get_remote_address(request)


@app.get("/api/expensive")
@limiter.limit("10/hour", key_func=get_user_identifier)
async def expensive_operation(request: Request):
    pass

自訂速率限制

from datetime import datetime, timedelta
import redis.asyncio as redis

redis_client = redis.from_url("redis://localhost:6379")


class RateLimiter:
    """自訂速率限制器"""

    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client

    async def is_allowed(
        self,
        key: str,
        limit: int,
        window_seconds: int
    ) -> tuple[bool, int]:
        """檢查是否允許請求"""
        now = datetime.utcnow()
        window_start = now - timedelta(seconds=window_seconds)

        pipe = self.redis.pipeline()

        # 使用 sorted set 儲存請求時間
        redis_key = f"ratelimit:{key}"

        # 移除過期的請求
        pipe.zremrangebyscore(redis_key, 0, window_start.timestamp())

        # 計算當前請求數
        pipe.zcard(redis_key)

        # 執行
        _, current_count = await pipe.execute()

        if current_count >= limit:
            # 計算重試時間
            oldest = await self.redis.zrange(redis_key, 0, 0, withscores=True)
            if oldest:
                retry_after = int(oldest[0][1] + window_seconds - now.timestamp())
                return False, retry_after
            return False, window_seconds

        # 允許請求,記錄這次請求
        await self.redis.zadd(redis_key, {str(now.timestamp()): now.timestamp()})
        await self.redis.expire(redis_key, window_seconds)

        return True, 0


# 依賴項
async def rate_limit(
    request: Request,
    limit: int = 100,
    window: int = 60
):
    """速率限制依賴項"""
    limiter = RateLimiter(redis_client)
    key = get_remote_address(request)

    allowed, retry_after = await limiter.is_allowed(key, limit, window)

    if not allowed:
        raise HTTPException(
            status_code=429,
            detail="Too many requests",
            headers={"Retry-After": str(retry_after)}
        )

5️⃣ CORS 設定

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()

# ❌ 危險:允許所有來源
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # 不要在生產環境這樣做!
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# ✅ 安全:只允許特定來源
app.add_middleware(
    CORSMiddleware,
    allow_origins=[
        "https://myapp.com",
        "https://admin.myapp.com",
    ],
    allow_credentials=True,
    allow_methods=["GET", "POST", "PUT", "DELETE"],
    allow_headers=["Authorization", "Content-Type"],
)

6️⃣ 安全 Headers

from fastapi import FastAPI
from starlette.middleware.base import BaseHTTPMiddleware


class SecurityHeadersMiddleware(BaseHTTPMiddleware):
    """安全 Headers 中間件"""

    async def dispatch(self, request, call_next):
        response = await call_next(request)

        # 防止 XSS
        response.headers["X-XSS-Protection"] = "1; mode=block"

        # 防止 MIME 類型嗅探
        response.headers["X-Content-Type-Options"] = "nosniff"

        # 防止 Clickjacking
        response.headers["X-Frame-Options"] = "DENY"

        # 內容安全政策
        response.headers["Content-Security-Policy"] = (
            "default-src 'self'; "
            "script-src 'self'; "
            "style-src 'self' 'unsafe-inline'; "
            "img-src 'self' data: https:; "
        )

        # 強制 HTTPS
        response.headers["Strict-Transport-Security"] = (
            "max-age=31536000; includeSubDomains"
        )

        # 隱藏伺服器資訊
        response.headers["Server"] = "WebServer"

        return response


app.add_middleware(SecurityHeadersMiddleware)

7️⃣ 敏感資料處理

不要暴露敏感資料

from pydantic import BaseModel, SecretStr


class UserInDB(BaseModel):
    """資料庫中的使用者(內部使用)"""
    id: int
    username: str
    email: str
    hashed_password: str
    is_active: bool


class UserResponse(BaseModel):
    """API 回應的使用者(對外)"""
    id: int
    username: str
    email: str
    is_active: bool

    # 不包含 hashed_password!

    class Config:
        from_attributes = True


# ✅ 只返回需要的欄位
@app.get("/users/me", response_model=UserResponse)
async def get_me(current_user: User = Depends(get_current_user)):
    return current_user


# ❌ 危險:可能暴露敏感欄位
@app.get("/users/me")
async def get_me_bad(current_user: User = Depends(get_current_user)):
    return current_user  # 可能包含密碼雜湊等敏感資料

日誌處理

import logging

logger = logging.getLogger(__name__)


class SensitiveDataFilter(logging.Filter):
    """過濾敏感資料"""

    SENSITIVE_FIELDS = ['password', 'token', 'secret', 'credit_card']

    def filter(self, record):
        if hasattr(record, 'msg'):
            for field in self.SENSITIVE_FIELDS:
                if field in str(record.msg).lower():
                    record.msg = "[FILTERED]"
        return True


# 設定 logger
logger.addFilter(SensitiveDataFilter())


# ❌ 不要記錄密碼
logger.info(f"User login attempt: {username}, password: {password}")

# ✅ 只記錄必要資訊
logger.info(f"User login attempt: {username}")

8️⃣ 安全設定

# app/core/config.py
from pydantic_settings import BaseSettings
from pydantic import SecretStr


class Settings(BaseSettings):
    """應用程式設定"""

    # 基本設定
    environment: str = "development"
    debug: bool = False

    # 安全設定
    secret_key: SecretStr
    algorithm: str = "HS256"
    access_token_expire_minutes: int = 30

    # 資料庫(使用 SecretStr)
    database_url: SecretStr

    # CORS
    allowed_origins: list[str] = []

    # 速率限制
    rate_limit_per_minute: int = 100

    class Config:
        env_file = ".env"
        env_file_encoding = "utf-8"


settings = Settings()

# 生產環境檢查
if settings.environment == "production":
    assert not settings.debug, "Debug must be False in production"
    assert len(settings.secret_key.get_secret_value()) >= 32, "Secret key too short"
    assert settings.allowed_origins, "CORS origins must be configured"

📝 安全檢查清單

認證安全
☐ 使用強密碼雜湊(bcrypt/argon2)
☐ 實作帳號鎖定(登入失敗次數限制)
☐ JWT 使用短期過期時間
☐ 實作 Token 撤銷機制

授權安全
☐ 檢查資源擁有者(BOLA 防護)
☐ 實作 RBAC 或 ABAC
☐ 最小權限原則

輸入驗證
☐ 使用 Pydantic 驗證所有輸入
☐ 限制字串長度和格式
☐ 防止 SQL/命令注入

API 安全
☐ 實作速率限制
☐ 設定正確的 CORS
☐ 添加安全 Headers
☐ 使用 HTTPS

資料保護
☐ 不暴露敏感資料
☐ 使用 response_model 過濾輸出
☐ 日誌過濾敏感資訊

監控
☐ 記錄所有認證事件
☐ 監控異常行為
☐ 設定告警機制

✅ 重點總結

常見攻擊與防護

攻擊防護方式
SQL 注入參數化查詢、ORM
XSS輸入驗證、CSP Header
CSRFSameSite Cookie、CSRF Token
BOLA資源擁有者檢查
暴力破解速率限制、帳號鎖定

安全原則

  1. 最小權限:只給必要的權限
  2. 深度防禦:多層防護
  3. 輸入驗證:永遠不信任使用者輸入
  4. 輸出過濾:只返回需要的資料

🎤 面試這樣答

Q: 如何防止 SQL 注入?

答案:

  1. 使用 ORM:SQLAlchemy 等 ORM 自動處理參數化
  2. 參數化查詢:使用 :param 綁定,而非字串拼接
  3. 輸入驗證:用 Pydantic 驗證和限制輸入
# ❌ 危險
f"SELECT * FROM users WHERE username = '{username}'"

# ✅ 安全
select(User).where(User.username == username)

上一篇: 04-4. 權限控制 下一篇: 04-6. 密碼重設流程


最後更新:2025-12-17

0%