04-7. Email 驗證

⏱️ 閱讀時間: 15 分鐘 🎯 難度: ⭐⭐⭐ (進階)


🤔 一句話解釋

Email 驗證確保使用者提供的 Email 地址是真實有效的,防止假帳號和垃圾註冊。


🔄 驗證流程

┌─────────────────────────────────────────────────────────────┐
│                    Email 驗證流程                            │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  1. 使用者註冊                                               │
│     ┌──────┐                     ┌──────┐                  │
│     │ User │ ── 註冊資料 ───────▶│Server│                  │
│     └──────┘                     └──┬───┘                  │
│                                     │                       │
│  2. 建立帳號(未驗證)               │                       │
│                                     │                       │
│  3. 產生驗證 Token,發送 Email       │                       │
│     ┌──────────────────────────────┘                       │
│     │                                                       │
│     ▼                                                       │
│  4. ┌────────────────────┐                                 │
│     │   Email Service    │                                 │
│     │  寄送驗證連結       │                                 │
│     └────────┬───────────┘                                 │
│              │                                              │
│              ▼                                              │
│  5. ┌──────┐                                               │
│     │ User │ 點擊驗證連結                                   │
│     └──┬───┘                                               │
│        │                                                    │
│        ▼                                                    │
│  6. ┌──────┐                     ┌──────┐                  │
│     │ User │ ── 驗證 Token ────▶│Server│                  │
│     │      │ ◀── 驗證成功 ──────│      │                  │
│     └──────┘                     └──────┘                  │
│                                                             │
└─────────────────────────────────────────────────────────────┘

🔧 Token 管理

# app/core/email_verification.py
import secrets
from datetime import datetime, timedelta
from typing import Optional
import redis.asyncio as redis
from jose import jwt

from app.core.config import settings

redis_client = redis.from_url(settings.redis_url)

# 設定
VERIFICATION_TOKEN_EXPIRE_HOURS = 24
TOKEN_PREFIX = "email_verify:"


class EmailVerificationToken:
    """Email 驗證 Token 管理"""

    @staticmethod
    def generate_token() -> str:
        """產生安全的隨機 Token"""
        return secrets.token_urlsafe(32)

    @staticmethod
    async def create(user_id: int, email: str) -> str:
        """建立驗證 Token"""
        token = EmailVerificationToken.generate_token()

        key = f"{TOKEN_PREFIX}{token}"
        data = {
            "user_id": str(user_id),
            "email": email,
            "created_at": datetime.utcnow().isoformat()
        }

        await redis_client.hset(key, mapping=data)
        await redis_client.expire(key, VERIFICATION_TOKEN_EXPIRE_HOURS * 3600)

        return token

    @staticmethod
    async def verify(token: str) -> Optional[dict]:
        """驗證 Token"""
        key = f"{TOKEN_PREFIX}{token}"
        data = await redis_client.hgetall(key)

        if not data:
            return None

        return {
            "user_id": int(data[b"user_id"]),
            "email": data[b"email"].decode(),
            "created_at": data[b"created_at"].decode()
        }

    @staticmethod
    async def invalidate(token: str) -> bool:
        """使 Token 失效"""
        key = f"{TOKEN_PREFIX}{token}"
        result = await redis_client.delete(key)
        return result > 0

    @staticmethod
    async def invalidate_all_for_user(user_id: int) -> int:
        """使該使用者所有驗證 Token 失效"""
        pattern = f"{TOKEN_PREFIX}*"
        count = 0

        async for key in redis_client.scan_iter(match=pattern):
            data = await redis_client.hgetall(key)
            if data and int(data[b"user_id"]) == user_id:
                await redis_client.delete(key)
                count += 1

        return count

📧 驗證 Email 模板

<!-- app/templates/email/email_verification.html -->
<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <title>Verify Your Email</title>
</head>
<body style="font-family: Arial, sans-serif; line-height: 1.6;">
    <div style="max-width: 600px; margin: 0 auto; padding: 20px;">
        <h2>Welcome to {{ app_name }}!</h2>

        <p>Hi {{ username }},</p>

        <p>Thank you for registering. Please verify your email address by clicking the button below:</p>

        <div style="text-align: center; margin: 30px 0;">
            <a href="{{ verification_url }}"
               style="background-color: #28a745;
                      color: white;
                      padding: 12px 24px;
                      text-decoration: none;
                      border-radius: 4px;
                      display: inline-block;">
                Verify Email
            </a>
        </div>

        <p style="color: #666;">
            This link will expire in {{ expire_hours }} hours.
        </p>

        <p style="color: #666;">
            If you didn't create an account, please ignore this email.
        </p>

        <hr style="border: none; border-top: 1px solid #eee; margin: 20px 0;">

        <p style="color: #999; font-size: 12px;">
            This is an automated email. Please do not reply.
        </p>
    </div>
