目錄
05-7. 背景任務
⏱️ 閱讀時間: 18 分鐘 🎯 難度: ⭐⭐⭐ (進階)
🤔 一句話解釋
背景任務讓 API 可以快速回應,同時在背景執行耗時操作,提升使用者體驗。
🔄 為什麼需要背景任務
沒有背景任務:
┌────────┐ ┌────────┐ ┌────────┐
│ Client │ ─請求─▶ │ Server │ ─處理─▶ │ 耗時 │
│ │ ◀─回應─ │ │ ◀───── │ 作業 │
└────────┘ └────────┘ └────────┘
用戶等待:████████████████████ 10 秒
有背景任務:
┌────────┐ ┌────────┐ ┌────────┐
│ Client │ ─請求─▶ │ Server │ ─背景─▶ │ 耗時 │
│ │ ◀─回應─ │ │ │ 作業 │
└────────┘ └────────┘ └────────┘
用戶等待:██ 0.1 秒 背景執行:████████████████🎯 適用場景
| 場景 | 說明 |
|---|---|
| 發送郵件 | 註冊確認、通知信 |
| 檔案處理 | 圖片壓縮、PDF 生成 |
| 資料同步 | 第三方 API 同步 |
| 清理任務 | 刪除暫存檔、日誌清理 |
| 統計計算 | 報表生成、數據彙整 |
📦 FastAPI BackgroundTasks
基本用法
from fastapi import FastAPI, BackgroundTasks
import time
app = FastAPI()
def send_email(email: str, message: str):
"""模擬發送郵件"""
print(f"開始發送郵件到 {email}")
time.sleep(3) # 模擬耗時
print(f"郵件已發送: {message}")
@app.post("/register")
async def register(
email: str,
background_tasks: BackgroundTasks
):
# 立即回應
background_tasks.add_task(send_email, email, "歡迎加入!")
return {"message": "註冊成功,確認信將稍後寄出"}多個背景任務
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
def task_a(data: str):
print(f"執行任務 A: {data}")
def task_b(data: str):
print(f"執行任務 B: {data}")
def task_c(data: str):
print(f"執行任務 C: {data}")
@app.post("/process")
async def process(background_tasks: BackgroundTasks):
# 多個任務會依序執行
background_tasks.add_task(task_a, "data_a")
background_tasks.add_task(task_b, "data_b")
background_tasks.add_task(task_c, "data_c")
return {"message": "處理中"}非同步背景任務
from fastapi import FastAPI, BackgroundTasks
import asyncio
import httpx
app = FastAPI()
async def async_task(url: str):
"""非同步背景任務"""
async with httpx.AsyncClient() as client:
response = await client.get(url)
print(f"取得 {url}: {response.status_code}")
@app.post("/fetch")
async def fetch_url(url: str, background_tasks: BackgroundTasks):
background_tasks.add_task(async_task, url)
return {"message": "背景取得中"}🔧 依賴注入中的背景任務
在依賴項中使用
from fastapi import FastAPI, Depends, BackgroundTasks
app = FastAPI()
def log_request(request_id: str):
print(f"記錄請求: {request_id}")
async def common_parameters(
request_id: str,
background_tasks: BackgroundTasks
):
"""共用依賴項"""
# 記錄請求日誌(背景執行)
background_tasks.add_task(log_request, request_id)
return {"request_id": request_id}
@app.get("/items")
async def get_items(params: dict = Depends(common_parameters)):
return {"items": [], "request_id": params["request_id"]}
@app.get("/users")
async def get_users(params: dict = Depends(common_parameters)):
return {"users": [], "request_id": params["request_id"]}類別型依賴
from fastapi import FastAPI, Depends, BackgroundTasks
app = FastAPI()
class TaskManager:
def __init__(self, background_tasks: BackgroundTasks):
self.background_tasks = background_tasks
def schedule_email(self, email: str, content: str):
self.background_tasks.add_task(
self._send_email, email, content
)
def schedule_notification(self, user_id: int, message: str):
self.background_tasks.add_task(
self._send_notification, user_id, message
)
def _send_email(self, email: str, content: str):
print(f"發送郵件到 {email}")
def _send_notification(self, user_id: int, message: str):
print(f"發送通知給用戶 {user_id}")
@app.post("/order")
async def create_order(
email: str,
task_manager: TaskManager = Depends(TaskManager)
):
# 處理訂單...
# 背景發送確認郵件和通知
task_manager.schedule_email(email, "訂單確認")
task_manager.schedule_notification(1, "新訂單")
return {"status": "success"}📊 任務追蹤
簡單的任務狀態
from fastapi import FastAPI, BackgroundTasks
from typing import Dict
import uuid
import asyncio
app = FastAPI()
# 任務狀態儲存
task_status: Dict[str, dict] = {}
async def long_task(task_id: str):
"""長時間執行的任務"""
task_status[task_id] = {"status": "running", "progress": 0}
for i in range(1, 11):
await asyncio.sleep(1) # 模擬工作
task_status[task_id]["progress"] = i * 10
task_status[task_id]["status"] = "completed"
@app.post("/tasks")
async def create_task(background_tasks: BackgroundTasks):
task_id = str(uuid.uuid4())
task_status[task_id] = {"status": "pending", "progress": 0}
background_tasks.add_task(long_task, task_id)
return {"task_id": task_id}
@app.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
if task_id not in task_status:
return {"error": "Task not found"}
return task_status[task_id]完整任務管理器
from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel
from typing import Dict, Optional, Callable, Any
from datetime import datetime
from enum import Enum
import uuid
import asyncio
import traceback
app = FastAPI()
class TaskStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
class TaskInfo(BaseModel):
id: str
status: TaskStatus
progress: int = 0
result: Optional[Any] = None
error: Optional[str] = None
created_at: str
updated_at: str
class TaskTracker:
"""任務追蹤器"""
def __init__(self):
self.tasks: Dict[str, TaskInfo] = {}
def create_task(self) -> str:
"""建立新任務"""
task_id = str(uuid.uuid4())
now = datetime.utcnow().isoformat()
self.tasks[task_id] = TaskInfo(
id=task_id,
status=TaskStatus.PENDING,
created_at=now,
updated_at=now
)
return task_id
def update_status(
self,
task_id: str,
status: TaskStatus,
progress: int = None,
result: Any = None,
error: str = None
):
"""更新任務狀態"""
if task_id not in self.tasks:
return
task = self.tasks[task_id]
task.status = status
task.updated_at = datetime.utcnow().isoformat()
if progress is not None:
task.progress = progress
if result is not None:
task.result = result
if error is not None:
task.error = error
def get_task(self, task_id: str) -> Optional[TaskInfo]:
"""取得任務資訊"""
return self.tasks.get(task_id)
async def run_task(
self,
task_id: str,
func: Callable,
*args,
**kwargs
):
"""執行任務並追蹤狀態"""
self.update_status(task_id, TaskStatus.RUNNING)
try:
if asyncio.iscoroutinefunction(func):
result = await func(task_id, self, *args, **kwargs)
else:
result = func(task_id, self, *args, **kwargs)
self.update_status(
task_id,
TaskStatus.COMPLETED,
progress=100,
result=result
)
except Exception as e:
self.update_status(
task_id,
TaskStatus.FAILED,
error=str(e)
)
tracker = TaskTracker()
async def process_data(
task_id: str,
tracker: TaskTracker,
data: list
) -> dict:
"""處理資料的任務"""
total = len(data)
results = []
for i, item in enumerate(data, 1):
# 模擬處理
await asyncio.sleep(0.5)
results.append(item * 2)
# 更新進度
progress = int(i / total * 100)
tracker.update_status(task_id, TaskStatus.RUNNING, progress=progress)
return {"processed": len(results), "results": results}
@app.post("/process")
async def start_processing(
data: list,
background_tasks: BackgroundTasks
):
task_id = tracker.create_task()
background_tasks.add_task(
tracker.run_task,
task_id,
process_data,
data
)
return {"task_id": task_id}
@app.get("/tasks/{task_id}")
async def get_task(task_id: str):
task = tracker.get_task(task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
return task
@app.get("/tasks")
async def list_tasks():
return list(tracker.tasks.values())⚠️ 錯誤處理
捕獲背景任務錯誤
from fastapi import FastAPI, BackgroundTasks
import logging
app = FastAPI()
logger = logging.getLogger(__name__)
def risky_task(data: str):
"""可能失敗的任務"""
try:
# 處理邏輯
if data == "error":
raise ValueError("處理失敗")
print(f"成功處理: {data}")
except Exception as e:
# 記錄錯誤
logger.error(f"背景任務錯誤: {e}")
# 可以發送警報通知
@app.post("/process")
async def process(data: str, background_tasks: BackgroundTasks):
background_tasks.add_task(risky_task, data)
return {"message": "處理中"}帶重試的任務
from fastapi import FastAPI, BackgroundTasks
import asyncio
import logging
app = FastAPI()
logger = logging.getLogger(__name__)
async def task_with_retry(
func,
max_retries: int = 3,
delay: float = 1.0,
*args,
**kwargs
):
"""帶重試機制的任務包裝器"""
last_exception = None
for attempt in range(max_retries):
try:
if asyncio.iscoroutinefunction(func):
return await func(*args, **kwargs)
else:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
logger.warning(
f"任務失敗 (嘗試 {attempt + 1}/{max_retries}): {e}"
)
if attempt < max_retries - 1:
await asyncio.sleep(delay * (2 ** attempt)) # 指數退避
logger.error(f"任務最終失敗: {last_exception}")
raise last_exception
async def send_webhook(url: str, data: dict):
"""發送 Webhook"""
import httpx
async with httpx.AsyncClient() as client:
response = await client.post(url, json=data)
response.raise_for_status()
@app.post("/webhook")
async def trigger_webhook(
url: str,
data: dict,
background_tasks: BackgroundTasks
):
background_tasks.add_task(
task_with_retry,
send_webhook,
3, # max_retries
1.0, # delay
url,
data
)
return {"message": "Webhook 排程中"}📝 實戰範例:郵件服務
from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel, EmailStr
from typing import List, Optional
import asyncio
from datetime import datetime
import uuid
app = FastAPI()
class EmailRequest(BaseModel):
to: List[EmailStr]
subject: str
body: str
cc: Optional[List[EmailStr]] = None
attachments: Optional[List[str]] = None
class EmailService:
"""郵件服務"""
def __init__(self):
self.sent_emails: List[dict] = []
self.failed_emails: List[dict] = []
async def send_email(
self,
to: str,
subject: str,
body: str,
cc: List[str] = None,
attachments: List[str] = None
) -> dict:
"""發送單封郵件"""
# 模擬發送
await asyncio.sleep(1)
# 模擬偶爾失敗
import random
if random.random() < 0.1:
raise Exception("SMTP 連線失敗")
email_record = {
"id": str(uuid.uuid4()),
"to": to,
"subject": subject,
"sent_at": datetime.utcnow().isoformat()
}
self.sent_emails.append(email_record)
return email_record
async def send_bulk_emails(
self,
recipients: List[str],
subject: str,
body: str
):
"""批次發送郵件"""
for recipient in recipients:
try:
await self.send_email(recipient, subject, body)
print(f"成功發送到 {recipient}")
except Exception as e:
self.failed_emails.append({
"to": recipient,
"error": str(e),
"timestamp": datetime.utcnow().isoformat()
})
print(f"發送失敗 {recipient}: {e}")
email_service = EmailService()
@app.post("/email/send")
async def send_email(
request: EmailRequest,
background_tasks: BackgroundTasks
):
"""發送郵件(背景執行)"""
for recipient in request.to:
background_tasks.add_task(
email_service.send_email,
recipient,
request.subject,
request.body,
request.cc,
request.attachments
)
return {
"message": f"已排程發送 {len(request.to)} 封郵件",
"recipients": request.to
}
@app.post("/email/bulk")
async def send_bulk_email(
recipients: List[EmailStr],
subject: str,
body: str,
background_tasks: BackgroundTasks
):
"""批次發送郵件"""
background_tasks.add_task(
email_service.send_bulk_emails,
recipients,
subject,
body
)
return {
"message": f"已排程批次發送 {len(recipients)} 封郵件"
}
@app.get("/email/stats")
async def get_email_stats():
"""取得郵件發送統計"""
return {
"sent": len(email_service.sent_emails),
"failed": len(email_service.failed_emails),
"recent_sent": email_service.sent_emails[-10:],
"recent_failed": email_service.failed_emails[-10:]
}✅ 重點總結
BackgroundTasks 特性
| 特性 | 說明 |
|---|---|
| 執行時機 | 回應發送後執行 |
| 執行順序 | 依加入順序執行 |
| 錯誤處理 | 不會影響回應 |
| 適用範圍 | 輕量、短時間任務 |
何時不適用
| 情況 | 建議 |
|---|---|
| 長時間任務 | 使用 Celery、RQ |
| 需要持久化 | 使用訊息佇列 |
| 跨行程 | 使用任務佇列 |
| 需要重試 | 使用專門的任務框架 |
最佳實踐
- 背景任務應該是獨立的
- 做好錯誤處理和日誌
- 考慮任務失敗的情況
- 大量任務考慮使用專門的任務佇列
🎤 面試這樣答
Q: FastAPI 的 BackgroundTasks 和 Celery 有什麼差別?
答案:
BackgroundTasks:
- 簡單輕量,適合短任務
- 同一行程內執行
- 不支援分散式
- 伺服器重啟會丟失任務
Celery:
- 功能完整的任務佇列
- 支援分散式執行
- 任務持久化(Redis/RabbitMQ)
- 支援定時任務、重試、監控
選擇建議:
- 發送郵件、簡單通知 → BackgroundTasks
- 影片轉碼、大量資料處理 → Celery
# BackgroundTasks 範例 @app.post("/notify") async def notify(background_tasks: BackgroundTasks): background_tasks.add_task(send_notification) return {"status": "ok"}
上一篇: 05-6. Server-Sent Events 下一篇: 05-8. 效能優化與監控
最後更新:2025-12-18