04-4. 權限控制

⏱️ 閱讀時間: 18 分鐘 🎯 難度: ⭐⭐⭐ (進階)


🤔 一句話解釋

權限控制(Authorization)決定已認證的使用者可以存取哪些資源、執行哪些操作。


🔐 權限模型

常見模型比較

┌─────────────────────────────────────────────────────────┐
│  1. RBAC(Role-Based Access Control)                   │
│     └─ 基於角色的存取控制                                │
│     └─ 使用者 → 角色 → 權限                             │
│     └─ 最常見,易於管理                                  │
├─────────────────────────────────────────────────────────┤
│  2. ABAC(Attribute-Based Access Control)              │
│     └─ 基於屬性的存取控制                                │
│     └─ 根據使用者屬性、資源屬性、環境條件判斷              │
│     └─ 更靈活,但較複雜                                  │
├─────────────────────────────────────────────────────────┤
│  3. ACL(Access Control List)                          │
│     └─ 存取控制列表                                      │
│     └─ 直接定義「誰」可以對「什麼」做「什麼」              │
│     └─ 細粒度控制,但難以擴展                             │
└─────────────────────────────────────────────────────────┘

🎭 RBAC 實作

Model 設計

from sqlalchemy import Column, Integer, String, Table, ForeignKey, Boolean
from sqlalchemy.orm import relationship, Mapped, mapped_column
from typing import List

# 使用者-角色 關聯表
user_roles = Table(
    "user_roles",
    Base.metadata,
    Column("user_id", Integer, ForeignKey("users.id"), primary_key=True),
    Column("role_id", Integer, ForeignKey("roles.id"), primary_key=True),
)

# 角色-權限 關聯表
role_permissions = Table(
    "role_permissions",
    Base.metadata,
    Column("role_id", Integer, ForeignKey("roles.id"), primary_key=True),
    Column("permission_id", Integer, ForeignKey("permissions.id"), primary_key=True),
)


class Permission(Base):
    """權限"""
    __tablename__ = "permissions"

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(50), unique=True)  # e.g., "users:read"
    description: Mapped[str] = mapped_column(String(200), nullable=True)

    roles: Mapped[List["Role"]] = relationship(
        "Role",
        secondary=role_permissions,
        back_populates="permissions"
    )


class Role(Base):
    """角色"""
    __tablename__ = "roles"

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(50), unique=True)  # e.g., "admin"
    description: Mapped[str] = mapped_column(String(200), nullable=True)

    permissions: Mapped[List["Permission"]] = relationship(
        "Permission",
        secondary=role_permissions,
        back_populates="roles"
    )
    users: Mapped[List["User"]] = relationship(
        "User",
        secondary=user_roles,
        back_populates="roles"
    )


class User(Base):
    """使用者"""
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True)
    username: Mapped[str] = mapped_column(String(50), unique=True)
    email: Mapped[str] = mapped_column(String(100), unique=True)
    hashed_password: Mapped[str] = mapped_column(String(255))
    is_active: Mapped[bool] = mapped_column(default=True)
    is_superuser: Mapped[bool] = mapped_column(default=False)

    roles: Mapped[List["Role"]] = relationship(
        "Role",
        secondary=user_roles,
        back_populates="users"
    )

    def has_permission(self, permission_name: str) -> bool:
        """檢查是否有特定權限"""
        if self.is_superuser:
            return True

        for role in self.roles:
            for permission in role.permissions:
                if permission.name == permission_name:
                    return True
        return False

    def has_role(self, role_name: str) -> bool:
        """檢查是否有特定角色"""
        if self.is_superuser:
            return True

        return any(role.name == role_name for role in self.roles)

    @property
    def permissions(self) -> set[str]:
        """取得所有權限"""
        perms = set()
        for role in self.roles:
            for permission in role.permissions:
                perms.add(permission.name)
        return perms

權限檢查依賴項

# app/api/deps.py
from fastapi import Depends, HTTPException, status
from typing import Callable, List

from app.models import User
from app.api.deps import get_current_active_user


