04-2. JWT 認證

⏱️ 閱讀時間: 20 分鐘 🎯 難度: ⭐⭐⭐ (進階)


🤔 一句話解釋

JWT(JSON Web Token)是一種自包含的 Token,包含了使用者資訊,伺服器不需要查詢資料庫就能驗證身份。


🔍 JWT 結構

JWT = Header.Payload.Signature

┌─────────────────────────────────────────────────────────┐
│  eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9                   │  ← Header
│  .                                                       │
│  eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4ifQ        │  ← Payload
│  .                                                       │
│  SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c           │  ← Signature
└─────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────┐
│  Header(標頭)                                          │
│  {                                                      │
│    "alg": "HS256",    // 簽名演算法                     │
│    "typ": "JWT"       // Token 類型                     │
│  }                                                      │
├─────────────────────────────────────────────────────────┤
│  Payload(載荷)                                         │
│  {                                                      │
│    "sub": "1234567890",  // Subject(使用者 ID)        │
│    "name": "John",        // 自訂資料                   │
│    "exp": 1516239022,     // 過期時間                   │
│    "iat": 1516239022      // 簽發時間                   │
│  }                                                      │
├─────────────────────────────────────────────────────────┤
│  Signature(簽名)                                       │
│  HMACSHA256(                                            │
│    base64UrlEncode(header) + "." +                      │
│    base64UrlEncode(payload),                            │
│    secret                                               │
│  )                                                      │
└─────────────────────────────────────────────────────────┘

📦 安裝

pip install python-jose[cryptography]
pip install passlib[bcrypt]

🔧 JWT 工具類別

# app/core/security.py
from datetime import datetime, timedelta
from typing import Any
from jose import jwt, JWTError
from passlib.context import CryptContext
from pydantic import BaseModel

# 設定
SECRET_KEY = "your-super-secret-key-keep-it-safe"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7

# 密碼雜湊
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


class TokenPayload(BaseModel):
    """Token 載荷"""
    sub: str  # Subject(使用者 ID)
    exp: datetime  # 過期時間
    type: str  # Token 類型(access/refresh)


def hash_password(password: str) -> str:
    """密碼雜湊"""
    return pwd_context.hash(password)


def verify_password(plain_password: str, hashed_password: str) -> bool:
    """驗證密碼"""
    return pwd_context.verify(plain_password, hashed_password)


def create_access_token(
    subject: str | int,
    expires_delta: timedelta | None = None
) -> str:
    """建立 Access Token"""
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)

    to_encode = {
        "sub": str(subject),
        "exp": expire,
        "type": "access",
        "iat": datetime.utcnow(),
    }

    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)


def create_refresh_token(
    subject: str | int,
    expires_delta: timedelta | None = None
) -> str:
    """建立 Refresh Token"""
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)

    to_encode = {
        "sub": str(subject),
        "exp": expire,
        "type": "refresh",
        "iat": datetime.utcnow(),
    }

    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)


def decode_token(token: str) -> TokenPayload | None:
    """解碼 Token"""
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        return TokenPayload(**payload)
    except JWTError:
        return None


def verify_token(token: str, token_type: str = "access") -> TokenPayload | None:
    """驗證 Token"""
    payload = decode_token(token)

    if not payload:
        return None

    if payload.type != token_type:
        return None

    if payload.exp < datetime.utcnow():
        return None

    return payload

🔐 FastAPI 整合

依賴項設定

# app/api/deps.py
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.ext.asyncio import AsyncSession

from app.core.security import verify_token, TokenPayload
from app.database import get_db
from app.models import User

# OAuth2 設定(指定 Token URL)
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")


async def get_current_user(
    token: str = Depends(oauth2_scheme),
    db: AsyncSession = Depends(get_db)
) -> User:
    """取得當前使用者"""
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )

    # 驗證 Token
    payload = verify_token(token, token_type="access")
    if not payload:
        raise credentials_exception

    # 從資料庫取得使用者
    user = await db.get(User, int(payload.sub))
    if not user:
        raise credentials_exception

    return user


async def get_current_active_user(
    current_user: User = Depends(get_current_user)
) -> User:
    """取得當前啟用的使用者"""
    if not current_user.is_active:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Inactive user"
        )
    return current_user


async def get_current_superuser(
    current_user: User = Depends(get_current_active_user)
) -> User:
    """取得當前超級使用者"""
    if not current_user.is_superuser:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Not enough permissions"
        )
    return current_user

認證端點

# app/api/routes/auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.ext.asyncio import AsyncSession
from pydantic import BaseModel, EmailStr
from datetime import timedelta

from app.database import get_db
from app.models import User
from app.core.security import (
    hash_password,
    verify_password,
    create_access_token,
    create_refresh_token,
    verify_token,
    ACCESS_TOKEN_EXPIRE_MINUTES,
)

