目錄
04-6. 密碼重設流程
⏱️ 閱讀時間: 15 分鐘 🎯 難度: ⭐⭐⭐ (進階)
🤔 一句話解釋
安全的密碼重設流程使用一次性 Token,透過 Email 驗證使用者身份後允許重設密碼。
🔄 完整流程
┌─────────────────────────────────────────────────────────────┐
│ 密碼重設流程 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 使用者請求重設密碼 │
│ ┌──────┐ ┌──────┐ │
│ │ User │ ── 輸入 Email ────▶ │Server│ │
│ └──────┘ └──┬───┘ │
│ │ │
│ 2. 產生 Token 並寄送 Email │ │
│ ┌──────────────────────────────┘ │
│ │ │
│ ▼ │
│ 3. ┌────────────────────┐ │
│ │ Email Service │ │
│ │ 寄送重設連結 │ │
│ └────────┬───────────┘ │
│ │ │
│ ▼ │
│ 4. ┌──────┐ │
│ │ User │ 點擊連結,輸入新密碼 │
│ └──┬───┘ │
│ │ │
│ ▼ │
│ 5. ┌──────┐ ┌──────┐ │
│ │ User │ ── Token + 新密碼 ─▶│Server│ │
│ │ │ ◀── 密碼已更新 ────│ │ │
│ └──────┘ └──────┘ │
│ │
└─────────────────────────────────────────────────────────────┘📦 安裝
pip install python-jose[cryptography]
pip install redis # Token 儲存
pip install aiosmtplib # 非同步 Email🔧 Token 管理
Token 工具
# app/core/password_reset.py
import secrets
from datetime import datetime, timedelta
from typing import Optional
import redis.asyncio as redis
from jose import jwt
from app.core.config import settings
redis_client = redis.from_url(settings.redis_url)
# Token 設定
RESET_TOKEN_EXPIRE_MINUTES = 30
TOKEN_PREFIX = "password_reset:"
class PasswordResetToken:
"""密碼重設 Token 管理"""
@staticmethod
def generate_token() -> str:
"""產生安全的隨機 Token"""
return secrets.token_urlsafe(32)
@staticmethod
async def create(user_id: int, email: str) -> str:
"""建立密碼重設 Token"""
token = PasswordResetToken.generate_token()
# 儲存到 Redis
key = f"{TOKEN_PREFIX}{token}"
data = {
"user_id": str(user_id),
"email": email,
"created_at": datetime.utcnow().isoformat()
}
await redis_client.hset(key, mapping=data)
await redis_client.expire(key, RESET_TOKEN_EXPIRE_MINUTES * 60)
return token
@staticmethod
async def verify(token: str) -> Optional[dict]:
"""驗證 Token"""
key = f"{TOKEN_PREFIX}{token}"
data = await redis_client.hgetall(key)
if not data:
return None
return {
"user_id": int(data[b"user_id"]),
"email": data[b"email"].decode(),
"created_at": data[b"created_at"].decode()
}
@staticmethod
async def invalidate(token: str) -> bool:
"""使 Token 失效"""
key = f"{TOKEN_PREFIX}{token}"
result = await redis_client.delete(key)
return result > 0
@staticmethod
async def invalidate_all_for_user(user_id: int) -> int:
"""使該使用者所有 Token 失效"""
pattern = f"{TOKEN_PREFIX}*"
count = 0
async for key in redis_client.scan_iter(match=pattern):
data = await redis_client.hgetall(key)
if data and int(data[b"user_id"]) == user_id:
await redis_client.delete(key)
count += 1
return count使用 JWT Token(替代方案)
from jose import jwt, JWTError
from datetime import datetime, timedelta
SECRET_KEY = settings.secret_key
ALGORITHM = "HS256"
def create_reset_token(user_id: int, email: str) -> str:
"""建立 JWT 格式的重設 Token"""
expire = datetime.utcnow() + timedelta(minutes=RESET_TOKEN_EXPIRE_MINUTES)
to_encode = {
"sub": str(user_id),
"email": email,
"exp": expire,
"type": "password_reset"
}
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def verify_reset_token(token: str) -> dict | None:
"""驗證 JWT Token"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
if payload.get("type") != "password_reset":
return None
return {
"user_id": int(payload["sub"]),
"email": payload["email"]
}
except JWTError:
return None📧 Email 服務
# app/services/email.py
import aiosmtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from jinja2 import Environment, FileSystemLoader
from app.core.config import settings
class EmailService:
"""Email 服務"""
def __init__(self):
self.smtp_host = settings.smtp_host
self.smtp_port = settings.smtp_port
self.smtp_user = settings.smtp_user
self.smtp_password = settings.smtp_password
self.from_email = settings.from_email
self.from_name = settings.from_name
# 模板引擎
self.template_env = Environment(
loader=FileSystemLoader("app/templates/email")
)
async def send_email(
self,
to_email: str,
subject: str,
html_content: str,
text_content: str | None = None
):
"""發送 Email"""
message = MIMEMultipart("alternative")
message["Subject"] = subject
message["From"] = f"{self.from_name} <{self.from_email}>"
message["To"] = to_email
# 純文字版本
if text_content:
message.attach(MIMEText(text_content, "plain"))
# HTML 版本
message.attach(MIMEText(html_content, "html"))
# 發送
await aiosmtplib.send(
message,
hostname=self.smtp_host,
port=self.smtp_port,
username=self.smtp_user,
password=self.smtp_password,
use_tls=True,
)
async def send_password_reset_email(
self,
to_email: str,
username: str,
reset_url: str
):
"""發送密碼重設 Email"""
template = self.template_env.get_template("password_reset.html")
html_content = template.render(
username=username,
reset_url=reset_url,
expire_minutes=RESET_TOKEN_EXPIRE_MINUTES
)
text_content = f"""
Hi {username},
You requested to reset your password.
Click the link below to reset your password:
{reset_url}
This link will expire in {RESET_TOKEN_EXPIRE_MINUTES} minutes.
If you didn't request this, please ignore this email.
"""
await self.send_email(
to_email=to_email,
subject="Password Reset Request",
html_content=html_content,
text_content=text_content
)
email_service = EmailService()Email 模板
<!-- app/templates/email/password_reset.html -->
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>Password Reset</title>
</head>
<body style="font-family: Arial, sans-serif; line-height: 1.6;">
<div style="max-width: 600px; margin: 0 auto; padding: 20px;">
<h2>Password Reset Request</h2>
<p>Hi {{ username }},</p>
<p>You requested to reset your password. Click the button below to set a new password:</p>
<div style="text-align: center; margin: 30px 0;">
<a href="{{ reset_url }}"
style="background-color: #007bff;
color: white;
padding: 12px 24px;
text-decoration: none;
border-radius: 4px;
display: inline-block;">
Reset Password
</a>
</div>
<p style="color: #666;">
This link will expire in {{ expire_minutes }} minutes.
</p>
<p style="color: #666;">
If you didn't request this password reset, please ignore this email.
</p>
<hr style="border: none; border-top: 1px solid #eee; margin: 20px 0;">
<p style="color: #999; font-size: 12px;">
This is an automated email. Please do not reply.
</p>
</div>
</body>
</html>🔐 API 端點
# app/api/routes/password_reset.py
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from pydantic import BaseModel, EmailStr
from app.database import get_db
from app.models import User
from app.core.security import hash_password, verify_password
from app.core.password_reset import PasswordResetToken
from app.services.email import email_service
from app.core.config import settings
router = APIRouter(prefix="/auth", tags=["auth"])
class ForgotPasswordRequest(BaseModel):
email: EmailStr
class ResetPasswordRequest(BaseModel):
token: str
new_password: str
class ChangePasswordRequest(BaseModel):
current_password: str
new_password: str
@router.post("/forgot-password")
async def forgot_password(
request: ForgotPasswordRequest,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db)
):
"""
請求密碼重設
- 不論 email 是否存在,都回傳相同訊息(防止使用者枚舉)
"""
# 查詢使用者
stmt = select(User).where(User.email == request.email)
result = await db.execute(stmt)
user = result.scalar_one_or_none()
if user:
# 產生 Token
token = await PasswordResetToken.create(user.id, user.email)
# 建立重設連結
reset_url = f"{settings.frontend_url}/reset-password?token={token}"
# 背景發送 Email
background_tasks.add_task(
email_service.send_password_reset_email,
to_email=user.email,
username=user.username,
reset_url=reset_url
)
# 不論是否找到使用者,都回傳相同訊息
return {
"message": "If the email exists, a password reset link has been sent."
}
@router.post("/reset-password")
async def reset_password(
request: ResetPasswordRequest,
db: AsyncSession = Depends(get_db)
):
"""
重設密碼
- 驗證 Token
- 更新密碼
- 使 Token 失效
"""
# 驗證 Token
token_data = await PasswordResetToken.verify(request.token)
if not token_data:
raise HTTPException(
status_code=400,
detail="Invalid or expired token"
)
# 查詢使用者
user = await db.get(User, token_data["user_id"])
if not user:
raise HTTPException(
status_code=400,
detail="User not found"
)
# 確認 email 一致
if user.email != token_data["email"]:
raise HTTPException(
status_code=400,
detail="Invalid token"
)
# 更新密碼
user.hashed_password = hash_password(request.new_password)
await db.commit()
# 使 Token 失效
await PasswordResetToken.invalidate(request.token)
# 使該使用者所有其他重設 Token 失效
await PasswordResetToken.invalidate_all_for_user(user.id)
return {"message": "Password has been reset successfully"}
@router.post("/change-password")
async def change_password(
request: ChangePasswordRequest,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
):
"""
變更密碼(已登入使用者)
- 驗證當前密碼
- 更新為新密碼
"""
# 驗證當前密碼
if not verify_password(request.current_password, current_user.hashed_password):
raise HTTPException(
status_code=400,
detail="Current password is incorrect"
)
# 檢查新密碼不能與舊密碼相同
if verify_password(request.new_password, current_user.hashed_password):
raise HTTPException(
status_code=400,
detail="New password must be different from current password"
)
# 更新密碼
current_user.hashed_password = hash_password(request.new_password)
await db.commit()
return {"message": "Password changed successfully"}
@router.get("/verify-reset-token")
async def verify_reset_token(token: str):
"""
驗證重設 Token 是否有效
- 前端可以用這個 API 檢查 Token 是否有效
"""
token_data = await PasswordResetToken.verify(token)
if not token_data:
raise HTTPException(
status_code=400,
detail="Invalid or expired token"
)
return {"valid": True, "email": token_data["email"]}🔒 安全注意事項
1. 防止使用者枚舉
# ❌ 危險:洩漏使用者是否存在
if not user:
raise HTTPException(status_code=404, detail="User not found")
# ✅ 安全:不論是否存在都回傳相同訊息
return {"message": "If the email exists, a reset link has been sent."}2. Token 使用後失效
# 重設密碼後立即使 Token 失效
await PasswordResetToken.invalidate(token)3. 速率限制
from slowapi import Limiter
@router.post("/forgot-password")
@limiter.limit("3/minute") # 每分鐘最多 3 次
async def forgot_password(request: Request, ...):
pass4. Token 過期時間
# 設定合理的過期時間(15-30 分鐘)
RESET_TOKEN_EXPIRE_MINUTES = 305. 密碼複雜度驗證
from pydantic import field_validator
class ResetPasswordRequest(BaseModel):
token: str
new_password: str
@field_validator('new_password')
@classmethod
def validate_password(cls, v: str) -> str:
if len(v) < 8:
raise ValueError('Password must be at least 8 characters')
if not any(c.isupper() for c in v):
raise ValueError('Password must contain uppercase letter')
if not any(c.islower() for c in v):
raise ValueError('Password must contain lowercase letter')
if not any(c.isdigit() for c in v):
raise ValueError('Password must contain digit')
return v✅ 重點總結
流程步驟
| 步驟 | 說明 |
|---|---|
| 1 | 使用者輸入 Email |
| 2 | 產生一次性 Token |
| 3 | 發送 Email |
| 4 | 使用者點擊連結 |
| 5 | 驗證 Token 並更新密碼 |
| 6 | Token 失效 |
安全要點
- 不洩漏使用者是否存在
- Token 使用後立即失效
- 設定合理的過期時間
- 實作速率限制
- 背景發送 Email
🎤 面試這樣答
Q: 如何實作安全的密碼重設?
答案:
- 產生安全 Token:使用
secrets.token_urlsafe(32)- 設定過期時間:15-30 分鐘
- 發送 Email:包含重設連結
- 驗證後失效:Token 用過即刪除
- 防止枚舉:不論 email 是否存在都回傳相同訊息
token = secrets.token_urlsafe(32) await redis.setex(f"reset:{token}", 1800, user_id) # 發送包含 token 的連結
上一篇: 04-5. 安全最佳實踐 下一篇: 04-7. Email 驗證
最後更新:2025-12-17