04-3. OAuth 2.0

⏱️ 閱讀時間: 20 分鐘 🎯 難度: ⭐⭐⭐⭐ (高階)


🤔 一句話解釋

OAuth 2.0 是授權框架,讓第三方應用可以安全地存取使用者資源,而不需要知道使用者的密碼。


🔍 OAuth 2.0 角色

┌─────────────────────────────────────────────────────────┐
│                   OAuth 2.0 角色                         │
├─────────────────────────────────────────────────────────┤
│                                                         │
│  Resource Owner(資源擁有者)                            │
│  └─ 就是「使用者」,擁有資源的人                          │
│                                                         │
│  Client(客戶端)                                        │
│  └─ 第三方應用程式,想要存取使用者資源                    │
│                                                         │
│  Authorization Server(授權伺服器)                      │
│  └─ 負責驗證身份、核發 Token                             │
│                                                         │
│  Resource Server(資源伺服器)                           │
│  └─ 存放使用者資源的伺服器                               │
│                                                         │
└─────────────────────────────────────────────────────────┘

實際例子

你想用「某 App」登入 Google 帳號

Resource Owner   = 你(使用者)
Client           = 某 App
Authorization Server = Google OAuth
Resource Server  = Google API(取得你的 email、姓名等)

🔄 授權流程

Authorization Code Flow(最常用)

┌──────────────────────────────────────────────────────────────┐
│              Authorization Code Flow                          │
├──────────────────────────────────────────────────────────────┤
│                                                              │
│  1. 使用者點擊「用 Google 登入」                              │
│     ┌──────┐                                                │
│     │ User │ ────────────────────────▶ ┌──────┐            │
│     └──────┘                           │Client│            │
│                                        └──┬───┘            │
│  2. 導向 Google 授權頁面                    │                │
│     ┌──────────────────────────────────────┘                │
│     │                                                        │
│     ▼                                                        │
│  3. ┌─────────────────┐                                     │
│     │ Google 授權頁面  │  ← 使用者登入並同意授權              │
│     └────────┬────────┘                                     │
│              │                                               │
│  4. 回傳 Authorization Code                                  │
│     ┌────────┘                                               │
│     │                                                        │
│     ▼                                                        │
│  5. ┌──────┐                    ┌────────────┐              │
│     │Client│ ── Code + Secret ─▶│   Google   │              │
│     │      │ ◀── Access Token ──│Auth Server │              │
│     └──────┘                    └────────────┘              │
│                                                              │
│  6. 用 Access Token 存取 Google API                          │
│                                                              │
└──────────────────────────────────────────────────────────────┘

📦 安裝

pip install httpx          # HTTP 客戶端
pip install authlib        # OAuth 2.0 庫(可選)

🔧 Google OAuth 實作

1. 設定 Google Cloud Console

1. 前往 Google Cloud Console
2. 建立專案
3. 啟用 Google+ API
4. 建立 OAuth 2.0 憑證
5. 設定授權重導向 URI:http://localhost:8000/auth/google/callback
6. 記下 Client ID 和 Client Secret

2. 設定

# app/core/config.py
from pydantic_settings import BaseSettings


class Settings(BaseSettings):
    # Google OAuth
    google_client_id: str
    google_client_secret: str
    google_redirect_uri: str = "http://localhost:8000/auth/google/callback"

    # JWT
    secret_key: str
    algorithm: str = "HS256"
    access_token_expire_minutes: int = 30

    class Config:
        env_file = ".env"


settings = Settings()

3. OAuth 工具

# app/core/oauth.py
from urllib.parse import urlencode
import httpx
from app.core.config import settings


