# 

# 04-7. Email 驗證

&gt; ⏱️ **閱讀時間：** 15 分鐘
&gt; 🎯 **難度：** ⭐⭐⭐ (進階)

---

## 🤔 一句話解釋

**Email 驗證確保使用者提供的 Email 地址是真實有效的，防止假帳號和垃圾註冊。**

---

## 🔄 驗證流程

```
┌─────────────────────────────────────────────────────────────┐
│                    Email 驗證流程                            │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  1. 使用者註冊                                               │
│     ┌──────┐                     ┌──────┐                  │
│     │ User │ ── 註冊資料 ───────▶│Server│                  │
│     └──────┘                     └──┬───┘                  │
│                                     │                       │
│  2. 建立帳號（未驗證）               │                       │
│                                     │                       │
│  3. 產生驗證 Token，發送 Email       │                       │
│     ┌──────────────────────────────┘                       │
│     │                                                       │
│     ▼                                                       │
│  4. ┌────────────────────┐                                 │
│     │   Email Service    │                                 │
│     │  寄送驗證連結       │                                 │
│     └────────┬───────────┘                                 │
│              │                                              │
│              ▼                                              │
│  5. ┌──────┐                                               │
│     │ User │ 點擊驗證連結                                   │
│     └──┬───┘                                               │
│        │                                                    │
│        ▼                                                    │
│  6. ┌──────┐                     ┌──────┐                  │
│     │ User │ ── 驗證 Token ────▶│Server│                  │
│     │      │ ◀── 驗證成功 ──────│      │                  │
│     └──────┘                     └──────┘                  │
│                                                             │
└─────────────────────────────────────────────────────────────┘
```

---

## 🔧 Token 管理

```python
# app/core/email_verification.py
import secrets
from datetime import datetime, timedelta
from typing import Optional
import redis.asyncio as redis
from jose import jwt

from app.core.config import settings

redis_client = redis.from_url(settings.redis_url)

# 設定
VERIFICATION_TOKEN_EXPIRE_HOURS = 24
TOKEN_PREFIX = &#34;email_verify:&#34;


class EmailVerificationToken:
    &#34;&#34;&#34;Email 驗證 Token 管理&#34;&#34;&#34;

    @staticmethod
    def generate_token() -&gt; str:
        &#34;&#34;&#34;產生安全的隨機 Token&#34;&#34;&#34;
        return secrets.token_urlsafe(32)

    @staticmethod
    async def create(user_id: int, email: str) -&gt; str:
        &#34;&#34;&#34;建立驗證 Token&#34;&#34;&#34;
        token = EmailVerificationToken.generate_token()

        key = f&#34;{TOKEN_PREFIX}{token}&#34;
        data = {
            &#34;user_id&#34;: str(user_id),
            &#34;email&#34;: email,
            &#34;created_at&#34;: datetime.utcnow().isoformat()
        }

        await redis_client.hset(key, mapping=data)
        await redis_client.expire(key, VERIFICATION_TOKEN_EXPIRE_HOURS * 3600)

        return token

    @staticmethod
    async def verify(token: str) -&gt; Optional[dict]:
        &#34;&#34;&#34;驗證 Token&#34;&#34;&#34;
        key = f&#34;{TOKEN_PREFIX}{token}&#34;
        data = await redis_client.hgetall(key)

        if not data:
            return None

        return {
            &#34;user_id&#34;: int(data[b&#34;user_id&#34;]),
            &#34;email&#34;: data[b&#34;email&#34;].decode(),
            &#34;created_at&#34;: data[b&#34;created_at&#34;].decode()
        }

    @staticmethod
    async def invalidate(token: str) -&gt; bool:
        &#34;&#34;&#34;使 Token 失效&#34;&#34;&#34;
        key = f&#34;{TOKEN_PREFIX}{token}&#34;
        result = await redis_client.delete(key)
        return result &gt; 0

    @staticmethod
    async def invalidate_all_for_user(user_id: int) -&gt; int:
        &#34;&#34;&#34;使該使用者所有驗證 Token 失效&#34;&#34;&#34;
        pattern = f&#34;{TOKEN_PREFIX}*&#34;
        count = 0

        async for key in redis_client.scan_iter(match=pattern):
            data = await redis_client.hgetall(key)
            if data and int(data[b&#34;user_id&#34;]) == user_id:
                await redis_client.delete(key)
                count &#43;= 1

        return count
```