def require_permissions(*required_permissions: str) -> Callable:
    """要求特定權限的依賴項"""
    async def permission_checker(
        current_user: User = Depends(get_current_active_user)
    ) -> User:
        # 超級使用者擁有所有權限
        if current_user.is_superuser:
            return current_user

        # 檢查使用者是否有所有必要權限
        user_permissions = current_user.permissions
        missing = set(required_permissions) - user_permissions

        if missing:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"Missing permissions: {', '.join(missing)}"
            )

        return current_user

    return permission_checker


def require_roles(*required_roles: str) -> Callable:
    """要求特定角色的依賴項"""
    async def role_checker(
        current_user: User = Depends(get_current_active_user)
    ) -> User:
        if current_user.is_superuser:
            return current_user

        user_roles = {role.name for role in current_user.roles}
        if not user_roles.intersection(required_roles):
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"Requires one of these roles: {', '.join(required_roles)}"
            )

        return current_user

    return role_checker

使用方式

# app/api/routes/users.py
from fastapi import APIRouter, Depends
from app.api.deps import require_permissions, require_roles
from app.models import User

router = APIRouter(prefix="/users", tags=["users"])


@router.get("/")
async def list_users(
    current_user: User = Depends(require_permissions("users:read"))
):
    """列出使用者(需要 users:read 權限)"""
    pass


@router.post("/")
async def create_user(
    current_user: User = Depends(require_permissions("users:create"))
):
    """建立使用者(需要 users:create 權限)"""
    pass


@router.delete("/{user_id}")
async def delete_user(
    user_id: int,
    current_user: User = Depends(require_permissions("users:delete"))
):
    """刪除使用者(需要 users:delete 權限)"""
    pass


@router.get("/admin-panel")
async def admin_panel(
    current_user: User = Depends(require_roles("admin", "moderator"))
):
    """管理面板(需要 admin 或 moderator 角色)"""
    pass

🏷️ 權限命名慣例

資源:操作 格式

# 權限命名慣例
PERMISSIONS = {
    # 使用者管理
    "users:read": "查看使用者",
    "users:create": "建立使用者",
    "users:update": "更新使用者",
    "users:delete": "刪除使用者",

    # 文章管理
    "posts:read": "查看文章",
    "posts:create": "建立文章",
    "posts:update": "更新文章",
    "posts:delete": "刪除文章",
    "posts:publish": "發布文章",

    # 評論管理
    "comments:read": "查看評論",
    "comments:create": "建立評論",
    "comments:delete": "刪除評論",
    "comments:moderate": "審核評論",

    # 系統管理
    "admin:access": "存取管理後台",
    "admin:settings": "修改系統設定",
}


# 預設角色
DEFAULT_ROLES = {
    "user": ["posts:read", "comments:read", "comments:create"],
    "author": ["posts:read", "posts:create", "posts:update", "comments:*"],
    "moderator": ["posts:*", "comments:*", "users:read"],
    "admin": ["*"],  # 所有權限
}

🔒 資源擁有者檢查

只能存取自己的資源

from fastapi import HTTPException, status

async def check_resource_owner(
    resource_user_id: int,
    current_user: User
) -> bool:
    """檢查是否為資源擁有者"""
    if current_user.is_superuser:
        return True

    if resource_user_id != current_user.id:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="You don't have permission to access this resource"
        )

    return True


# 使用範例
@router.get("/posts/{post_id}")
async def get_post(
    post_id: int,
    current_user: User = Depends(get_current_active_user),
    db: AsyncSession = Depends(get_db)
):
    """取得文章(自己的文章或有權限)"""
    post = await db.get(Post, post_id)

    if not post:
        raise HTTPException(status_code=404, detail="Post not found")

    # 公開文章任何人都可以看
    if post.is_published:
        return post

    # 未發布的文章只有擁有者或有權限的人可以看
    if post.author_id != current_user.id:
        if not current_user.has_permission("posts:read_all"):
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="You don't have permission to view this post"
            )

    return post


