Django 面試準備 06-2:Celery 實戰案例
7 個真實場景的 Celery 應用與最佳實踐
目錄
06-2. Celery 實戰案例
本章通過 7 個真實場景,展示 Celery 在生產環境中的實際應用。
1. 案例一:郵件發送系統
場景描述
電商平台需要發送多種郵件:訂單確認、發貨通知、促銷活動等。每天發送 10 萬封郵件。
問題分析
# ❌ 同步發送郵件的問題
def create_order(request):
order = Order.objects.create(...)
# 同步發送郵件,阻塞 2-3 秒
send_mail(
subject=f'訂單 {order.id} 已確認',
message='...',
recipient_list=[order.user.email]
)
return JsonResponse({'order_id': order.id})
# 問題:
# - 用戶等待 2-3 秒才能看到響應
# - SMTP 服務器故障會導致訂單創建失敗
# - 高峰期可能超時Celery 解決方案
步驟 1:定義郵件任務
# tasks.py
from celery import shared_task
from django.core.mail import send_mail, EmailMultiAlternatives
from django.template.loader import render_to_string
from django.conf import settings
import logging
logger = logging.getLogger(__name__)
@shared_task(bind=True, max_retries=3)
def send_order_confirmation_email(self, order_id):
"""發送訂單確認郵件"""
try:
from orders.models import Order
order = Order.objects.select_related('user').get(id=order_id)
# 渲染 HTML 郵件
html_content = render_to_string('emails/order_confirmation.html', {
'order': order,
'user': order.user,
})
# 創建郵件
email = EmailMultiAlternatives(
subject=f'訂單 {order.id} 已確認',
body=f'您的訂單總金額:${order.total}',
from_email=settings.DEFAULT_FROM_EMAIL,
to=[order.user.email]
)
email.attach_alternative(html_content, "text/html")
# 發送
email.send()
logger.info(f'Order confirmation email sent: order_id={order_id}')
return f'Email sent to {order.user.email}'
except Exception as exc:
logger.error(f'Failed to send email: order_id={order_id}, error={exc}')
# 30 秒後重試
raise self.retry(exc=exc, countdown=30)
@shared_task(bind=True, max_retries=3)
def send_shipping_notification(self, order_id, tracking_number):
"""發送發貨通知"""
try:
from orders.models import Order
order = Order.objects.select_related('user').get(id=order_id)
html_content = render_to_string('emails/shipping_notification.html', {
'order': order,
'tracking_number': tracking_number,
})
email = EmailMultiAlternatives(
subject=f'訂單 {order.id} 已發貨',
body=f'您的包裹追蹤號:{tracking_number}',
from_email=settings.DEFAULT_FROM_EMAIL,
to=[order.user.email]
)
email.attach_alternative(html_content, "text/html")
email.send()
return f'Shipping notification sent: {tracking_number}'
except Exception as exc:
raise self.retry(exc=exc, countdown=30)
@shared_task
def send_bulk_promotional_email(user_ids, subject, template_name, context):
"""批量發送促銷郵件"""
from django.contrib.auth.models import User
users = User.objects.filter(id__in=user_ids)
sent_count = 0
failed_count = 0
for user in users:
try:
html_content = render_to_string(template_name, {
**context,
'user': user,
})
email = EmailMultiAlternatives(
subject=subject,
body='查看 HTML 版本',
from_email=settings.DEFAULT_FROM_EMAIL,
to=[user.email]
)
email.attach_alternative(html_content, "text/html")
email.send()
sent_count += 1
except Exception as e:
logger.error(f'Failed to send to {user.email}: {e}')
failed_count += 1
return {
'sent': sent_count,
'failed': failed_count,
'total': len(user_ids)
}步驟 2:在 View 中使用
# views.py
from .tasks import send_order_confirmation_email, send_shipping_notification
def create_order(request):
"""創建訂單"""
order = Order.objects.create(
user=request.user,
total=calculate_total(request.data['items'])
)
# 異步發送郵件
send_order_confirmation_email.delay(order.id)
return JsonResponse({
'order_id': order.id,
'status': 'created'
})
def mark_order_shipped(request, order_id):
"""標記訂單已發貨"""
order = Order.objects.get(id=order_id)
order.status = 'shipped'
order.tracking_number = request.data['tracking_number']
order.save()
# 異步發送發貨通知
send_shipping_notification.delay(order.id, order.tracking_number)
return JsonResponse({'status': 'ok'})步驟 3:批量發送促銷郵件
# management/commands/send_promotion.py
from django.core.management.base import BaseCommand
from django.contrib.auth.models import User
from tasks import send_bulk_promotional_email
class Command(BaseCommand):
help = '發送促銷郵件給活躍用戶'
def handle(self, *args, **options):
# 獲取最近 30 天活躍的用戶
active_users = User.objects.filter(
last_login__gte=timezone.now() - timedelta(days=30),
is_active=True
).values_list('id', flat=True)
# 分批發送(每批 1000 個用戶)
batch_size = 1000
user_batches = [
list(active_users[i:i + batch_size])
for i in range(0, len(active_users), batch_size)
]
for batch in user_batches:
send_bulk_promotional_email.delay(
user_ids=batch,
subject='雙11大促銷!',
template_name='emails/promotion.html',
context={'discount': 50}
)
self.stdout.write(
self.style.SUCCESS(
f'Queued {len(user_batches)} batches, '
f'total {len(active_users)} users'
)
)優化技巧
# 使用連接池優化 SMTP 連接
from celery import shared_task
from django.core.mail import get_connection
@shared_task
def send_bulk_emails_optimized(email_list):
"""使用連接池批量發送"""
# 重用 SMTP 連接
connection = get_connection()
connection.open()
try:
for email_data in email_list:
email = EmailMultiAlternatives(
subject=email_data['subject'],
body=email_data['body'],
from_email=settings.DEFAULT_FROM_EMAIL,
to=[email_data['to']],
connection=connection # 重用連接
)
email.send()
finally:
connection.close()2. 案例二:報表生成系統
場景描述
企業需要生成各種報表:銷售報表、用戶統計、財務報表等。報表生成耗時 10-60 秒。
Celery 解決方案
# tasks.py
from celery import shared_task
from django.core.files.base import ContentFile
from django.db.models import Sum, Count
import pandas as pd
from datetime import datetime, timedelta
import io
@shared_task(bind=True)
def generate_sales_report(self, start_date, end_date, format='xlsx'):
"""生成銷售報表"""
from orders.models import Order
# 更新任務狀態
self.update_state(
state='PROGRESS',
meta={'current': 0, 'total': 100, 'status': '查詢數據中...'}
)
# 查詢數據
orders = Order.objects.filter(
created_at__range=[start_date, end_date]
).select_related('user').prefetch_related('items')
# 統計數據
self.update_state(
state='PROGRESS',
meta={'current': 30, 'total': 100, 'status': '統計數據中...'}
)
stats = {
'total_orders': orders.count(),
'total_revenue': orders.aggregate(Sum('total'))['total__sum'] or 0,
'avg_order_value': orders.aggregate(Sum('total'))['total__sum'] / orders.count(),
}
# 按日期分組
daily_stats = orders.extra(
select={'date': 'DATE(created_at)'}
).values('date').annotate(
orders=Count('id'),
revenue=Sum('total')
).order_by('date')
# 生成 Excel
self.update_state(
state='PROGRESS',
meta={'current': 60, 'total': 100, 'status': '生成報表中...'}
)
# 創建 DataFrame
df = pd.DataFrame(list(daily_stats))
# 生成 Excel 文件
buffer = io.BytesIO()
with pd.ExcelWriter(buffer, engine='openpyxl') as writer:
# 每日統計
df.to_excel(writer, sheet_name='每日統計', index=False)
# 摘要統計
summary_df = pd.DataFrame([stats])
summary_df.to_excel(writer, sheet_name='摘要', index=False)
buffer.seek(0)
# 保存到文件
self.update_state(
state='PROGRESS',
meta={'current': 90, 'total': 100, 'status': '保存文件中...'}
)
filename = f'sales_report_{start_date}_{end_date}.xlsx'
file_path = f'reports/{filename}'
# 上傳到 S3(或保存到本地)
from django.core.files.storage import default_storage
default_storage.save(file_path, ContentFile(buffer.getvalue()))
file_url = default_storage.url(file_path)
return {
'status': 'completed',
'file_url': file_url,
'stats': stats
}
@shared_task(bind=True)
def generate_user_analytics(self, year, month):
"""生成用戶分析報表"""
from django.contrib.auth.models import User
from orders.models import Order
self.update_state(state='PROGRESS', meta={'status': '分析用戶數據...'})
# 用戶註冊趨勢
users = User.objects.filter(
date_joined__year=year,
date_joined__month=month
)
# 用戶活躍度分析
active_users = users.filter(
last_login__gte=timezone.now() - timedelta(days=30)
)
# 用戶購買行為
user_orders = Order.objects.filter(
created_at__year=year,
created_at__month=month
).values('user_id').annotate(
order_count=Count('id'),
total_spent=Sum('total')
)
# 生成報表
data = {
'total_users': users.count(),
'active_users': active_users.count(),
'active_rate': active_users.count() / users.count() * 100,
'avg_orders_per_user': user_orders.aggregate(
avg=Sum('order_count')
)['avg'],
}
# 生成 PDF 報表(使用 ReportLab)
from reportlab.pdfgen import canvas
from reportlab.lib.pagesizes import A4
buffer = io.BytesIO()
p = canvas.Canvas(buffer, pagesize=A4)
# 繪製報表內容
p.drawString(100, 800, f'用戶分析報表 - {year}/{month}')
p.drawString(100, 780, f'總用戶數:{data["total_users"]}')
p.drawString(100, 760, f'活躍用戶:{data["active_users"]}')
p.drawString(100, 740, f'活躍率:{data["active_rate"]:.2f}%')
p.showPage()
p.save()
buffer.seek(0)
# 保存文件
filename = f'user_analytics_{year}_{month}.pdf'
file_path = f'reports/{filename}'
default_storage.save(file_path, ContentFile(buffer.getvalue()))
return {
'file_url': default_storage.url(file_path),
'data': data
}前端進度顯示
# views.py
from celery.result import AsyncResult
def request_sales_report(request):
"""請求生成報表"""
start_date = request.GET['start_date']
end_date = request.GET['end_date']
# 發送任務
task = generate_sales_report.delay(start_date, end_date)
return JsonResponse({
'task_id': task.id,
'status': 'processing'
})
def check_report_status(request, task_id):
"""檢查報表生成進度"""
result = AsyncResult(task_id)
if result.state == 'PROGRESS':
response = {
'state': result.state,
'current': result.info.get('current', 0),
'total': result.info.get('total', 1),
'status': result.info.get('status', '')
}
elif result.state == 'SUCCESS':
response = {
'state': result.state,
'result': result.result
}
else:
response = {
'state': result.state,
'status': str(result.info)
}
return JsonResponse(response)// 前端輪詢進度
async function generateReport() {
// 請求生成報表
const response = await fetch('/api/reports/sales/');
const data = await response.json();
const taskId = data.task_id;
// 輪詢進度
const interval = setInterval(async () => {
const statusResponse = await fetch(`/api/reports/status/${taskId}/`);
const status = await statusResponse.json();
if (status.state === 'PROGRESS') {
// 更新進度條
const progress = (status.current / status.total) * 100;
updateProgressBar(progress, status.status);
} else if (status.state === 'SUCCESS') {
clearInterval(interval);
// 顯示下載連結
showDownloadLink(status.result.file_url);
} else if (status.state === 'FAILURE') {
clearInterval(interval);
showError('報表生成失敗');
}
}, 1000); // 每秒檢查一次
}3. 案例三:圖片處理系統
場景描述
社交媒體平台需要處理用戶上傳的圖片:生成縮圖、添加浮水印、轉換格式等。
Celery 解決方案
# tasks.py
from celery import shared_task, chain
from PIL import Image, ImageDraw, ImageFont
import os
@shared_task
def resize_image(image_path, sizes):
"""調整圖片大小"""
try:
img = Image.open(image_path)
results = []
for name, size in sizes.items():
# 調整大小(保持比例)
img_copy = img.copy()
img_copy.thumbnail(size, Image.Resampling.LANCZOS)
# 保存
filename = f"{os.path.splitext(image_path)[0]}_{name}.jpg"
img_copy.save(filename, 'JPEG', quality=85)
results.append({
'name': name,
'path': filename,
'size': size
})
return results
except Exception as e:
raise Exception(f'Failed to resize image: {e}')
@shared_task
def add_watermark(image_path, watermark_text):
"""添加浮水印"""
try:
img = Image.open(image_path)
# 創建透明層
watermark = Image.new('RGBA', img.size, (0, 0, 0, 0))
draw = ImageDraw.Draw(watermark)
# 設置字體
font_size = int(img.size[0] / 20)
font = ImageFont.truetype('arial.ttf', font_size)
# 計算文字位置(右下角)
text_bbox = draw.textbbox((0, 0), watermark_text, font=font)
text_width = text_bbox[2] - text_bbox[0]
text_height = text_bbox[3] - text_bbox[1]
x = img.size[0] - text_width - 10
y = img.size[1] - text_height - 10
# 繪製浮水印
draw.text((x, y), watermark_text, fill=(255, 255, 255, 128), font=font)
# 合併圖層
img = img.convert('RGBA')
img = Image.alpha_composite(img, watermark)
# 保存
output_path = f"{os.path.splitext(image_path)[0]}_watermarked.png"
img.save(output_path, 'PNG')
return output_path
except Exception as e:
raise Exception(f'Failed to add watermark: {e}')
@shared_task
def optimize_image(image_path):
"""優化圖片(壓縮)"""
try:
img = Image.open(image_path)
# 轉換為 RGB(去除 alpha 通道)
if img.mode in ('RGBA', 'LA'):
background = Image.new('RGB', img.size, (255, 255, 255))
background.paste(img, mask=img.split()[-1])
img = background
# 壓縮保存
output_path = f"{os.path.splitext(image_path)[0]}_optimized.jpg"
img.save(output_path, 'JPEG', quality=75, optimize=True)
return output_path
except Exception as e:
raise Exception(f'Failed to optimize image: {e}')
@shared_task
def process_image_pipeline(image_path):
"""完整的圖片處理流程"""
# 使用 chain 串接任務
workflow = chain(
resize_image.s(image_path, {
'thumbnail': (150, 150),
'medium': (800, 600),
'large': (1920, 1080)
}),
add_watermark.s('© MyApp 2025'),
optimize_image.s()
)
return workflow.apply_async()在 View 中使用
# views.py
from django.core.files.storage import default_storage
def upload_image(request):
"""上傳圖片"""
if request.method == 'POST':
image = request.FILES['image']
# 保存原始圖片
filename = default_storage.save(f'uploads/{image.name}', image)
file_path = default_storage.path(filename)
# 異步處理圖片
task = process_image_pipeline.delay(file_path)
return JsonResponse({
'task_id': task.id,
'message': '圖片上傳成功,正在處理中...'
})4. 案例四:定時任務(Celery Beat)
場景描述
系統需要定期執行任務:每天生成報表、每小時清理過期數據、每週發送摘要郵件等。
配置 Celery Beat
# settings.py
from celery.schedules import crontab
# Celery Beat 配置
CELERY_BEAT_SCHEDULE = {
# 每天凌晨 2 點生成報表
'generate-daily-report': {
'task': 'tasks.generate_daily_report',
'schedule': crontab(hour=2, minute=0),
},
# 每小時清理過期 session
'cleanup-expired-sessions': {
'task': 'tasks.cleanup_expired_sessions',
'schedule': crontab(minute=0), # 每小時的第 0 分鐘
},
# 每週一早上 9 點發送週報
'send-weekly-summary': {
'task': 'tasks.send_weekly_summary',
'schedule': crontab(day_of_week=1, hour=9, minute=0),
},
# 每 30 秒檢查訂單支付狀態
'check-payment-status': {
'task': 'tasks.check_payment_status',
'schedule': 30.0, # 每 30 秒
},
# 每天凌晨 3 點備份資料庫
'backup-database': {
'task': 'tasks.backup_database',
'schedule': crontab(hour=3, minute=0),
'options': {'expires': 3600} # 1 小時內必須執行
},
}定時任務實現
# tasks.py
from celery import shared_task
from django.utils import timezone
from datetime import timedelta
@shared_task
def generate_daily_report():
"""生成每日報表"""
yesterday = timezone.now().date() - timedelta(days=1)
from orders.models import Order
# 統計昨天的數據
orders = Order.objects.filter(
created_at__date=yesterday
)
stats = {
'date': str(yesterday),
'total_orders': orders.count(),
'total_revenue': orders.aggregate(Sum('total'))['total__sum'] or 0,
}
# 發送郵件給管理員
send_mail(
subject=f'每日報表 - {yesterday}',
message=f"訂單數:{stats['total_orders']}\n"
f"總營收:${stats['total_revenue']}",
recipient_list=['admin@example.com']
)
return stats
@shared_task
def cleanup_expired_sessions():
"""清理過期的 Session"""
from django.contrib.sessions.models import Session
# 刪除過期的 session
deleted = Session.objects.filter(
expire_date__lt=timezone.now()
).delete()
return f'Deleted {deleted[0]} expired sessions'
@shared_task
def send_weekly_summary():
"""發送週報"""
from django.contrib.auth.models import User
# 獲取上週數據
last_week_start = timezone.now().date() - timedelta(days=7)
last_week_end = timezone.now().date()
users = User.objects.filter(
is_active=True,
email__isnull=False
)
for user in users:
# 獲取用戶的訂單數據
orders = user.orders.filter(
created_at__range=[last_week_start, last_week_end]
)
if orders.exists():
# 發送個性化週報
send_mail(
subject='您的本週購物摘要',
message=f'本週您完成了 {orders.count()} 筆訂單',
recipient_list=[user.email]
)
return f'Sent weekly summary to {users.count()} users'
@shared_task
def check_payment_status():
"""檢查訂單支付狀態"""
from orders.models import Order
# 查找待支付超過 30 分鐘的訂單
expired_time = timezone.now() - timedelta(minutes=30)
pending_orders = Order.objects.filter(
status='pending',
created_at__lt=expired_time
)
cancelled_count = 0
for order in pending_orders:
# 取消訂單
order.status = 'cancelled'
order.save()
# 發送通知
send_mail(
subject=f'訂單 {order.id} 已取消',
message='您的訂單因超時未支付已自動取消',
recipient_list=[order.user.email]
)
cancelled_count += 1
return f'Cancelled {cancelled_count} expired orders'
@shared_task
def backup_database():
"""備份資料庫"""
import subprocess
from django.conf import settings
timestamp = timezone.now().strftime('%Y%m%d_%H%M%S')
backup_file = f'/backups/db_backup_{timestamp}.sql'
# 執行 pg_dump
command = [
'pg_dump',
'-h', settings.DATABASES['default']['HOST'],
'-U', settings.DATABASES['default']['USER'],
'-d', settings.DATABASES['default']['NAME'],
'-f', backup_file
]
result = subprocess.run(command, capture_output=True, text=True)
if result.returncode == 0:
# 上傳到 S3
upload_to_s3(backup_file)
return f'Database backed up: {backup_file}'
else:
raise Exception(f'Backup failed: {result.stderr}')啟動 Celery Beat
# 開發環境
celery -A myproject beat --loglevel=info
# 生產環境(使用 systemd)
# /etc/systemd/system/celery-beat.service
[Unit]
Description=Celery Beat
After=network.target
[Service]
Type=simple
User=www-data
Group=www-data
WorkingDirectory=/var/www/myproject
ExecStart=/var/www/myproject/venv/bin/celery -A myproject beat \
--loglevel=info \
--pidfile=/var/run/celery/beat.pid
[Install]
WantedBy=multi-user.target5. 案例五:數據同步系統
場景描述
需要定期同步外部系統的數據:同步產品信息、更新庫存、同步用戶資料等。
Celery 解決方案
# tasks.py
from celery import shared_task, group
from django.db import transaction
import requests
@shared_task(bind=True, max_retries=5)
def sync_product_from_erp(self, product_id):
"""從 ERP 系統同步單個產品"""
try:
# 呼叫 ERP API
response = requests.get(
f'https://erp.example.com/api/products/{product_id}',
timeout=10
)
response.raise_for_status()
data = response.json()
# 更新本地數據
from products.models import Product
with transaction.atomic():
product, created = Product.objects.update_or_create(
erp_id=product_id,
defaults={
'name': data['name'],
'price': data['price'],
'stock': data['stock'],
'description': data['description'],
'last_synced': timezone.now()
}
)
action = 'created' if created else 'updated'
return f'Product {product_id} {action}'
except requests.exceptions.RequestException as exc:
# 指數退避重試:5秒、25秒、125秒...
countdown = 5 ** self.request.retries
raise self.retry(exc=exc, countdown=countdown)
@shared_task
def sync_all_products():
"""同步所有產品"""
# 獲取需要同步的產品 ID 列表
response = requests.get('https://erp.example.com/api/products/')
product_ids = response.json()['product_ids']
# 並行同步
job = group(
sync_product_from_erp.s(product_id)
for product_id in product_ids
)
result = job.apply_async()
return f'Queued sync for {len(product_ids)} products'
@shared_task
def sync_inventory():
"""同步庫存"""
from products.models import Product
# 批量獲取庫存信息
response = requests.get('https://erp.example.com/api/inventory/')
inventory_data = response.json()
updated_count = 0
with transaction.atomic():
for item in inventory_data:
Product.objects.filter(erp_id=item['product_id']).update(
stock=item['stock'],
last_synced=timezone.now()
)
updated_count += 1
return f'Updated inventory for {updated_count} products'定時同步配置
# settings.py
CELERY_BEAT_SCHEDULE = {
# 每小時同步產品信息
'sync-products': {
'task': 'tasks.sync_all_products',
'schedule': crontab(minute=0),
},
# 每 15 分鐘同步庫存
'sync-inventory': {
'task': 'tasks.sync_inventory',
'schedule': crontab(minute='*/15'), # 每 15 分鐘
},
}6. 案例六:批量數據處理
場景描述
需要處理大量數據:批量導入用戶、批量更新價格、批量發送通知等。
Celery 解決方案
# tasks.py
from celery import shared_task, chord
from django.db import transaction
@shared_task
def process_user_batch(user_data_batch):
"""處理一批用戶數據"""
from django.contrib.auth.models import User
created_count = 0
updated_count = 0
error_count = 0
with transaction.atomic():
for user_data in user_data_batch:
try:
user, created = User.objects.update_or_create(
email=user_data['email'],
defaults={
'username': user_data['username'],
'first_name': user_data['first_name'],
'last_name': user_data['last_name'],
}
)
if created:
created_count += 1
else:
updated_count += 1
except Exception as e:
error_count += 1
logger.error(f"Failed to process user {user_data['email']}: {e}")
return {
'created': created_count,
'updated': updated_count,
'errors': error_count
}
@shared_task
def summarize_import_results(results):
"""匯總導入結果"""
total_created = sum(r['created'] for r in results)
total_updated = sum(r['updated'] for r in results)
total_errors = sum(r['errors'] for r in results)
# 發送郵件通知
send_mail(
subject='用戶批量導入完成',
message=f'創建:{total_created}\n'
f'更新:{total_updated}\n'
f'錯誤:{total_errors}',
recipient_list=['admin@example.com']
)
return {
'total_created': total_created,
'total_updated': total_updated,
'total_errors': total_errors
}
@shared_task
def bulk_import_users(csv_file_path):
"""批量導入用戶"""
import csv
# 讀取 CSV
with open(csv_file_path, 'r') as f:
reader = csv.DictReader(f)
all_users = list(reader)
# 分批處理(每批 1000 個用戶)
batch_size = 1000
batches = [
all_users[i:i + batch_size]
for i in range(0, len(all_users), batch_size)
]
# 使用 chord:並行處理所有批次,完成後執行匯總
callback = summarize_import_results.s()
header = [process_user_batch.s(batch) for batch in batches]
result = chord(header)(callback)
return {
'task_id': result.id,
'total_users': len(all_users),
'batches': len(batches)
}7. 最佳實踐與優化
1. 冪等性設計
# ✅ 確保任務可以安全重試
@shared_task(bind=True, max_retries=3)
def create_order_invoice(self, order_id):
"""創建訂單發票(冪等)"""
from orders.models import Order, Invoice
order = Order.objects.get(id=order_id)
# 檢查是否已經創建過
if Invoice.objects.filter(order=order).exists():
return 'Invoice already exists'
# 創建發票
invoice = Invoice.objects.create(
order=order,
amount=order.total,
# ...
)
return f'Invoice {invoice.id} created'2. 任務超時設置
# 設置任務超時
@shared_task(time_limit=300, soft_time_limit=240)
def long_running_task():
"""長時間運行的任務"""
try:
# 處理任務
process_data()
except SoftTimeLimitExceeded:
# 軟超時:清理資源
cleanup()
raise3. 任務優先級
# 配置多個隊列
# settings.py
CELERY_TASK_ROUTES = {
'tasks.send_email': {'queue': 'high_priority'},
'tasks.generate_report': {'queue': 'low_priority'},
}
# 啟動不同優先級的 Worker
# celery -A myproject worker -Q high_priority --concurrency=10
# celery -A myproject worker -Q low_priority --concurrency=24. 錯誤處理和告警
from celery.signals import task_failure
from django.core.mail import mail_admins
@task_failure.connect
def handle_task_failure(sender, task_id, exception, **kwargs):
"""任務失敗時發送告警"""
mail_admins(
subject=f'Celery Task Failed: {sender.name}',
message=f'Task ID: {task_id}\nException: {exception}'
)5. 監控和日誌
# tasks.py
import logging
import time
logger = logging.getLogger(__name__)
@shared_task(bind=True)
def monitored_task(self):
"""帶監控的任務"""
start_time = time.time()
try:
# 記錄開始
logger.info(f'Task {self.request.id} started')
# 執行任務
result = perform_work()
# 記錄完成
duration = time.time() - start_time
logger.info(f'Task {self.request.id} completed in {duration:.2f}s')
return result
except Exception as e:
logger.error(f'Task {self.request.id} failed: {e}', exc_info=True)
raise小結
本章展示了 7 個 Celery 實戰案例:
- 郵件發送系統:異步發送、批量發送、重試機制
- 報表生成系統:進度追蹤、大文件處理
- 圖片處理系統:鏈式任務、並行處理
- 定時任務:Celery Beat 配置、多種調度策略
- 數據同步系統:外部 API 集成、錯誤重試
- 批量數據處理:分批處理、結果匯總
- 最佳實踐:冪等性、超時、優先級、監控
關鍵要點:
- ✅ 使用異步任務提升用戶體驗
- ✅ 合理設置重試機制和超時時間
- ✅ 分批處理大量數據
- ✅ 使用 chord/chain 組合複雜工作流
- ✅ 完善的監控和錯誤處理
下一章將探討 Channels 的基礎概念和實時通信功能!