Django 面試準備 06-2:Celery 實戰案例

7 個真實場景的 Celery 應用與最佳實踐

06-2. Celery 實戰案例

本章通過 7 個真實場景,展示 Celery 在生產環境中的實際應用。


1. 案例一:郵件發送系統

場景描述

電商平台需要發送多種郵件:訂單確認、發貨通知、促銷活動等。每天發送 10 萬封郵件。

問題分析

# ❌ 同步發送郵件的問題
def create_order(request):
    order = Order.objects.create(...)

    # 同步發送郵件,阻塞 2-3 秒
    send_mail(
        subject=f'訂單 {order.id} 已確認',
        message='...',
        recipient_list=[order.user.email]
    )

    return JsonResponse({'order_id': order.id})

# 問題:
# - 用戶等待 2-3 秒才能看到響應
# - SMTP 服務器故障會導致訂單創建失敗
# - 高峰期可能超時

Celery 解決方案

步驟 1:定義郵件任務

# tasks.py
from celery import shared_task
from django.core.mail import send_mail, EmailMultiAlternatives
from django.template.loader import render_to_string
from django.conf import settings
import logging

logger = logging.getLogger(__name__)

@shared_task(bind=True, max_retries=3)
def send_order_confirmation_email(self, order_id):
    """發送訂單確認郵件"""
    try:
        from orders.models import Order

        order = Order.objects.select_related('user').get(id=order_id)

        # 渲染 HTML 郵件
        html_content = render_to_string('emails/order_confirmation.html', {
            'order': order,
            'user': order.user,
        })

        # 創建郵件
        email = EmailMultiAlternatives(
            subject=f'訂單 {order.id} 已確認',
            body=f'您的訂單總金額:${order.total}',
            from_email=settings.DEFAULT_FROM_EMAIL,
            to=[order.user.email]
        )
        email.attach_alternative(html_content, "text/html")

        # 發送
        email.send()

        logger.info(f'Order confirmation email sent: order_id={order_id}')
        return f'Email sent to {order.user.email}'

    except Exception as exc:
        logger.error(f'Failed to send email: order_id={order_id}, error={exc}')
        # 30 秒後重試
        raise self.retry(exc=exc, countdown=30)

@shared_task(bind=True, max_retries=3)
def send_shipping_notification(self, order_id, tracking_number):
    """發送發貨通知"""
    try:
        from orders.models import Order

        order = Order.objects.select_related('user').get(id=order_id)

        html_content = render_to_string('emails/shipping_notification.html', {
            'order': order,
            'tracking_number': tracking_number,
        })

        email = EmailMultiAlternatives(
            subject=f'訂單 {order.id} 已發貨',
            body=f'您的包裹追蹤號:{tracking_number}',
            from_email=settings.DEFAULT_FROM_EMAIL,
            to=[order.user.email]
        )
        email.attach_alternative(html_content, "text/html")
        email.send()

        return f'Shipping notification sent: {tracking_number}'

    except Exception as exc:
        raise self.retry(exc=exc, countdown=30)

@shared_task
def send_bulk_promotional_email(user_ids, subject, template_name, context):
    """批量發送促銷郵件"""
    from django.contrib.auth.models import User

    users = User.objects.filter(id__in=user_ids)

    sent_count = 0
    failed_count = 0

    for user in users:
        try:
            html_content = render_to_string(template_name, {
                **context,
                'user': user,
            })

            email = EmailMultiAlternatives(
                subject=subject,
                body='查看 HTML 版本',
                from_email=settings.DEFAULT_FROM_EMAIL,
                to=[user.email]
            )
            email.attach_alternative(html_content, "text/html")
            email.send()

            sent_count += 1

        except Exception as e:
            logger.error(f'Failed to send to {user.email}: {e}')
            failed_count += 1

    return {
        'sent': sent_count,
        'failed': failed_count,
        'total': len(user_ids)
    }

步驟 2:在 View 中使用

# views.py
from .tasks import send_order_confirmation_email, send_shipping_notification

def create_order(request):
    """創建訂單"""
    order = Order.objects.create(
        user=request.user,
        total=calculate_total(request.data['items'])
    )

    # 異步發送郵件
    send_order_confirmation_email.delay(order.id)

    return JsonResponse({
        'order_id': order.id,
        'status': 'created'
    })

