06-5. Celery vs Channels 如何選擇

一、核心差異比較

1.1 設計目標

Celery:分布式任務隊列

  • 解決問題:異步執行耗時任務,避免阻塞主應用
  • 核心特性:任務調度、延遲執行、定時任務
  • 通訊模式:單向(Producer → Worker)
  • 結果返回:通過 Result Backend 查詢

Channels:實時雙向通訊框架

  • 解決問題:實時雙向通訊需求(WebSocket、SSE)
  • 核心特性:持久連接、實時推送、雙向通訊
  • 通訊模式:雙向(Client ↔ Server)
  • 結果返回:即時推送給連接的客戶端

1.2 技術架構對比

┌─────────────────────────────────────────────────────────┐
│                     Celery 架構                          │
├─────────────────────────────────────────────────────────┤
│                                                           │
│  Django View                                              │
│       │                                                   │
│       ├─► task.delay()  ─────►  Message Broker           │
│       │                         (Redis/RabbitMQ)          │
│       │                               │                   │
│       │                               ▼                   │
│       │                         Celery Worker             │
│       │                               │                   │
│       │                               ▼                   │
│       └─► get result  ◄────  Result Backend              │
│                                (Redis/DB)                 │
│                                                           │
└─────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────┐
│                    Channels 架構                         │
├─────────────────────────────────────────────────────────┤
│                                                           │
│  WebSocket Client                                         │
│       │                                                   │
│       ├──► ws:// connect  ─►  ASGI Server                │
│       │                        (Daphne/Uvicorn)           │
│       │                               │                   │
│       │                               ▼                   │
│       │                         Consumer                  │
│       │                               │                   │
│       │                               ▼                   │
│       ◄──── push message  ◄──  Channel Layer             │
│                                  (Redis)                  │
│                                                           │
└─────────────────────────────────────────────────────────┘

1.3 功能特性對比

特性CeleryChannels說明
通訊方式單向雙向Celery 是推送任務,Channels 是持久連接
執行時機異步執行即時互動Celery 任務獨立執行,Channels 即時響應
連接狀態無狀態有狀態Celery 不維護連接,Channels 維護 WebSocket
結果返回查詢式推送式Celery 需主動查詢,Channels 主動推送
定時任務✅ Celery BeatChannels 不支持定時任務
任務重試✅ 內建重試機制Channels 需自行實現
任務優先級✅ 支持Channels 無優先級概念
分布式✅ 天然分布式⚠️ 需 Channel LayerCelery 天生分布式,Channels 需配置
資源消耗WebSocket 持久連接消耗更多資源
適用場景後台任務實時互動各有專長

二、使用場景選擇指南

2.1 應該使用 Celery 的場景

✅ 場景 1:發送郵件/簡訊

# ❌ 錯誤:使用 Channels(過度設計)
class NotificationConsumer(AsyncWebsocketConsumer):
    async def send_email(self, event):
        # 使用 WebSocket 發送郵件?不合適!
        send_mail(...)

# ✅ 正確:使用 Celery
from celery import shared_task

@shared_task
def send_email_task(user_id, subject, message):
    user = User.objects.get(id=user_id)
    send_mail(
        subject=subject,
        message=message,
        recipient_list=[user.email],
    )

# View 中調用
def order_view(request):
    order = Order.objects.create(...)
    send_email_task.delay(request.user.id, '訂單確認', f'訂單 {order.id} 已建立')
    return JsonResponse({'status': 'success'})

為什麼選 Celery?

  • 發送郵件是單向操作,無需實時反饋
  • 不需要維護 WebSocket 連接
  • 可以利用 Celery 的重試機制處理失敗
  • 更低的資源消耗

✅ 場景 2:定時任務

# settings.py
from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    'send-daily-report': {
        'task': 'tasks.generate_daily_report',
        'schedule': crontab(hour=2, minute=0),  # 每天凌晨 2 點
    },
    'cleanup-expired-sessions': {
        'task': 'tasks.cleanup_sessions',
        'schedule': crontab(hour=3, minute=0),  # 每天凌晨 3 點
    },
}

為什麼選 Celery?

  • Channels 不支持定時任務
  • Celery Beat 提供完整的 cron 語法
  • 可以動態添加/修改定時任務

✅ 場景 3:數據導入/導出

@shared_task(bind=True)
def export_users_to_csv(self, filters):
    """導出用戶數據到 CSV"""
    users = User.objects.filter(**filters)
    total = users.count()

    csv_buffer = io.StringIO()
    writer = csv.writer(csv_buffer)

    for idx, user in enumerate(users.iterator(), 1):
        writer.writerow([user.id, user.username, user.email])

        # 更新進度
        if idx % 100 == 0:
            self.update_state(
                state='PROGRESS',
                meta={'current': idx, 'total': total}
            )

    # 上傳到 S3
    s3_url = upload_to_s3(csv_buffer.getvalue())
    return {'url': s3_url, 'total': total}

