Django 面試準備 06-3:Channels 基礎概念

深入理解 Django Channels 實時通信架構與 WebSocket

06-3. Channels 基礎概念

Django Channels 是 Django 的擴展,讓 Django 支持 WebSocket、長輪詢等實時通信協議。本章將深入探討其核心概念。


1. 什麼是 Channels?

定義

Django Channels 是一個讓 Django 支持 異步協議 的框架,特別是 WebSocket。

# 傳統 Django (WSGI):只支援 HTTP
# 每個請求-響應是獨立的,無法保持長連接

用戶  HTTP 請求  Django  HTTP 響應  用戶
# 連接結束 ✂️

# Django Channels (ASGI):支援 WebSocket
# 可以保持長連接,實現雙向通信

用戶  WebSocket 連接  Django Channels
# 連接持續,可以隨時收發消息 ✅

核心特性

  1. WebSocket 支持:實時雙向通信
  2. HTTP/2 支持:Server Push
  3. 長輪詢:兼容舊瀏覽器
  4. 異步處理:基於 asyncio
  5. 向後兼容:可與傳統 Django view 並存

2. 為什麼需要 Channels?

傳統 HTTP 的限制

# HTTP 請求-響應模式的問題:

# 場景 1:聊天應用
# ❌ 用戶 A 發送消息後,用戶 B 如何知道?
# 傳統方案:用戶 B 每隔 1 秒輪詢一次服務器
# 問題:浪費資源、延遲高

# 場景 2:即時通知
# ❌ 服務器有新通知,如何推送給用戶?
# 傳統方案:用戶不斷輪詢「有沒有新通知?」
# 問題:99% 的請求都是無意義的

# 場景 3:協作編輯
# ❌ 多個用戶同時編輯文檔,如何同步?
# 傳統方案:每隔幾秒發送一次更新
# 問題:衝突、延遲、體驗差

Channels 的解決方案

# WebSocket:持久連接,雙向通信

# 場景 1:聊天應用 ✅
用戶 A  發送消息  Channels  推送給用戶 B
# 即時送達,無需輪詢

# 場景 2:即時通知 ✅
服務器  新通知  Channels  主動推送給用戶
# 服務器推送,無需輪詢

# 場景 3:協作編輯 ✅
用戶 A 編輯  Channels  即時同步給所有用戶
# 實時同步,無衝突

3. ASGI vs WSGI

架構對比

# WSGI(Web Server Gateway Interface)
# - 同步協議
# - 只支援 HTTP
# - Django 1.x - 2.x 的標準

def wsgi_application(environ, start_response):
    # 處理 HTTP 請求
    start_response('200 OK', [('Content-Type', 'text/plain')])
    return [b'Hello World']

# ASGI(Asynchronous Server Gateway Interface)
# - 異步協議
# - 支援 HTTP、WebSocket、HTTP/2
# - Django 3.0+ 的標準

async def asgi_application(scope, receive, send):
    # 處理 HTTP、WebSocket 等多種協議
    if scope['type'] == 'http':
        await send({
            'type': 'http.response.start',
            'status': 200,
        })
        await send({
            'type': 'http.response.body',
            'body': b'Hello World',
        })
    elif scope['type'] == 'websocket':
        # 處理 WebSocket
        await send({'type': 'websocket.accept'})
        message = await receive()
        await send({
            'type': 'websocket.send',
            'text': 'Hello WebSocket'
        })

支援的協議

協議WSGIASGI
HTTP
WebSocket
HTTP/2
長輪詢⚠️ 低效
Server Sent Events

4. Channels 架構

核心組件