def mark_order_shipped(request, order_id):
    """標記訂單已發貨"""
    order = Order.objects.get(id=order_id)
    order.status = 'shipped'
    order.tracking_number = request.data['tracking_number']
    order.save()

    # 異步發送發貨通知
    send_shipping_notification.delay(order.id, order.tracking_number)

    return JsonResponse({'status': 'ok'})

步驟 3:批量發送促銷郵件

# management/commands/send_promotion.py
from django.core.management.base import BaseCommand
from django.contrib.auth.models import User
from tasks import send_bulk_promotional_email

class Command(BaseCommand):
    help = '發送促銷郵件給活躍用戶'

    def handle(self, *args, **options):
        # 獲取最近 30 天活躍的用戶
        active_users = User.objects.filter(
            last_login__gte=timezone.now() - timedelta(days=30),
            is_active=True
        ).values_list('id', flat=True)

        # 分批發送(每批 1000 個用戶)
        batch_size = 1000
        user_batches = [
            list(active_users[i:i + batch_size])
            for i in range(0, len(active_users), batch_size)
        ]

        for batch in user_batches:
            send_bulk_promotional_email.delay(
                user_ids=batch,
                subject='雙11大促銷!',
                template_name='emails/promotion.html',
                context={'discount': 50}
            )

        self.stdout.write(
            self.style.SUCCESS(
                f'Queued {len(user_batches)} batches, '
                f'total {len(active_users)} users'
            )
        )

優化技巧

# 使用連接池優化 SMTP 連接
from celery import shared_task
from django.core.mail import get_connection

@shared_task
def send_bulk_emails_optimized(email_list):
    """使用連接池批量發送"""
    # 重用 SMTP 連接
    connection = get_connection()
    connection.open()

    try:
        for email_data in email_list:
            email = EmailMultiAlternatives(
                subject=email_data['subject'],
                body=email_data['body'],
                from_email=settings.DEFAULT_FROM_EMAIL,
                to=[email_data['to']],
                connection=connection  # 重用連接
            )
            email.send()

    finally:
        connection.close()

2. 案例二:報表生成系統

場景描述

企業需要生成各種報表:銷售報表、用戶統計、財務報表等。報表生成耗時 10-60 秒。

Celery 解決方案

# tasks.py
from celery import shared_task
from django.core.files.base import ContentFile
from django.db.models import Sum, Count
import pandas as pd
from datetime import datetime, timedelta
import io

@shared_task(bind=True)
def generate_sales_report(self, start_date, end_date, format='xlsx'):
    """生成銷售報表"""
    from orders.models import Order

    # 更新任務狀態
    self.update_state(
        state='PROGRESS',
        meta={'current': 0, 'total': 100, 'status': '查詢數據中...'}
    )

    # 查詢數據
    orders = Order.objects.filter(
        created_at__range=[start_date, end_date]
    ).select_related('user').prefetch_related('items')

    # 統計數據
    self.update_state(
        state='PROGRESS',
        meta={'current': 30, 'total': 100, 'status': '統計數據中...'}
    )

    stats = {
        'total_orders': orders.count(),
        'total_revenue': orders.aggregate(Sum('total'))['total__sum'] or 0,
        'avg_order_value': orders.aggregate(Sum('total'))['total__sum'] / orders.count(),
    }

    # 按日期分組
    daily_stats = orders.extra(
        select={'date': 'DATE(created_at)'}
    ).values('date').annotate(
        orders=Count('id'),
        revenue=Sum('total')
    ).order_by('date')

    # 生成 Excel
    self.update_state(
        state='PROGRESS',
        meta={'current': 60, 'total': 100, 'status': '生成報表中...'}
    )

    # 創建 DataFrame
    df = pd.DataFrame(list(daily_stats))

    # 生成 Excel 文件
    buffer = io.BytesIO()
    with pd.ExcelWriter(buffer, engine='openpyxl') as writer:
        # 每日統計
        df.to_excel(writer, sheet_name='每日統計', index=False)

        # 摘要統計
        summary_df = pd.DataFrame([stats])
        summary_df.to_excel(writer, sheet_name='摘要', index=False)

    buffer.seek(0)

    # 保存到文件
    self.update_state(
        state='PROGRESS',
        meta={'current': 90, 'total': 100, 'status': '保存文件中...'}
    )

    filename = f'sales_report_{start_date}_{end_date}.xlsx'
    file_path = f'reports/{filename}'

    # 上傳到 S3(或保存到本地)
    from django.core.files.storage import default_storage
    default_storage.save(file_path, ContentFile(buffer.getvalue()))

    file_url = default_storage.url(file_path)

    return {
        'status': 'completed',
        'file_url': file_url,
        'stats': stats
    }

