# 

# 04-2. JWT 認證

&gt; ⏱️ **閱讀時間：** 20 分鐘
&gt; 🎯 **難度：** ⭐⭐⭐ (進階)

---

## 🤔 一句話解釋

**JWT（JSON Web Token）是一種自包含的 Token，包含了使用者資訊，伺服器不需要查詢資料庫就能驗證身份。**

---

## 🔍 JWT 結構

```
JWT = Header.Payload.Signature

┌─────────────────────────────────────────────────────────┐
│  eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9                   │  ← Header
│  .                                                       │
│  eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4ifQ        │  ← Payload
│  .                                                       │
│  SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c           │  ← Signature
└─────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────┐
│  Header（標頭）                                          │
│  {                                                      │
│    &#34;alg&#34;: &#34;HS256&#34;,    // 簽名演算法                     │
│    &#34;typ&#34;: &#34;JWT&#34;       // Token 類型                     │
│  }                                                      │
├─────────────────────────────────────────────────────────┤
│  Payload（載荷）                                         │
│  {                                                      │
│    &#34;sub&#34;: &#34;1234567890&#34;,  // Subject（使用者 ID）        │
│    &#34;name&#34;: &#34;John&#34;,        // 自訂資料                   │
│    &#34;exp&#34;: 1516239022,     // 過期時間                   │
│    &#34;iat&#34;: 1516239022      // 簽發時間                   │
│  }                                                      │
├─────────────────────────────────────────────────────────┤
│  Signature（簽名）                                       │
│  HMACSHA256(                                            │
│    base64UrlEncode(header) &#43; &#34;.&#34; &#43;                      │
│    base64UrlEncode(payload),                            │
│    secret                                               │
│  )                                                      │
└─────────────────────────────────────────────────────────┘
```

---

## 📦 安裝

```bash
pip install python-jose[cryptography]
pip install passlib[bcrypt]
```

---

## 🔧 JWT 工具類別

```python
# app/core/security.py
from datetime import datetime, timedelta
from typing import Any
from jose import jwt, JWTError
from passlib.context import CryptContext
from pydantic import BaseModel

# 設定
SECRET_KEY = &#34;your-super-secret-key-keep-it-safe&#34;
ALGORITHM = &#34;HS256&#34;
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7

# 密碼雜湊
pwd_context = CryptContext(schemes=[&#34;bcrypt&#34;], deprecated=&#34;auto&#34;)


class TokenPayload(BaseModel):
    &#34;&#34;&#34;Token 載荷&#34;&#34;&#34;
    sub: str  # Subject（使用者 ID）
    exp: datetime  # 過期時間
    type: str  # Token 類型（access/refresh）


def hash_password(password: str) -&gt; str:
    &#34;&#34;&#34;密碼雜湊&#34;&#34;&#34;
    return pwd_context.hash(password)


def verify_password(plain_password: str, hashed_password: str) -&gt; bool:
    &#34;&#34;&#34;驗證密碼&#34;&#34;&#34;
    return pwd_context.verify(plain_password, hashed_password)


def create_access_token(
    subject: str | int,
    expires_delta: timedelta | None = None
) -&gt; str:
    &#34;&#34;&#34;建立 Access Token&#34;&#34;&#34;
    if expires_delta:
        expire = datetime.utcnow() &#43; expires_delta
    else:
        expire = datetime.utcnow() &#43; timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)

    to_encode = {
        &#34;sub&#34;: str(subject),
        &#34;exp&#34;: expire,
        &#34;type&#34;: &#34;access&#34;,
        &#34;iat&#34;: datetime.utcnow(),
    }

    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)


def create_refresh_token(
    subject: str | int,
    expires_delta: timedelta | None = None
) -&gt; str:
    &#34;&#34;&#34;建立 Refresh Token&#34;&#34;&#34;
    if expires_delta:
        expire = datetime.utcnow() &#43; expires_delta
    else:
        expire = datetime.utcnow() &#43; timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)

    to_encode = {
        &#34;sub&#34;: str(subject),
        &#34;exp&#34;: expire,
        &#34;type&#34;: &#34;refresh&#34;,
        &#34;iat&#34;: datetime.utcnow(),
    }

    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)


def decode_token(token: str) -&gt; TokenPayload | None:
    &#34;&#34;&#34;解碼 Token&#34;&#34;&#34;
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        return TokenPayload(**payload)
    except JWTError:
        return None


def verify_token(token: str, token_type: str = &#34;access&#34;) -&gt; TokenPayload | None:
    &#34;&#34;&#34;驗證 Token&#34;&#34;&#34;
    payload = decode_token(token)

    if not payload:
        return None

    if payload.type != token_type:
        return None

    if payload.exp &lt; datetime.utcnow():
        return None

    return payload
```