┌─────────────────────────────────────────────┐
│              Client (瀏覽器)                 │
│         WebSocket Connection                │
└──────────────────┬──────────────────────────┘
                   ↓
         ┌─────────────────────┐
         │   Daphne / Uvicorn  │ ← ASGI Server
         │  (接收 WebSocket)    │
         └─────────────────────┘
                   ↓
         ┌─────────────────────┐
         │  Routing (路由)      │
         │  websocket_urlpatterns
         └─────────────────────┘
                   ↓
         ┌─────────────────────┐
         │   Consumer (消費者)  │ ← 處理邏輯
         │  - connect()        │
         │  - receive()        │
         │  - disconnect()     │
         └─────────────────────┘
                   ↓
         ┌─────────────────────┐
         │  Channel Layer      │ ← 消息傳遞
         │  (Redis / 內存)      │
         └─────────────────────┘

1. ASGI Server(Daphne / Uvicorn)

處理 WebSocket 連接的服務器。

# Daphne(Channels 官方推薦)
daphne -b 0.0.0.0 -p 8000 myproject.asgi:application

# Uvicorn(更快)
uvicorn myproject.asgi:application --host 0.0.0.0 --port 8000

2. Consumer(消費者)

處理 WebSocket 消息的類,類似於 Django 的 View。

# consumers.py
from channels.generic.websocket import WebsocketConsumer
import json

class ChatConsumer(WebsocketConsumer):
    def connect(self):
        """客戶端連接時觸發"""
        self.accept()

    def disconnect(self, close_code):
        """客戶端斷開時觸發"""
        pass

    def receive(self, text_data):
        """接收到消息時觸發"""
        data = json.loads(text_data)
        message = data['message']

        # 發送消息回客戶端
        self.send(text_data=json.dumps({
            'message': message
        }))

3. Routing(路由)

將 WebSocket URL 映射到 Consumer。

# routing.py
from django.urls import re_path
from . import consumers

websocket_urlpatterns = [
    re_path(r'ws/chat/(?P<room_name>\w+)/$', consumers.ChatConsumer.as_asgi()),
]

4. Channel Layer(通道層)

實現不同 Consumer 之間的通信。

# settings.py
CHANNEL_LAYERS = {
    'default': {
        'BACKEND': 'channels_redis.core.RedisChannelLayer',
        'CONFIG': {
            "hosts": [('127.0.0.1', 6379)],
        },
    },
}

5. 安裝與配置

步驟 1:安裝 Channels

# 安裝 Channels
pip install channels

# 安裝 Redis Channel Layer(生產環境推薦)
pip install channels-redis

# 或使用內存 Channel Layer(開發環境)
pip install channels[daphne]

步驟 2:配置 Django

# settings.py

# 1. 添加到 INSTALLED_APPS
INSTALLED_APPS = [
    'daphne',  # ⚠️ 必須放在最前面
    'django.contrib.admin',
    'django.contrib.auth',
    # ...
    'channels',
]

# 2. 設置 ASGI application
ASGI_APPLICATION = 'myproject.asgi.application'

# 3. 配置 Channel Layer
CHANNEL_LAYERS = {
    'default': {
        # 使用 Redis(生產環境)
        'BACKEND': 'channels_redis.core.RedisChannelLayer',
        'CONFIG': {
            "hosts": [('127.0.0.1', 6379)],
        },

        # 或使用內存(開發環境)
        # 'BACKEND': 'channels.layers.InMemoryChannelLayer'
    },
}

步驟 3:創建 ASGI 配置

# myproject/asgi.py
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
import myapp.routing

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

application = ProtocolTypeRouter({
    # HTTP 請求使用 Django 的 ASGI application
    "http": get_asgi_application(),

    # WebSocket 請求使用 Channels
    "websocket": AuthMiddlewareStack(
        URLRouter(
            myapp.routing.websocket_urlpatterns
        )
    ),
})

步驟 4:創建 Consumer

# myapp/consumers.py
from channels.generic.websocket import WebsocketConsumer
import json

class ChatConsumer(WebsocketConsumer):
    def connect(self):
        """連接建立時"""
        self.accept()
        self.send(text_data=json.dumps({
            'message': 'Welcome to chat!'
        }))

    def disconnect(self, close_code):
        """連接斷開時"""
        pass

    def receive(self, text_data):
        """接收消息時"""
        data = json.loads(text_data)
        message = data['message']

        # 回傳消息
        self.send(text_data=json.dumps({
            'message': f'You said: {message}'
        }))