@shared_task(bind=True)
def generate_user_analytics(self, year, month):
    """生成用戶分析報表"""
    from django.contrib.auth.models import User
    from orders.models import Order

    self.update_state(state='PROGRESS', meta={'status': '分析用戶數據...'})

    # 用戶註冊趨勢
    users = User.objects.filter(
        date_joined__year=year,
        date_joined__month=month
    )

    # 用戶活躍度分析
    active_users = users.filter(
        last_login__gte=timezone.now() - timedelta(days=30)
    )

    # 用戶購買行為
    user_orders = Order.objects.filter(
        created_at__year=year,
        created_at__month=month
    ).values('user_id').annotate(
        order_count=Count('id'),
        total_spent=Sum('total')
    )

    # 生成報表
    data = {
        'total_users': users.count(),
        'active_users': active_users.count(),
        'active_rate': active_users.count() / users.count() * 100,
        'avg_orders_per_user': user_orders.aggregate(
            avg=Sum('order_count')
        )['avg'],
    }

    # 生成 PDF 報表(使用 ReportLab)
    from reportlab.pdfgen import canvas
    from reportlab.lib.pagesizes import A4

    buffer = io.BytesIO()
    p = canvas.Canvas(buffer, pagesize=A4)

    # 繪製報表內容
    p.drawString(100, 800, f'用戶分析報表 - {year}/{month}')
    p.drawString(100, 780, f'總用戶數:{data["total_users"]}')
    p.drawString(100, 760, f'活躍用戶:{data["active_users"]}')
    p.drawString(100, 740, f'活躍率:{data["active_rate"]:.2f}%')

    p.showPage()
    p.save()

    buffer.seek(0)

    # 保存文件
    filename = f'user_analytics_{year}_{month}.pdf'
    file_path = f'reports/{filename}'
    default_storage.save(file_path, ContentFile(buffer.getvalue()))

    return {
        'file_url': default_storage.url(file_path),
        'data': data
    }

前端進度顯示

# views.py
from celery.result import AsyncResult

def request_sales_report(request):
    """請求生成報表"""
    start_date = request.GET['start_date']
    end_date = request.GET['end_date']

    # 發送任務
    task = generate_sales_report.delay(start_date, end_date)

    return JsonResponse({
        'task_id': task.id,
        'status': 'processing'
    })

def check_report_status(request, task_id):
    """檢查報表生成進度"""
    result = AsyncResult(task_id)

    if result.state == 'PROGRESS':
        response = {
            'state': result.state,
            'current': result.info.get('current', 0),
            'total': result.info.get('total', 1),
            'status': result.info.get('status', '')
        }
    elif result.state == 'SUCCESS':
        response = {
            'state': result.state,
            'result': result.result
        }
    else:
        response = {
            'state': result.state,
            'status': str(result.info)
        }

    return JsonResponse(response)
// 前端輪詢進度
async function generateReport() {
    // 請求生成報表
    const response = await fetch('/api/reports/sales/');
    const data = await response.json();
    const taskId = data.task_id;

    // 輪詢進度
    const interval = setInterval(async () => {
        const statusResponse = await fetch(`/api/reports/status/${taskId}/`);
        const status = await statusResponse.json();

        if (status.state === 'PROGRESS') {
            // 更新進度條
            const progress = (status.current / status.total) * 100;
            updateProgressBar(progress, status.status);
        } else if (status.state === 'SUCCESS') {
            clearInterval(interval);
            // 顯示下載連結
            showDownloadLink(status.result.file_url);
        } else if (status.state === 'FAILURE') {
            clearInterval(interval);
            showError('報表生成失敗');
        }
    }, 1000);  // 每秒檢查一次
}

