目錄
04-2. JWT 認證
⏱️ 閱讀時間: 20 分鐘 🎯 難度: ⭐⭐⭐ (進階)
🤔 一句話解釋
JWT(JSON Web Token)是一種自包含的 Token,包含了使用者資訊,伺服器不需要查詢資料庫就能驗證身份。
🔍 JWT 結構
JWT = Header.Payload.Signature
┌─────────────────────────────────────────────────────────┐
│ eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9 │ ← Header
│ . │
│ eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4ifQ │ ← Payload
│ . │
│ SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c │ ← Signature
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ Header(標頭) │
│ { │
│ "alg": "HS256", // 簽名演算法 │
│ "typ": "JWT" // Token 類型 │
│ } │
├─────────────────────────────────────────────────────────┤
│ Payload(載荷) │
│ { │
│ "sub": "1234567890", // Subject(使用者 ID) │
│ "name": "John", // 自訂資料 │
│ "exp": 1516239022, // 過期時間 │
│ "iat": 1516239022 // 簽發時間 │
│ } │
├─────────────────────────────────────────────────────────┤
│ Signature(簽名) │
│ HMACSHA256( │
│ base64UrlEncode(header) + "." + │
│ base64UrlEncode(payload), │
│ secret │
│ ) │
└─────────────────────────────────────────────────────────┘📦 安裝
pip install python-jose[cryptography]
pip install passlib[bcrypt]🔧 JWT 工具類別
# app/core/security.py
from datetime import datetime, timedelta
from typing import Any
from jose import jwt, JWTError
from passlib.context import CryptContext
from pydantic import BaseModel
# 設定
SECRET_KEY = "your-super-secret-key-keep-it-safe"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7
# 密碼雜湊
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
class TokenPayload(BaseModel):
"""Token 載荷"""
sub: str # Subject(使用者 ID)
exp: datetime # 過期時間
type: str # Token 類型(access/refresh)
def hash_password(password: str) -> str:
"""密碼雜湊"""
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""驗證密碼"""
return pwd_context.verify(plain_password, hashed_password)
def create_access_token(
subject: str | int,
expires_delta: timedelta | None = None
) -> str:
"""建立 Access Token"""
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode = {
"sub": str(subject),
"exp": expire,
"type": "access",
"iat": datetime.utcnow(),
}
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def create_refresh_token(
subject: str | int,
expires_delta: timedelta | None = None
) -> str:
"""建立 Refresh Token"""
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
to_encode = {
"sub": str(subject),
"exp": expire,
"type": "refresh",
"iat": datetime.utcnow(),
}
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def decode_token(token: str) -> TokenPayload | None:
"""解碼 Token"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return TokenPayload(**payload)
except JWTError:
return None
def verify_token(token: str, token_type: str = "access") -> TokenPayload | None:
"""驗證 Token"""
payload = decode_token(token)
if not payload:
return None
if payload.type != token_type:
return None
if payload.exp < datetime.utcnow():
return None
return payload🔐 FastAPI 整合
依賴項設定
# app/api/deps.py
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.security import verify_token, TokenPayload
from app.database import get_db
from app.models import User
# OAuth2 設定(指定 Token URL)
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db)
) -> User:
"""取得當前使用者"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# 驗證 Token
payload = verify_token(token, token_type="access")
if not payload:
raise credentials_exception
# 從資料庫取得使用者
user = await db.get(User, int(payload.sub))
if not user:
raise credentials_exception
return user
async def get_current_active_user(
current_user: User = Depends(get_current_user)
) -> User:
"""取得當前啟用的使用者"""
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Inactive user"
)
return current_user
async def get_current_superuser(
current_user: User = Depends(get_current_active_user)
) -> User:
"""取得當前超級使用者"""
if not current_user.is_superuser:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
return current_user認證端點
# app/api/routes/auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.ext.asyncio import AsyncSession
from pydantic import BaseModel, EmailStr
from datetime import timedelta
from app.database import get_db
from app.models import User
from app.core.security import (
hash_password,
verify_password,
create_access_token,
create_refresh_token,
verify_token,
ACCESS_TOKEN_EXPIRE_MINUTES,
)
router = APIRouter(prefix="/auth", tags=["auth"])
class Token(BaseModel):
"""Token 回應"""
access_token: str
refresh_token: str
token_type: str = "bearer"
class TokenRefresh(BaseModel):
"""Token 更新請求"""
refresh_token: str
class UserCreate(BaseModel):
"""使用者註冊請求"""
username: str
email: EmailStr
password: str
class UserResponse(BaseModel):
"""使用者回應"""
id: int
username: str
email: str
is_active: bool
class Config:
from_attributes = True
@router.post("/register", response_model=UserResponse)
async def register(
user_data: UserCreate,
db: AsyncSession = Depends(get_db)
):
"""使用者註冊"""
from sqlalchemy import select
# 檢查 email 是否已存在
stmt = select(User).where(User.email == user_data.email)
result = await db.execute(stmt)
if result.scalar_one_or_none():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
# 檢查 username 是否已存在
stmt = select(User).where(User.username == user_data.username)
result = await db.execute(stmt)
if result.scalar_one_or_none():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already taken"
)
# 建立使用者
user = User(
username=user_data.username,
email=user_data.email,
hashed_password=hash_password(user_data.password)
)
db.add(user)
await db.commit()
await db.refresh(user)
return user
@router.post("/login", response_model=Token)
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: AsyncSession = Depends(get_db)
):
"""使用者登入"""
from sqlalchemy import select
# 查詢使用者(用 username 或 email)
stmt = select(User).where(
(User.username == form_data.username) |
(User.email == form_data.username)
)
result = await db.execute(stmt)
user = result.scalar_one_or_none()
# 驗證
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Inactive user"
)
# 建立 Token
access_token = create_access_token(subject=user.id)
refresh_token = create_refresh_token(subject=user.id)
return Token(
access_token=access_token,
refresh_token=refresh_token
)
@router.post("/refresh", response_model=Token)
async def refresh_token(
token_data: TokenRefresh,
db: AsyncSession = Depends(get_db)
):
"""更新 Token"""
# 驗證 Refresh Token
payload = verify_token(token_data.refresh_token, token_type="refresh")
if not payload:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token"
)
# 檢查使用者是否存在
user = await db.get(User, int(payload.sub))
if not user or not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found or inactive"
)
# 建立新的 Token
access_token = create_access_token(subject=user.id)
refresh_token = create_refresh_token(subject=user.id)
return Token(
access_token=access_token,
refresh_token=refresh_token
)保護端點
# app/api/routes/users.py
from fastapi import APIRouter, Depends
from app.api.deps import get_current_active_user, get_current_superuser
from app.models import User
router = APIRouter(prefix="/users", tags=["users"])
@router.get("/me")
async def get_me(current_user: User = Depends(get_current_active_user)):
"""取得當前使用者資訊"""
return {
"id": current_user.id,
"username": current_user.username,
"email": current_user.email
}
@router.get("/admin-only")
async def admin_only(current_user: User = Depends(get_current_superuser)):
"""只有管理員可以存取"""
return {"message": "Welcome, admin!"}🔄 Token 更新流程
┌─────────────────────────────────────────────────────────┐
│ Token 更新流程 │
├─────────────────────────────────────────────────────────┤
│ │
│ 1. 使用者登入 │
│ ┌──────┐ ┌──────┐ │
│ │Client│ ──── 帳號密碼 ────▶ │Server│ │
│ │ │ ◀── Access Token ── │ │ │
│ │ │ ◀── Refresh Token ─ │ │ │
│ └──────┘ └──────┘ │
│ │
│ 2. 存取 API │
│ ┌──────┐ ┌──────┐ │
│ │Client│ ── Access Token ──▶ │Server│ │
│ │ │ ◀──── 資料 ─────── │ │ │
│ └──────┘ └──────┘ │
│ │
│ 3. Access Token 過期 │
│ ┌──────┐ ┌──────┐ │
│ │Client│ ── Refresh Token ─▶ │Server│ │
│ │ │ ◀── 新 Access Token │ │ │
│ │ │ ◀── 新 Refresh Token│ │ │
│ └──────┘ └──────┘ │
│ │
└─────────────────────────────────────────────────────────┘前端處理範例
// 攔截器處理 Token 更新
axios.interceptors.response.use(
response => response,
async error => {
const originalRequest = error.config;
// 如果是 401 且沒有重試過
if (error.response.status === 401 && !originalRequest._retry) {
originalRequest._retry = true;
try {
// 使用 Refresh Token 取得新的 Access Token
const refreshToken = localStorage.getItem('refresh_token');
const response = await axios.post('/auth/refresh', {
refresh_token: refreshToken
});
// 儲存新的 Token
localStorage.setItem('access_token', response.data.access_token);
localStorage.setItem('refresh_token', response.data.refresh_token);
// 重新發送原本的請求
originalRequest.headers.Authorization =
`Bearer ${response.data.access_token}`;
return axios(originalRequest);
} catch (refreshError) {
// Refresh Token 也過期,需要重新登入
localStorage.removeItem('access_token');
localStorage.removeItem('refresh_token');
window.location.href = '/login';
return Promise.reject(refreshError);
}
}
return Promise.reject(error);
}
);🔒 Token 黑名單
實作登出功能
# app/core/token_blacklist.py
import redis.asyncio as redis
from datetime import timedelta
redis_client = redis.from_url("redis://localhost:6379")
async def blacklist_token(token: str, expires_in: int):
"""將 Token 加入黑名單"""
await redis_client.setex(
f"blacklist:{token}",
timedelta(seconds=expires_in),
"1"
)
async def is_token_blacklisted(token: str) -> bool:
"""檢查 Token 是否在黑名單中"""
return await redis_client.exists(f"blacklist:{token}") > 0
# 更新驗證邏輯
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db)
) -> User:
"""取得當前使用者(含黑名單檢查)"""
# 檢查黑名單
if await is_token_blacklisted(token):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has been revoked"
)
# 其餘驗證邏輯...
# 登出端點
@router.post("/logout")
async def logout(
token: str = Depends(oauth2_scheme),
current_user: User = Depends(get_current_active_user)
):
"""登出(將 Token 加入黑名單)"""
payload = decode_token(token)
if payload:
# 計算剩餘時間
remaining = int((payload.exp - datetime.utcnow()).total_seconds())
if remaining > 0:
await blacklist_token(token, remaining)
return {"message": "Successfully logged out"}⚠️ JWT 安全注意事項
1. 密鑰管理
# ❌ 不要硬編碼
SECRET_KEY = "my-secret-key"
# ✅ 從環境變數讀取
import os
SECRET_KEY = os.environ.get("JWT_SECRET_KEY")
# ✅ 使用強密鑰
import secrets
SECRET_KEY = secrets.token_urlsafe(32)2. Token 過期時間
# Access Token:短時間(15-30 分鐘)
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# Refresh Token:較長(7-30 天)
REFRESH_TOKEN_EXPIRE_DAYS = 73. 使用 HTTPS
# 生產環境強制 HTTPS
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
if settings.ENVIRONMENT == "production":
app.add_middleware(HTTPSRedirectMiddleware)4. 避免在 JWT 中存放敏感資料
# ❌ 不要存放敏感資料
{
"sub": "123",
"email": "user@example.com",
"password": "hashed_password", # 絕對不要!
"credit_card": "1234-5678" # 絕對不要!
}
# ✅ 只存放必要的資訊
{
"sub": "123",
"type": "access",
"exp": 1234567890
}✅ 重點總結
JWT 結構
| 部分 | 內容 |
|---|---|
| Header | 演算法、類型 |
| Payload | 使用者資訊、過期時間 |
| Signature | 簽名(驗證完整性) |
Token 策略
| Token | 用途 | 過期時間 |
|---|---|---|
| Access Token | API 存取 | 15-30 分鐘 |
| Refresh Token | 更新 Token | 7-30 天 |
安全要點
- 使用強密鑰,從環境變數讀取
- Access Token 設定短過期時間
- 使用 HTTPS
- 實作 Token 黑名單(登出功能)
- 不在 JWT 中存放敏感資料
🎤 面試這樣答
Q: JWT 和 Session 的差別是什麼?
答案:
Session:
- 有狀態:Session 資料存在伺服器
- 需要查詢:每次請求都要查詢 Session 資料
- 可撤銷:登出時刪除 Session 即可
JWT:
- 無狀態:Token 自包含使用者資訊
- 不需查詢:伺服器只需驗證簽名
- 撤銷困難:需要額外實作黑名單機制
選擇建議:
- 傳統 Web 應用 → Session
- API、SPA、行動 App → JWT
上一篇: 04-1. 認證基礎 下一篇: 04-3. OAuth 2.0
最後更新:2025-12-17