04-6. 密碼重設流程

⏱️ 閱讀時間: 15 分鐘 🎯 難度: ⭐⭐⭐ (進階)


🤔 一句話解釋

安全的密碼重設流程使用一次性 Token,透過 Email 驗證使用者身份後允許重設密碼。


🔄 完整流程

┌─────────────────────────────────────────────────────────────┐
│                    密碼重設流程                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  1. 使用者請求重設密碼                                       │
│     ┌──────┐                     ┌──────┐                  │
│     │ User │ ── 輸入 Email ────▶ │Server│                  │
│     └──────┘                     └──┬───┘                  │
│                                     │                       │
│  2. 產生 Token 並寄送 Email          │                       │
│     ┌──────────────────────────────┘                       │
│     │                                                       │
│     ▼                                                       │
│  3. ┌────────────────────┐                                 │
│     │   Email Service    │                                 │
│     │  寄送重設連結       │                                 │
│     └────────┬───────────┘                                 │
│              │                                              │
│              ▼                                              │
│  4. ┌──────┐                                               │
│     │ User │ 點擊連結,輸入新密碼                           │
│     └──┬───┘                                               │
│        │                                                    │
│        ▼                                                    │
│  5. ┌──────┐                     ┌──────┐                  │
│     │ User │ ── Token + 新密碼 ─▶│Server│                  │
│     │      │ ◀── 密碼已更新 ────│      │                  │
│     └──────┘                     └──────┘                  │
│                                                             │
└─────────────────────────────────────────────────────────────┘

📦 安裝

pip install python-jose[cryptography]
pip install redis           # Token 儲存
pip install aiosmtplib      # 非同步 Email

🔧 Token 管理

Token 工具

# app/core/password_reset.py
import secrets
from datetime import datetime, timedelta
from typing import Optional
import redis.asyncio as redis
from jose import jwt

from app.core.config import settings

redis_client = redis.from_url(settings.redis_url)

# Token 設定
RESET_TOKEN_EXPIRE_MINUTES = 30
TOKEN_PREFIX = "password_reset:"


class PasswordResetToken:
    """密碼重設 Token 管理"""

    @staticmethod
    def generate_token() -> str:
        """產生安全的隨機 Token"""
        return secrets.token_urlsafe(32)

    @staticmethod
    async def create(user_id: int, email: str) -> str:
        """建立密碼重設 Token"""
        token = PasswordResetToken.generate_token()

        # 儲存到 Redis
        key = f"{TOKEN_PREFIX}{token}"
        data = {
            "user_id": str(user_id),
            "email": email,
            "created_at": datetime.utcnow().isoformat()
        }

        await redis_client.hset(key, mapping=data)
        await redis_client.expire(key, RESET_TOKEN_EXPIRE_MINUTES * 60)

        return token

    @staticmethod
    async def verify(token: str) -> Optional[dict]:
        """驗證 Token"""
        key = f"{TOKEN_PREFIX}{token}"
        data = await redis_client.hgetall(key)

        if not data:
            return None

        return {
            "user_id": int(data[b"user_id"]),
            "email": data[b"email"].decode(),
            "created_at": data[b"created_at"].decode()
        }

    @staticmethod
    async def invalidate(token: str) -> bool:
        """使 Token 失效"""
        key = f"{TOKEN_PREFIX}{token}"
        result = await redis_client.delete(key)
        return result > 0

    @staticmethod
    async def invalidate_all_for_user(user_id: int) -> int:
        """使該使用者所有 Token 失效"""
        pattern = f"{TOKEN_PREFIX}*"
        count = 0

        async for key in redis_client.scan_iter(match=pattern):
            data = await redis_client.hgetall(key)
            if data and int(data[b"user_id"]) == user_id:
                await redis_client.delete(key)
                count += 1

        return count

使用 JWT Token(替代方案)

from jose import jwt, JWTError
from datetime import datetime, timedelta

SECRET_KEY = settings.secret_key
ALGORITHM = "HS256"


def create_reset_token(user_id: int, email: str) -> str:
    """建立 JWT 格式的重設 Token"""
    expire = datetime.utcnow() + timedelta(minutes=RESET_TOKEN_EXPIRE_MINUTES)

    to_encode = {
        "sub": str(user_id),
        "email": email,
        "exp": expire,
        "type": "password_reset"
    }

    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)


def verify_reset_token(token: str) -> dict | None:
    """驗證 JWT Token"""
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])

        if payload.get("type") != "password_reset":
            return None

        return {
            "user_id": int(payload["sub"]),
            "email": payload["email"]
        }
    except JWTError:
        return None

📧 Email 服務

# app/services/email.py
import aiosmtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from jinja2 import Environment, FileSystemLoader

