目錄
04-7. Email 驗證
⏱️ 閱讀時間: 15 分鐘 🎯 難度: ⭐⭐⭐ (進階)
🤔 一句話解釋
Email 驗證確保使用者提供的 Email 地址是真實有效的,防止假帳號和垃圾註冊。
🔄 驗證流程
┌─────────────────────────────────────────────────────────────┐
│ Email 驗證流程 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 使用者註冊 │
│ ┌──────┐ ┌──────┐ │
│ │ User │ ── 註冊資料 ───────▶│Server│ │
│ └──────┘ └──┬───┘ │
│ │ │
│ 2. 建立帳號(未驗證) │ │
│ │ │
│ 3. 產生驗證 Token,發送 Email │ │
│ ┌──────────────────────────────┘ │
│ │ │
│ ▼ │
│ 4. ┌────────────────────┐ │
│ │ Email Service │ │
│ │ 寄送驗證連結 │ │
│ └────────┬───────────┘ │
│ │ │
│ ▼ │
│ 5. ┌──────┐ │
│ │ User │ 點擊驗證連結 │
│ └──┬───┘ │
│ │ │
│ ▼ │
│ 6. ┌──────┐ ┌──────┐ │
│ │ User │ ── 驗證 Token ────▶│Server│ │
│ │ │ ◀── 驗證成功 ──────│ │ │
│ └──────┘ └──────┘ │
│ │
└─────────────────────────────────────────────────────────────┘🔧 Token 管理
# app/core/email_verification.py
import secrets
from datetime import datetime, timedelta
from typing import Optional
import redis.asyncio as redis
from jose import jwt
from app.core.config import settings
redis_client = redis.from_url(settings.redis_url)
# 設定
VERIFICATION_TOKEN_EXPIRE_HOURS = 24
TOKEN_PREFIX = "email_verify:"
class EmailVerificationToken:
"""Email 驗證 Token 管理"""
@staticmethod
def generate_token() -> str:
"""產生安全的隨機 Token"""
return secrets.token_urlsafe(32)
@staticmethod
async def create(user_id: int, email: str) -> str:
"""建立驗證 Token"""
token = EmailVerificationToken.generate_token()
key = f"{TOKEN_PREFIX}{token}"
data = {
"user_id": str(user_id),
"email": email,
"created_at": datetime.utcnow().isoformat()
}
await redis_client.hset(key, mapping=data)
await redis_client.expire(key, VERIFICATION_TOKEN_EXPIRE_HOURS * 3600)
return token
@staticmethod
async def verify(token: str) -> Optional[dict]:
"""驗證 Token"""
key = f"{TOKEN_PREFIX}{token}"
data = await redis_client.hgetall(key)
if not data:
return None
return {
"user_id": int(data[b"user_id"]),
"email": data[b"email"].decode(),
"created_at": data[b"created_at"].decode()
}
@staticmethod
async def invalidate(token: str) -> bool:
"""使 Token 失效"""
key = f"{TOKEN_PREFIX}{token}"
result = await redis_client.delete(key)
return result > 0
@staticmethod
async def invalidate_all_for_user(user_id: int) -> int:
"""使該使用者所有驗證 Token 失效"""
pattern = f"{TOKEN_PREFIX}*"
count = 0
async for key in redis_client.scan_iter(match=pattern):
data = await redis_client.hgetall(key)
if data and int(data[b"user_id"]) == user_id:
await redis_client.delete(key)
count += 1
return count📧 驗證 Email 模板
<!-- app/templates/email/email_verification.html -->
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>Verify Your Email</title>
</head>
<body style="font-family: Arial, sans-serif; line-height: 1.6;">
<div style="max-width: 600px; margin: 0 auto; padding: 20px;">
<h2>Welcome to {{ app_name }}!</h2>
<p>Hi {{ username }},</p>
<p>Thank you for registering. Please verify your email address by clicking the button below:</p>
<div style="text-align: center; margin: 30px 0;">
<a href="{{ verification_url }}"
style="background-color: #28a745;
color: white;
padding: 12px 24px;
text-decoration: none;
border-radius: 4px;
display: inline-block;">
Verify Email
</a>
</div>
<p style="color: #666;">
This link will expire in {{ expire_hours }} hours.
</p>
<p style="color: #666;">
If you didn't create an account, please ignore this email.
</p>
<hr style="border: none; border-top: 1px solid #eee; margin: 20px 0;">
<p style="color: #999; font-size: 12px;">
This is an automated email. Please do not reply.
</p>
</div>
</body>
</html>🔐 API 端點
# app/api/routes/email_verification.py
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from pydantic import BaseModel, EmailStr
from app.database import get_db
from app.models import User
from app.core.email_verification import EmailVerificationToken
from app.services.email import email_service
from app.core.config import settings
from app.api.deps import get_current_active_user
router = APIRouter(prefix="/auth", tags=["auth"])
class ResendVerificationRequest(BaseModel):
email: EmailStr
@router.post("/register")
async def register(
user_data: UserCreate,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db)
):
"""
使用者註冊
- 建立帳號(is_verified = False)
- 發送驗證 Email
"""
# 檢查 email 是否已存在
stmt = select(User).where(User.email == user_data.email)
result = await db.execute(stmt)
if result.scalar_one_or_none():
raise HTTPException(status_code=400, detail="Email already registered")
# 建立使用者(未驗證)
user = User(
username=user_data.username,
email=user_data.email,
hashed_password=hash_password(user_data.password),
is_verified=False, # 未驗證
is_active=True
)
db.add(user)
await db.commit()
await db.refresh(user)
# 產生驗證 Token
token = await EmailVerificationToken.create(user.id, user.email)
# 建立驗證連結
verification_url = f"{settings.frontend_url}/verify-email?token={token}"
# 背景發送驗證 Email
background_tasks.add_task(
email_service.send_verification_email,
to_email=user.email,
username=user.username,
verification_url=verification_url
)
return {
"message": "Registration successful. Please check your email to verify your account.",
"user": {
"id": user.id,
"username": user.username,
"email": user.email,
"is_verified": user.is_verified
}
}
@router.get("/verify-email")
async def verify_email(
token: str,
db: AsyncSession = Depends(get_db)
):
"""
驗證 Email
- 驗證 Token
- 更新使用者狀態
- 使 Token 失效
"""
# 驗證 Token
token_data = await EmailVerificationToken.verify(token)
if not token_data:
raise HTTPException(
status_code=400,
detail="Invalid or expired verification token"
)
# 查詢使用者
user = await db.get(User, token_data["user_id"])
if not user:
raise HTTPException(status_code=400, detail="User not found")
# 確認 email 一致
if user.email != token_data["email"]:
raise HTTPException(status_code=400, detail="Invalid token")
# 檢查是否已驗證
if user.is_verified:
return {"message": "Email already verified"}
# 更新為已驗證
user.is_verified = True
user.verified_at = datetime.utcnow()
await db.commit()
# 使 Token 失效
await EmailVerificationToken.invalidate(token)
return {"message": "Email verified successfully"}
@router.post("/resend-verification")
async def resend_verification(
request: ResendVerificationRequest,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db)
):
"""
重新發送驗證 Email
- 檢查使用者是否存在且未驗證
- 產生新的 Token
- 發送驗證 Email
"""
# 查詢使用者
stmt = select(User).where(User.email == request.email)
result = await db.execute(stmt)
user = result.scalar_one_or_none()
if user and not user.is_verified:
# 使舊的 Token 失效
await EmailVerificationToken.invalidate_all_for_user(user.id)
# 產生新的 Token
token = await EmailVerificationToken.create(user.id, user.email)
verification_url = f"{settings.frontend_url}/verify-email?token={token}"
# 發送 Email
background_tasks.add_task(
email_service.send_verification_email,
to_email=user.email,
username=user.username,
verification_url=verification_url
)
# 不論是否找到使用者,都回傳相同訊息(防止枚舉)
return {
"message": "If the email exists and is not verified, a verification link has been sent."
}
@router.post("/change-email")
async def change_email(
new_email: EmailStr,
current_user: User = Depends(get_current_active_user),
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db)
):
"""
變更 Email
- 需要重新驗證新的 Email
"""
# 檢查新 email 是否已被使用
stmt = select(User).where(User.email == new_email)
result = await db.execute(stmt)
if result.scalar_one_or_none():
raise HTTPException(status_code=400, detail="Email already in use")
# 儲存新 email(但標記為未驗證)
current_user.pending_email = new_email
await db.commit()
# 產生驗證 Token
token = await EmailVerificationToken.create(current_user.id, new_email)
verification_url = f"{settings.frontend_url}/verify-new-email?token={token}"
# 發送驗證 Email
background_tasks.add_task(
email_service.send_email_change_verification,
to_email=new_email,
username=current_user.username,
verification_url=verification_url
)
return {"message": "Verification email sent to your new email address"}
@router.get("/verify-new-email")
async def verify_new_email(
token: str,
db: AsyncSession = Depends(get_db)
):
"""
驗證新的 Email
"""
token_data = await EmailVerificationToken.verify(token)
if not token_data:
raise HTTPException(status_code=400, detail="Invalid or expired token")
user = await db.get(User, token_data["user_id"])
if not user:
raise HTTPException(status_code=400, detail="User not found")
# 確認是正確的 pending email
if user.pending_email != token_data["email"]:
raise HTTPException(status_code=400, detail="Invalid token")
# 更新 email
user.email = user.pending_email
user.pending_email = None
await db.commit()
# 使 Token 失效
await EmailVerificationToken.invalidate(token)
return {"message": "Email changed successfully"}🔒 要求已驗證使用者
# app/api/deps.py
async def get_verified_user(
current_user: User = Depends(get_current_active_user)
) -> User:
"""要求已驗證 Email 的使用者"""
if not current_user.is_verified:
raise HTTPException(
status_code=403,
detail="Please verify your email first"
)
return current_user
# 使用方式
@router.post("/posts")
async def create_post(
post_data: PostCreate,
current_user: User = Depends(get_verified_user) # 必須已驗證
):
"""建立文章(需要已驗證的使用者)"""
pass📊 User Model 擴充
from sqlalchemy import Column, Integer, String, Boolean, DateTime
from datetime import datetime
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True, nullable=False)
email = Column(String(100), unique=True, nullable=False)
hashed_password = Column(String(255), nullable=False)
# 驗證狀態
is_verified = Column(Boolean, default=False)
verified_at = Column(DateTime, nullable=True)
# Email 變更
pending_email = Column(String(100), nullable=True)
# 帳號狀態
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)✅ 重點總結
驗證流程
| 步驟 | 說明 |
|---|---|
| 1 | 使用者註冊(is_verified=False) |
| 2 | 產生驗證 Token |
| 3 | 發送驗證 Email |
| 4 | 使用者點擊連結 |
| 5 | 驗證 Token 並更新狀態 |
安全要點
- Token 一次性使用
- 設定過期時間(24 小時)
- 防止使用者枚舉
- 變更 Email 也要重新驗證
使用情境
# 註冊時
user.is_verified = False
# 發送驗證 Email
# 驗證後
user.is_verified = True
user.verified_at = datetime.utcnow()
# 需要驗證才能操作的功能
@router.post("/posts")
async def create_post(user: User = Depends(get_verified_user)):
pass🎤 面試這樣答
Q: 為什麼需要 Email 驗證?
答案:
Email 驗證的目的:
- 確認身份:確保使用者擁有該 Email
- 防止假帳號:減少垃圾註冊
- 通訊管道:確保能聯繫到使用者
- 密碼找回:確保密碼重設發送到正確的 Email
實作方式:
# 產生一次性 Token token = secrets.token_urlsafe(32) # 發送包含 Token 的連結 # 使用者點擊後驗證並使 Token 失效
上一篇: 04-6. 密碼重設流程 下一篇: 04-8. 雙因素認證
最後更新:2025-12-17