3. 案例三:圖片處理系統

場景描述

社交媒體平台需要處理用戶上傳的圖片:生成縮圖、添加浮水印、轉換格式等。

Celery 解決方案

# tasks.py
from celery import shared_task, chain
from PIL import Image, ImageDraw, ImageFont
import os

@shared_task
def resize_image(image_path, sizes):
    """調整圖片大小"""
    try:
        img = Image.open(image_path)

        results = []
        for name, size in sizes.items():
            # 調整大小(保持比例)
            img_copy = img.copy()
            img_copy.thumbnail(size, Image.Resampling.LANCZOS)

            # 保存
            filename = f"{os.path.splitext(image_path)[0]}_{name}.jpg"
            img_copy.save(filename, 'JPEG', quality=85)

            results.append({
                'name': name,
                'path': filename,
                'size': size
            })

        return results

    except Exception as e:
        raise Exception(f'Failed to resize image: {e}')

@shared_task
def add_watermark(image_path, watermark_text):
    """添加浮水印"""
    try:
        img = Image.open(image_path)

        # 創建透明層
        watermark = Image.new('RGBA', img.size, (0, 0, 0, 0))
        draw = ImageDraw.Draw(watermark)

        # 設置字體
        font_size = int(img.size[0] / 20)
        font = ImageFont.truetype('arial.ttf', font_size)

        # 計算文字位置(右下角)
        text_bbox = draw.textbbox((0, 0), watermark_text, font=font)
        text_width = text_bbox[2] - text_bbox[0]
        text_height = text_bbox[3] - text_bbox[1]

        x = img.size[0] - text_width - 10
        y = img.size[1] - text_height - 10

        # 繪製浮水印
        draw.text((x, y), watermark_text, fill=(255, 255, 255, 128), font=font)

        # 合併圖層
        img = img.convert('RGBA')
        img = Image.alpha_composite(img, watermark)

        # 保存
        output_path = f"{os.path.splitext(image_path)[0]}_watermarked.png"
        img.save(output_path, 'PNG')

        return output_path

    except Exception as e:
        raise Exception(f'Failed to add watermark: {e}')

@shared_task
def optimize_image(image_path):
    """優化圖片(壓縮)"""
    try:
        img = Image.open(image_path)

        # 轉換為 RGB(去除 alpha 通道)
        if img.mode in ('RGBA', 'LA'):
            background = Image.new('RGB', img.size, (255, 255, 255))
            background.paste(img, mask=img.split()[-1])
            img = background

        # 壓縮保存
        output_path = f"{os.path.splitext(image_path)[0]}_optimized.jpg"
        img.save(output_path, 'JPEG', quality=75, optimize=True)

        return output_path

    except Exception as e:
        raise Exception(f'Failed to optimize image: {e}')

@shared_task
def process_image_pipeline(image_path):
    """完整的圖片處理流程"""
    # 使用 chain 串接任務
    workflow = chain(
        resize_image.s(image_path, {
            'thumbnail': (150, 150),
            'medium': (800, 600),
            'large': (1920, 1080)
        }),
        add_watermark.s('© MyApp 2025'),
        optimize_image.s()
    )

    return workflow.apply_async()

在 View 中使用

# views.py
from django.core.files.storage import default_storage

def upload_image(request):
    """上傳圖片"""
    if request.method == 'POST':
        image = request.FILES['image']

        # 保存原始圖片
        filename = default_storage.save(f'uploads/{image.name}', image)
        file_path = default_storage.path(filename)

        # 異步處理圖片
        task = process_image_pipeline.delay(file_path)

        return JsonResponse({
            'task_id': task.id,
            'message': '圖片上傳成功,正在處理中...'
        })

4. 案例四:定時任務(Celery Beat)

場景描述

系統需要定期執行任務:每天生成報表、每小時清理過期數據、每週發送摘要郵件等。

配置 Celery Beat

# settings.py
from celery.schedules import crontab