---

## 🔐 FastAPI 整合

### 依賴項設定

```python
# app/api/deps.py
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.ext.asyncio import AsyncSession

from app.core.security import verify_token, TokenPayload
from app.database import get_db
from app.models import User

# OAuth2 設定（指定 Token URL）
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=&#34;/auth/login&#34;)


async def get_current_user(
    token: str = Depends(oauth2_scheme),
    db: AsyncSession = Depends(get_db)
) -&gt; User:
    &#34;&#34;&#34;取得當前使用者&#34;&#34;&#34;
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail=&#34;Could not validate credentials&#34;,
        headers={&#34;WWW-Authenticate&#34;: &#34;Bearer&#34;},
    )

    # 驗證 Token
    payload = verify_token(token, token_type=&#34;access&#34;)
    if not payload:
        raise credentials_exception

    # 從資料庫取得使用者
    user = await db.get(User, int(payload.sub))
    if not user:
        raise credentials_exception

    return user


async def get_current_active_user(
    current_user: User = Depends(get_current_user)
) -&gt; User:
    &#34;&#34;&#34;取得當前啟用的使用者&#34;&#34;&#34;
    if not current_user.is_active:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail=&#34;Inactive user&#34;
        )
    return current_user


async def get_current_superuser(
    current_user: User = Depends(get_current_active_user)
) -&gt; User:
    &#34;&#34;&#34;取得當前超級使用者&#34;&#34;&#34;
    if not current_user.is_superuser:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail=&#34;Not enough permissions&#34;
        )
    return current_user
```

### 認證端點

```python
# app/api/routes/auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.ext.asyncio import AsyncSession
from pydantic import BaseModel, EmailStr
from datetime import timedelta

from app.database import get_db
from app.models import User
from app.core.security import (
    hash_password,
    verify_password,
    create_access_token,
    create_refresh_token,
    verify_token,
    ACCESS_TOKEN_EXPIRE_MINUTES,
)

router = APIRouter(prefix=&#34;/auth&#34;, tags=[&#34;auth&#34;])


class Token(BaseModel):
    &#34;&#34;&#34;Token 回應&#34;&#34;&#34;
    access_token: str
    refresh_token: str
    token_type: str = &#34;bearer&#34;


class TokenRefresh(BaseModel):
    &#34;&#34;&#34;Token 更新請求&#34;&#34;&#34;
    refresh_token: str


class UserCreate(BaseModel):
    &#34;&#34;&#34;使用者註冊請求&#34;&#34;&#34;
    username: str
    email: EmailStr
    password: str


class UserResponse(BaseModel):
    &#34;&#34;&#34;使用者回應&#34;&#34;&#34;
    id: int
    username: str
    email: str
    is_active: bool

    class Config:
        from_attributes = True


@router.post(&#34;/register&#34;, response_model=UserResponse)
async def register(
    user_data: UserCreate,
    db: AsyncSession = Depends(get_db)
):
    &#34;&#34;&#34;使用者註冊&#34;&#34;&#34;
    from sqlalchemy import select

    # 檢查 email 是否已存在
    stmt = select(User).where(User.email == user_data.email)
    result = await db.execute(stmt)
    if result.scalar_one_or_none():
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=&#34;Email already registered&#34;
        )

    # 檢查 username 是否已存在
    stmt = select(User).where(User.username == user_data.username)
    result = await db.execute(stmt)
    if result.scalar_one_or_none():
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=&#34;Username already taken&#34;
        )

    # 建立使用者
    user = User(
        username=user_data.username,
        email=user_data.email,
        hashed_password=hash_password(user_data.password)
    )
    db.add(user)
    await db.commit()
    await db.refresh(user)

    return user


@router.post(&#34;/login&#34;, response_model=Token)
async def login(
    form_data: OAuth2PasswordRequestForm = Depends(),
    db: AsyncSession = Depends(get_db)
):
    &#34;&#34;&#34;使用者登入&#34;&#34;&#34;
    from sqlalchemy import select

    # 查詢使用者（用 username 或 email）
    stmt = select(User).where(
        (User.username == form_data.username) |
        (User.email == form_data.username)
    )
    result = await db.execute(stmt)
    user = result.scalar_one_or_none()

    # 驗證
    if not user or not verify_password(form_data.password, user.hashed_password):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=&#34;Incorrect username or password&#34;,
            headers={&#34;WWW-Authenticate&#34;: &#34;Bearer&#34;},
        )

    if not user.is_active:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail=&#34;Inactive user&#34;
        )

    # 建立 Token
    access_token = create_access_token(subject=user.id)
    refresh_token = create_refresh_token(subject=user.id)

    return Token(
        access_token=access_token,
        refresh_token=refresh_token
    )


@router.post(&#34;/refresh&#34;, response_model=Token)
async def refresh_token(
    token_data: TokenRefresh,
    db: AsyncSession = Depends(get_db)
):
    &#34;&#34;&#34;更新 Token&#34;&#34;&#34;
    # 驗證 Refresh Token
    payload = verify_token(token_data.refresh_token, token_type=&#34;refresh&#34;)
    if not payload:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=&#34;Invalid refresh token&#34;
        )

    # 檢查使用者是否存在
    user = await db.get(User, int(payload.sub))
    if not user or not user.is_active:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=&#34;User not found or inactive&#34;
        )

    # 建立新的 Token
    access_token = create_access_token(subject=user.id)
    refresh_token = create_refresh_token(subject=user.id)

    return Token(
        access_token=access_token,
        refresh_token=refresh_token
    )
```