為什麼選 Celery?

  • 任務耗時長,適合後台執行
  • 不需要實時雙向通訊
  • 可以利用 Result Backend 查詢進度

✅ 場景 4:第三方 API 調用

@shared_task(bind=True, max_retries=3)
def sync_order_to_erp(self, order_id):
    """同步訂單到 ERP 系統"""
    try:
        order = Order.objects.get(id=order_id)
        response = requests.post(
            'https://erp.example.com/api/orders',
            json=order.to_dict(),
            timeout=30
        )
        response.raise_for_status()
        order.synced = True
        order.save()
    except requests.RequestException as exc:
        # 30 秒後重試
        raise self.retry(exc=exc, countdown=30)

為什麼選 Celery?

  • 第三方 API 可能不穩定,需要重試機制
  • 不需要即時反饋給用戶
  • 可以批量處理,提高效率

2.2 應該使用 Channels 的場景

✅ 場景 1:聊天室/即時訊息

class ChatConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.room_name = self.scope['url_route']['kwargs']['room_name']
        self.room_group_name = f'chat_{self.room_name}'

        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )
        await self.accept()

    async def receive(self, text_data):
        data = json.loads(text_data)
        message = data['message']

        # 廣播給同房間的所有人
        await self.channel_layer.group_send(
            self.room_group_name,
            {
                'type': 'chat_message',
                'message': message,
                'sender': self.scope['user'].username,
            }
        )

    async def chat_message(self, event):
        await self.send(text_data=json.dumps({
            'type': 'message',
            'message': event['message'],
            'sender': event['sender'],
        }))

為什麼選 Channels?

  • 需要雙向實時通訊
  • 消息需要立即送達所有在線用戶
  • WebSocket 連接保持狀態

✅ 場景 2:協同編輯

class DocumentConsumer(AsyncWebsocketConsumer):
    async def receive(self, text_data):
        data = json.loads(text_data)

        if data['type'] == 'edit':
            # 將編輯操作廣播給其他協作者
            await self.channel_layer.group_send(
                f'doc_{self.document_id}',
                {
                    'type': 'document_edit',
                    'operation': data['operation'],
                    'position': data['position'],
                    'user': self.scope['user'].username,
                }
            )

為什麼選 Channels?

  • 需要毫秒級的響應速度
  • 多人同時編輯需要實時同步
  • 必須使用 WebSocket 維護狀態

✅ 場景 3:實時監控儀表板

class DashboardConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        await self.accept()

        # 每秒推送系統狀態
        while True:
            stats = await self.get_system_stats()
            await self.send(text_data=json.dumps({
                'type': 'stats_update',
                'cpu_usage': stats['cpu'],
                'memory_usage': stats['memory'],
                'active_users': stats['users'],
            }))
            await asyncio.sleep(1)

    async def get_system_stats(self):
        return {
            'cpu': psutil.cpu_percent(),
            'memory': psutil.virtual_memory().percent,
            'users': await database_sync_to_async(
                lambda: User.objects.filter(is_active=True).count()
            )(),
        }

為什麼選 Channels?

  • 需要持續推送更新數據
  • 前端無需輪詢,減少請求量
  • 實時展示,延遲不能超過 1 秒

✅ 場景 4:在線遊戲/多人互動

class GameConsumer(AsyncWebsocketConsumer):
    async def receive(self, text_data):
        data = json.loads(text_data)

        if data['type'] == 'move':
            # 玩家移動,立即廣播給其他玩家
            await self.channel_layer.group_send(
                f'game_{self.game_id}',
                {
                    'type': 'player_move',
                    'player_id': self.player_id,
                    'position': data['position'],
                    'timestamp': time.time(),
                }
            )

為什麼選 Channels?

  • 需要極低延遲(<100ms)
  • 雙向互動頻繁
  • 必須維護遊戲狀態

2.3 組合使用的場景

很多時候,Celery + Channels 組合使用才是最佳方案!

🎯 場景:大文件上傳與處理

# views.py
def upload_video(request):
    video_file = request.FILES['video']
    video = Video.objects.create(file=video_file, status='processing')

    # 使用 Celery 處理視頻(耗時任務)
    process_video_task.delay(video.id, request.user.id)

    return JsonResponse({'video_id': video.id})