</body>
</html>

🔐 API 端點

# app/api/routes/email_verification.py
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from pydantic import BaseModel, EmailStr

from app.database import get_db
from app.models import User
from app.core.email_verification import EmailVerificationToken
from app.services.email import email_service
from app.core.config import settings
from app.api.deps import get_current_active_user

router = APIRouter(prefix="/auth", tags=["auth"])


class ResendVerificationRequest(BaseModel):
    email: EmailStr


@router.post("/register")
async def register(
    user_data: UserCreate,
    background_tasks: BackgroundTasks,
    db: AsyncSession = Depends(get_db)
):
    """
    使用者註冊

    - 建立帳號(is_verified = False)
    - 發送驗證 Email
    """
    # 檢查 email 是否已存在
    stmt = select(User).where(User.email == user_data.email)
    result = await db.execute(stmt)
    if result.scalar_one_or_none():
        raise HTTPException(status_code=400, detail="Email already registered")

    # 建立使用者(未驗證)
    user = User(
        username=user_data.username,
        email=user_data.email,
        hashed_password=hash_password(user_data.password),
        is_verified=False,  # 未驗證
        is_active=True
    )
    db.add(user)
    await db.commit()
    await db.refresh(user)

    # 產生驗證 Token
    token = await EmailVerificationToken.create(user.id, user.email)

    # 建立驗證連結
    verification_url = f"{settings.frontend_url}/verify-email?token={token}"

    # 背景發送驗證 Email
    background_tasks.add_task(
        email_service.send_verification_email,
        to_email=user.email,
        username=user.username,
        verification_url=verification_url
    )

    return {
        "message": "Registration successful. Please check your email to verify your account.",
        "user": {
            "id": user.id,
            "username": user.username,
            "email": user.email,
            "is_verified": user.is_verified
        }
    }


@router.get("/verify-email")
async def verify_email(
    token: str,
    db: AsyncSession = Depends(get_db)
):
    """
    驗證 Email

    - 驗證 Token
    - 更新使用者狀態
    - 使 Token 失效
    """
    # 驗證 Token
    token_data = await EmailVerificationToken.verify(token)

    if not token_data:
        raise HTTPException(
            status_code=400,
            detail="Invalid or expired verification token"
        )

    # 查詢使用者
    user = await db.get(User, token_data["user_id"])

    if not user:
        raise HTTPException(status_code=400, detail="User not found")

    # 確認 email 一致
    if user.email != token_data["email"]:
        raise HTTPException(status_code=400, detail="Invalid token")

    # 檢查是否已驗證
    if user.is_verified:
        return {"message": "Email already verified"}

    # 更新為已驗證
    user.is_verified = True
    user.verified_at = datetime.utcnow()
    await db.commit()

    # 使 Token 失效
    await EmailVerificationToken.invalidate(token)

    return {"message": "Email verified successfully"}


@router.post("/resend-verification")
async def resend_verification(
    request: ResendVerificationRequest,
    background_tasks: BackgroundTasks,
    db: AsyncSession = Depends(get_db)
):
    """
    重新發送驗證 Email

    - 檢查使用者是否存在且未驗證
    - 產生新的 Token
    - 發送驗證 Email
    """
    # 查詢使用者
    stmt = select(User).where(User.email == request.email)
    result = await db.execute(stmt)
    user = result.scalar_one_or_none()

    if user and not user.is_verified:
        # 使舊的 Token 失效
        await EmailVerificationToken.invalidate_all_for_user(user.id)

        # 產生新的 Token
        token = await EmailVerificationToken.create(user.id, user.email)
        verification_url = f"{settings.frontend_url}/verify-email?token={token}"

        # 發送 Email
        background_tasks.add_task(
            email_service.send_verification_email,
            to_email=user.email,
            username=user.username,
            verification_url=verification_url
        )

    # 不論是否找到使用者,都回傳相同訊息(防止枚舉)
    return {
        "message": "If the email exists and is not verified, a verification link has been sent."
    }