# Celery Beat 配置
CELERY_BEAT_SCHEDULE = {
    # 每天凌晨 2 點生成報表
    'generate-daily-report': {
        'task': 'tasks.generate_daily_report',
        'schedule': crontab(hour=2, minute=0),
    },

    # 每小時清理過期 session
    'cleanup-expired-sessions': {
        'task': 'tasks.cleanup_expired_sessions',
        'schedule': crontab(minute=0),  # 每小時的第 0 分鐘
    },

    # 每週一早上 9 點發送週報
    'send-weekly-summary': {
        'task': 'tasks.send_weekly_summary',
        'schedule': crontab(day_of_week=1, hour=9, minute=0),
    },

    # 每 30 秒檢查訂單支付狀態
    'check-payment-status': {
        'task': 'tasks.check_payment_status',
        'schedule': 30.0,  # 每 30 秒
    },

    # 每天凌晨 3 點備份資料庫
    'backup-database': {
        'task': 'tasks.backup_database',
        'schedule': crontab(hour=3, minute=0),
        'options': {'expires': 3600}  # 1 小時內必須執行
    },
}

定時任務實現

# tasks.py
from celery import shared_task
from django.utils import timezone
from datetime import timedelta

@shared_task
def generate_daily_report():
    """生成每日報表"""
    yesterday = timezone.now().date() - timedelta(days=1)

    from orders.models import Order

    # 統計昨天的數據
    orders = Order.objects.filter(
        created_at__date=yesterday
    )

    stats = {
        'date': str(yesterday),
        'total_orders': orders.count(),
        'total_revenue': orders.aggregate(Sum('total'))['total__sum'] or 0,
    }

    # 發送郵件給管理員
    send_mail(
        subject=f'每日報表 - {yesterday}',
        message=f"訂單數:{stats['total_orders']}\n"
                f"總營收:${stats['total_revenue']}",
        recipient_list=['admin@example.com']
    )

    return stats

@shared_task
def cleanup_expired_sessions():
    """清理過期的 Session"""
    from django.contrib.sessions.models import Session

    # 刪除過期的 session
    deleted = Session.objects.filter(
        expire_date__lt=timezone.now()
    ).delete()

    return f'Deleted {deleted[0]} expired sessions'

@shared_task
def send_weekly_summary():
    """發送週報"""
    from django.contrib.auth.models import User

    # 獲取上週數據
    last_week_start = timezone.now().date() - timedelta(days=7)
    last_week_end = timezone.now().date()

    users = User.objects.filter(
        is_active=True,
        email__isnull=False
    )

    for user in users:
        # 獲取用戶的訂單數據
        orders = user.orders.filter(
            created_at__range=[last_week_start, last_week_end]
        )

        if orders.exists():
            # 發送個性化週報
            send_mail(
                subject='您的本週購物摘要',
                message=f'本週您完成了 {orders.count()} 筆訂單',
                recipient_list=[user.email]
            )

    return f'Sent weekly summary to {users.count()} users'

@shared_task
def check_payment_status():
    """檢查訂單支付狀態"""
    from orders.models import Order

    # 查找待支付超過 30 分鐘的訂單
    expired_time = timezone.now() - timedelta(minutes=30)

    pending_orders = Order.objects.filter(
        status='pending',
        created_at__lt=expired_time
    )

    cancelled_count = 0
    for order in pending_orders:
        # 取消訂單
        order.status = 'cancelled'
        order.save()

        # 發送通知
        send_mail(
            subject=f'訂單 {order.id} 已取消',
            message='您的訂單因超時未支付已自動取消',
            recipient_list=[order.user.email]
        )

        cancelled_count += 1

    return f'Cancelled {cancelled_count} expired orders'

@shared_task
def backup_database():
    """備份資料庫"""
    import subprocess
    from django.conf import settings

    timestamp = timezone.now().strftime('%Y%m%d_%H%M%S')
    backup_file = f'/backups/db_backup_{timestamp}.sql'

    # 執行 pg_dump
    command = [
        'pg_dump',
        '-h', settings.DATABASES['default']['HOST'],
        '-U', settings.DATABASES['default']['USER'],
        '-d', settings.DATABASES['default']['NAME'],
        '-f', backup_file
    ]

    result = subprocess.run(command, capture_output=True, text=True)

    if result.returncode == 0:
        # 上傳到 S3
        upload_to_s3(backup_file)
        return f'Database backed up: {backup_file}'
    else:
        raise Exception(f'Backup failed: {result.stderr}')

啟動 Celery Beat

# 開發環境
celery -A myproject beat --loglevel=info