### 保護端點

```python
# app/api/routes/users.py
from fastapi import APIRouter, Depends
from app.api.deps import get_current_active_user, get_current_superuser
from app.models import User

router = APIRouter(prefix=&#34;/users&#34;, tags=[&#34;users&#34;])


@router.get(&#34;/me&#34;)
async def get_me(current_user: User = Depends(get_current_active_user)):
    &#34;&#34;&#34;取得當前使用者資訊&#34;&#34;&#34;
    return {
        &#34;id&#34;: current_user.id,
        &#34;username&#34;: current_user.username,
        &#34;email&#34;: current_user.email
    }


@router.get(&#34;/admin-only&#34;)
async def admin_only(current_user: User = Depends(get_current_superuser)):
    &#34;&#34;&#34;只有管理員可以存取&#34;&#34;&#34;
    return {&#34;message&#34;: &#34;Welcome, admin!&#34;}
```

---

## 🔄 Token 更新流程

```
┌─────────────────────────────────────────────────────────┐
│                   Token 更新流程                         │
├─────────────────────────────────────────────────────────┤
│                                                         │
│  1. 使用者登入                                           │
│     ┌──────┐                     ┌──────┐              │
│     │Client│ ──── 帳號密碼 ────▶ │Server│              │
│     │      │ ◀── Access Token ── │      │              │
│     │      │ ◀── Refresh Token ─ │      │              │
│     └──────┘                     └──────┘              │
│                                                         │
│  2. 存取 API                                            │
│     ┌──────┐                     ┌──────┐              │
│     │Client│ ── Access Token ──▶ │Server│              │
│     │      │ ◀──── 資料 ─────── │      │              │
│     └──────┘                     └──────┘              │
│                                                         │
│  3. Access Token 過期                                   │
│     ┌──────┐                     ┌──────┐              │
│     │Client│ ── Refresh Token ─▶ │Server│              │
│     │      │ ◀── 新 Access Token │      │              │
│     │      │ ◀── 新 Refresh Token│      │              │
│     └──────┘                     └──────┘              │
│                                                         │
└─────────────────────────────────────────────────────────┘
```

### 前端處理範例

```javascript
// 攔截器處理 Token 更新
axios.interceptors.response.use(
  response =&gt; response,
  async error =&gt; {
    const originalRequest = error.config;

    // 如果是 401 且沒有重試過
    if (error.response.status === 401 &amp;&amp; !originalRequest._retry) {
      originalRequest._retry = true;

      try {
        // 使用 Refresh Token 取得新的 Access Token
        const refreshToken = localStorage.getItem(&#39;refresh_token&#39;);
        const response = await axios.post(&#39;/auth/refresh&#39;, {
          refresh_token: refreshToken
        });

        // 儲存新的 Token
        localStorage.setItem(&#39;access_token&#39;, response.data.access_token);
        localStorage.setItem(&#39;refresh_token&#39;, response.data.refresh_token);

        // 重新發送原本的請求
        originalRequest.headers.Authorization =
          `Bearer ${response.data.access_token}`;
        return axios(originalRequest);

      } catch (refreshError) {
        // Refresh Token 也過期，需要重新登入
        localStorage.removeItem(&#39;access_token&#39;);
        localStorage.removeItem(&#39;refresh_token&#39;);
        window.location.href = &#39;/login&#39;;
        return Promise.reject(refreshError);
      }
    }

    return Promise.reject(error);
  }
);
```

---

## 🔒 Token 黑名單

### 實作登出功能