步驟 5:配置路由

# myapp/routing.py
from django.urls import re_path
from . import consumers

websocket_urlpatterns = [
    re_path(r'ws/chat/$', consumers.ChatConsumer.as_asgi()),
]

步驟 6:前端連接

<!-- chat.html -->
<!DOCTYPE html>
<html>
<head>
    <title>Chat Room</title>
</head>
<body>
    <textarea id="chat-log" cols="100" rows="20"></textarea><br>
    <input id="chat-message-input" type="text" size="100"><br>
    <input id="chat-message-submit" type="button" value="Send">

    <script>
        // 建立 WebSocket 連接
        const chatSocket = new WebSocket(
            'ws://' + window.location.host + '/ws/chat/'
        );

        // 連接成功
        chatSocket.onopen = function(e) {
            console.log('WebSocket connected');
        };

        // 接收消息
        chatSocket.onmessage = function(e) {
            const data = JSON.parse(e.data);
            document.querySelector('#chat-log').value += (data.message + '\n');
        };

        // 連接關閉
        chatSocket.onclose = function(e) {
            console.error('WebSocket closed');
        };

        // 發送消息
        document.querySelector('#chat-message-submit').onclick = function(e) {
            const messageInput = document.querySelector('#chat-message-input');
            const message = messageInput.value;

            chatSocket.send(JSON.stringify({
                'message': message
            }));

            messageInput.value = '';
        };
    </script>
</body>
</html>

步驟 7:啟動服務

# 開發環境
python manage.py runserver
# Daphne 會自動啟動

# 或明確使用 Daphne
daphne -b 0.0.0.0 -p 8000 myproject.asgi:application

# 或使用 Uvicorn(更快)
uvicorn myproject.asgi:application --reload

6. WebSocket 生命周期

完整流程

from channels.generic.websocket import WebsocketConsumer
import json

class ChatConsumer(WebsocketConsumer):
    def connect(self):
        """
        1. 客戶端請求連接
        WebSocket Handshake:
          GET /ws/chat/ HTTP/1.1
          Upgrade: websocket
          Connection: Upgrade
        """
        print(f"Client connecting: {self.scope['client']}")

        # 接受連接
        self.accept()

        # 可以在這裡做認證檢查
        user = self.scope['user']
        if not user.is_authenticated:
            self.close()
            return

        # 加入到某個群組(後面會講)
        # self.channel_layer.group_add(...)

    def disconnect(self, close_code):
        """
        2. 連接斷開(用戶關閉頁面、網路斷開等)
        """
        print(f"Client disconnected: {close_code}")

        # 離開群組
        # self.channel_layer.group_discard(...)

    def receive(self, text_data=None, bytes_data=None):
        """
        3. 接收到客戶端消息
        """
        if text_data:
            data = json.loads(text_data)
            message = data['message']

            print(f"Received message: {message}")

            # 處理消息
            self.send(text_data=json.dumps({
                'message': f'Echo: {message}'
            }))

        if bytes_data:
            # 處理二進制數據(如文件上傳)
            pass

    # 自定義方法(從 Channel Layer 接收消息時調用)
    def chat_message(self, event):
        """
        從群組接收消息
        """
        message = event['message']

        # 發送給客戶端
        self.send(text_data=json.dumps({
            'message': message
        }))

7. Channel Layer(群組通信)

為什麼需要 Channel Layer?

# 問題:如何實現群組聊天?

# 用戶 A 發送消息 → 如何通知用戶 B、C、D?
# 每個用戶都有自己的 WebSocket 連接(不同的 Consumer 實例)

# 解決方案:Channel Layer
# - 讓不同的 Consumer 可以互相通信
# - 支援「群組」概念:一次發送給多個 Consumer

使用 Channel Layer

# consumers.py
from channels.generic.websocket import AsyncWebsocketConsumer
import json

class ChatConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        # 從 URL 獲取房間名稱
        self.room_name = self.scope['url_route']['kwargs']['room_name']
        self.room_group_name = f'chat_{self.room_name}'

        # 加入群組
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )

        await self.accept()

    async def disconnect(self, close_code):
        # 離開群組
        await self.channel_layer.group_discard(
            self.room_group_name,
            self.channel_name
        )

    async def receive(self, text_data):
        data = json.loads(text_data)
        message = data['message']

        # 發送消息給群組中的所有成員
        await self.channel_layer.group_send(
            self.room_group_name,
            {
                'type': 'chat_message',  # 調用 chat_message 方法
                'message': message
            }
        )

    async def chat_message(self, event):
        """從群組接收消息"""
        message = event['message']

        # 發送給客戶端
        await self.send(text_data=json.dumps({
            'message': message
        }))

Channel Layer 架構

┌──────────────┐        ┌──────────────┐        ┌──────────────┐
│  Consumer A  │        │  Consumer B  │        │  Consumer C  │
│  (用戶 A)    │        │  (用戶 B)    │        │  (用戶 C)    │
└──────┬───────┘        └──────┬───────┘        └──────┬───────┘
       │                       │                       │
       │    加入群組 "room_1"    │                       │
       └───────────────┬───────┴───────────────────────┘
                       ↓
              ┌─────────────────┐
              │  Channel Layer  │
              │     (Redis)     │
              └─────────────────┘
                       │
       用戶 A 發送消息 "Hello"
                       │
       ┌───────────────┴───────────────┐
       ↓               ↓               ↓
   Consumer A      Consumer B      Consumer C
   (收到 Hello)    (收到 Hello)    (收到 Hello)

8. 同步 vs 異步 Consumer

同步 Consumer

from channels.generic.websocket import WebsocketConsumer

class SyncChatConsumer(WebsocketConsumer):
    """同步 Consumer(簡單但性能較差)"""

    def connect(self):
        self.accept()

    def receive(self, text_data):
        # 同步處理(阻塞)
        result = self.process_message(text_data)
        self.send(text_data=result)

    def process_message(self, text_data):
        # 可能是耗時操作
        return text_data.upper()

異步 Consumer

from channels.generic.websocket import AsyncWebsocketConsumer

class AsyncChatConsumer(AsyncWebsocketConsumer):
    """異步 Consumer(推薦)"""

    async def connect(self):
        await self.accept()

    async def receive(self, text_data):
        # 異步處理(不阻塞)
        result = await self.process_message(text_data)
        await self.send(text_data=result)

    async def process_message(self, text_data):
        # 異步操作
        return text_data.upper()

性能對比

# 同步 Consumer:
# - 每個連接一個線程
# - 1000 個連接 = 1000 個線程
# - 記憶體占用:1000 × 8MB = 8GB

# 異步 Consumer:
# - 單個事件循環處理所有連接
# - 1000 個連接 = 1 個事件循環
# - 記憶體占用:<100MB

# ✅ 推薦使用異步 Consumer

9. 與資料庫交互

同步 ORM(需要 sync_to_async)

from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async
from django.contrib.auth.models import User
import json

class ChatConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        await self.accept()

    async def receive(self, text_data):
        data = json.loads(text_data)
        message = data['message']

        # ⚠️ Django ORM 是同步的,需要包裝成異步
        user = await self.get_user(self.scope['user'].id)

        # 保存消息到資料庫
        await self.save_message(user, message)

        await self.send(text_data=json.dumps({
            'message': message,
            'user': user.username
        }))

    @database_sync_to_async
    def get_user(self, user_id):
        """將同步 ORM 查詢包裝成異步"""
        return User.objects.get(id=user_id)

    @database_sync_to_async
    def save_message(self, user, message):
        """將同步 ORM 操作包裝成異步"""
        from .models import Message
        return Message.objects.create(
            user=user,
            content=message
        )

10. Channels vs Celery

核心區別