@router.put("/posts/{post_id}")
async def update_post(
    post_id: int,
    post_data: PostUpdate,
    current_user: User = Depends(get_current_active_user),
    db: AsyncSession = Depends(get_db)
):
    """更新文章(只有擁有者或管理員可以更新)"""
    post = await db.get(Post, post_id)

    if not post:
        raise HTTPException(status_code=404, detail="Post not found")

    # 檢查是否為擁有者或有 posts:update_all 權限
    is_owner = post.author_id == current_user.id
    can_update_all = current_user.has_permission("posts:update_all")

    if not is_owner and not can_update_all:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="You can only update your own posts"
        )

    # 更新文章
    for key, value in post_data.dict(exclude_unset=True).items():
        setattr(post, key, value)

    await db.commit()
    return post

🎨 裝飾器方式

權限裝飾器

from functools import wraps
from typing import Callable, List


def permission_required(*permissions: str):
    """權限檢查裝飾器"""
    def decorator(func: Callable):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # 從 kwargs 取得 current_user
            current_user = kwargs.get("current_user")

            if not current_user:
                raise HTTPException(
                    status_code=status.HTTP_401_UNAUTHORIZED,
                    detail="Not authenticated"
                )

            if current_user.is_superuser:
                return await func(*args, **kwargs)

            user_perms = current_user.permissions
            missing = set(permissions) - user_perms

            if missing:
                raise HTTPException(
                    status_code=status.HTTP_403_FORBIDDEN,
                    detail=f"Missing permissions: {', '.join(missing)}"
                )

            return await func(*args, **kwargs)

        return wrapper
    return decorator


# 使用方式(不推薦,因為 FastAPI 的依賴注入更好用)
@router.get("/users")
@permission_required("users:read")
async def list_users(current_user: User = Depends(get_current_active_user)):
    pass

📝 完整範例:部落格權限系統

# app/api/routes/blog.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select

from app.database import get_db
from app.models import User, Post
from app.api.deps import get_current_active_user, require_permissions

router = APIRouter(prefix="/blog", tags=["blog"])


class PostPermissions:
    """文章權限檢查"""

    @staticmethod
    async def can_view(post: Post, user: User | None) -> bool:
        """是否可以查看文章"""
        # 公開文章任何人都可以看
        if post.is_published:
            return True

        # 未登入不能看草稿
        if not user:
            return False

        # 擁有者可以看自己的草稿
        if post.author_id == user.id:
            return True

        # 有 posts:read_all 權限可以看所有文章
        return user.has_permission("posts:read_all")

    @staticmethod
    async def can_edit(post: Post, user: User) -> bool:
        """是否可以編輯文章"""
        # 擁有者可以編輯自己的文章
        if post.author_id == user.id:
            return True

        # 有 posts:update_all 權限可以編輯所有文章
        return user.has_permission("posts:update_all")

    @staticmethod
    async def can_delete(post: Post, user: User) -> bool:
        """是否可以刪除文章"""
        # 擁有者可以刪除自己的文章
        if post.author_id == user.id:
            return True

        # 有 posts:delete_all 權限可以刪除所有文章
        return user.has_permission("posts:delete_all")

    @staticmethod
    async def can_publish(post: Post, user: User) -> bool:
        """是否可以發布文章"""
        # 擁有者有 posts:publish 權限才能發布
        if post.author_id == user.id:
            return user.has_permission("posts:publish")

        # 有 posts:publish_all 權限可以發布所有文章
        return user.has_permission("posts:publish_all")


@router.get("/posts")
async def list_posts(
    skip: int = 0,
    limit: int = 20,
    include_drafts: bool = False,
    current_user: User | None = Depends(get_current_user_optional),
    db: AsyncSession = Depends(get_db)
):
    """列出文章"""
    stmt = select(Post)

    if include_drafts:
        # 只有有權限的人才能看到草稿
        if not current_user:
            raise HTTPException(status_code=401, detail="Login required")

        if not current_user.has_permission("posts:read_all"):
            # 只能看自己的草稿
            stmt = stmt.where(
                (Post.is_published == True) |
                (Post.author_id == current_user.id)
            )
    else:
        stmt = stmt.where(Post.is_published == True)

    stmt = stmt.offset(skip).limit(limit)
    result = await db.execute(stmt)
    return result.scalars().all()


