目錄
04-4. 權限控制
⏱️ 閱讀時間: 18 分鐘 🎯 難度: ⭐⭐⭐ (進階)
🤔 一句話解釋
權限控制(Authorization)決定已認證的使用者可以存取哪些資源、執行哪些操作。
🔐 權限模型
常見模型比較
┌─────────────────────────────────────────────────────────┐
│ 1. RBAC(Role-Based Access Control) │
│ └─ 基於角色的存取控制 │
│ └─ 使用者 → 角色 → 權限 │
│ └─ 最常見,易於管理 │
├─────────────────────────────────────────────────────────┤
│ 2. ABAC(Attribute-Based Access Control) │
│ └─ 基於屬性的存取控制 │
│ └─ 根據使用者屬性、資源屬性、環境條件判斷 │
│ └─ 更靈活,但較複雜 │
├─────────────────────────────────────────────────────────┤
│ 3. ACL(Access Control List) │
│ └─ 存取控制列表 │
│ └─ 直接定義「誰」可以對「什麼」做「什麼」 │
│ └─ 細粒度控制,但難以擴展 │
└─────────────────────────────────────────────────────────┘🎭 RBAC 實作
Model 設計
from sqlalchemy import Column, Integer, String, Table, ForeignKey, Boolean
from sqlalchemy.orm import relationship, Mapped, mapped_column
from typing import List
# 使用者-角色 關聯表
user_roles = Table(
"user_roles",
Base.metadata,
Column("user_id", Integer, ForeignKey("users.id"), primary_key=True),
Column("role_id", Integer, ForeignKey("roles.id"), primary_key=True),
)
# 角色-權限 關聯表
role_permissions = Table(
"role_permissions",
Base.metadata,
Column("role_id", Integer, ForeignKey("roles.id"), primary_key=True),
Column("permission_id", Integer, ForeignKey("permissions.id"), primary_key=True),
)
class Permission(Base):
"""權限"""
__tablename__ = "permissions"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50), unique=True) # e.g., "users:read"
description: Mapped[str] = mapped_column(String(200), nullable=True)
roles: Mapped[List["Role"]] = relationship(
"Role",
secondary=role_permissions,
back_populates="permissions"
)
class Role(Base):
"""角色"""
__tablename__ = "roles"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50), unique=True) # e.g., "admin"
description: Mapped[str] = mapped_column(String(200), nullable=True)
permissions: Mapped[List["Permission"]] = relationship(
"Permission",
secondary=role_permissions,
back_populates="roles"
)
users: Mapped[List["User"]] = relationship(
"User",
secondary=user_roles,
back_populates="roles"
)
class User(Base):
"""使用者"""
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
username: Mapped[str] = mapped_column(String(50), unique=True)
email: Mapped[str] = mapped_column(String(100), unique=True)
hashed_password: Mapped[str] = mapped_column(String(255))
is_active: Mapped[bool] = mapped_column(default=True)
is_superuser: Mapped[bool] = mapped_column(default=False)
roles: Mapped[List["Role"]] = relationship(
"Role",
secondary=user_roles,
back_populates="users"
)
def has_permission(self, permission_name: str) -> bool:
"""檢查是否有特定權限"""
if self.is_superuser:
return True
for role in self.roles:
for permission in role.permissions:
if permission.name == permission_name:
return True
return False
def has_role(self, role_name: str) -> bool:
"""檢查是否有特定角色"""
if self.is_superuser:
return True
return any(role.name == role_name for role in self.roles)
@property
def permissions(self) -> set[str]:
"""取得所有權限"""
perms = set()
for role in self.roles:
for permission in role.permissions:
perms.add(permission.name)
return perms權限檢查依賴項
# app/api/deps.py
from fastapi import Depends, HTTPException, status
from typing import Callable, List
from app.models import User
from app.api.deps import get_current_active_user
def require_permissions(*required_permissions: str) -> Callable:
"""要求特定權限的依賴項"""
async def permission_checker(
current_user: User = Depends(get_current_active_user)
) -> User:
# 超級使用者擁有所有權限
if current_user.is_superuser:
return current_user
# 檢查使用者是否有所有必要權限
user_permissions = current_user.permissions
missing = set(required_permissions) - user_permissions
if missing:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Missing permissions: {', '.join(missing)}"
)
return current_user
return permission_checker
def require_roles(*required_roles: str) -> Callable:
"""要求特定角色的依賴項"""
async def role_checker(
current_user: User = Depends(get_current_active_user)
) -> User:
if current_user.is_superuser:
return current_user
user_roles = {role.name for role in current_user.roles}
if not user_roles.intersection(required_roles):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Requires one of these roles: {', '.join(required_roles)}"
)
return current_user
return role_checker使用方式
# app/api/routes/users.py
from fastapi import APIRouter, Depends
from app.api.deps import require_permissions, require_roles
from app.models import User
router = APIRouter(prefix="/users", tags=["users"])
@router.get("/")
async def list_users(
current_user: User = Depends(require_permissions("users:read"))
):
"""列出使用者(需要 users:read 權限)"""
pass
@router.post("/")
async def create_user(
current_user: User = Depends(require_permissions("users:create"))
):
"""建立使用者(需要 users:create 權限)"""
pass
@router.delete("/{user_id}")
async def delete_user(
user_id: int,
current_user: User = Depends(require_permissions("users:delete"))
):
"""刪除使用者(需要 users:delete 權限)"""
pass
@router.get("/admin-panel")
async def admin_panel(
current_user: User = Depends(require_roles("admin", "moderator"))
):
"""管理面板(需要 admin 或 moderator 角色)"""
pass🏷️ 權限命名慣例
資源:操作 格式
# 權限命名慣例
PERMISSIONS = {
# 使用者管理
"users:read": "查看使用者",
"users:create": "建立使用者",
"users:update": "更新使用者",
"users:delete": "刪除使用者",
# 文章管理
"posts:read": "查看文章",
"posts:create": "建立文章",
"posts:update": "更新文章",
"posts:delete": "刪除文章",
"posts:publish": "發布文章",
# 評論管理
"comments:read": "查看評論",
"comments:create": "建立評論",
"comments:delete": "刪除評論",
"comments:moderate": "審核評論",
# 系統管理
"admin:access": "存取管理後台",
"admin:settings": "修改系統設定",
}
# 預設角色
DEFAULT_ROLES = {
"user": ["posts:read", "comments:read", "comments:create"],
"author": ["posts:read", "posts:create", "posts:update", "comments:*"],
"moderator": ["posts:*", "comments:*", "users:read"],
"admin": ["*"], # 所有權限
}🔒 資源擁有者檢查
只能存取自己的資源
from fastapi import HTTPException, status
async def check_resource_owner(
resource_user_id: int,
current_user: User
) -> bool:
"""檢查是否為資源擁有者"""
if current_user.is_superuser:
return True
if resource_user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to access this resource"
)
return True
# 使用範例
@router.get("/posts/{post_id}")
async def get_post(
post_id: int,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
):
"""取得文章(自己的文章或有權限)"""
post = await db.get(Post, post_id)
if not post:
raise HTTPException(status_code=404, detail="Post not found")
# 公開文章任何人都可以看
if post.is_published:
return post
# 未發布的文章只有擁有者或有權限的人可以看
if post.author_id != current_user.id:
if not current_user.has_permission("posts:read_all"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to view this post"
)
return post
@router.put("/posts/{post_id}")
async def update_post(
post_id: int,
post_data: PostUpdate,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
):
"""更新文章(只有擁有者或管理員可以更新)"""
post = await db.get(Post, post_id)
if not post:
raise HTTPException(status_code=404, detail="Post not found")
# 檢查是否為擁有者或有 posts:update_all 權限
is_owner = post.author_id == current_user.id
can_update_all = current_user.has_permission("posts:update_all")
if not is_owner and not can_update_all:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You can only update your own posts"
)
# 更新文章
for key, value in post_data.dict(exclude_unset=True).items():
setattr(post, key, value)
await db.commit()
return post🎨 裝飾器方式
權限裝飾器
from functools import wraps
from typing import Callable, List
def permission_required(*permissions: str):
"""權限檢查裝飾器"""
def decorator(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs):
# 從 kwargs 取得 current_user
current_user = kwargs.get("current_user")
if not current_user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated"
)
if current_user.is_superuser:
return await func(*args, **kwargs)
user_perms = current_user.permissions
missing = set(permissions) - user_perms
if missing:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Missing permissions: {', '.join(missing)}"
)
return await func(*args, **kwargs)
return wrapper
return decorator
# 使用方式(不推薦,因為 FastAPI 的依賴注入更好用)
@router.get("/users")
@permission_required("users:read")
async def list_users(current_user: User = Depends(get_current_active_user)):
pass📝 完整範例:部落格權限系統
# app/api/routes/blog.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.database import get_db
from app.models import User, Post
from app.api.deps import get_current_active_user, require_permissions
router = APIRouter(prefix="/blog", tags=["blog"])
class PostPermissions:
"""文章權限檢查"""
@staticmethod
async def can_view(post: Post, user: User | None) -> bool:
"""是否可以查看文章"""
# 公開文章任何人都可以看
if post.is_published:
return True
# 未登入不能看草稿
if not user:
return False
# 擁有者可以看自己的草稿
if post.author_id == user.id:
return True
# 有 posts:read_all 權限可以看所有文章
return user.has_permission("posts:read_all")
@staticmethod
async def can_edit(post: Post, user: User) -> bool:
"""是否可以編輯文章"""
# 擁有者可以編輯自己的文章
if post.author_id == user.id:
return True
# 有 posts:update_all 權限可以編輯所有文章
return user.has_permission("posts:update_all")
@staticmethod
async def can_delete(post: Post, user: User) -> bool:
"""是否可以刪除文章"""
# 擁有者可以刪除自己的文章
if post.author_id == user.id:
return True
# 有 posts:delete_all 權限可以刪除所有文章
return user.has_permission("posts:delete_all")
@staticmethod
async def can_publish(post: Post, user: User) -> bool:
"""是否可以發布文章"""
# 擁有者有 posts:publish 權限才能發布
if post.author_id == user.id:
return user.has_permission("posts:publish")
# 有 posts:publish_all 權限可以發布所有文章
return user.has_permission("posts:publish_all")
@router.get("/posts")
async def list_posts(
skip: int = 0,
limit: int = 20,
include_drafts: bool = False,
current_user: User | None = Depends(get_current_user_optional),
db: AsyncSession = Depends(get_db)
):
"""列出文章"""
stmt = select(Post)
if include_drafts:
# 只有有權限的人才能看到草稿
if not current_user:
raise HTTPException(status_code=401, detail="Login required")
if not current_user.has_permission("posts:read_all"):
# 只能看自己的草稿
stmt = stmt.where(
(Post.is_published == True) |
(Post.author_id == current_user.id)
)
else:
stmt = stmt.where(Post.is_published == True)
stmt = stmt.offset(skip).limit(limit)
result = await db.execute(stmt)
return result.scalars().all()
@router.get("/posts/{post_id}")
async def get_post(
post_id: int,
current_user: User | None = Depends(get_current_user_optional),
db: AsyncSession = Depends(get_db)
):
"""取得單篇文章"""
post = await db.get(Post, post_id)
if not post:
raise HTTPException(status_code=404, detail="Post not found")
if not await PostPermissions.can_view(post, current_user):
raise HTTPException(status_code=403, detail="Permission denied")
return post
@router.post("/posts")
async def create_post(
post_data: PostCreate,
current_user: User = Depends(require_permissions("posts:create")),
db: AsyncSession = Depends(get_db)
):
"""建立文章"""
post = Post(
**post_data.dict(),
author_id=current_user.id,
is_published=False
)
db.add(post)
await db.commit()
await db.refresh(post)
return post
@router.put("/posts/{post_id}")
async def update_post(
post_id: int,
post_data: PostUpdate,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
):
"""更新文章"""
post = await db.get(Post, post_id)
if not post:
raise HTTPException(status_code=404, detail="Post not found")
if not await PostPermissions.can_edit(post, current_user):
raise HTTPException(status_code=403, detail="Permission denied")
for key, value in post_data.dict(exclude_unset=True).items():
setattr(post, key, value)
await db.commit()
await db.refresh(post)
return post
@router.post("/posts/{post_id}/publish")
async def publish_post(
post_id: int,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
):
"""發布文章"""
post = await db.get(Post, post_id)
if not post:
raise HTTPException(status_code=404, detail="Post not found")
if not await PostPermissions.can_publish(post, current_user):
raise HTTPException(status_code=403, detail="Permission denied")
post.is_published = True
post.published_at = datetime.utcnow()
await db.commit()
return {"message": "Post published successfully"}
@router.delete("/posts/{post_id}")
async def delete_post(
post_id: int,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
):
"""刪除文章"""
post = await db.get(Post, post_id)
if not post:
raise HTTPException(status_code=404, detail="Post not found")
if not await PostPermissions.can_delete(post, current_user):
raise HTTPException(status_code=403, detail="Permission denied")
await db.delete(post)
await db.commit()
return {"message": "Post deleted successfully"}✅ 重點總結
權限模型選擇
| 模型 | 適用場景 |
|---|---|
| RBAC | 大多數應用(推薦) |
| ABAC | 需要複雜條件判斷 |
| ACL | 細粒度資源控制 |
RBAC 要點
使用者 → 角色 → 權限
User → admin → users:*, posts:*, admin:*
→ author → posts:create, posts:update
→ user → posts:read, comments:create實作建議
- 權限命名:使用
資源:操作格式 - 擁有者檢查:使用者只能存取自己的資源
- 依賴注入:用 FastAPI Depends 實現權限檢查
- 超級使用者:保留一個可以繞過所有權限的角色
🎤 面試這樣答
Q: RBAC 和 ABAC 的差別是什麼?
答案:
RBAC(Role-Based):
- 使用者 → 角色 → 權限
- 簡單易管理
- 例如:admin 角色有所有權限
ABAC(Attribute-Based):
- 基於屬性判斷(使用者屬性、資源屬性、環境)
- 更靈活但較複雜
- 例如:部門經理只能存取自己部門的資料
選擇建議:
- 大多數應用用 RBAC 就夠了
- 需要複雜條件判斷時用 ABAC
上一篇: 04-3. OAuth 2.0 下一篇: 04-5. 安全最佳實踐
最後更新:2025-12-17