router = APIRouter(prefix="/auth", tags=["auth"])


class Token(BaseModel):
    """Token 回應"""
    access_token: str
    refresh_token: str
    token_type: str = "bearer"


class TokenRefresh(BaseModel):
    """Token 更新請求"""
    refresh_token: str


class UserCreate(BaseModel):
    """使用者註冊請求"""
    username: str
    email: EmailStr
    password: str


class UserResponse(BaseModel):
    """使用者回應"""
    id: int
    username: str
    email: str
    is_active: bool

    class Config:
        from_attributes = True


@router.post("/register", response_model=UserResponse)
async def register(
    user_data: UserCreate,
    db: AsyncSession = Depends(get_db)
):
    """使用者註冊"""
    from sqlalchemy import select

    # 檢查 email 是否已存在
    stmt = select(User).where(User.email == user_data.email)
    result = await db.execute(stmt)
    if result.scalar_one_or_none():
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Email already registered"
        )

    # 檢查 username 是否已存在
    stmt = select(User).where(User.username == user_data.username)
    result = await db.execute(stmt)
    if result.scalar_one_or_none():
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Username already taken"
        )

    # 建立使用者
    user = User(
        username=user_data.username,
        email=user_data.email,
        hashed_password=hash_password(user_data.password)
    )
    db.add(user)
    await db.commit()
    await db.refresh(user)

    return user


@router.post("/login", response_model=Token)
async def login(
    form_data: OAuth2PasswordRequestForm = Depends(),
    db: AsyncSession = Depends(get_db)
):
    """使用者登入"""
    from sqlalchemy import select

    # 查詢使用者(用 username 或 email)
    stmt = select(User).where(
        (User.username == form_data.username) |
        (User.email == form_data.username)
    )
    result = await db.execute(stmt)
    user = result.scalar_one_or_none()

    # 驗證
    if not user or not verify_password(form_data.password, user.hashed_password):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )

    if not user.is_active:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Inactive user"
        )

    # 建立 Token
    access_token = create_access_token(subject=user.id)
    refresh_token = create_refresh_token(subject=user.id)

    return Token(
        access_token=access_token,
        refresh_token=refresh_token
    )


@router.post("/refresh", response_model=Token)
async def refresh_token(
    token_data: TokenRefresh,
    db: AsyncSession = Depends(get_db)
):
    """更新 Token"""
    # 驗證 Refresh Token
    payload = verify_token(token_data.refresh_token, token_type="refresh")
    if not payload:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid refresh token"
        )

    # 檢查使用者是否存在
    user = await db.get(User, int(payload.sub))
    if not user or not user.is_active:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="User not found or inactive"
        )

    # 建立新的 Token
    access_token = create_access_token(subject=user.id)
    refresh_token = create_refresh_token(subject=user.id)

    return Token(
        access_token=access_token,
        refresh_token=refresh_token
    )

保護端點

# app/api/routes/users.py
from fastapi import APIRouter, Depends
from app.api.deps import get_current_active_user, get_current_superuser
from app.models import User

router = APIRouter(prefix="/users", tags=["users"])


@router.get("/me")
async def get_me(current_user: User = Depends(get_current_active_user)):
    """取得當前使用者資訊"""
    return {
        "id": current_user.id,
        "username": current_user.username,
        "email": current_user.email
    }


@router.get("/admin-only")
async def admin_only(current_user: User = Depends(get_current_superuser)):
    """只有管理員可以存取"""
    return {"message": "Welcome, admin!"}

🔄 Token 更新流程

┌─────────────────────────────────────────────────────────┐
│                   Token 更新流程                         │
├─────────────────────────────────────────────────────────┤
│                                                         │
│  1. 使用者登入                                           │
│     ┌──────┐                     ┌──────┐              │
│     │Client│ ──── 帳號密碼 ────▶ │Server│              │
│     │      │ ◀── Access Token ── │      │              │
│     │      │ ◀── Refresh Token ─ │      │              │
│     └──────┘                     └──────┘              │
│                                                         │
│  2. 存取 API                                            │
│     ┌──────┐                     ┌──────┐              │
│     │Client│ ── Access Token ──▶ │Server│              │
│     │      │ ◀──── 資料 ─────── │      │              │
│     └──────┘                     └──────┘              │
│                                                         │
│  3. Access Token 過期                                   │
│     ┌──────┐                     ┌──────┐              │
│     │Client│ ── Refresh Token ─▶ │Server│              │
│     │      │ ◀── 新 Access Token │      │              │
│     │      │ ◀── 新 Refresh Token│      │              │
│     └──────┘                     └──────┘              │
│                                                         │
└─────────────────────────────────────────────────────────┘

前端處理範例