@router.get("/posts/{post_id}")
async def get_post(
    post_id: int,
    current_user: User | None = Depends(get_current_user_optional),
    db: AsyncSession = Depends(get_db)
):
    """取得單篇文章"""
    post = await db.get(Post, post_id)

    if not post:
        raise HTTPException(status_code=404, detail="Post not found")

    if not await PostPermissions.can_view(post, current_user):
        raise HTTPException(status_code=403, detail="Permission denied")

    return post


@router.post("/posts")
async def create_post(
    post_data: PostCreate,
    current_user: User = Depends(require_permissions("posts:create")),
    db: AsyncSession = Depends(get_db)
):
    """建立文章"""
    post = Post(
        **post_data.dict(),
        author_id=current_user.id,
        is_published=False
    )
    db.add(post)
    await db.commit()
    await db.refresh(post)
    return post


@router.put("/posts/{post_id}")
async def update_post(
    post_id: int,
    post_data: PostUpdate,
    current_user: User = Depends(get_current_active_user),
    db: AsyncSession = Depends(get_db)
):
    """更新文章"""
    post = await db.get(Post, post_id)

    if not post:
        raise HTTPException(status_code=404, detail="Post not found")

    if not await PostPermissions.can_edit(post, current_user):
        raise HTTPException(status_code=403, detail="Permission denied")

    for key, value in post_data.dict(exclude_unset=True).items():
        setattr(post, key, value)

    await db.commit()
    await db.refresh(post)
    return post


@router.post("/posts/{post_id}/publish")
async def publish_post(
    post_id: int,
    current_user: User = Depends(get_current_active_user),
    db: AsyncSession = Depends(get_db)
):
    """發布文章"""
    post = await db.get(Post, post_id)

    if not post:
        raise HTTPException(status_code=404, detail="Post not found")

    if not await PostPermissions.can_publish(post, current_user):
        raise HTTPException(status_code=403, detail="Permission denied")

    post.is_published = True
    post.published_at = datetime.utcnow()
    await db.commit()

    return {"message": "Post published successfully"}


@router.delete("/posts/{post_id}")
async def delete_post(
    post_id: int,
    current_user: User = Depends(get_current_active_user),
    db: AsyncSession = Depends(get_db)
):
    """刪除文章"""
    post = await db.get(Post, post_id)

    if not post:
        raise HTTPException(status_code=404, detail="Post not found")

    if not await PostPermissions.can_delete(post, current_user):
        raise HTTPException(status_code=403, detail="Permission denied")

    await db.delete(post)
    await db.commit()

    return {"message": "Post deleted successfully"}

✅ 重點總結

權限模型選擇

模型適用場景
RBAC大多數應用(推薦)
ABAC需要複雜條件判斷
ACL細粒度資源控制

RBAC 要點

使用者 → 角色 → 權限

User  →  admin   →  users:*, posts:*, admin:*
      →  author  →  posts:create, posts:update
      →  user    →  posts:read, comments:create

實作建議

  1. 權限命名:使用 資源:操作 格式
  2. 擁有者檢查:使用者只能存取自己的資源
  3. 依賴注入:用 FastAPI Depends 實現權限檢查
  4. 超級使用者:保留一個可以繞過所有權限的角色

🎤 面試這樣答

Q: RBAC 和 ABAC 的差別是什麼?

答案:

RBAC(Role-Based):

  • 使用者 → 角色 → 權限
  • 簡單易管理
  • 例如:admin 角色有所有權限

ABAC(Attribute-Based):

  • 基於屬性判斷(使用者屬性、資源屬性、環境)
  • 更靈活但較複雜
  • 例如:部門經理只能存取自己部門的資料

選擇建議:

  • 大多數應用用 RBAC 就夠了
  • 需要複雜條件判斷時用 ABAC

上一篇: 04-3. OAuth 2.0 下一篇: 04-5. 安全最佳實踐


最後更新:2025-12-17

0%