@router.post("/change-email")
async def change_email(
    new_email: EmailStr,
    current_user: User = Depends(get_current_active_user),
    background_tasks: BackgroundTasks,
    db: AsyncSession = Depends(get_db)
):
    """
    變更 Email

    - 需要重新驗證新的 Email
    """
    # 檢查新 email 是否已被使用
    stmt = select(User).where(User.email == new_email)
    result = await db.execute(stmt)
    if result.scalar_one_or_none():
        raise HTTPException(status_code=400, detail="Email already in use")

    # 儲存新 email(但標記為未驗證)
    current_user.pending_email = new_email
    await db.commit()

    # 產生驗證 Token
    token = await EmailVerificationToken.create(current_user.id, new_email)
    verification_url = f"{settings.frontend_url}/verify-new-email?token={token}"

    # 發送驗證 Email
    background_tasks.add_task(
        email_service.send_email_change_verification,
        to_email=new_email,
        username=current_user.username,
        verification_url=verification_url
    )

    return {"message": "Verification email sent to your new email address"}


@router.get("/verify-new-email")
async def verify_new_email(
    token: str,
    db: AsyncSession = Depends(get_db)
):
    """
    驗證新的 Email
    """
    token_data = await EmailVerificationToken.verify(token)

    if not token_data:
        raise HTTPException(status_code=400, detail="Invalid or expired token")

    user = await db.get(User, token_data["user_id"])

    if not user:
        raise HTTPException(status_code=400, detail="User not found")

    # 確認是正確的 pending email
    if user.pending_email != token_data["email"]:
        raise HTTPException(status_code=400, detail="Invalid token")

    # 更新 email
    user.email = user.pending_email
    user.pending_email = None
    await db.commit()

    # 使 Token 失效
    await EmailVerificationToken.invalidate(token)

    return {"message": "Email changed successfully"}

🔒 要求已驗證使用者

# app/api/deps.py
async def get_verified_user(
    current_user: User = Depends(get_current_active_user)
) -> User:
    """要求已驗證 Email 的使用者"""
    if not current_user.is_verified:
        raise HTTPException(
            status_code=403,
            detail="Please verify your email first"
        )
    return current_user


# 使用方式
@router.post("/posts")
async def create_post(
    post_data: PostCreate,
    current_user: User = Depends(get_verified_user)  # 必須已驗證
):
    """建立文章(需要已驗證的使用者)"""
    pass

📊 User Model 擴充

from sqlalchemy import Column, Integer, String, Boolean, DateTime
from datetime import datetime


class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True)
    username = Column(String(50), unique=True, nullable=False)
    email = Column(String(100), unique=True, nullable=False)
    hashed_password = Column(String(255), nullable=False)

    # 驗證狀態
    is_verified = Column(Boolean, default=False)
    verified_at = Column(DateTime, nullable=True)

    # Email 變更
    pending_email = Column(String(100), nullable=True)

    # 帳號狀態
    is_active = Column(Boolean, default=True)

    created_at = Column(DateTime, default=datetime.utcnow)

✅ 重點總結

驗證流程

步驟說明
1使用者註冊(is_verified=False)
2產生驗證 Token
3發送驗證 Email
4使用者點擊連結
5驗證 Token 並更新狀態

安全要點

  1. Token 一次性使用
  2. 設定過期時間(24 小時)
  3. 防止使用者枚舉
  4. 變更 Email 也要重新驗證

使用情境

# 註冊時
user.is_verified = False
# 發送驗證 Email

# 驗證後
user.is_verified = True
user.verified_at = datetime.utcnow()

# 需要驗證才能操作的功能
@router.post("/posts")
async def create_post(user: User = Depends(get_verified_user)):
    pass

🎤 面試這樣答

Q: 為什麼需要 Email 驗證?

答案:

Email 驗證的目的:

  1. 確認身份:確保使用者擁有該 Email
  2. 防止假帳號:減少垃圾註冊
  3. 通訊管道:確保能聯繫到使用者
  4. 密碼找回:確保密碼重設發送到正確的 Email

實作方式:

# 產生一次性 Token
token = secrets.token_urlsafe(32)
# 發送包含 Token 的連結
# 使用者點擊後驗證並使 Token 失效

上一篇: 04-6. 密碼重設流程 下一篇: 04-8. 雙因素認證


最後更新:2025-12-17

0%