05-7. 背景任務

⏱️ 閱讀時間: 18 分鐘 🎯 難度: ⭐⭐⭐ (進階)


🤔 一句話解釋

背景任務讓 API 可以快速回應,同時在背景執行耗時操作,提升使用者體驗。


🔄 為什麼需要背景任務

沒有背景任務:
┌────────┐         ┌────────┐         ┌────────┐
│ Client │ ─請求─▶ │ Server │ ─處理─▶ │ 耗時   │
│        │ ◀─回應─ │        │ ◀───── │ 作業   │
└────────┘         └────────┘         └────────┘
用戶等待:████████████████████  10 秒

有背景任務:
┌────────┐         ┌────────┐         ┌────────┐
│ Client │ ─請求─▶ │ Server │ ─背景─▶ │ 耗時   │
│        │ ◀─回應─ │        │         │ 作業   │
└────────┘         └────────┘         └────────┘
用戶等待:██  0.1 秒    背景執行:████████████████

🎯 適用場景

場景說明
發送郵件註冊確認、通知信
檔案處理圖片壓縮、PDF 生成
資料同步第三方 API 同步
清理任務刪除暫存檔、日誌清理
統計計算報表生成、數據彙整

📦 FastAPI BackgroundTasks

基本用法

from fastapi import FastAPI, BackgroundTasks
import time

app = FastAPI()

def send_email(email: str, message: str):
    """模擬發送郵件"""
    print(f"開始發送郵件到 {email}")
    time.sleep(3)  # 模擬耗時
    print(f"郵件已發送: {message}")

@app.post("/register")
async def register(
    email: str,
    background_tasks: BackgroundTasks
):
    # 立即回應
    background_tasks.add_task(send_email, email, "歡迎加入!")

    return {"message": "註冊成功,確認信將稍後寄出"}

多個背景任務

from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

def task_a(data: str):
    print(f"執行任務 A: {data}")

def task_b(data: str):
    print(f"執行任務 B: {data}")

def task_c(data: str):
    print(f"執行任務 C: {data}")

@app.post("/process")
async def process(background_tasks: BackgroundTasks):
    # 多個任務會依序執行
    background_tasks.add_task(task_a, "data_a")
    background_tasks.add_task(task_b, "data_b")
    background_tasks.add_task(task_c, "data_c")

    return {"message": "處理中"}

非同步背景任務

from fastapi import FastAPI, BackgroundTasks
import asyncio
import httpx

app = FastAPI()

async def async_task(url: str):
    """非同步背景任務"""
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
        print(f"取得 {url}: {response.status_code}")