# 生產環境(使用 systemd)
# /etc/systemd/system/celery-beat.service
[Unit]
Description=Celery Beat
After=network.target

[Service]
Type=simple
User=www-data
Group=www-data
WorkingDirectory=/var/www/myproject
ExecStart=/var/www/myproject/venv/bin/celery -A myproject beat \
    --loglevel=info \
    --pidfile=/var/run/celery/beat.pid

[Install]
WantedBy=multi-user.target

5. 案例五:數據同步系統

場景描述

需要定期同步外部系統的數據:同步產品信息、更新庫存、同步用戶資料等。

Celery 解決方案

# tasks.py
from celery import shared_task, group
from django.db import transaction
import requests

@shared_task(bind=True, max_retries=5)
def sync_product_from_erp(self, product_id):
    """從 ERP 系統同步單個產品"""
    try:
        # 呼叫 ERP API
        response = requests.get(
            f'https://erp.example.com/api/products/{product_id}',
            timeout=10
        )
        response.raise_for_status()
        data = response.json()

        # 更新本地數據
        from products.models import Product

        with transaction.atomic():
            product, created = Product.objects.update_or_create(
                erp_id=product_id,
                defaults={
                    'name': data['name'],
                    'price': data['price'],
                    'stock': data['stock'],
                    'description': data['description'],
                    'last_synced': timezone.now()
                }
            )

        action = 'created' if created else 'updated'
        return f'Product {product_id} {action}'

    except requests.exceptions.RequestException as exc:
        # 指數退避重試:5秒、25秒、125秒...
        countdown = 5 ** self.request.retries
        raise self.retry(exc=exc, countdown=countdown)

@shared_task
def sync_all_products():
    """同步所有產品"""
    # 獲取需要同步的產品 ID 列表
    response = requests.get('https://erp.example.com/api/products/')
    product_ids = response.json()['product_ids']

    # 並行同步
    job = group(
        sync_product_from_erp.s(product_id)
        for product_id in product_ids
    )

    result = job.apply_async()

    return f'Queued sync for {len(product_ids)} products'

@shared_task
def sync_inventory():
    """同步庫存"""
    from products.models import Product

    # 批量獲取庫存信息
    response = requests.get('https://erp.example.com/api/inventory/')
    inventory_data = response.json()

    updated_count = 0

    with transaction.atomic():
        for item in inventory_data:
            Product.objects.filter(erp_id=item['product_id']).update(
                stock=item['stock'],
                last_synced=timezone.now()
            )
            updated_count += 1

    return f'Updated inventory for {updated_count} products'

定時同步配置

# settings.py
CELERY_BEAT_SCHEDULE = {
    # 每小時同步產品信息
    'sync-products': {
        'task': 'tasks.sync_all_products',
        'schedule': crontab(minute=0),
    },

    # 每 15 分鐘同步庫存
    'sync-inventory': {
        'task': 'tasks.sync_inventory',
        'schedule': crontab(minute='*/15'),  # 每 15 分鐘
    },
}

6. 案例六:批量數據處理

場景描述

需要處理大量數據:批量導入用戶、批量更新價格、批量發送通知等。

Celery 解決方案

# tasks.py
from celery import shared_task, chord
from django.db import transaction

@shared_task
def process_user_batch(user_data_batch):
    """處理一批用戶數據"""
    from django.contrib.auth.models import User

    created_count = 0
    updated_count = 0
    error_count = 0

    with transaction.atomic():
        for user_data in user_data_batch:
            try:
                user, created = User.objects.update_or_create(
                    email=user_data['email'],
                    defaults={
                        'username': user_data['username'],
                        'first_name': user_data['first_name'],
                        'last_name': user_data['last_name'],
                    }
                )

                if created:
                    created_count += 1
                else:
                    updated_count += 1

            except Exception as e:
                error_count += 1
                logger.error(f"Failed to process user {user_data['email']}: {e}")

    return {
        'created': created_count,
        'updated': updated_count,
        'errors': error_count
    }