---

## 📧 驗證 Email 模板

```html
&lt;!-- app/templates/email/email_verification.html --&gt;
&lt;!DOCTYPE html&gt;
&lt;html&gt;
&lt;head&gt;
    &lt;meta charset=&#34;utf-8&#34;&gt;
    &lt;title&gt;Verify Your Email&lt;/title&gt;
&lt;/head&gt;
&lt;body style=&#34;font-family: Arial, sans-serif; line-height: 1.6;&#34;&gt;
    &lt;div style=&#34;max-width: 600px; margin: 0 auto; padding: 20px;&#34;&gt;
        &lt;h2&gt;Welcome to {{ app_name }}!&lt;/h2&gt;

        &lt;p&gt;Hi {{ username }},&lt;/p&gt;

        &lt;p&gt;Thank you for registering. Please verify your email address by clicking the button below:&lt;/p&gt;

        &lt;div style=&#34;text-align: center; margin: 30px 0;&#34;&gt;
            &lt;a href=&#34;{{ verification_url }}&#34;
               style=&#34;background-color: #28a745;
                      color: white;
                      padding: 12px 24px;
                      text-decoration: none;
                      border-radius: 4px;
                      display: inline-block;&#34;&gt;
                Verify Email
            &lt;/a&gt;
        &lt;/div&gt;

        &lt;p style=&#34;color: #666;&#34;&gt;
            This link will expire in {{ expire_hours }} hours.
        &lt;/p&gt;

        &lt;p style=&#34;color: #666;&#34;&gt;
            If you didn&#39;t create an account, please ignore this email.
        &lt;/p&gt;

        &lt;hr style=&#34;border: none; border-top: 1px solid #eee; margin: 20px 0;&#34;&gt;

        &lt;p style=&#34;color: #999; font-size: 12px;&#34;&gt;
            This is an automated email. Please do not reply.
        &lt;/p&gt;
    &lt;/div&gt;
&lt;/body&gt;
&lt;/html&gt;
```

---

## 🔐 API 端點