@app.post("/fetch")
async def fetch_url(url: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(async_task, url)
    return {"message": "背景取得中"}

🔧 依賴注入中的背景任務

在依賴項中使用

from fastapi import FastAPI, Depends, BackgroundTasks

app = FastAPI()

def log_request(request_id: str):
    print(f"記錄請求: {request_id}")

async def common_parameters(
    request_id: str,
    background_tasks: BackgroundTasks
):
    """共用依賴項"""
    # 記錄請求日誌(背景執行)
    background_tasks.add_task(log_request, request_id)

    return {"request_id": request_id}

@app.get("/items")
async def get_items(params: dict = Depends(common_parameters)):
    return {"items": [], "request_id": params["request_id"]}

@app.get("/users")
async def get_users(params: dict = Depends(common_parameters)):
    return {"users": [], "request_id": params["request_id"]}

類別型依賴

from fastapi import FastAPI, Depends, BackgroundTasks

app = FastAPI()

class TaskManager:
    def __init__(self, background_tasks: BackgroundTasks):
        self.background_tasks = background_tasks

    def schedule_email(self, email: str, content: str):
        self.background_tasks.add_task(
            self._send_email, email, content
        )

    def schedule_notification(self, user_id: int, message: str):
        self.background_tasks.add_task(
            self._send_notification, user_id, message
        )

    def _send_email(self, email: str, content: str):
        print(f"發送郵件到 {email}")

    def _send_notification(self, user_id: int, message: str):
        print(f"發送通知給用戶 {user_id}")

@app.post("/order")
async def create_order(
    email: str,
    task_manager: TaskManager = Depends(TaskManager)
):
    # 處理訂單...

    # 背景發送確認郵件和通知
    task_manager.schedule_email(email, "訂單確認")
    task_manager.schedule_notification(1, "新訂單")

    return {"status": "success"}

📊 任務追蹤

簡單的任務狀態

from fastapi import FastAPI, BackgroundTasks
from typing import Dict
import uuid
import asyncio

app = FastAPI()

# 任務狀態儲存
task_status: Dict[str, dict] = {}

async def long_task(task_id: str):
    """長時間執行的任務"""
    task_status[task_id] = {"status": "running", "progress": 0}

    for i in range(1, 11):
        await asyncio.sleep(1)  # 模擬工作
        task_status[task_id]["progress"] = i * 10

    task_status[task_id]["status"] = "completed"

@app.post("/tasks")
async def create_task(background_tasks: BackgroundTasks):
    task_id = str(uuid.uuid4())
    task_status[task_id] = {"status": "pending", "progress": 0}

    background_tasks.add_task(long_task, task_id)

    return {"task_id": task_id}

@app.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
    if task_id not in task_status:
        return {"error": "Task not found"}

    return task_status[task_id]

完整任務管理器

from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel
from typing import Dict, Optional, Callable, Any
from datetime import datetime
from enum import Enum
import uuid
import asyncio
import traceback

app = FastAPI()

class TaskStatus(str, Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"

class TaskInfo(BaseModel):
    id: str
    status: TaskStatus
    progress: int = 0
    result: Optional[Any] = None
    error: Optional[str] = None
    created_at: str
    updated_at: str

class TaskTracker:
    """任務追蹤器"""

    def __init__(self):
        self.tasks: Dict[str, TaskInfo] = {}

    def create_task(self) -> str:
        """建立新任務"""
        task_id = str(uuid.uuid4())
        now = datetime.utcnow().isoformat()

        self.tasks[task_id] = TaskInfo(
            id=task_id,
            status=TaskStatus.PENDING,
            created_at=now,
            updated_at=now
        )

        return task_id

    def update_status(
        self,
        task_id: str,
        status: TaskStatus,
        progress: int = None,
        result: Any = None,
        error: str = None
    ):
        """更新任務狀態"""
        if task_id not in self.tasks:
            return

        task = self.tasks[task_id]
        task.status = status
        task.updated_at = datetime.utcnow().isoformat()

        if progress is not None:
            task.progress = progress
        if result is not None:
            task.result = result
        if error is not None:
            task.error = error

    def get_task(self, task_id: str) -> Optional[TaskInfo]:
        """取得任務資訊"""
        return self.tasks.get(task_id)

    async def run_task(
        self,
        task_id: str,
        func: Callable,
        *args,
        **kwargs
    ):
        """執行任務並追蹤狀態"""
        self.update_status(task_id, TaskStatus.RUNNING)

        try:
            if asyncio.iscoroutinefunction(func):
                result = await func(task_id, self, *args, **kwargs)
            else:
                result = func(task_id, self, *args, **kwargs)

            self.update_status(
                task_id,
                TaskStatus.COMPLETED,
                progress=100,
                result=result
            )
        except Exception as e:
            self.update_status(
                task_id,
                TaskStatus.FAILED,
                error=str(e)
            )

tracker = TaskTracker()

async def process_data(
    task_id: str,
    tracker: TaskTracker,
    data: list
) -> dict:
    """處理資料的任務"""
    total = len(data)
    results = []

    for i, item in enumerate(data, 1):
        # 模擬處理
        await asyncio.sleep(0.5)
        results.append(item * 2)

        # 更新進度
        progress = int(i / total * 100)
        tracker.update_status(task_id, TaskStatus.RUNNING, progress=progress)

    return {"processed": len(results), "results": results}

@app.post("/process")
async def start_processing(
    data: list,
    background_tasks: BackgroundTasks
):
    task_id = tracker.create_task()

    background_tasks.add_task(
        tracker.run_task,
        task_id,
        process_data,
        data
    )

    return {"task_id": task_id}

@app.get("/tasks/{task_id}")
async def get_task(task_id: str):
    task = tracker.get_task(task_id)

    if not task:
        raise HTTPException(status_code=404, detail="Task not found")

    return task

@app.get("/tasks")
async def list_tasks():
    return list(tracker.tasks.values())

⚠️ 錯誤處理

捕獲背景任務錯誤

from fastapi import FastAPI, BackgroundTasks
import logging

app = FastAPI()
logger = logging.getLogger(__name__)

def risky_task(data: str):
    """可能失敗的任務"""
    try:
        # 處理邏輯
        if data == "error":
            raise ValueError("處理失敗")

        print(f"成功處理: {data}")
    except Exception as e:
        # 記錄錯誤
        logger.error(f"背景任務錯誤: {e}")
        # 可以發送警報通知

@app.post("/process")
async def process(data: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(risky_task, data)
    return {"message": "處理中"}

帶重試的任務

from fastapi import FastAPI, BackgroundTasks
import asyncio
import logging

app = FastAPI()
logger = logging.getLogger(__name__)

async def task_with_retry(
    func,
    max_retries: int = 3,
    delay: float = 1.0,
    *args,
    **kwargs
):
    """帶重試機制的任務包裝器"""
    last_exception = None

    for attempt in range(max_retries):
        try:
            if asyncio.iscoroutinefunction(func):
                return await func(*args, **kwargs)
            else:
                return func(*args, **kwargs)
        except Exception as e:
            last_exception = e
            logger.warning(
                f"任務失敗 (嘗試 {attempt + 1}/{max_retries}): {e}"
            )

            if attempt < max_retries - 1:
                await asyncio.sleep(delay * (2 ** attempt))  # 指數退避

    logger.error(f"任務最終失敗: {last_exception}")
    raise last_exception

async def send_webhook(url: str, data: dict):
    """發送 Webhook"""
    import httpx

    async with httpx.AsyncClient() as client:
        response = await client.post(url, json=data)
        response.raise_for_status()

@app.post("/webhook")
async def trigger_webhook(
    url: str,
    data: dict,
    background_tasks: BackgroundTasks
):
    background_tasks.add_task(
        task_with_retry,
        send_webhook,
        3,  # max_retries
        1.0,  # delay
        url,
        data
    )

    return {"message": "Webhook 排程中"}

📝 實戰範例:郵件服務

from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel, EmailStr
from typing import List, Optional
import asyncio
from datetime import datetime
import uuid

app = FastAPI()

class EmailRequest(BaseModel):
    to: List[EmailStr]
    subject: str
    body: str
    cc: Optional[List[EmailStr]] = None
    attachments: Optional[List[str]] = None

class EmailService:
    """郵件服務"""

    def __init__(self):
        self.sent_emails: List[dict] = []
        self.failed_emails: List[dict] = []

    async def send_email(
        self,
        to: str,
        subject: str,
        body: str,
        cc: List[str] = None,
        attachments: List[str] = None
    ) -> dict:
        """發送單封郵件"""
        # 模擬發送
        await asyncio.sleep(1)

        # 模擬偶爾失敗
        import random
        if random.random() < 0.1:
            raise Exception("SMTP 連線失敗")

        email_record = {
            "id": str(uuid.uuid4()),
            "to": to,
            "subject": subject,
            "sent_at": datetime.utcnow().isoformat()
        }

        self.sent_emails.append(email_record)
        return email_record

    async def send_bulk_emails(
        self,
        recipients: List[str],
        subject: str,
        body: str
    ):
        """批次發送郵件"""
        for recipient in recipients:
            try:
                await self.send_email(recipient, subject, body)
                print(f"成功發送到 {recipient}")
            except Exception as e:
                self.failed_emails.append({
                    "to": recipient,
                    "error": str(e),
                    "timestamp": datetime.utcnow().isoformat()
                })
                print(f"發送失敗 {recipient}: {e}")

email_service = EmailService()

@app.post("/email/send")
async def send_email(
    request: EmailRequest,
    background_tasks: BackgroundTasks
):
    """發送郵件(背景執行)"""
    for recipient in request.to:
        background_tasks.add_task(
            email_service.send_email,
            recipient,
            request.subject,
            request.body,
            request.cc,
            request.attachments
        )

    return {
        "message": f"已排程發送 {len(request.to)} 封郵件",
        "recipients": request.to
    }

@app.post("/email/bulk")
async def send_bulk_email(
    recipients: List[EmailStr],
    subject: str,
    body: str,
    background_tasks: BackgroundTasks
):
    """批次發送郵件"""
    background_tasks.add_task(
        email_service.send_bulk_emails,
        recipients,
        subject,
        body
    )

    return {
        "message": f"已排程批次發送 {len(recipients)} 封郵件"
    }

@app.get("/email/stats")
async def get_email_stats():
    """取得郵件發送統計"""
    return {
        "sent": len(email_service.sent_emails),
        "failed": len(email_service.failed_emails),
        "recent_sent": email_service.sent_emails[-10:],
        "recent_failed": email_service.failed_emails[-10:]
    }

✅ 重點總結

BackgroundTasks 特性

特性說明
執行時機回應發送後執行
執行順序依加入順序執行
錯誤處理不會影響回應
適用範圍輕量、短時間任務

何時不適用

情況建議
長時間任務使用 Celery、RQ
需要持久化使用訊息佇列
跨行程使用任務佇列
需要重試使用專門的任務框架

最佳實踐

  1. 背景任務應該是獨立的
  2. 做好錯誤處理和日誌
  3. 考慮任務失敗的情況
  4. 大量任務考慮使用專門的任務佇列

🎤 面試這樣答

Q: FastAPI 的 BackgroundTasks 和 Celery 有什麼差別?

答案:

BackgroundTasks:

  • 簡單輕量,適合短任務
  • 同一行程內執行
  • 不支援分散式
  • 伺服器重啟會丟失任務

Celery:

  • 功能完整的任務佇列
  • 支援分散式執行
  • 任務持久化(Redis/RabbitMQ)
  • 支援定時任務、重試、監控

選擇建議:

  • 發送郵件、簡單通知 → BackgroundTasks
  • 影片轉碼、大量資料處理 → Celery
# BackgroundTasks 範例
@app.post("/notify")
async def notify(background_tasks: BackgroundTasks):
    background_tasks.add_task(send_notification)
    return {"status": "ok"}

上一篇: 05-6. Server-Sent Events 下一篇: 05-8. 效能優化與監控


最後更新:2025-12-18

0%