from app.core.config import settings


class EmailService:
    """Email 服務"""

    def __init__(self):
        self.smtp_host = settings.smtp_host
        self.smtp_port = settings.smtp_port
        self.smtp_user = settings.smtp_user
        self.smtp_password = settings.smtp_password
        self.from_email = settings.from_email
        self.from_name = settings.from_name

        # 模板引擎
        self.template_env = Environment(
            loader=FileSystemLoader("app/templates/email")
        )

    async def send_email(
        self,
        to_email: str,
        subject: str,
        html_content: str,
        text_content: str | None = None
    ):
        """發送 Email"""
        message = MIMEMultipart("alternative")
        message["Subject"] = subject
        message["From"] = f"{self.from_name} <{self.from_email}>"
        message["To"] = to_email

        # 純文字版本
        if text_content:
            message.attach(MIMEText(text_content, "plain"))

        # HTML 版本
        message.attach(MIMEText(html_content, "html"))

        # 發送
        await aiosmtplib.send(
            message,
            hostname=self.smtp_host,
            port=self.smtp_port,
            username=self.smtp_user,
            password=self.smtp_password,
            use_tls=True,
        )

    async def send_password_reset_email(
        self,
        to_email: str,
        username: str,
        reset_url: str
    ):
        """發送密碼重設 Email"""
        template = self.template_env.get_template("password_reset.html")
        html_content = template.render(
            username=username,
            reset_url=reset_url,
            expire_minutes=RESET_TOKEN_EXPIRE_MINUTES
        )

        text_content = f"""
Hi {username},

You requested to reset your password.

Click the link below to reset your password:
{reset_url}

This link will expire in {RESET_TOKEN_EXPIRE_MINUTES} minutes.

If you didn't request this, please ignore this email.
        """

        await self.send_email(
            to_email=to_email,
            subject="Password Reset Request",
            html_content=html_content,
            text_content=text_content
        )


email_service = EmailService()

Email 模板

<!-- app/templates/email/password_reset.html -->
<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <title>Password Reset</title>
</head>
<body style="font-family: Arial, sans-serif; line-height: 1.6;">
    <div style="max-width: 600px; margin: 0 auto; padding: 20px;">
        <h2>Password Reset Request</h2>

        <p>Hi {{ username }},</p>

        <p>You requested to reset your password. Click the button below to set a new password:</p>

        <div style="text-align: center; margin: 30px 0;">
            <a href="{{ reset_url }}"
               style="background-color: #007bff;
                      color: white;
                      padding: 12px 24px;
                      text-decoration: none;
                      border-radius: 4px;
                      display: inline-block;">
                Reset Password
            </a>
        </div>

        <p style="color: #666;">
            This link will expire in {{ expire_minutes }} minutes.
        </p>

        <p style="color: #666;">
            If you didn't request this password reset, please ignore this email.
        </p>

        <hr style="border: none; border-top: 1px solid #eee; margin: 20px 0;">

        <p style="color: #999; font-size: 12px;">
            This is an automated email. Please do not reply.
        </p>
    </div>
</body>
</html>

🔐 API 端點

# app/api/routes/password_reset.py
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from pydantic import BaseModel, EmailStr

from app.database import get_db
from app.models import User
from app.core.security import hash_password, verify_password
from app.core.password_reset import PasswordResetToken
from app.services.email import email_service
from app.core.config import settings

router = APIRouter(prefix="/auth", tags=["auth"])


class ForgotPasswordRequest(BaseModel):
    email: EmailStr


class ResetPasswordRequest(BaseModel):
    token: str
    new_password: str


class ChangePasswordRequest(BaseModel):
    current_password: str
    new_password: str


@router.post("/forgot-password")
async def forgot_password(
    request: ForgotPasswordRequest,
    background_tasks: BackgroundTasks,
    db: AsyncSession = Depends(get_db)
):
    """
    請求密碼重設

    - 不論 email 是否存在,都回傳相同訊息(防止使用者枚舉)
    """
    # 查詢使用者
    stmt = select(User).where(User.email == request.email)
    result = await db.execute(stmt)
    user = result.scalar_one_or_none()

    if user:
        # 產生 Token
        token = await PasswordResetToken.create(user.id, user.email)

        # 建立重設連結
        reset_url = f"{settings.frontend_url}/reset-password?token={token}"

        # 背景發送 Email
        background_tasks.add_task(
            email_service.send_password_reset_email,
            to_email=user.email,
            username=user.username,
            reset_url=reset_url
        )

    # 不論是否找到使用者,都回傳相同訊息
    return {
        "message": "If the email exists, a password reset link has been sent."
    }


