Django 面試準備 06-4:Channels 實戰案例
5 個真實場景的 Channels 應用與最佳實踐
目錄
06-4. Channels 實戰案例
本章通過 5 個真實場景,展示 Django Channels 在生產環境中的實際應用。
1. 案例一:聊天室系統
場景描述
構建一個多房間聊天系統,支持:
- 多個聊天室
- 用戶加入/離開通知
- 在線用戶列表
- 歷史消息加載
完整實現
步驟 1:數據模型
# models.py
from django.db import models
from django.contrib.auth.models import User
class ChatRoom(models.Model):
"""聊天室"""
name = models.CharField(max_length=100, unique=True)
slug = models.SlugField(unique=True)
description = models.TextField(blank=True)
created_at = models.DateTimeField(auto_now_add=True)
def __str__(self):
return self.name
class Message(models.Model):
"""聊天消息"""
room = models.ForeignKey(ChatRoom, on_delete=models.CASCADE, related_name='messages')
user = models.ForeignKey(User, on_delete=models.CASCADE)
content = models.TextField()
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
ordering = ['created_at']
def __str__(self):
return f'{self.user.username}: {self.content[:50]}'步驟 2:Consumer 實現
# consumers.py
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async
from django.contrib.auth.models import User
import json
from datetime import datetime
class ChatConsumer(AsyncWebsocketConsumer):
async def connect(self):
"""用戶連接"""
self.room_name = self.scope['url_route']['kwargs']['room_name']
self.room_group_name = f'chat_{self.room_name}'
self.user = self.scope['user']
# 檢查用戶是否已認證
if not self.user.is_authenticated:
await self.close()
return
# 加入房間群組
await self.channel_layer.group_add(
self.room_group_name,
self.channel_name
)
await self.accept()
# 通知其他用戶有人加入
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'user_join',
'user': self.user.username,
'timestamp': datetime.now().isoformat()
}
)
# 發送歷史消息給新用戶
messages = await self.get_room_messages()
await self.send(text_data=json.dumps({
'type': 'history',
'messages': messages
}))
# 發送在線用戶列表
online_users = await self.get_online_users()
await self.send(text_data=json.dumps({
'type': 'online_users',
'users': online_users
}))
async def disconnect(self, close_code):
"""用戶斷開"""
# 通知其他用戶有人離開
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'user_leave',
'user': self.user.username,
'timestamp': datetime.now().isoformat()
}
)
# 離開群組
await self.channel_layer.group_discard(
self.room_group_name,
self.channel_name
)
async def receive(self, text_data):
"""接收消息"""
data = json.loads(text_data)
message_type = data.get('type')
if message_type == 'chat_message':
message = data['message']
# 保存到資料庫
await self.save_message(message)
# 廣播給房間所有用戶
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'chat_message',
'user': self.user.username,
'message': message,
'timestamp': datetime.now().isoformat()
}
)
elif message_type == 'typing':
# 用戶正在輸入
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'user_typing',
'user': self.user.username
}
)
# 群組消息處理器
async def chat_message(self, event):
"""廣播聊天消息"""
await self.send(text_data=json.dumps({
'type': 'chat_message',
'user': event['user'],
'message': event['message'],
'timestamp': event['timestamp']
}))
async def user_join(self, event):
"""用戶加入通知"""
await self.send(text_data=json.dumps({
'type': 'user_join',
'user': event['user'],
'timestamp': event['timestamp']
}))
async def user_leave(self, event):
"""用戶離開通知"""
await self.send(text_data=json.dumps({
'type': 'user_leave',
'user': event['user'],
'timestamp': event['timestamp']
}))
async def user_typing(self, event):
"""用戶正在輸入"""
# 不發送給自己
if event['user'] != self.user.username:
await self.send(text_data=json.dumps({
'type': 'user_typing',
'user': event['user']
}))
# 資料庫操作
@database_sync_to_async
def save_message(self, message):
"""保存消息到資料庫"""
from .models import ChatRoom, Message
room = ChatRoom.objects.get(slug=self.room_name)
return Message.objects.create(
room=room,
user=self.user,
content=message
)
@database_sync_to_async
def get_room_messages(self):
"""獲取房間歷史消息(最近 50 條)"""
from .models import ChatRoom
room = ChatRoom.objects.get(slug=self.room_name)
messages = room.messages.select_related('user').order_by('-created_at')[:50]
return [
{
'user': msg.user.username,
'message': msg.content,
'timestamp': msg.created_at.isoformat()
}
for msg in reversed(messages)
]
@database_sync_to_async
def get_online_users(self):
"""獲取在線用戶列表(簡化實現)"""
# 實際應該使用 Redis 或其他方式追蹤
return [self.user.username]步驟 3:路由配置
# routing.py
from django.urls import re_path
from . import consumers
websocket_urlpatterns = [
re_path(r'ws/chat/(?P<room_name>\w+)/$', consumers.ChatConsumer.as_asgi()),
]步驟 4:前端實現
<!-- chat.html -->
<!DOCTYPE html>
<html>
<head>
<title>Chat Room - {{ room.name }}</title>
<style>
#chat-log {
height: 400px;
overflow-y: scroll;
border: 1px solid #ccc;
padding: 10px;
margin-bottom: 10px;
}
.message {
margin: 5px 0;
}
.system-message {
color: #666;
font-style: italic;
}
.typing-indicator {
color: #999;
font-size: 0.9em;
}
</style>
</head>
<body>
<h1>{{ room.name }}</h1>
<div id="online-users">
<strong>在線用戶:</strong>
<span id="user-list"></span>
</div>
<div id="chat-log"></div>
<div id="typing-indicator" class="typing-indicator"></div>
<input id="chat-message-input" type="text" placeholder="輸入消息..." size="100">
<button id="chat-message-submit">發送</button>
<script>
const roomName = "{{ room.slug }}";
const username = "{{ request.user.username }}";
// 建立 WebSocket 連接
const chatSocket = new WebSocket(
'ws://' + window.location.host +
'/ws/chat/' + roomName + '/'
);
// 連接成功
chatSocket.onopen = function(e) {
console.log('WebSocket connected');
};
// 接收消息
chatSocket.onmessage = function(e) {
const data = JSON.parse(e.data);
switch(data.type) {
case 'history':
// 顯示歷史消息
data.messages.forEach(msg => {
appendMessage(msg.user, msg.message, msg.timestamp);
});
break;
case 'chat_message':
// 新消息
appendMessage(data.user, data.message, data.timestamp);
break;
case 'user_join':
// 用戶加入
appendSystemMessage(`${data.user} 加入了聊天室`);
break;
case 'user_leave':
// 用戶離開
appendSystemMessage(`${data.user} 離開了聊天室`);
break;
case 'user_typing':
// 用戶正在輸入
showTypingIndicator(data.user);
break;
case 'online_users':
// 更新在線用戶列表
document.getElementById('user-list').textContent = data.users.join(', ');
break;
}
};
// 連接關閉
chatSocket.onclose = function(e) {
console.error('WebSocket closed');
};
// 發送消息
document.getElementById('chat-message-submit').onclick = function(e) {
const messageInput = document.getElementById('chat-message-input');
const message = messageInput.value.trim();
if (message) {
chatSocket.send(JSON.stringify({
'type': 'chat_message',
'message': message
}));
messageInput.value = '';
}
};
// Enter 鍵發送
document.getElementById('chat-message-input').onkeyup = function(e) {
if (e.keyCode === 13) {
document.getElementById('chat-message-submit').click();
} else {
// 發送正在輸入通知(節流)
clearTimeout(window.typingTimeout);
window.typingTimeout = setTimeout(() => {
chatSocket.send(JSON.dumps({
'type': 'typing'
}));
}, 500);
}
};
// 顯示消息
function appendMessage(user, message, timestamp) {
const chatLog = document.getElementById('chat-log');
const messageDiv = document.createElement('div');
messageDiv.className = 'message';
const time = new Date(timestamp).toLocaleTimeString();
messageDiv.innerHTML = `<strong>${user}</strong> [${time}]: ${escapeHtml(message)}`;
chatLog.appendChild(messageDiv);
chatLog.scrollTop = chatLog.scrollHeight;
}
// 顯示系統消息
function appendSystemMessage(message) {
const chatLog = document.getElementById('chat-log');
const messageDiv = document.createElement('div');
messageDiv.className = 'message system-message';
messageDiv.textContent = message;
chatLog.appendChild(messageDiv);
chatLog.scrollTop = chatLog.scrollHeight;
}
// 顯示正在輸入
function showTypingIndicator(user) {
const indicator = document.getElementById('typing-indicator');
indicator.textContent = `${user} 正在輸入...`;
clearTimeout(window.typingIndicatorTimeout);
window.typingIndicatorTimeout = setTimeout(() => {
indicator.textContent = '';
}, 3000);
}
// 轉義 HTML
function escapeHtml(text) {
const map = {
'&': '&',
'<': '<',
'>': '>',
'"': '"',
"'": '''
};
return text.replace(/[&<>"']/g, m => map[m]);
}
</script>
</body>
</html>2. 案例二:實時通知系統
場景描述
當後端事件發生時,即時推送通知給用戶:
- 新訂單通知
- 系統消息
- 好友請求
- 評論回覆
實現
# consumers.py
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async
import json
class NotificationConsumer(AsyncWebsocketConsumer):
async def connect(self):
"""連接建立"""
self.user = self.scope['user']
if not self.user.is_authenticated:
await self.close()
return
# 每個用戶有自己的通知頻道
self.user_group_name = f'user_{self.user.id}'
# 加入個人群組
await self.channel_layer.group_add(
self.user_group_name,
self.channel_name
)
await self.accept()
# 發送未讀通知數量
unread_count = await self.get_unread_count()
await self.send(text_data=json.dumps({
'type': 'unread_count',
'count': unread_count
}))
async def disconnect(self, close_code):
"""斷開連接"""
await self.channel_layer.group_discard(
self.user_group_name,
self.channel_name
)
async def receive(self, text_data):
"""接收客戶端消息"""
data = json.loads(text_data)
if data['type'] == 'mark_read':
# 標記通知已讀
notification_id = data['notification_id']
await self.mark_notification_read(notification_id)
# 通知處理器
async def send_notification(self, event):
"""發送通知"""
await self.send(text_data=json.dumps({
'type': 'notification',
'id': event['notification_id'],
'title': event['title'],
'message': event['message'],
'url': event.get('url'),
'timestamp': event['timestamp']
}))
# 資料庫操作
@database_sync_to_async
def get_unread_count(self):
"""獲取未讀通知數量"""
from .models import Notification
return Notification.objects.filter(
user=self.user,
is_read=False
).count()
@database_sync_to_async
def mark_notification_read(self, notification_id):
"""標記通知已讀"""
from .models import Notification
Notification.objects.filter(
id=notification_id,
user=self.user
).update(is_read=True)# models.py
class Notification(models.Model):
"""通知模型"""
user = models.ForeignKey(User, on_delete=models.CASCADE)
title = models.CharField(max_length=200)
message = models.TextField()
url = models.URLField(blank=True)
is_read = models.BooleanField(default=False)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
ordering = ['-created_at']# utils.py
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
from datetime import datetime
def send_user_notification(user_id, title, message, url=None):
"""發送通知給指定用戶"""
from .models import Notification
# 保存到資料庫
notification = Notification.objects.create(
user_id=user_id,
title=title,
message=message,
url=url
)
# 通過 WebSocket 推送
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
f'user_{user_id}',
{
'type': 'send_notification',
'notification_id': notification.id,
'title': title,
'message': message,
'url': url or '',
'timestamp': datetime.now().isoformat()
}
)
# 使用示例
# views.py
def create_order(request):
order = Order.objects.create(...)
# 發送通知
send_user_notification(
user_id=request.user.id,
title='訂單已創建',
message=f'您的訂單 {order.id} 已成功創建',
url=f'/orders/{order.id}/'
)
return JsonResponse({'order_id': order.id})3. 案例三:協作編輯(簡化版)
場景描述
多個用戶同時編輯同一個文檔,實時同步。
實現
# consumers.py
from channels.generic.websocket import AsyncWebsocketConsumer
import json
class CollaborativeEditConsumer(AsyncWebsocketConsumer):
async def connect(self):
"""連接建立"""
self.document_id = self.scope['url_route']['kwargs']['document_id']
self.room_group_name = f'document_{self.document_id}'
self.user = self.scope['user']
if not self.user.is_authenticated:
await self.close()
return
# 加入文檔群組
await self.channel_layer.group_add(
self.room_group_name,
self.channel_name
)
await self.accept()
# 通知其他用戶有人加入
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'user_joined',
'user': self.user.username,
'user_id': self.user.id
}
)
# 發送當前文檔內容
content = await self.get_document_content()
await self.send(text_data=json.dumps({
'type': 'document_content',
'content': content
}))
async def disconnect(self, close_code):
"""斷開連接"""
# 通知其他用戶有人離開
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'user_left',
'user': self.user.username,
'user_id': self.user.id
}
)
await self.channel_layer.group_discard(
self.room_group_name,
self.channel_name
)
async def receive(self, text_data):
"""接收編輯操作"""
data = json.loads(text_data)
operation_type = data['type']
if operation_type == 'edit':
# 編輯操作
position = data['position']
text = data['text']
action = data['action'] # 'insert' or 'delete'
# 保存到資料庫(可選)
# await self.save_edit_operation(...)
# 廣播給其他用戶
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'broadcast_edit',
'user': self.user.username,
'user_id': self.user.id,
'position': position,
'text': text,
'action': action
}
)
elif operation_type == 'cursor_move':
# 光標移動
position = data['position']
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'cursor_update',
'user': self.user.username,
'user_id': self.user.id,
'position': position
}
)
# 廣播處理器
async def broadcast_edit(self, event):
"""廣播編輯操作"""
# 不發送給自己
if event['user_id'] != self.user.id:
await self.send(text_data=json.dumps({
'type': 'edit',
'user': event['user'],
'position': event['position'],
'text': event['text'],
'action': event['action']
}))
async def cursor_update(self, event):
"""光標位置更新"""
if event['user_id'] != self.user.id:
await self.send(text_data=json.dumps({
'type': 'cursor',
'user': event['user'],
'position': event['position']
}))
async def user_joined(self, event):
"""用戶加入通知"""
if event['user_id'] != self.user.id:
await self.send(text_data=json.dumps({
'type': 'user_joined',
'user': event['user']
}))
async def user_left(self, event):
"""用戶離開通知"""
await self.send(text_data=json.dumps({
'type': 'user_left',
'user': event['user']
}))
@database_sync_to_async
def get_document_content(self):
"""獲取文檔內容"""
from .models import Document
document = Document.objects.get(id=self.document_id)
return document.content4. 案例四:實時儀表板
場景描述
管理員監控實時數據:在線用戶數、訂單數、系統狀態等。
實現
# consumers.py
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async
import json
import asyncio
class DashboardConsumer(AsyncWebsocketConsumer):
async def connect(self):
"""連接建立"""
self.user = self.scope['user']
# 檢查權限(只有管理員能訪問)
if not self.user.is_authenticated or not self.user.is_staff:
await self.close()
return
self.room_group_name = 'dashboard'
await self.channel_layer.group_add(
self.room_group_name,
self.channel_name
)
await self.accept()
# 發送初始數據
stats = await self.get_initial_stats()
await self.send(text_data=json.dumps({
'type': 'initial_stats',
'data': stats
}))
# 開始定期推送更新(每 5 秒)
asyncio.create_task(self.send_periodic_updates())
async def disconnect(self, close_code):
"""斷開連接"""
await self.channel_layer.group_discard(
self.room_group_name,
self.channel_name
)
async def send_periodic_updates(self):
"""定期發送更新"""
try:
while True:
await asyncio.sleep(5)
# 獲取最新數據
stats = await self.get_current_stats()
# 發送給客戶端
await self.send(text_data=json.dumps({
'type': 'stats_update',
'data': stats
}))
except asyncio.CancelledError:
pass
# 實時事件處理器
async def new_order(self, event):
"""新訂單事件"""
await self.send(text_data=json.dumps({
'type': 'new_order',
'order': event['order_data']
}))
async def system_alert(self, event):
"""系統告警"""
await self.send(text_data=json.dumps({
'type': 'alert',
'level': event['level'],
'message': event['message']
}))
# 資料庫操作
@database_sync_to_async
def get_initial_stats(self):
"""獲取初始統計數據"""
from django.contrib.auth.models import User
from .models import Order
from django.utils import timezone
from datetime import timedelta
today = timezone.now().date()
return {
'total_users': User.objects.count(),
'online_users': self.get_online_user_count(),
'today_orders': Order.objects.filter(created_at__date=today).count(),
'today_revenue': Order.objects.filter(
created_at__date=today
).aggregate(Sum('total'))['total__sum'] or 0,
}
@database_sync_to_async
def get_current_stats(self):
"""獲取當前統計數據"""
return self.get_initial_stats()
def get_online_user_count(self):
"""獲取在線用戶數(簡化實現)"""
# 實際應該使用 Redis 追蹤
return 42# 在其他地方觸發事件
# views.py
def create_order(request):
order = Order.objects.create(...)
# 推送新訂單到儀表板
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
'dashboard',
{
'type': 'new_order',
'order_data': {
'id': order.id,
'user': order.user.username,
'total': float(order.total),
'status': order.status
}
}
)
return JsonResponse({'order_id': order.id})5. 案例五:Celery + Channels 整合
場景描述
長時間任務(Celery)完成後,通過 WebSocket(Channels)即時通知用戶。
實現
# tasks.py
from celery import shared_task
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
from datetime import datetime
@shared_task(bind=True)
def generate_report_task(self, user_id, report_type):
"""生成報表(耗時任務)"""
try:
# 發送開始通知
send_task_update(user_id, self.request.id, 'started', 0)
# 模擬報表生成(分步驟)
steps = 10
for i in range(steps):
# 執行某個步驟
time.sleep(2) # 模擬耗時操作
# 更新進度
progress = int((i + 1) / steps * 100)
send_task_update(
user_id,
self.request.id,
'progress',
progress,
f'正在處理步驟 {i+1}/{steps}'
)
# 生成完成
report_url = f'/reports/{self.request.id}.pdf'
send_task_update(
user_id,
self.request.id,
'completed',
100,
'報表生成完成',
result={'url': report_url}
)
return report_url
except Exception as e:
# 任務失敗
send_task_update(
user_id,
self.request.id,
'failed',
0,
str(e)
)
raise
def send_task_update(user_id, task_id, status, progress, message='', result=None):
"""發送任務更新通知"""
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
f'user_{user_id}',
{
'type': 'task_update',
'task_id': task_id,
'status': status,
'progress': progress,
'message': message,
'result': result or {},
'timestamp': datetime.now().isoformat()
}
)# consumers.py
class TaskNotificationConsumer(AsyncWebsocketConsumer):
async def connect(self):
"""連接建立"""
self.user = self.scope['user']
if not self.user.is_authenticated:
await self.close()
return
self.user_group_name = f'user_{self.user.id}'
await self.channel_layer.group_add(
self.user_group_name,
self.channel_name
)
await self.accept()
async def disconnect(self, close_code):
"""斷開連接"""
await self.channel_layer.group_discard(
self.user_group_name,
self.channel_name
)
async def task_update(self, event):
"""接收任務更新"""
await self.send(text_data=json.dumps({
'type': 'task_update',
'task_id': event['task_id'],
'status': event['status'],
'progress': event['progress'],
'message': event['message'],
'result': event['result'],
'timestamp': event['timestamp']
}))# views.py
from .tasks import generate_report_task
def request_report(request):
"""請求生成報表"""
report_type = request.POST['type']
# 發送 Celery 任務
task = generate_report_task.delay(request.user.id, report_type)
return JsonResponse({
'task_id': task.id,
'message': '報表生成中,請稍候...'
})// 前端實現
const socket = new WebSocket('ws://localhost:8000/ws/tasks/');
socket.onmessage = function(e) {
const data = JSON.parse(e.data);
if (data.type === 'task_update') {
const { task_id, status, progress, message, result } = data;
// 更新進度條
updateProgressBar(task_id, progress);
// 更新狀態消息
updateStatusMessage(task_id, message);
if (status === 'completed') {
// 任務完成
showDownloadLink(result.url);
} else if (status === 'failed') {
// 任務失敗
showError(message);
}
}
};
// 請求生成報表
function requestReport(type) {
fetch('/api/reports/request/', {
method: 'POST',
body: JSON.stringify({ type: type })
})
.then(response => response.json())
.then(data => {
console.log('Task ID:', data.task_id);
// WebSocket 會自動接收更新
});
}6. 生產環境最佳實踐
1. 認證與授權
# middleware.py
from channels.auth import AuthMiddlewareStack
from channels.db import database_sync_to_async
from django.contrib.auth.models import AnonymousUser
from urllib.parse import parse_qs
class TokenAuthMiddleware:
"""Token 認證中間件"""
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
# 從 query string 獲取 token
query_string = parse_qs(scope['query_string'].decode())
token = query_string.get('token', [None])[0]
if token:
scope['user'] = await self.get_user_from_token(token)
else:
scope['user'] = AnonymousUser()
return await self.app(scope, receive, send)
@database_sync_to_async
def get_user_from_token(self, token):
"""從 token 獲取用戶"""
try:
from rest_framework.authtoken.models import Token
return Token.objects.get(key=token).user
except:
return AnonymousUser()
# asgi.py
application = ProtocolTypeRouter({
"websocket": TokenAuthMiddleware(
URLRouter(websocket_urlpatterns)
),
})2. 錯誤處理
class RobustChatConsumer(AsyncWebsocketConsumer):
async def connect(self):
try:
# 連接邏輯
await self.accept()
except Exception as e:
logger.error(f"Connection error: {e}")
await self.close()
async def receive(self, text_data):
try:
data = json.loads(text_data)
# 處理消息
except json.JSONDecodeError:
await self.send(text_data=json.dumps({
'error': 'Invalid JSON'
}))
except Exception as e:
logger.error(f"Receive error: {e}")
await self.send(text_data=json.dumps({
'error': 'Internal server error'
}))3. 限流
from django.core.cache import cache
class RateLimitedConsumer(AsyncWebsocketConsumer):
async def receive(self, text_data):
# 限流檢查
cache_key = f'ratelimit_{self.user.id}'
request_count = cache.get(cache_key, 0)
if request_count > 100: # 每分鐘最多 100 條消息
await self.send(text_data=json.dumps({
'error': 'Rate limit exceeded'
}))
return
cache.set(cache_key, request_count + 1, timeout=60)
# 處理消息
...4. 監控
import logging
import time
logger = logging.getLogger(__name__)
class MonitoredConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.connect_time = time.time()
logger.info(f"User {self.scope['user'].id} connected")
await self.accept()
async def disconnect(self, close_code):
duration = time.time() - self.connect_time
logger.info(
f"User {self.scope['user'].id} disconnected "
f"after {duration:.2f}s, code={close_code}"
)
async def receive(self, text_data):
start_time = time.time()
# 處理消息
await self.process_message(text_data)
duration = time.time() - start_time
if duration > 1: # 超過 1 秒記錄
logger.warning(
f"Slow message processing: {duration:.2f}s"
)小結
本章展示了 5 個 Channels 實戰案例:
- 聊天室系統:多房間、歷史消息、在線用戶、正在輸入
- 實時通知系統:個人通知頻道、未讀數量、標記已讀
- 協作編輯:多人同步編輯、光標位置、實時更新
- 實時儀表板:定期推送數據、實時事件、系統監控
- Celery + Channels:長任務進度推送、完成通知
關鍵要點:
- ✅ 使用異步 Consumer 獲得最佳性能
- ✅ 合理使用 Channel Layer(Redis)
- ✅ 實現認證、錯誤處理、限流
- ✅ 監控連接數和消息處理時間
- ✅ Celery 處理耗時任務 + Channels 推送通知
下一章將對比 Celery 和 Channels,幫助你選擇合適的方案!