# tasks.py
@shared_task(bind=True)
def process_video_task(self, video_id, user_id):
    video = Video.objects.get(id=video_id)

    # 1. 轉碼(Celery 執行)
    for progress in range(0, 101, 10):
        time.sleep(2)  # 模擬處理
        self.update_state(state='PROGRESS', meta={'progress': progress})

        # 2. 通過 Channels 推送進度(實時通知)
        send_progress_update(user_id, video_id, progress)

    # 3. 完成後推送通知
    video.status = 'completed'
    video.save()
    send_completion_notification(user_id, video_id)

# utils.py
def send_progress_update(user_id, video_id, progress):
    from channels.layers import get_channel_layer
    from asgiref.sync import async_to_sync

    channel_layer = get_channel_layer()
    async_to_sync(channel_layer.group_send)(
        f'user_{user_id}',
        {
            'type': 'video_progress',
            'video_id': video_id,
            'progress': progress,
        }
    )

# consumers.py
class NotificationConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.user_id = self.scope['user'].id
        await self.channel_layer.group_add(
            f'user_{self.user_id}',
            self.channel_name
        )
        await self.accept()

    async def video_progress(self, event):
        """接收 Celery 任務推送的進度"""
        await self.send(text_data=json.dumps({
            'type': 'video_progress',
            'video_id': event['video_id'],
            'progress': event['progress'],
        }))

為什麼組合使用?

  • Celery:處理耗時的視頻轉碼任務
  • Channels:實時推送處理進度給前端
  • 最佳實踐:各司其職,發揮各自優勢

🎯 場景:訂單支付與通知

# views.py
def create_order(request):
    order = Order.objects.create(...)

    # 1. 同步處理:建立訂單(必須即時完成)

    # 2. 異步任務:發送郵件(Celery)
    send_order_email_task.delay(order.id)

    # 3. 實時通知:推送到前端(Channels)
    send_order_notification(request.user.id, order.id)

    return JsonResponse({'order_id': order.id})

# Webhook 回調
def payment_callback(request):
    order_id = request.POST['order_id']
    order = Order.objects.get(id=order_id)
    order.status = 'paid'
    order.save()

    # 1. 異步任務:生成發票(Celery)
    generate_invoice_task.delay(order.id)

    # 2. 實時通知:告知用戶支付成功(Channels)
    send_payment_success_notification(order.user_id, order.id)

三、決策流程圖

開始
  │
  ▼
是否需要實時雙向通訊?
  │
  ├─ 是 ────────► 是否需要毫秒級響應?
  │                │
  │                ├─ 是 ──► 使用 Channels
  │                │          (聊天室、遊戲、協同編輯)
  │                │
  │                └─ 否 ──► 是否可以接受輪詢?
  │                           │
  │                           ├─ 是 ──► 使用 HTTP 輪詢(更簡單)
  │                           └─ 否 ──► 使用 Channels
  │
  └─ 否 ────────► 是否需要定時執行?
                   │
                   ├─ 是 ──► 使用 Celery Beat
                   │
                   └─ 否 ──► 是否是耗時任務?
                              │
                              ├─ 是 ──► 使用 Celery
                              │          (郵件、報表、數據處理)
                              │
                              └─ 否 ──► 同步處理即可
                                         (普通 Django View)

四、性能與資源對比

4.1 資源消耗對比

Celery Worker(以 prefork 為例)

單個 Worker 進程資源:
- 記憶體:50-100 MB(基礎)
- CPU:任務執行時才消耗
- 連接:短暫連接(與 Broker 和 DB)

4 個 Worker 進程:
- 記憶體:200-400 MB
- 可同時處理:4 個任務
- 適合場景:後台異步處理

Channels (ASGI Server + WebSocket)

單個 Daphne 進程資源:
- 記憶體:100 MB(基礎)+ 連接數 × 30 KB
- CPU:持續消耗(維護連接)
- 連接:持久連接

1000 個 WebSocket 連接:
- 記憶體:100 + 1000 × 0.03 = 130 MB
- CPU:持續消耗(心跳、消息處理)
- 適合場景:實時互動

4.2 吞吐量對比

Celery

吞吐量:
- Prefork Worker:100-200 任務/秒/進程
- Gevent Worker:1000-2000 任務/秒/進程
- 瓶頸:任務處理速度

擴展方式:
- 水平擴展:增加 Worker 機器
- 垂直擴展:增加 Worker 進程數

Channels

吞吐量:
- 單進程:1000-5000 消息/秒
- 瓶頸:Redis Channel Layer、網絡帶寬

擴展方式:
- 水平擴展:增加 ASGI Server 實例
- 需要:Redis Cluster(Channel Layer)
- 注意:WebSocket 連接有狀態,擴展較複雜

