目錄
04-5. 安全最佳實踐
⏱️ 閱讀時間: 20 分鐘 🎯 難度: ⭐⭐⭐⭐ (高階)
🤔 一句話解釋
安全最佳實踐是一系列防護措施,保護你的 API 免受常見攻擊,包括注入、XSS、CSRF 等。
🛡️ OWASP Top 10
┌─────────────────────────────────────────────────────────┐
│ OWASP API Security Top 10 │
├─────────────────────────────────────────────────────────┤
│ 1. 失效的物件級授權(BOLA) │
│ 2. 失效的身份認證 │
│ 3. 過度的資料暴露 │
│ 4. 缺乏資源和速率限制 │
│ 5. 失效的功能級授權 │
│ 6. 大量指派 │
│ 7. 安全配置錯誤 │
│ 8. 注入攻擊 │
│ 9. 不當的資產管理 │
│ 10. 不足的日誌和監控 │
└─────────────────────────────────────────────────────────┘1️⃣ 防止 BOLA(物件級授權失效)
問題
# ❌ 危險:任何人都可以存取任何使用者的資料
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
return await db.get(User, user_id)解決方案
# ✅ 安全:檢查資源擁有者
@app.get("/users/{user_id}")
async def get_user(
user_id: int,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
):
# 只能存取自己的資料,或有權限
if user_id != current_user.id and not current_user.is_superuser:
raise HTTPException(status_code=403, detail="Permission denied")
return await db.get(User, user_id)
# ✅ 更好的方式:依賴項
def get_owned_user(
user_id: int,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
) -> User:
user = await db.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
if user.id != current_user.id and not current_user.is_superuser:
raise HTTPException(status_code=403, detail="Permission denied")
return user2️⃣ 防止注入攻擊
SQL 注入
# ❌ 危險:SQL 注入
@app.get("/users")
async def search_users(username: str, db: AsyncSession = Depends(get_db)):
# 絕對不要這樣做!
result = await db.execute(
text(f"SELECT * FROM users WHERE username = '{username}'")
)
return result.fetchall()
# ✅ 安全:使用參數化查詢
@app.get("/users")
async def search_users(username: str, db: AsyncSession = Depends(get_db)):
# SQLAlchemy ORM 自動防止 SQL 注入
stmt = select(User).where(User.username == username)
result = await db.execute(stmt)
return result.scalars().all()
# ✅ 如果必須用原生 SQL
@app.get("/users")
async def search_users(username: str, db: AsyncSession = Depends(get_db)):
# 使用參數綁定
result = await db.execute(
text("SELECT * FROM users WHERE username = :username"),
{"username": username}
)
return result.fetchall()命令注入
import subprocess
import shlex
# ❌ 危險:命令注入
@app.post("/convert")
async def convert_file(filename: str):
# 攻擊者可以傳入 "file.txt; rm -rf /"
subprocess.run(f"convert {filename}", shell=True)
# ✅ 安全:不使用 shell
@app.post("/convert")
async def convert_file(filename: str):
# 驗證檔名
if not re.match(r'^[\w\-\.]+$', filename):
raise HTTPException(status_code=400, detail="Invalid filename")
# 不使用 shell=True
subprocess.run(["convert", filename], shell=False)3️⃣ 輸入驗證
使用 Pydantic 驗證
from pydantic import BaseModel, EmailStr, Field, field_validator
import re
class UserCreate(BaseModel):
username: str = Field(
...,
min_length=3,
max_length=50,
pattern=r'^[a-zA-Z0-9_]+$' # 只允許字母數字底線
)
email: EmailStr
password: str = Field(..., min_length=8, max_length=128)
@field_validator('username')
@classmethod
def validate_username(cls, v: str) -> str:
# 防止保留字
reserved = ['admin', 'root', 'system', 'null', 'undefined']
if v.lower() in reserved:
raise ValueError('This username is reserved')
return v
@field_validator('password')
@classmethod
def validate_password(cls, v: str) -> str:
if not re.search(r'[A-Z]', v):
raise ValueError('Password must contain uppercase letter')
if not re.search(r'[a-z]', v):
raise ValueError('Password must contain lowercase letter')
if not re.search(r'\d', v):
raise ValueError('Password must contain digit')
return v
class PostCreate(BaseModel):
title: str = Field(..., min_length=1, max_length=200)
content: str = Field(..., min_length=1, max_length=50000)
@field_validator('content')
@classmethod
def sanitize_content(cls, v: str) -> str:
# 清理 HTML(如果允許 HTML)
import bleach
allowed_tags = ['p', 'br', 'strong', 'em', 'a', 'ul', 'ol', 'li']
return bleach.clean(v, tags=allowed_tags, strip=True)4️⃣ 速率限制
安裝
pip install slowapi實作
from fastapi import FastAPI, Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
# 建立 Limiter
limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# 全域限制
@app.get("/")
@limiter.limit("100/minute")
async def home(request: Request):
return {"message": "Hello"}
# 登入端點更嚴格
@app.post("/auth/login")
@limiter.limit("5/minute") # 每分鐘最多 5 次
async def login(request: Request):
pass
# 根據使用者限制
def get_user_identifier(request: Request) -> str:
# 已登入使用者用 user_id,未登入用 IP
user = getattr(request.state, 'user', None)
if user:
return f"user:{user.id}"
return get_remote_address(request)
@app.get("/api/expensive")
@limiter.limit("10/hour", key_func=get_user_identifier)
async def expensive_operation(request: Request):
pass自訂速率限制
from datetime import datetime, timedelta
import redis.asyncio as redis
redis_client = redis.from_url("redis://localhost:6379")
class RateLimiter:
"""自訂速率限制器"""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def is_allowed(
self,
key: str,
limit: int,
window_seconds: int
) -> tuple[bool, int]:
"""檢查是否允許請求"""
now = datetime.utcnow()
window_start = now - timedelta(seconds=window_seconds)
pipe = self.redis.pipeline()
# 使用 sorted set 儲存請求時間
redis_key = f"ratelimit:{key}"
# 移除過期的請求
pipe.zremrangebyscore(redis_key, 0, window_start.timestamp())
# 計算當前請求數
pipe.zcard(redis_key)
# 執行
_, current_count = await pipe.execute()
if current_count >= limit:
# 計算重試時間
oldest = await self.redis.zrange(redis_key, 0, 0, withscores=True)
if oldest:
retry_after = int(oldest[0][1] + window_seconds - now.timestamp())
return False, retry_after
return False, window_seconds
# 允許請求,記錄這次請求
await self.redis.zadd(redis_key, {str(now.timestamp()): now.timestamp()})
await self.redis.expire(redis_key, window_seconds)
return True, 0
# 依賴項
async def rate_limit(
request: Request,
limit: int = 100,
window: int = 60
):
"""速率限制依賴項"""
limiter = RateLimiter(redis_client)
key = get_remote_address(request)
allowed, retry_after = await limiter.is_allowed(key, limit, window)
if not allowed:
raise HTTPException(
status_code=429,
detail="Too many requests",
headers={"Retry-After": str(retry_after)}
)5️⃣ CORS 設定
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
# ❌ 危險:允許所有來源
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 不要在生產環境這樣做!
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ✅ 安全:只允許特定來源
app.add_middleware(
CORSMiddleware,
allow_origins=[
"https://myapp.com",
"https://admin.myapp.com",
],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["Authorization", "Content-Type"],
)6️⃣ 安全 Headers
from fastapi import FastAPI
from starlette.middleware.base import BaseHTTPMiddleware
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
"""安全 Headers 中間件"""
async def dispatch(self, request, call_next):
response = await call_next(request)
# 防止 XSS
response.headers["X-XSS-Protection"] = "1; mode=block"
# 防止 MIME 類型嗅探
response.headers["X-Content-Type-Options"] = "nosniff"
# 防止 Clickjacking
response.headers["X-Frame-Options"] = "DENY"
# 內容安全政策
response.headers["Content-Security-Policy"] = (
"default-src 'self'; "
"script-src 'self'; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data: https:; "
)
# 強制 HTTPS
response.headers["Strict-Transport-Security"] = (
"max-age=31536000; includeSubDomains"
)
# 隱藏伺服器資訊
response.headers["Server"] = "WebServer"
return response
app.add_middleware(SecurityHeadersMiddleware)7️⃣ 敏感資料處理
不要暴露敏感資料
from pydantic import BaseModel, SecretStr
class UserInDB(BaseModel):
"""資料庫中的使用者(內部使用)"""
id: int
username: str
email: str
hashed_password: str
is_active: bool
class UserResponse(BaseModel):
"""API 回應的使用者(對外)"""
id: int
username: str
email: str
is_active: bool
# 不包含 hashed_password!
class Config:
from_attributes = True
# ✅ 只返回需要的欄位
@app.get("/users/me", response_model=UserResponse)
async def get_me(current_user: User = Depends(get_current_user)):
return current_user
# ❌ 危險:可能暴露敏感欄位
@app.get("/users/me")
async def get_me_bad(current_user: User = Depends(get_current_user)):
return current_user # 可能包含密碼雜湊等敏感資料日誌處理
import logging
logger = logging.getLogger(__name__)
class SensitiveDataFilter(logging.Filter):
"""過濾敏感資料"""
SENSITIVE_FIELDS = ['password', 'token', 'secret', 'credit_card']
def filter(self, record):
if hasattr(record, 'msg'):
for field in self.SENSITIVE_FIELDS:
if field in str(record.msg).lower():
record.msg = "[FILTERED]"
return True
# 設定 logger
logger.addFilter(SensitiveDataFilter())
# ❌ 不要記錄密碼
logger.info(f"User login attempt: {username}, password: {password}")
# ✅ 只記錄必要資訊
logger.info(f"User login attempt: {username}")8️⃣ 安全設定
# app/core/config.py
from pydantic_settings import BaseSettings
from pydantic import SecretStr
class Settings(BaseSettings):
"""應用程式設定"""
# 基本設定
environment: str = "development"
debug: bool = False
# 安全設定
secret_key: SecretStr
algorithm: str = "HS256"
access_token_expire_minutes: int = 30
# 資料庫(使用 SecretStr)
database_url: SecretStr
# CORS
allowed_origins: list[str] = []
# 速率限制
rate_limit_per_minute: int = 100
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
settings = Settings()
# 生產環境檢查
if settings.environment == "production":
assert not settings.debug, "Debug must be False in production"
assert len(settings.secret_key.get_secret_value()) >= 32, "Secret key too short"
assert settings.allowed_origins, "CORS origins must be configured"📝 安全檢查清單
認證安全
☐ 使用強密碼雜湊(bcrypt/argon2)
☐ 實作帳號鎖定(登入失敗次數限制)
☐ JWT 使用短期過期時間
☐ 實作 Token 撤銷機制
授權安全
☐ 檢查資源擁有者(BOLA 防護)
☐ 實作 RBAC 或 ABAC
☐ 最小權限原則
輸入驗證
☐ 使用 Pydantic 驗證所有輸入
☐ 限制字串長度和格式
☐ 防止 SQL/命令注入
API 安全
☐ 實作速率限制
☐ 設定正確的 CORS
☐ 添加安全 Headers
☐ 使用 HTTPS
資料保護
☐ 不暴露敏感資料
☐ 使用 response_model 過濾輸出
☐ 日誌過濾敏感資訊
監控
☐ 記錄所有認證事件
☐ 監控異常行為
☐ 設定告警機制✅ 重點總結
常見攻擊與防護
| 攻擊 | 防護方式 |
|---|---|
| SQL 注入 | 參數化查詢、ORM |
| XSS | 輸入驗證、CSP Header |
| CSRF | SameSite Cookie、CSRF Token |
| BOLA | 資源擁有者檢查 |
| 暴力破解 | 速率限制、帳號鎖定 |
安全原則
- 最小權限:只給必要的權限
- 深度防禦:多層防護
- 輸入驗證:永遠不信任使用者輸入
- 輸出過濾:只返回需要的資料
🎤 面試這樣答
Q: 如何防止 SQL 注入?
答案:
- 使用 ORM:SQLAlchemy 等 ORM 自動處理參數化
- 參數化查詢:使用
:param綁定,而非字串拼接- 輸入驗證:用 Pydantic 驗證和限制輸入
# ❌ 危險 f"SELECT * FROM users WHERE username = '{username}'" # ✅ 安全 select(User).where(User.username == username)
上一篇: 04-4. 權限控制 下一篇: 04-6. 密碼重設流程
最後更新:2025-12-17