```python
# app/api/routes/email_verification.py
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from pydantic import BaseModel, EmailStr

from app.database import get_db
from app.models import User
from app.core.email_verification import EmailVerificationToken
from app.services.email import email_service
from app.core.config import settings
from app.api.deps import get_current_active_user

router = APIRouter(prefix=&#34;/auth&#34;, tags=[&#34;auth&#34;])


class ResendVerificationRequest(BaseModel):
    email: EmailStr


@router.post(&#34;/register&#34;)
async def register(
    user_data: UserCreate,
    background_tasks: BackgroundTasks,
    db: AsyncSession = Depends(get_db)
):
    &#34;&#34;&#34;
    使用者註冊

    - 建立帳號（is_verified = False）
    - 發送驗證 Email
    &#34;&#34;&#34;
    # 檢查 email 是否已存在
    stmt = select(User).where(User.email == user_data.email)
    result = await db.execute(stmt)
    if result.scalar_one_or_none():
        raise HTTPException(status_code=400, detail=&#34;Email already registered&#34;)

    # 建立使用者（未驗證）
    user = User(
        username=user_data.username,
        email=user_data.email,
        hashed_password=hash_password(user_data.password),
        is_verified=False,  # 未驗證
        is_active=True
    )
    db.add(user)
    await db.commit()
    await db.refresh(user)

    # 產生驗證 Token
    token = await EmailVerificationToken.create(user.id, user.email)

    # 建立驗證連結
    verification_url = f&#34;{settings.frontend_url}/verify-email?token={token}&#34;

    # 背景發送驗證 Email
    background_tasks.add_task(
        email_service.send_verification_email,
        to_email=user.email,
        username=user.username,
        verification_url=verification_url
    )

    return {
        &#34;message&#34;: &#34;Registration successful. Please check your email to verify your account.&#34;,
        &#34;user&#34;: {
            &#34;id&#34;: user.id,
            &#34;username&#34;: user.username,
            &#34;email&#34;: user.email,
            &#34;is_verified&#34;: user.is_verified
        }
    }


@router.get(&#34;/verify-email&#34;)
async def verify_email(
    token: str,
    db: AsyncSession = Depends(get_db)
):
    &#34;&#34;&#34;
    驗證 Email

    - 驗證 Token
    - 更新使用者狀態
    - 使 Token 失效
    &#34;&#34;&#34;
    # 驗證 Token
    token_data = await EmailVerificationToken.verify(token)

    if not token_data:
        raise HTTPException(
            status_code=400,
            detail=&#34;Invalid or expired verification token&#34;
        )

    # 查詢使用者
    user = await db.get(User, token_data[&#34;user_id&#34;])

    if not user:
        raise HTTPException(status_code=400, detail=&#34;User not found&#34;)

    # 確認 email 一致
    if user.email != token_data[&#34;email&#34;]:
        raise HTTPException(status_code=400, detail=&#34;Invalid token&#34;)

    # 檢查是否已驗證
    if user.is_verified:
        return {&#34;message&#34;: &#34;Email already verified&#34;}

    # 更新為已驗證
    user.is_verified = True
    user.verified_at = datetime.utcnow()
    await db.commit()

    # 使 Token 失效
    await EmailVerificationToken.invalidate(token)

    return {&#34;message&#34;: &#34;Email verified successfully&#34;}


@router.post(&#34;/resend-verification&#34;)
async def resend_verification(
    request: ResendVerificationRequest,
    background_tasks: BackgroundTasks,
    db: AsyncSession = Depends(get_db)
):
    &#34;&#34;&#34;
    重新發送驗證 Email

    - 檢查使用者是否存在且未驗證
    - 產生新的 Token
    - 發送驗證 Email
    &#34;&#34;&#34;
    # 查詢使用者
    stmt = select(User).where(User.email == request.email)
    result = await db.execute(stmt)
    user = result.scalar_one_or_none()

    if user and not user.is_verified:
        # 使舊的 Token 失效
        await EmailVerificationToken.invalidate_all_for_user(user.id)

        # 產生新的 Token
        token = await EmailVerificationToken.create(user.id, user.email)
        verification_url = f&#34;{settings.frontend_url}/verify-email?token={token}&#34;

        # 發送 Email
        background_tasks.add_task(
            email_service.send_verification_email,
            to_email=user.email,
            username=user.username,
            verification_url=verification_url
        )

    # 不論是否找到使用者，都回傳相同訊息（防止枚舉）
    return {
        &#34;message&#34;: &#34;If the email exists and is not verified, a verification link has been sent.&#34;
    }


@router.post(&#34;/change-email&#34;)
async def change_email(
    new_email: EmailStr,
    current_user: User = Depends(get_current_active_user),
    background_tasks: BackgroundTasks,
    db: AsyncSession = Depends(get_db)
):
    &#34;&#34;&#34;
    變更 Email

    - 需要重新驗證新的 Email
    &#34;&#34;&#34;
    # 檢查新 email 是否已被使用
    stmt = select(User).where(User.email == new_email)
    result = await db.execute(stmt)
    if result.scalar_one_or_none():
        raise HTTPException(status_code=400, detail=&#34;Email already in use&#34;)

    # 儲存新 email（但標記為未驗證）
    current_user.pending_email = new_email
    await db.commit()

    # 產生驗證 Token
    token = await EmailVerificationToken.create(current_user.id, new_email)
    verification_url = f&#34;{settings.frontend_url}/verify-new-email?token={token}&#34;

    # 發送驗證 Email
    background_tasks.add_task(
        email_service.send_email_change_verification,
        to_email=new_email,
        username=current_user.username,
        verification_url=verification_url
    )

    return {&#34;message&#34;: &#34;Verification email sent to your new email address&#34;}


@router.get(&#34;/verify-new-email&#34;)
async def verify_new_email(
    token: str,
    db: AsyncSession = Depends(get_db)
):
    &#34;&#34;&#34;
    驗證新的 Email
    &#34;&#34;&#34;
    token_data = await EmailVerificationToken.verify(token)

    if not token_data:
        raise HTTPException(status_code=400, detail=&#34;Invalid or expired token&#34;)

    user = await db.get(User, token_data[&#34;user_id&#34;])

    if not user:
        raise HTTPException(status_code=400, detail=&#34;User not found&#34;)

    # 確認是正確的 pending email
    if user.pending_email != token_data[&#34;email&#34;]:
        raise HTTPException(status_code=400, detail=&#34;Invalid token&#34;)

    # 更新 email
    user.email = user.pending_email
    user.pending_email = None
    await db.commit()

    # 使 Token 失效
    await EmailVerificationToken.invalidate(token)

    return {&#34;message&#34;: &#34;Email changed successfully&#34;}
```

