06-5. Celery vs Channels 如何選擇
目錄
一、核心差異比較
1.1 設計目標
Celery:分布式任務隊列
- 解決問題:異步執行耗時任務,避免阻塞主應用
- 核心特性:任務調度、延遲執行、定時任務
- 通訊模式:單向(Producer → Worker)
- 結果返回:通過 Result Backend 查詢
Channels:實時雙向通訊框架
- 解決問題:實時雙向通訊需求(WebSocket、SSE)
- 核心特性:持久連接、實時推送、雙向通訊
- 通訊模式:雙向(Client ↔ Server)
- 結果返回:即時推送給連接的客戶端
1.2 技術架構對比
┌─────────────────────────────────────────────────────────┐
│ Celery 架構 │
├─────────────────────────────────────────────────────────┤
│ │
│ Django View │
│ │ │
│ ├─► task.delay() ─────► Message Broker │
│ │ (Redis/RabbitMQ) │
│ │ │ │
│ │ ▼ │
│ │ Celery Worker │
│ │ │ │
│ │ ▼ │
│ └─► get result ◄──── Result Backend │
│ (Redis/DB) │
│ │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ Channels 架構 │
├─────────────────────────────────────────────────────────┤
│ │
│ WebSocket Client │
│ │ │
│ ├──► ws:// connect ─► ASGI Server │
│ │ (Daphne/Uvicorn) │
│ │ │ │
│ │ ▼ │
│ │ Consumer │
│ │ │ │
│ │ ▼ │
│ ◄──── push message ◄── Channel Layer │
│ (Redis) │
│ │
└─────────────────────────────────────────────────────────┘1.3 功能特性對比
| 特性 | Celery | Channels | 說明 |
|---|---|---|---|
| 通訊方式 | 單向 | 雙向 | Celery 是推送任務,Channels 是持久連接 |
| 執行時機 | 異步執行 | 即時互動 | Celery 任務獨立執行,Channels 即時響應 |
| 連接狀態 | 無狀態 | 有狀態 | Celery 不維護連接,Channels 維護 WebSocket |
| 結果返回 | 查詢式 | 推送式 | Celery 需主動查詢,Channels 主動推送 |
| 定時任務 | ✅ Celery Beat | ❌ | Channels 不支持定時任務 |
| 任務重試 | ✅ 內建重試機制 | ❌ | Channels 需自行實現 |
| 任務優先級 | ✅ 支持 | ❌ | Channels 無優先級概念 |
| 分布式 | ✅ 天然分布式 | ⚠️ 需 Channel Layer | Celery 天生分布式,Channels 需配置 |
| 資源消耗 | 低 | 高 | WebSocket 持久連接消耗更多資源 |
| 適用場景 | 後台任務 | 實時互動 | 各有專長 |
二、使用場景選擇指南
2.1 應該使用 Celery 的場景
✅ 場景 1:發送郵件/簡訊
# ❌ 錯誤:使用 Channels(過度設計)
class NotificationConsumer(AsyncWebsocketConsumer):
async def send_email(self, event):
# 使用 WebSocket 發送郵件?不合適!
send_mail(...)
# ✅ 正確:使用 Celery
from celery import shared_task
@shared_task
def send_email_task(user_id, subject, message):
user = User.objects.get(id=user_id)
send_mail(
subject=subject,
message=message,
recipient_list=[user.email],
)
# View 中調用
def order_view(request):
order = Order.objects.create(...)
send_email_task.delay(request.user.id, '訂單確認', f'訂單 {order.id} 已建立')
return JsonResponse({'status': 'success'})為什麼選 Celery?
- 發送郵件是單向操作,無需實時反饋
- 不需要維護 WebSocket 連接
- 可以利用 Celery 的重試機制處理失敗
- 更低的資源消耗
✅ 場景 2:定時任務
# settings.py
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
'send-daily-report': {
'task': 'tasks.generate_daily_report',
'schedule': crontab(hour=2, minute=0), # 每天凌晨 2 點
},
'cleanup-expired-sessions': {
'task': 'tasks.cleanup_sessions',
'schedule': crontab(hour=3, minute=0), # 每天凌晨 3 點
},
}為什麼選 Celery?
- Channels 不支持定時任務
- Celery Beat 提供完整的 cron 語法
- 可以動態添加/修改定時任務
✅ 場景 3:數據導入/導出
@shared_task(bind=True)
def export_users_to_csv(self, filters):
"""導出用戶數據到 CSV"""
users = User.objects.filter(**filters)
total = users.count()
csv_buffer = io.StringIO()
writer = csv.writer(csv_buffer)
for idx, user in enumerate(users.iterator(), 1):
writer.writerow([user.id, user.username, user.email])
# 更新進度
if idx % 100 == 0:
self.update_state(
state='PROGRESS',
meta={'current': idx, 'total': total}
)
# 上傳到 S3
s3_url = upload_to_s3(csv_buffer.getvalue())
return {'url': s3_url, 'total': total}為什麼選 Celery?
- 任務耗時長,適合後台執行
- 不需要實時雙向通訊
- 可以利用 Result Backend 查詢進度
✅ 場景 4:第三方 API 調用
@shared_task(bind=True, max_retries=3)
def sync_order_to_erp(self, order_id):
"""同步訂單到 ERP 系統"""
try:
order = Order.objects.get(id=order_id)
response = requests.post(
'https://erp.example.com/api/orders',
json=order.to_dict(),
timeout=30
)
response.raise_for_status()
order.synced = True
order.save()
except requests.RequestException as exc:
# 30 秒後重試
raise self.retry(exc=exc, countdown=30)為什麼選 Celery?
- 第三方 API 可能不穩定,需要重試機制
- 不需要即時反饋給用戶
- 可以批量處理,提高效率
2.2 應該使用 Channels 的場景
✅ 場景 1:聊天室/即時訊息
class ChatConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.room_name = self.scope['url_route']['kwargs']['room_name']
self.room_group_name = f'chat_{self.room_name}'
await self.channel_layer.group_add(
self.room_group_name,
self.channel_name
)
await self.accept()
async def receive(self, text_data):
data = json.loads(text_data)
message = data['message']
# 廣播給同房間的所有人
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'chat_message',
'message': message,
'sender': self.scope['user'].username,
}
)
async def chat_message(self, event):
await self.send(text_data=json.dumps({
'type': 'message',
'message': event['message'],
'sender': event['sender'],
}))為什麼選 Channels?
- 需要雙向實時通訊
- 消息需要立即送達所有在線用戶
- WebSocket 連接保持狀態
✅ 場景 2:協同編輯
class DocumentConsumer(AsyncWebsocketConsumer):
async def receive(self, text_data):
data = json.loads(text_data)
if data['type'] == 'edit':
# 將編輯操作廣播給其他協作者
await self.channel_layer.group_send(
f'doc_{self.document_id}',
{
'type': 'document_edit',
'operation': data['operation'],
'position': data['position'],
'user': self.scope['user'].username,
}
)為什麼選 Channels?
- 需要毫秒級的響應速度
- 多人同時編輯需要實時同步
- 必須使用 WebSocket 維護狀態
✅ 場景 3:實時監控儀表板
class DashboardConsumer(AsyncWebsocketConsumer):
async def connect(self):
await self.accept()
# 每秒推送系統狀態
while True:
stats = await self.get_system_stats()
await self.send(text_data=json.dumps({
'type': 'stats_update',
'cpu_usage': stats['cpu'],
'memory_usage': stats['memory'],
'active_users': stats['users'],
}))
await asyncio.sleep(1)
async def get_system_stats(self):
return {
'cpu': psutil.cpu_percent(),
'memory': psutil.virtual_memory().percent,
'users': await database_sync_to_async(
lambda: User.objects.filter(is_active=True).count()
)(),
}為什麼選 Channels?
- 需要持續推送更新數據
- 前端無需輪詢,減少請求量
- 實時展示,延遲不能超過 1 秒
✅ 場景 4:在線遊戲/多人互動
class GameConsumer(AsyncWebsocketConsumer):
async def receive(self, text_data):
data = json.loads(text_data)
if data['type'] == 'move':
# 玩家移動,立即廣播給其他玩家
await self.channel_layer.group_send(
f'game_{self.game_id}',
{
'type': 'player_move',
'player_id': self.player_id,
'position': data['position'],
'timestamp': time.time(),
}
)為什麼選 Channels?
- 需要極低延遲(<100ms)
- 雙向互動頻繁
- 必須維護遊戲狀態
2.3 組合使用的場景
很多時候,Celery + Channels 組合使用才是最佳方案!
🎯 場景:大文件上傳與處理
# views.py
def upload_video(request):
video_file = request.FILES['video']
video = Video.objects.create(file=video_file, status='processing')
# 使用 Celery 處理視頻(耗時任務)
process_video_task.delay(video.id, request.user.id)
return JsonResponse({'video_id': video.id})
# tasks.py
@shared_task(bind=True)
def process_video_task(self, video_id, user_id):
video = Video.objects.get(id=video_id)
# 1. 轉碼(Celery 執行)
for progress in range(0, 101, 10):
time.sleep(2) # 模擬處理
self.update_state(state='PROGRESS', meta={'progress': progress})
# 2. 通過 Channels 推送進度(實時通知)
send_progress_update(user_id, video_id, progress)
# 3. 完成後推送通知
video.status = 'completed'
video.save()
send_completion_notification(user_id, video_id)
# utils.py
def send_progress_update(user_id, video_id, progress):
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
f'user_{user_id}',
{
'type': 'video_progress',
'video_id': video_id,
'progress': progress,
}
)
# consumers.py
class NotificationConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.user_id = self.scope['user'].id
await self.channel_layer.group_add(
f'user_{self.user_id}',
self.channel_name
)
await self.accept()
async def video_progress(self, event):
"""接收 Celery 任務推送的進度"""
await self.send(text_data=json.dumps({
'type': 'video_progress',
'video_id': event['video_id'],
'progress': event['progress'],
}))為什麼組合使用?
- Celery:處理耗時的視頻轉碼任務
- Channels:實時推送處理進度給前端
- 最佳實踐:各司其職,發揮各自優勢
🎯 場景:訂單支付與通知
# views.py
def create_order(request):
order = Order.objects.create(...)
# 1. 同步處理:建立訂單(必須即時完成)
# 2. 異步任務:發送郵件(Celery)
send_order_email_task.delay(order.id)
# 3. 實時通知:推送到前端(Channels)
send_order_notification(request.user.id, order.id)
return JsonResponse({'order_id': order.id})
# Webhook 回調
def payment_callback(request):
order_id = request.POST['order_id']
order = Order.objects.get(id=order_id)
order.status = 'paid'
order.save()
# 1. 異步任務:生成發票(Celery)
generate_invoice_task.delay(order.id)
# 2. 實時通知:告知用戶支付成功(Channels)
send_payment_success_notification(order.user_id, order.id)三、決策流程圖
開始
│
▼
是否需要實時雙向通訊?
│
├─ 是 ────────► 是否需要毫秒級響應?
│ │
│ ├─ 是 ──► 使用 Channels
│ │ (聊天室、遊戲、協同編輯)
│ │
│ └─ 否 ──► 是否可以接受輪詢?
│ │
│ ├─ 是 ──► 使用 HTTP 輪詢(更簡單)
│ └─ 否 ──► 使用 Channels
│
└─ 否 ────────► 是否需要定時執行?
│
├─ 是 ──► 使用 Celery Beat
│
└─ 否 ──► 是否是耗時任務?
│
├─ 是 ──► 使用 Celery
│ (郵件、報表、數據處理)
│
└─ 否 ──► 同步處理即可
(普通 Django View)四、性能與資源對比
4.1 資源消耗對比
Celery Worker(以 prefork 為例)
單個 Worker 進程資源:
- 記憶體:50-100 MB(基礎)
- CPU:任務執行時才消耗
- 連接:短暫連接(與 Broker 和 DB)
4 個 Worker 進程:
- 記憶體:200-400 MB
- 可同時處理:4 個任務
- 適合場景:後台異步處理Channels (ASGI Server + WebSocket)
單個 Daphne 進程資源:
- 記憶體:100 MB(基礎)+ 連接數 × 30 KB
- CPU:持續消耗(維護連接)
- 連接:持久連接
1000 個 WebSocket 連接:
- 記憶體:100 + 1000 × 0.03 = 130 MB
- CPU:持續消耗(心跳、消息處理)
- 適合場景:實時互動4.2 吞吐量對比
Celery
吞吐量:
- Prefork Worker:100-200 任務/秒/進程
- Gevent Worker:1000-2000 任務/秒/進程
- 瓶頸:任務處理速度
擴展方式:
- 水平擴展:增加 Worker 機器
- 垂直擴展:增加 Worker 進程數Channels
吞吐量:
- 單進程:1000-5000 消息/秒
- 瓶頸:Redis Channel Layer、網絡帶寬
擴展方式:
- 水平擴展:增加 ASGI Server 實例
- 需要:Redis Cluster(Channel Layer)
- 注意:WebSocket 連接有狀態,擴展較複雜五、常見面試問題
Q1: Celery 和 Channels 的核心區別是什麼?
標準答案:
- Celery 是分布式任務隊列,用於異步執行後台任務(單向通訊)
- Channels 是實時通訊框架,用於 WebSocket 雙向通訊(雙向通訊)
- 關鍵區別:
- 通訊模式:Celery 單向推送,Channels 雙向互動
- 連接狀態:Celery 無狀態,Channels 有狀態
- 使用場景:Celery 適合後台任務,Channels 適合實時互動
Q2: 什麼情況下應該同時使用 Celery 和 Channels?
標準答案: 當需要「執行耗時任務 + 實時推送進度」時,應該組合使用:
- Celery:執行耗時任務(視頻處理、數據導出)
- Channels:實時推送進度給前端
典型案例:
- 視頻上傳與轉碼
- 大數據報表生成
- 批量數據導入
Q3: 如何在 Celery 任務中推送 Channels 消息?
標準答案:
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
@shared_task
def my_task(user_id):
# 執行任務
result = do_something()
# 推送到 Channels
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
f'user_{user_id}',
{
'type': 'task_result',
'result': result,
}
)關鍵點:
- 使用
async_to_sync將 Channels 的異步方法轉為同步 - 通過
channel_layer.group_send發送消息 - Consumer 需要監聽相應的 group
Q4: Channels 為什麼不能替代 Celery?
標準答案: 雖然 Channels 也能執行異步任務,但不能替代 Celery,原因:
- 定時任務:Channels 不支持 cron 定時任務
- 任務管理:Channels 沒有任務重試、優先級、結果存儲等機制
- 資源消耗:Channels 維護 WebSocket 連接消耗更多資源
- 設計目標:Channels 專注於實時通訊,不是任務隊列
最佳實踐:
- 後台任務:使用 Celery
- 實時通訊:使用 Channels
- 進度推送:Celery + Channels 組合
Q5: 如何選擇 Celery Worker 類型和 Channels Server?
標準答案:
Celery Worker 選擇:
- Prefork:CPU 密集型任務(圖像處理、數據計算)
- Gevent/Eventlet:I/O 密集型任務(API 調用、郵件發送)
- Threads:介於兩者之間
Channels Server 選擇:
- Daphne:官方推薦,穩定性好
- Uvicorn:性能更高,支持 HTTP/2
- 生產環境:使用 Nginx 反向代理
六、最佳實踐總結
✅ 使用 Celery 的場景
- 發送郵件、簡訊通知
- 定時任務(報表、數據清理)
- 數據導入/導出
- 第三方 API 調用
- 圖像/視頻處理
- 批量數據處理
✅ 使用 Channels 的場景
- 聊天室、即時訊息
- 實時通知(需要立即推送)
- 協同編輯(文檔、白板)
- 實時監控儀表板
- 在線遊戲、多人互動
- WebRTC 信令伺服器
✅ 組合使用的場景
- 視頻上傳 + 實時進度推送
- 訂單處理 + 實時狀態更新
- 大數據報表 + 完成通知
- 批量操作 + 進度顯示
⚠️ 注意事項
Celery 注意事項:
- 選擇合適的 Worker 類型(prefork vs gevent)
- 配置合理的任務超時時間
- 使用
bind=True實現任務重試 - 監控 Broker 和 Result Backend 性能
Channels 注意事項:
- 實現 Token 認證機制
- 配置 Channel Layer(Redis)
- 處理連接異常(斷線重連)
- 監控 WebSocket 連接數和資源消耗
組合使用注意事項:
- Celery 和 Channels 使用同一個 Redis 實例
- 避免循環依賴(Celery → Channels → Celery)
- 統一錯誤處理和日誌記錄
七、總結
| 需求 | 推薦方案 | 理由 |
|---|---|---|
| 發送郵件 | Celery | 單向任務,無需實時反饋 |
| 定時任務 | Celery Beat | Channels 不支持定時 |
| 聊天室 | Channels | 需要雙向實時通訊 |
| 數據導出 | Celery | 耗時任務,後台執行 |
| 實時通知 | Channels | 需要立即推送 |
| 視頻處理 + 進度 | Celery + Channels | 組合使用,各司其職 |
| API 調用 | Celery | 可能失敗,需要重試 |
| 在線遊戲 | Channels | 低延遲,雙向互動 |
核心原則:
- 單向任務 → Celery
- 雙向實時 → Channels
- 複雜需求 → Celery + Channels
選擇合適的工具,而不是最流行的工具!