@router.post("/reset-password")
async def reset_password(
    request: ResetPasswordRequest,
    db: AsyncSession = Depends(get_db)
):
    """
    重設密碼

    - 驗證 Token
    - 更新密碼
    - 使 Token 失效
    """
    # 驗證 Token
    token_data = await PasswordResetToken.verify(request.token)

    if not token_data:
        raise HTTPException(
            status_code=400,
            detail="Invalid or expired token"
        )

    # 查詢使用者
    user = await db.get(User, token_data["user_id"])

    if not user:
        raise HTTPException(
            status_code=400,
            detail="User not found"
        )

    # 確認 email 一致
    if user.email != token_data["email"]:
        raise HTTPException(
            status_code=400,
            detail="Invalid token"
        )

    # 更新密碼
    user.hashed_password = hash_password(request.new_password)
    await db.commit()

    # 使 Token 失效
    await PasswordResetToken.invalidate(request.token)

    # 使該使用者所有其他重設 Token 失效
    await PasswordResetToken.invalidate_all_for_user(user.id)

    return {"message": "Password has been reset successfully"}


@router.post("/change-password")
async def change_password(
    request: ChangePasswordRequest,
    current_user: User = Depends(get_current_active_user),
    db: AsyncSession = Depends(get_db)
):
    """
    變更密碼(已登入使用者)

    - 驗證當前密碼
    - 更新為新密碼
    """
    # 驗證當前密碼
    if not verify_password(request.current_password, current_user.hashed_password):
        raise HTTPException(
            status_code=400,
            detail="Current password is incorrect"
        )

    # 檢查新密碼不能與舊密碼相同
    if verify_password(request.new_password, current_user.hashed_password):
        raise HTTPException(
            status_code=400,
            detail="New password must be different from current password"
        )

    # 更新密碼
    current_user.hashed_password = hash_password(request.new_password)
    await db.commit()

    return {"message": "Password changed successfully"}


@router.get("/verify-reset-token")
async def verify_reset_token(token: str):
    """
    驗證重設 Token 是否有效

    - 前端可以用這個 API 檢查 Token 是否有效
    """
    token_data = await PasswordResetToken.verify(token)

    if not token_data:
        raise HTTPException(
            status_code=400,
            detail="Invalid or expired token"
        )

    return {"valid": True, "email": token_data["email"]}

🔒 安全注意事項

1. 防止使用者枚舉

# ❌ 危險:洩漏使用者是否存在
if not user:
    raise HTTPException(status_code=404, detail="User not found")

# ✅ 安全:不論是否存在都回傳相同訊息
return {"message": "If the email exists, a reset link has been sent."}

2. Token 使用後失效

# 重設密碼後立即使 Token 失效
await PasswordResetToken.invalidate(token)

3. 速率限制

from slowapi import Limiter

@router.post("/forgot-password")
@limiter.limit("3/minute")  # 每分鐘最多 3 次
async def forgot_password(request: Request, ...):
    pass

4. Token 過期時間

# 設定合理的過期時間(15-30 分鐘)
RESET_TOKEN_EXPIRE_MINUTES = 30

5. 密碼複雜度驗證

from pydantic import field_validator

class ResetPasswordRequest(BaseModel):
    token: str
    new_password: str

    @field_validator('new_password')
    @classmethod
    def validate_password(cls, v: str) -> str:
        if len(v) < 8:
            raise ValueError('Password must be at least 8 characters')
        if not any(c.isupper() for c in v):
            raise ValueError('Password must contain uppercase letter')
        if not any(c.islower() for c in v):
            raise ValueError('Password must contain lowercase letter')
        if not any(c.isdigit() for c in v):
            raise ValueError('Password must contain digit')
        return v

✅ 重點總結

流程步驟

步驟說明
1使用者輸入 Email
2產生一次性 Token
3發送 Email
4使用者點擊連結
5驗證 Token 並更新密碼
6Token 失效

安全要點

  1. 不洩漏使用者是否存在
  2. Token 使用後立即失效
  3. 設定合理的過期時間
  4. 實作速率限制
  5. 背景發送 Email

🎤 面試這樣答

Q: 如何實作安全的密碼重設?

答案:

  1. 產生安全 Token:使用 secrets.token_urlsafe(32)
  2. 設定過期時間:15-30 分鐘
  3. 發送 Email:包含重設連結
  4. 驗證後失效:Token 用過即刪除
  5. 防止枚舉:不論 email 是否存在都回傳相同訊息
token = secrets.token_urlsafe(32)
await redis.setex(f"reset:{token}", 1800, user_id)
# 發送包含 token 的連結

上一篇: 04-5. 安全最佳實踐 下一篇: 04-7. Email 驗證


最後更新:2025-12-17

0%