---

## 🔒 要求已驗證使用者

```python
# app/api/deps.py
async def get_verified_user(
    current_user: User = Depends(get_current_active_user)
) -&gt; User:
    &#34;&#34;&#34;要求已驗證 Email 的使用者&#34;&#34;&#34;
    if not current_user.is_verified:
        raise HTTPException(
            status_code=403,
            detail=&#34;Please verify your email first&#34;
        )
    return current_user


# 使用方式
@router.post(&#34;/posts&#34;)
async def create_post(
    post_data: PostCreate,
    current_user: User = Depends(get_verified_user)  # 必須已驗證
):
    &#34;&#34;&#34;建立文章（需要已驗證的使用者）&#34;&#34;&#34;
    pass
```

---

## 📊 User Model 擴充

```python
from sqlalchemy import Column, Integer, String, Boolean, DateTime
from datetime import datetime


class User(Base):
    __tablename__ = &#34;users&#34;

    id = Column(Integer, primary_key=True)
    username = Column(String(50), unique=True, nullable=False)
    email = Column(String(100), unique=True, nullable=False)
    hashed_password = Column(String(255), nullable=False)

    # 驗證狀態
    is_verified = Column(Boolean, default=False)
    verified_at = Column(DateTime, nullable=True)

    # Email 變更
    pending_email = Column(String(100), nullable=True)

    # 帳號狀態
    is_active = Column(Boolean, default=True)

    created_at = Column(DateTime, default=datetime.utcnow)
```

---

## ✅ 重點總結

### 驗證流程

| 步驟 | 說明 |
|------|------|
| 1 | 使用者註冊（is_verified=False）|
| 2 | 產生驗證 Token |
| 3 | 發送驗證 Email |
| 4 | 使用者點擊連結 |
| 5 | 驗證 Token 並更新狀態 |

### 安全要點

1. **Token 一次性使用**
2. **設定過期時間**（24 小時）
3. **防止使用者枚舉**
4. **變更 Email 也要重新驗證**

### 使用情境

```python
# 註冊時
user.is_verified = False
# 發送驗證 Email

# 驗證後
user.is_verified = True
user.verified_at = datetime.utcnow()

# 需要驗證才能操作的功能
@router.post(&#34;/posts&#34;)
async def create_post(user: User = Depends(get_verified_user)):
    pass
```

---

## 🎤 面試這樣答

### Q: 為什麼需要 Email 驗證？

**答案：**

&gt; Email 驗證的目的：
&gt;
&gt; 1. **確認身份**：確保使用者擁有該 Email
&gt; 2. **防止假帳號**：減少垃圾註冊
&gt; 3. **通訊管道**：確保能聯繫到使用者
&gt; 4. **密碼找回**：確保密碼重設發送到正確的 Email
&gt;
&gt; 實作方式：
&gt; ```python
&gt; # 產生一次性 Token
&gt; token = secrets.token_urlsafe(32)
&gt; # 發送包含 Token 的連結
&gt; # 使用者點擊後驗證並使 Token 失效
&gt; ```

---

**上一篇：** [04-6. 密碼重設流程](./04-6)
**下一篇：** [04-8. 雙因素認證](./04-8)

---

最後更新：2025-12-17


---

> 作者: luk  
> URL: https://yoru-karu-blog-lalaluk-52581ac5e0cef170a3c8922c19182ecb6f7bd604.gitlab.io/posts/tutorial/fastapi/04-7/  