// 攔截器處理 Token 更新
axios.interceptors.response.use(
  response => response,
  async error => {
    const originalRequest = error.config;

    // 如果是 401 且沒有重試過
    if (error.response.status === 401 && !originalRequest._retry) {
      originalRequest._retry = true;

      try {
        // 使用 Refresh Token 取得新的 Access Token
        const refreshToken = localStorage.getItem('refresh_token');
        const response = await axios.post('/auth/refresh', {
          refresh_token: refreshToken
        });

        // 儲存新的 Token
        localStorage.setItem('access_token', response.data.access_token);
        localStorage.setItem('refresh_token', response.data.refresh_token);

        // 重新發送原本的請求
        originalRequest.headers.Authorization =
          `Bearer ${response.data.access_token}`;
        return axios(originalRequest);

      } catch (refreshError) {
        // Refresh Token 也過期,需要重新登入
        localStorage.removeItem('access_token');
        localStorage.removeItem('refresh_token');
        window.location.href = '/login';
        return Promise.reject(refreshError);
      }
    }

    return Promise.reject(error);
  }
);

🔒 Token 黑名單

實作登出功能

# app/core/token_blacklist.py
import redis.asyncio as redis
from datetime import timedelta

redis_client = redis.from_url("redis://localhost:6379")


async def blacklist_token(token: str, expires_in: int):
    """將 Token 加入黑名單"""
    await redis_client.setex(
        f"blacklist:{token}",
        timedelta(seconds=expires_in),
        "1"
    )


async def is_token_blacklisted(token: str) -> bool:
    """檢查 Token 是否在黑名單中"""
    return await redis_client.exists(f"blacklist:{token}") > 0


# 更新驗證邏輯
async def get_current_user(
    token: str = Depends(oauth2_scheme),
    db: AsyncSession = Depends(get_db)
) -> User:
    """取得當前使用者(含黑名單檢查)"""
    # 檢查黑名單
    if await is_token_blacklisted(token):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Token has been revoked"
        )

    # 其餘驗證邏輯...


# 登出端點
@router.post("/logout")
async def logout(
    token: str = Depends(oauth2_scheme),
    current_user: User = Depends(get_current_active_user)
):
    """登出(將 Token 加入黑名單)"""
    payload = decode_token(token)
    if payload:
        # 計算剩餘時間
        remaining = int((payload.exp - datetime.utcnow()).total_seconds())
        if remaining > 0:
            await blacklist_token(token, remaining)

    return {"message": "Successfully logged out"}

⚠️ JWT 安全注意事項

1. 密鑰管理

# ❌ 不要硬編碼
SECRET_KEY = "my-secret-key"

# ✅ 從環境變數讀取
import os
SECRET_KEY = os.environ.get("JWT_SECRET_KEY")

# ✅ 使用強密鑰
import secrets
SECRET_KEY = secrets.token_urlsafe(32)

2. Token 過期時間

# Access Token:短時間(15-30 分鐘)
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# Refresh Token:較長(7-30 天)
REFRESH_TOKEN_EXPIRE_DAYS = 7

3. 使用 HTTPS

# 生產環境強制 HTTPS
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware

if settings.ENVIRONMENT == "production":
    app.add_middleware(HTTPSRedirectMiddleware)

4. 避免在 JWT 中存放敏感資料

# ❌ 不要存放敏感資料
{
    "sub": "123",
    "email": "user@example.com",
    "password": "hashed_password",  # 絕對不要!
    "credit_card": "1234-5678"      # 絕對不要!
}

# ✅ 只存放必要的資訊
{
    "sub": "123",
    "type": "access",
    "exp": 1234567890
}

✅ 重點總結

JWT 結構

部分內容
Header演算法、類型
Payload使用者資訊、過期時間
Signature簽名(驗證完整性)

Token 策略

Token用途過期時間
Access TokenAPI 存取15-30 分鐘
Refresh Token更新 Token7-30 天

安全要點

  1. 使用強密鑰,從環境變數讀取
  2. Access Token 設定短過期時間
  3. 使用 HTTPS
  4. 實作 Token 黑名單(登出功能)
  5. 不在 JWT 中存放敏感資料

🎤 面試這樣答

Q: JWT 和 Session 的差別是什麼?

答案:

Session:

  • 有狀態:Session 資料存在伺服器
  • 需要查詢:每次請求都要查詢 Session 資料
  • 可撤銷:登出時刪除 Session 即可

JWT:

  • 無狀態:Token 自包含使用者資訊
  • 不需查詢:伺服器只需驗證簽名
  • 撤銷困難:需要額外實作黑名單機制

選擇建議:

  • 傳統 Web 應用 → Session
  • API、SPA、行動 App → JWT

上一篇: 04-1. 認證基礎 下一篇: 04-3. OAuth 2.0


最後更新:2025-12-17

0%