特性ChannelsCelery
用途實時通信(WebSocket)異步任務處理
通信方式雙向(客戶端 ⟷ 服務器)單向(服務器 → Worker)
連接長連接無連接
即時性毫秒級秒級
適用場景聊天、通知、協作郵件、報表、計算

使用場景

# ✅ 使用 Channels:
# - 聊天應用
# - 即時通知推送
# - 協作編輯
# - 即時儀表板
# - 在線遊戲

# ✅ 使用 Celery:
# - 發送郵件
# - 生成報表
# - 圖片處理
# - 數據同步
# - 定時任務

可以並用

# 典型架構:Channels + Celery

# 1. 用戶提交任務 → Celery 處理
from celery import shared_task
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync

@shared_task
def process_long_task(user_id, task_id):
    """長時間運行的任務"""
    # 處理任務(10 分鐘)
    result = perform_heavy_computation()

    # 2. 任務完成 → 通過 Channels 通知用戶
    channel_layer = get_channel_layer()
    async_to_sync(channel_layer.group_send)(
        f'user_{user_id}',
        {
            'type': 'task_complete',
            'task_id': task_id,
            'result': result
        }
    )

# 3. Consumer 接收通知 → 推送給客戶端
class NotificationConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        user_id = self.scope['user'].id
        await self.channel_layer.group_add(
            f'user_{user_id}',
            self.channel_name
        )
        await self.accept()

    async def task_complete(self, event):
        """接收任務完成通知"""
        await self.send(text_data=json.dumps({
            'type': 'task_complete',
            'task_id': event['task_id'],
            'result': event['result']
        }))

面試常見問題

Q1:Channels 和 WebSocket 有什麼關係?

答案:

  • WebSocket 是一種通信協議(類似 HTTP)
  • Channels 是 Django 的擴展框架,用於處理 WebSocket
# WebSocket 是協議
ws://example.com/chat/  # WebSocket URL

# Channels 是實現
class ChatConsumer(WebsocketConsumer):
    # 處理 WebSocket 連接
    pass

關係: Channels 讓 Django 可以處理 WebSocket 協議。


Q2:為什麼 Channels 需要 Redis?

答案:

Redis 用作 Channel Layer,實現不同 Consumer 之間的通信。

# 問題:多個 Worker 進程如何通信?

# Worker 1 (處理用戶 A)
用戶 A 發送消息 "Hello"

# Worker 2 (處理用戶 B)
如何通知用戶 B

# 解決:通過 Redis Channel Layer
Worker 1  發送到 Redis  Worker 2 接收  推送給用戶 B

沒有 Redis 的後果:

  • 無法實現群組聊天
  • 無法跨 Worker 通信
  • 只能做一對一的簡單 WebSocket

Q3:Channels 可以和傳統 Django View 並存嗎?

答案: 可以!完全兼容。

# myproject/asgi.py
from channels.routing import ProtocolTypeRouter, URLRouter

application = ProtocolTypeRouter({
    # 傳統 HTTP 請求 → Django Views
    "http": get_asgi_application(),

    # WebSocket 請求 → Channels Consumers
    "websocket": AuthMiddlewareStack(
        URLRouter(websocket_urlpatterns)
    ),
})

# 可以同時使用:
# - /api/users/ → Django Rest Framework (HTTP)
# - /ws/chat/ → Channels Consumer (WebSocket)

小結

Django Channels 的核心概念:

  1. ASGI:支持異步協議(HTTP、WebSocket)
  2. Consumer:處理 WebSocket 消息(類似 View)
  3. Channel Layer:不同 Consumer 之間通信(需要 Redis)
  4. 群組:一次發送給多個用戶
  5. 異步優先:使用 AsyncWebsocketConsumer 獲得最佳性能

關鍵優勢:

  • ✅ 實時雙向通信
  • ✅ 支持群組和廣播
  • ✅ 與 Django 完全集成
  • ✅ 可以與 Celery 並用

下一章將通過實戰案例展示 Channels 的應用!

0%