@shared_task
def summarize_import_results(results):
    """匯總導入結果"""
    total_created = sum(r['created'] for r in results)
    total_updated = sum(r['updated'] for r in results)
    total_errors = sum(r['errors'] for r in results)

    # 發送郵件通知
    send_mail(
        subject='用戶批量導入完成',
        message=f'創建:{total_created}\n'
                f'更新:{total_updated}\n'
                f'錯誤:{total_errors}',
        recipient_list=['admin@example.com']
    )

    return {
        'total_created': total_created,
        'total_updated': total_updated,
        'total_errors': total_errors
    }

@shared_task
def bulk_import_users(csv_file_path):
    """批量導入用戶"""
    import csv

    # 讀取 CSV
    with open(csv_file_path, 'r') as f:
        reader = csv.DictReader(f)
        all_users = list(reader)

    # 分批處理(每批 1000 個用戶)
    batch_size = 1000
    batches = [
        all_users[i:i + batch_size]
        for i in range(0, len(all_users), batch_size)
    ]

    # 使用 chord:並行處理所有批次,完成後執行匯總
    callback = summarize_import_results.s()
    header = [process_user_batch.s(batch) for batch in batches]

    result = chord(header)(callback)

    return {
        'task_id': result.id,
        'total_users': len(all_users),
        'batches': len(batches)
    }

7. 最佳實踐與優化

1. 冪等性設計

# ✅ 確保任務可以安全重試
@shared_task(bind=True, max_retries=3)
def create_order_invoice(self, order_id):
    """創建訂單發票(冪等)"""
    from orders.models import Order, Invoice

    order = Order.objects.get(id=order_id)

    # 檢查是否已經創建過
    if Invoice.objects.filter(order=order).exists():
        return 'Invoice already exists'

    # 創建發票
    invoice = Invoice.objects.create(
        order=order,
        amount=order.total,
        # ...
    )

    return f'Invoice {invoice.id} created'

2. 任務超時設置

# 設置任務超時
@shared_task(time_limit=300, soft_time_limit=240)
def long_running_task():
    """長時間運行的任務"""
    try:
        # 處理任務
        process_data()
    except SoftTimeLimitExceeded:
        # 軟超時:清理資源
        cleanup()
        raise

3. 任務優先級

# 配置多個隊列
# settings.py
CELERY_TASK_ROUTES = {
    'tasks.send_email': {'queue': 'high_priority'},
    'tasks.generate_report': {'queue': 'low_priority'},
}

# 啟動不同優先級的 Worker
# celery -A myproject worker -Q high_priority --concurrency=10
# celery -A myproject worker -Q low_priority --concurrency=2

4. 錯誤處理和告警

from celery.signals import task_failure
from django.core.mail import mail_admins

@task_failure.connect
def handle_task_failure(sender, task_id, exception, **kwargs):
    """任務失敗時發送告警"""
    mail_admins(
        subject=f'Celery Task Failed: {sender.name}',
        message=f'Task ID: {task_id}\nException: {exception}'
    )

5. 監控和日誌

# tasks.py
import logging
import time

logger = logging.getLogger(__name__)

@shared_task(bind=True)
def monitored_task(self):
    """帶監控的任務"""
    start_time = time.time()

    try:
        # 記錄開始
        logger.info(f'Task {self.request.id} started')

        # 執行任務
        result = perform_work()

        # 記錄完成
        duration = time.time() - start_time
        logger.info(f'Task {self.request.id} completed in {duration:.2f}s')

        return result

    except Exception as e:
        logger.error(f'Task {self.request.id} failed: {e}', exc_info=True)
        raise

小結

本章展示了 7 個 Celery 實戰案例:

  1. 郵件發送系統:異步發送、批量發送、重試機制
  2. 報表生成系統:進度追蹤、大文件處理
  3. 圖片處理系統:鏈式任務、並行處理
  4. 定時任務:Celery Beat 配置、多種調度策略
  5. 數據同步系統:外部 API 集成、錯誤重試
  6. 批量數據處理:分批處理、結果匯總
  7. 最佳實踐:冪等性、超時、優先級、監控

關鍵要點:

  • ✅ 使用異步任務提升用戶體驗
  • ✅ 合理設置重試機制和超時時間
  • ✅ 分批處理大量數據
  • ✅ 使用 chord/chain 組合複雜工作流
  • ✅ 完善的監控和錯誤處理

下一章將探討 Channels 的基礎概念和實時通信功能!

0%