class GoogleOAuth:
    """Google OAuth 2.0 工具"""

    AUTHORIZATION_URL = "https://accounts.google.com/o/oauth2/v2/auth"
    TOKEN_URL = "https://oauth2.googleapis.com/token"
    USERINFO_URL = "https://www.googleapis.com/oauth2/v2/userinfo"

    def __init__(self):
        self.client_id = settings.google_client_id
        self.client_secret = settings.google_client_secret
        self.redirect_uri = settings.google_redirect_uri

    def get_authorization_url(self, state: str | None = None) -> str:
        """取得授權 URL"""
        params = {
            "client_id": self.client_id,
            "redirect_uri": self.redirect_uri,
            "response_type": "code",
            "scope": "openid email profile",
            "access_type": "offline",  # 取得 refresh token
            "prompt": "consent",
        }

        if state:
            params["state"] = state

        return f"{self.AUTHORIZATION_URL}?{urlencode(params)}"

    async def exchange_code(self, code: str) -> dict:
        """用 Authorization Code 換取 Token"""
        async with httpx.AsyncClient() as client:
            response = await client.post(
                self.TOKEN_URL,
                data={
                    "client_id": self.client_id,
                    "client_secret": self.client_secret,
                    "code": code,
                    "grant_type": "authorization_code",
                    "redirect_uri": self.redirect_uri,
                },
            )
            response.raise_for_status()
            return response.json()

    async def get_user_info(self, access_token: str) -> dict:
        """取得使用者資訊"""
        async with httpx.AsyncClient() as client:
            response = await client.get(
                self.USERINFO_URL,
                headers={"Authorization": f"Bearer {access_token}"},
            )
            response.raise_for_status()
            return response.json()


google_oauth = GoogleOAuth()

4. OAuth 端點

# app/api/routes/oauth.py
from fastapi import APIRouter, HTTPException, Depends
from fastapi.responses import RedirectResponse
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
import secrets

from app.core.oauth import google_oauth
from app.core.security import create_access_token, create_refresh_token
from app.database import get_db
from app.models import User

router = APIRouter(prefix="/auth", tags=["oauth"])


@router.get("/google/login")
async def google_login():
    """導向 Google 登入頁面"""
    # 產生 state 防止 CSRF
    state = secrets.token_urlsafe(32)

    # 實際專案中應該將 state 存到 Session 或 Redis
    # 這裡簡化處理

    authorization_url = google_oauth.get_authorization_url(state=state)
    return RedirectResponse(url=authorization_url)


@router.get("/google/callback")
async def google_callback(
    code: str,
    state: str | None = None,
    db: AsyncSession = Depends(get_db)
):
    """Google OAuth 回調"""
    try:
        # 1. 用 code 換取 token
        token_data = await google_oauth.exchange_code(code)
        access_token = token_data["access_token"]

        # 2. 取得使用者資訊
        user_info = await google_oauth.get_user_info(access_token)

        # 3. 查詢或建立使用者
        user = await get_or_create_user(db, user_info)

        # 4. 建立我們自己的 JWT Token
        jwt_access_token = create_access_token(subject=user.id)
        jwt_refresh_token = create_refresh_token(subject=user.id)

        # 5. 返回 Token(實際專案可能會重導向到前端並帶上 token)
        return {
            "access_token": jwt_access_token,
            "refresh_token": jwt_refresh_token,
            "token_type": "bearer",
            "user": {
                "id": user.id,
                "email": user.email,
                "name": user.username,
            }
        }

    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))


async def get_or_create_user(db: AsyncSession, user_info: dict) -> User:
    """根據 OAuth 資訊取得或建立使用者"""
    email = user_info["email"]

    # 查詢現有使用者
    stmt = select(User).where(User.email == email)
    result = await db.execute(stmt)
    user = result.scalar_one_or_none()

    if user:
        return user

    # 建立新使用者
    user = User(
        email=email,
        username=user_info.get("name", email.split("@")[0]),
        hashed_password="",  # OAuth 使用者不需要密碼
        is_active=True,
        oauth_provider="google",
        oauth_id=user_info["id"],
    )
    db.add(user)
    await db.commit()
    await db.refresh(user)

    return user

🔐 GitHub OAuth

GitHub OAuth 工具