五、常見面試問題

Q1: Celery 和 Channels 的核心區別是什麼?

標準答案:

  • Celery 是分布式任務隊列,用於異步執行後台任務(單向通訊)
  • Channels 是實時通訊框架,用於 WebSocket 雙向通訊(雙向通訊)
  • 關鍵區別
    • 通訊模式:Celery 單向推送,Channels 雙向互動
    • 連接狀態:Celery 無狀態,Channels 有狀態
    • 使用場景:Celery 適合後台任務,Channels 適合實時互動

Q2: 什麼情況下應該同時使用 Celery 和 Channels?

標準答案: 當需要「執行耗時任務 + 實時推送進度」時,應該組合使用:

  • Celery:執行耗時任務(視頻處理、數據導出)
  • Channels:實時推送進度給前端

典型案例:

  • 視頻上傳與轉碼
  • 大數據報表生成
  • 批量數據導入

Q3: 如何在 Celery 任務中推送 Channels 消息?

標準答案:

from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync

@shared_task
def my_task(user_id):
    # 執行任務
    result = do_something()

    # 推送到 Channels
    channel_layer = get_channel_layer()
    async_to_sync(channel_layer.group_send)(
        f'user_{user_id}',
        {
            'type': 'task_result',
            'result': result,
        }
    )

關鍵點:

  • 使用 async_to_sync 將 Channels 的異步方法轉為同步
  • 通過 channel_layer.group_send 發送消息
  • Consumer 需要監聽相應的 group

Q4: Channels 為什麼不能替代 Celery?

標準答案: 雖然 Channels 也能執行異步任務,但不能替代 Celery,原因:

  1. 定時任務:Channels 不支持 cron 定時任務
  2. 任務管理:Channels 沒有任務重試、優先級、結果存儲等機制
  3. 資源消耗:Channels 維護 WebSocket 連接消耗更多資源
  4. 設計目標:Channels 專注於實時通訊,不是任務隊列

最佳實踐:

  • 後台任務:使用 Celery
  • 實時通訊:使用 Channels
  • 進度推送:Celery + Channels 組合

Q5: 如何選擇 Celery Worker 類型和 Channels Server?

標準答案:

Celery Worker 選擇:

  • Prefork:CPU 密集型任務(圖像處理、數據計算)
  • Gevent/Eventlet:I/O 密集型任務(API 調用、郵件發送)
  • Threads:介於兩者之間

Channels Server 選擇:

  • Daphne:官方推薦,穩定性好
  • Uvicorn:性能更高,支持 HTTP/2
  • 生產環境:使用 Nginx 反向代理

六、最佳實踐總結

✅ 使用 Celery 的場景

  1. 發送郵件、簡訊通知
  2. 定時任務(報表、數據清理)
  3. 數據導入/導出
  4. 第三方 API 調用
  5. 圖像/視頻處理
  6. 批量數據處理

✅ 使用 Channels 的場景

  1. 聊天室、即時訊息
  2. 實時通知(需要立即推送)
  3. 協同編輯(文檔、白板)
  4. 實時監控儀表板
  5. 在線遊戲、多人互動
  6. WebRTC 信令伺服器

✅ 組合使用的場景

  1. 視頻上傳 + 實時進度推送
  2. 訂單處理 + 實時狀態更新
  3. 大數據報表 + 完成通知
  4. 批量操作 + 進度顯示

⚠️ 注意事項

Celery 注意事項:

  • 選擇合適的 Worker 類型(prefork vs gevent)
  • 配置合理的任務超時時間
  • 使用 bind=True 實現任務重試
  • 監控 Broker 和 Result Backend 性能

Channels 注意事項:

  • 實現 Token 認證機制
  • 配置 Channel Layer(Redis)
  • 處理連接異常(斷線重連)
  • 監控 WebSocket 連接數和資源消耗

組合使用注意事項:

  • Celery 和 Channels 使用同一個 Redis 實例
  • 避免循環依賴(Celery → Channels → Celery)
  • 統一錯誤處理和日誌記錄

七、總結

需求推薦方案理由
發送郵件Celery單向任務,無需實時反饋
定時任務Celery BeatChannels 不支持定時
聊天室Channels需要雙向實時通訊
數據導出Celery耗時任務,後台執行
實時通知Channels需要立即推送
視頻處理 + 進度Celery + Channels組合使用,各司其職
API 調用Celery可能失敗,需要重試
在線遊戲Channels低延遲,雙向互動

核心原則:

  • 單向任務 → Celery
  • 雙向實時 → Channels
  • 複雜需求 → Celery + Channels

選擇合適的工具,而不是最流行的工具!

0%