```python
# app/core/token_blacklist.py
import redis.asyncio as redis
from datetime import timedelta

redis_client = redis.from_url(&#34;redis://localhost:6379&#34;)


async def blacklist_token(token: str, expires_in: int):
    &#34;&#34;&#34;將 Token 加入黑名單&#34;&#34;&#34;
    await redis_client.setex(
        f&#34;blacklist:{token}&#34;,
        timedelta(seconds=expires_in),
        &#34;1&#34;
    )


async def is_token_blacklisted(token: str) -&gt; bool:
    &#34;&#34;&#34;檢查 Token 是否在黑名單中&#34;&#34;&#34;
    return await redis_client.exists(f&#34;blacklist:{token}&#34;) &gt; 0


# 更新驗證邏輯
async def get_current_user(
    token: str = Depends(oauth2_scheme),
    db: AsyncSession = Depends(get_db)
) -&gt; User:
    &#34;&#34;&#34;取得當前使用者（含黑名單檢查）&#34;&#34;&#34;
    # 檢查黑名單
    if await is_token_blacklisted(token):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=&#34;Token has been revoked&#34;
        )

    # 其餘驗證邏輯...


# 登出端點
@router.post(&#34;/logout&#34;)
async def logout(
    token: str = Depends(oauth2_scheme),
    current_user: User = Depends(get_current_active_user)
):
    &#34;&#34;&#34;登出（將 Token 加入黑名單）&#34;&#34;&#34;
    payload = decode_token(token)
    if payload:
        # 計算剩餘時間
        remaining = int((payload.exp - datetime.utcnow()).total_seconds())
        if remaining &gt; 0:
            await blacklist_token(token, remaining)

    return {&#34;message&#34;: &#34;Successfully logged out&#34;}
```

---

## ⚠️ JWT 安全注意事項

### 1. 密鑰管理

```python
# ❌ 不要硬編碼
SECRET_KEY = &#34;my-secret-key&#34;

# ✅ 從環境變數讀取
import os
SECRET_KEY = os.environ.get(&#34;JWT_SECRET_KEY&#34;)

# ✅ 使用強密鑰
import secrets
SECRET_KEY = secrets.token_urlsafe(32)
```

### 2. Token 過期時間

```python
# Access Token：短時間（15-30 分鐘）
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# Refresh Token：較長（7-30 天）
REFRESH_TOKEN_EXPIRE_DAYS = 7
```

### 3. 使用 HTTPS

```python
# 生產環境強制 HTTPS
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware

if settings.ENVIRONMENT == &#34;production&#34;:
    app.add_middleware(HTTPSRedirectMiddleware)
```

### 4. 避免在 JWT 中存放敏感資料

```python
# ❌ 不要存放敏感資料
{
    &#34;sub&#34;: &#34;123&#34;,
    &#34;email&#34;: &#34;user@example.com&#34;,
    &#34;password&#34;: &#34;hashed_password&#34;,  # 絕對不要！
    &#34;credit_card&#34;: &#34;1234-5678&#34;      # 絕對不要！
}

# ✅ 只存放必要的資訊
{
    &#34;sub&#34;: &#34;123&#34;,
    &#34;type&#34;: &#34;access&#34;,
    &#34;exp&#34;: 1234567890
}
```

---

## ✅ 重點總結

### JWT 結構

| 部分 | 內容 |
|------|------|
| Header | 演算法、類型 |
| Payload | 使用者資訊、過期時間 |
| Signature | 簽名（驗證完整性）|

### Token 策略

| Token | 用途 | 過期時間 |
|-------|------|----------|
| Access Token | API 存取 | 15-30 分鐘 |
| Refresh Token | 更新 Token | 7-30 天 |

### 安全要點

1. 使用強密鑰，從環境變數讀取
2. Access Token 設定短過期時間
3. 使用 HTTPS
4. 實作 Token 黑名單（登出功能）
5. 不在 JWT 中存放敏感資料

---

## 🎤 面試這樣答

### Q: JWT 和 Session 的差別是什麼？

**答案：**

&gt; **Session：**
&gt; - 有狀態：Session 資料存在伺服器
&gt; - 需要查詢：每次請求都要查詢 Session 資料
&gt; - 可撤銷：登出時刪除 Session 即可
&gt;
&gt; **JWT：**
&gt; - 無狀態：Token 自包含使用者資訊
&gt; - 不需查詢：伺服器只需驗證簽名
&gt; - 撤銷困難：需要額外實作黑名單機制
&gt;
&gt; **選擇建議：**
&gt; - 傳統 Web 應用 → Session
&gt; - API、SPA、行動 App → JWT

---

**上一篇：** [04-1. 認證基礎](./04-1)
**下一篇：** [04-3. OAuth 2.0](./04-3)

---

最後更新：2025-12-17


---

> 作者: luk  
> URL: https://yoru-karu-blog-lalaluk-52581ac5e0cef170a3c8922c19182ecb6f7bd604.gitlab.io/posts/tutorial/fastapi/04-2/  