# app/core/oauth.py
class GitHubOAuth:
    """GitHub OAuth 2.0 工具"""

    AUTHORIZATION_URL = "https://github.com/login/oauth/authorize"
    TOKEN_URL = "https://github.com/login/oauth/access_token"
    USERINFO_URL = "https://api.github.com/user"

    def __init__(self):
        self.client_id = settings.github_client_id
        self.client_secret = settings.github_client_secret
        self.redirect_uri = settings.github_redirect_uri

    def get_authorization_url(self, state: str | None = None) -> str:
        """取得授權 URL"""
        params = {
            "client_id": self.client_id,
            "redirect_uri": self.redirect_uri,
            "scope": "read:user user:email",
        }

        if state:
            params["state"] = state

        return f"{self.AUTHORIZATION_URL}?{urlencode(params)}"

    async def exchange_code(self, code: str) -> dict:
        """用 Authorization Code 換取 Token"""
        async with httpx.AsyncClient() as client:
            response = await client.post(
                self.TOKEN_URL,
                data={
                    "client_id": self.client_id,
                    "client_secret": self.client_secret,
                    "code": code,
                },
                headers={"Accept": "application/json"},
            )
            response.raise_for_status()
            return response.json()

    async def get_user_info(self, access_token: str) -> dict:
        """取得使用者資訊"""
        async with httpx.AsyncClient() as client:
            # 取得基本資訊
            response = await client.get(
                self.USERINFO_URL,
                headers={
                    "Authorization": f"Bearer {access_token}",
                    "Accept": "application/json",
                },
            )
            response.raise_for_status()
            user_data = response.json()

            # 取得 email(可能需要額外請求)
            if not user_data.get("email"):
                email_response = await client.get(
                    "https://api.github.com/user/emails",
                    headers={
                        "Authorization": f"Bearer {access_token}",
                        "Accept": "application/json",
                    },
                )
                emails = email_response.json()
                primary_email = next(
                    (e["email"] for e in emails if e["primary"]),
                    None
                )
                user_data["email"] = primary_email

            return user_data


github_oauth = GitHubOAuth()

🔄 多 Provider 整合

統一介面

# app/core/oauth.py
from abc import ABC, abstractmethod


class OAuthProvider(ABC):
    """OAuth Provider 抽象基類"""

    @abstractmethod
    def get_authorization_url(self, state: str | None = None) -> str:
        pass

    @abstractmethod
    async def exchange_code(self, code: str) -> dict:
        pass

    @abstractmethod
    async def get_user_info(self, access_token: str) -> dict:
        pass


class OAuthManager:
    """OAuth 管理器"""

    providers: dict[str, OAuthProvider] = {}

    @classmethod
    def register(cls, name: str, provider: OAuthProvider):
        cls.providers[name] = provider

    @classmethod
    def get(cls, name: str) -> OAuthProvider:
        if name not in cls.providers:
            raise ValueError(f"Unknown OAuth provider: {name}")
        return cls.providers[name]


# 註冊 providers
OAuthManager.register("google", GoogleOAuth())
OAuthManager.register("github", GitHubOAuth())

通用端點

# app/api/routes/oauth.py
@router.get("/{provider}/login")
async def oauth_login(provider: str):
    """導向 OAuth 登入頁面"""
    try:
        oauth_provider = OAuthManager.get(provider)
    except ValueError:
        raise HTTPException(status_code=400, detail=f"Unknown provider: {provider}")

    state = secrets.token_urlsafe(32)
    authorization_url = oauth_provider.get_authorization_url(state=state)
    return RedirectResponse(url=authorization_url)


@router.get("/{provider}/callback")
async def oauth_callback(
    provider: str,
    code: str,
    state: str | None = None,
    db: AsyncSession = Depends(get_db)
):
    """OAuth 回調"""
    try:
        oauth_provider = OAuthManager.get(provider)
    except ValueError:
        raise HTTPException(status_code=400, detail=f"Unknown provider: {provider}")

    # 1. 換取 token
    token_data = await oauth_provider.exchange_code(code)
    access_token = token_data["access_token"]

    # 2. 取得使用者資訊
    user_info = await oauth_provider.get_user_info(access_token)

    # 3. 取得或建立使用者
    user = await get_or_create_oauth_user(db, provider, user_info)

    # 4. 建立 JWT
    jwt_access_token = create_access_token(subject=user.id)
    jwt_refresh_token = create_refresh_token(subject=user.id)

    return {
        "access_token": jwt_access_token,
        "refresh_token": jwt_refresh_token,
        "token_type": "bearer"
    }

🔗 帳號連結

User Model 擴充

from sqlalchemy import Column, Integer, String, Boolean, ForeignKey
from sqlalchemy.orm import relationship


class User(Base):
    """使用者"""
    __tablename__ = "users"

    id = Column(Integer, primary_key=True)
    email = Column(String(100), unique=True, nullable=False)
    username = Column(String(50), unique=True, nullable=False)
    hashed_password = Column(String(255), nullable=True)  # OAuth 使用者可為空
    is_active = Column(Boolean, default=True)

    # OAuth 連結
    oauth_accounts = relationship("OAuthAccount", back_populates="user")


class OAuthAccount(Base):
    """OAuth 帳號連結"""
    __tablename__ = "oauth_accounts"

    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
    provider = Column(String(20), nullable=False)  # google, github, etc.
    provider_user_id = Column(String(100), nullable=False)
    access_token = Column(String(500), nullable=True)
    refresh_token = Column(String(500), nullable=True)

    user = relationship("User", back_populates="oauth_accounts")

    __table_args__ = (
        # 每個 provider 只能連結一次
        UniqueConstraint("provider", "provider_user_id"),
    )

連結現有帳號

@router.post("/link/{provider}")
async def link_oauth_account(
    provider: str,
    code: str,
    current_user: User = Depends(get_current_active_user),
    db: AsyncSession = Depends(get_db)
):
    """連結 OAuth 帳號到現有使用者"""
    oauth_provider = OAuthManager.get(provider)

    # 取得 OAuth 資訊
    token_data = await oauth_provider.exchange_code(code)
    user_info = await oauth_provider.get_user_info(token_data["access_token"])

    # 檢查是否已被其他使用者連結
    stmt = select(OAuthAccount).where(
        OAuthAccount.provider == provider,
        OAuthAccount.provider_user_id == user_info["id"]
    )
    existing = await db.execute(stmt)
    if existing.scalar_one_or_none():
        raise HTTPException(
            status_code=400,
            detail="This account is already linked to another user"
        )

    # 建立連結
    oauth_account = OAuthAccount(
        user_id=current_user.id,
        provider=provider,
        provider_user_id=user_info["id"],
        access_token=token_data.get("access_token"),
        refresh_token=token_data.get("refresh_token"),
    )
    db.add(oauth_account)
    await db.commit()

    return {"message": f"Successfully linked {provider} account"}

✅ 重點總結

OAuth 2.0 角色

角色說明
Resource Owner使用者
Client你的應用程式
Authorization ServerGoogle/GitHub OAuth
Resource ServerGoogle/GitHub API

常見 Flow

Flow適用場景
Authorization CodeWeb 應用(後端)
PKCESPA、行動 App
Client Credentials服務對服務

實作要點

  1. 使用 state 防止 CSRF
  2. 安全儲存 Client Secret
  3. 用 OAuth Token 取得使用者資訊
  4. 建立自己的 JWT Token

🎤 面試這樣答

Q: OAuth 2.0 的授權流程是什麼?

答案:

Authorization Code Flow(最常用):

  1. 使用者點擊登入 → 導向授權伺服器
  2. 使用者同意授權 → 授權伺服器回傳 Authorization Code
  3. 後端用 Code 換 Token → 發送 Code + Client Secret
  4. 取得 Access Token → 用來呼叫 API
  5. 呼叫資源伺服器 API → 取得使用者資料
# 步驟 3-5 的簡化程式碼
token = await oauth.exchange_code(code)
user_info = await oauth.get_user_info(token["access_token"])

上一篇: 04-2. JWT 認證 下一篇: 04-4. 權限控制


最後更新:2025-12-17

0%