Django 面試精華:asyncio 在 Django 中的使用

掌握 Python 異步編程在 Django 中的實戰技巧

前言

asyncio 是 Python 3.4+ 引入的異步 I/O 框架,Django 3.0+ 開始原生支持 asyncio。掌握 asyncio 在 Django 中的使用,可以顯著提升應用性能。

傳統同步代碼的問題

def get_user_data(user_id):
    # 串行執行,總耗時累加
    user = fetch_user_service(user_id)           # 100ms
    orders = fetch_order_service(user_id)        # 150ms
    notifications = fetch_notification_service(user_id)  # 120ms
    # 總耗時:370ms

    return {
        'user': user,
        'orders': orders,
        'notifications': notifications,
    }

使用 asyncio 的優勢

async def get_user_data(user_id):
    # 並行執行,總耗時取最慢的那個
    user, orders, notifications = await asyncio.gather(
        fetch_user_service(user_id),           # 100ms
        fetch_order_service(user_id),          # 150ms
        fetch_notification_service(user_id),   # 120ms
    )
    # 總耗時:150ms(取最慢的那個)
    # 性能提升:59%

    return {
        'user': user,
        'orders': orders,
        'notifications': notifications,
    }

這篇文章將深入探討 asyncio 在 Django 中的實戰應用,包括任務調度、超時控制、錯誤處理等高級技巧。


1. asyncio 基礎回顧

1.1 核心概念

概念說明示例
協程 (Coroutine)使用 async def 定義的函數async def foo()
await掛起協程,等待另一個協程完成await foo()
事件循環 (Event Loop)管理和調度協程執行asyncio.get_event_loop()
任務 (Task)封裝協程的可調度單元asyncio.create_task()
Future代表未來的結果asyncio.Future()

1.2 基本語法

import asyncio

# 定義協程
async def fetch_data(url):
    print(f"開始獲取: {url}")
    await asyncio.sleep(1)  # 模擬 I/O 操作
    print(f"完成獲取: {url}")
    return f"Data from {url}"


# 執行協程(在 Django 異步視圖中不需要手動運行事件循環)
async def main():
    result = await fetch_data("https://api.example.com")
    print(result)


# 在普通 Python 腳本中運行
if __name__ == "__main__":
    asyncio.run(main())

1.3 在 Django 中使用 asyncio

Django 異步視圖自動運行在事件循環中:

# Django 異步視圖
from django.http import JsonResponse
import asyncio

async def my_async_view(request):
    # 已經在事件循環中,可以直接使用 await
    result = await fetch_data("https://api.example.com")
    return JsonResponse({'result': result})

    # ❌ 不要這樣做
    # asyncio.run(fetch_data(...))  # 會創建新的事件循環,導致錯誤

2. asyncio.gather() - 並行執行

2.1 基本用法

asyncio.gather() 是最常用的並行執行工具:

import asyncio
from django.http import JsonResponse

async def fetch_user_dashboard(request, user_id):
    """並行獲取用戶儀表板數據"""

    # 方式 1:傳遞協程
    results = await asyncio.gather(
        fetch_user(user_id),
        fetch_orders(user_id),
        fetch_notifications(user_id),
    )
    user, orders, notifications = results

    # 方式 2:解包結果
    user, orders, notifications = await asyncio.gather(
        fetch_user(user_id),
        fetch_orders(user_id),
        fetch_notifications(user_id),
    )

    return JsonResponse({
        'user': user,
        'orders': orders,
        'notifications': notifications,
    })


async def fetch_user(user_id):
    """獲取用戶信息(模擬)"""
    await asyncio.sleep(0.1)  # 模擬 I/O 延遲
    return {'id': user_id, 'name': 'Alice'}


async def fetch_orders(user_id):
    """獲取訂單列表(模擬)"""
    await asyncio.sleep(0.15)
    return [{'id': 1, 'total': 100}, {'id': 2, 'total': 200}]


async def fetch_notifications(user_id):
    """獲取通知列表(模擬)"""
    await asyncio.sleep(0.12)
    return [{'id': 1, 'message': 'New order'}]

執行時序

時間軸 (ms)
0  ─┬─ fetch_user() 開始
    ├─ fetch_orders() 開始
    └─ fetch_notifications() 開始

100 ─ fetch_user() 完成 ✓

120 ─ fetch_notifications() 完成 ✓

150 ─ fetch_orders() 完成 ✓
    └─ 所有任務完成,返回結果

總耗時:150ms(不是 100 + 150 + 120 = 370ms)

2.2 錯誤處理

return_exceptions=True:讓單個任務失敗不影響其他任務

async def fetch_dashboard_safe(request, user_id):
    """容錯的並行請求"""

    results = await asyncio.gather(
        fetch_user(user_id),
        fetch_orders(user_id),
        fetch_notifications(user_id),
        fetch_external_api(user_id),  # 可能失敗
        return_exceptions=True,  # 關鍵參數
    )

    user, orders, notifications, external = results

    # 檢查每個結果
    if isinstance(user, Exception):
        return JsonResponse({'error': 'User not found'}, status=404)

    if isinstance(orders, Exception):
        orders = []  # 使用默認值

    if isinstance(notifications, Exception):
        notifications = []

    if isinstance(external, Exception):
        external = {'error': str(external)}  # 記錄錯誤,不中斷

    return JsonResponse({
        'user': user,
        'orders': orders,
        'notifications': notifications,
        'external': external,
    })


async def fetch_external_api(user_id):
    """可能失敗的外部 API 調用"""
    await asyncio.sleep(0.5)
    raise Exception("External API timeout")  # 模擬錯誤

3. asyncio.create_task() - 後台任務

3.1 創建後台任務

使用 create_task() 創建不阻塞主流程的後台任務:

import asyncio
from django.http import JsonResponse

async def create_order(request):
    """創建訂單,同時觸發後台任務"""

    # 創建訂單(主要業務邏輯)
    order = await Order.objects.acreate(
        user=request.user,
        total=100,
    )

    # 創建後台任務(不等待完成)
    asyncio.create_task(send_order_email(order.id))
    asyncio.create_task(update_inventory(order.id))
    asyncio.create_task(notify_seller(order.id))

    # 立即返回響應,不等待後台任務完成
    return JsonResponse({
        'order_id': order.id,
        'status': 'created',
    })


async def send_order_email(order_id):
    """發送訂單確認郵件(後台任務)"""
    await asyncio.sleep(2)  # 模擬郵件發送
    print(f"Email sent for order {order_id}")


async def update_inventory(order_id):
    """更新庫存(後台任務)"""
    await asyncio.sleep(1)
    print(f"Inventory updated for order {order_id}")


async def notify_seller(order_id):
    """通知賣家(後台任務)"""
    await asyncio.sleep(0.5)
    print(f"Seller notified for order {order_id}")

執行時序

請求到達
    ↓
創建訂單(50ms)
    ↓
啟動 3 個後台任務
    ├─ send_order_email() → 2 秒後完成
    ├─ update_inventory() → 1 秒後完成
    └─ notify_seller() → 0.5 秒後完成
    ↓
立即返回響應(不等待後台任務) ✓

用戶感知響應時間:50ms
實際任務執行時間:2 秒(在後台進行)

3.2 等待後台任務完成

如果需要等待後台任務完成:

async def create_order_and_wait(request):
    """創建訂單並等待關鍵後台任務完成"""

    # 創建訂單
    order = await Order.objects.acreate(
        user=request.user,
        total=100,
    )

    # 創建任務
    task1 = asyncio.create_task(send_order_email(order.id))
    task2 = asyncio.create_task(update_inventory(order.id))
    task3 = asyncio.create_task(notify_seller(order.id))

    # 等待關鍵任務完成(庫存更新)
    await task2

    # 其他任務在後台繼續執行
    # task1 和 task3 不等待

    return JsonResponse({
        'order_id': order.id,
        'status': 'created',
        'inventory_updated': True,
    })

4. asyncio.wait_for() - 超時控制

4.1 設置超時時間

使用 asyncio.wait_for() 為異步操作設置超時:

import asyncio
from django.http import JsonResponse

async def fetch_with_timeout(request):
    """帶超時的外部 API 調用"""

    try:
        # 設置 5 秒超時
        result = await asyncio.wait_for(
            fetch_external_api(),
            timeout=5.0
        )
        return JsonResponse({'data': result})

    except asyncio.TimeoutError:
        return JsonResponse({
            'error': 'Request timeout after 5 seconds'
        }, status=504)


async def fetch_external_api():
    """外部 API 調用(可能很慢)"""
    await asyncio.sleep(10)  # 模擬慢速 API
    return {'status': 'ok'}

4.2 部分超時控制

對並行任務設置不同的超時時間:

async def fetch_dashboard_with_timeouts(request, user_id):
    """為不同任務設置不同超時"""

    async def fetch_user_with_timeout():
        return await asyncio.wait_for(
            fetch_user(user_id),
            timeout=2.0  # 用戶信息:2 秒超時
        )

    async def fetch_orders_with_timeout():
        return await asyncio.wait_for(
            fetch_orders(user_id),
            timeout=5.0  # 訂單列表:5 秒超時
        )

    async def fetch_notifications_with_timeout():
        return await asyncio.wait_for(
            fetch_notifications(user_id),
            timeout=3.0  # 通知列表:3 秒超時
        )

    # 並行執行,各自有超時控制
    results = await asyncio.gather(
        fetch_user_with_timeout(),
        fetch_orders_with_timeout(),
        fetch_notifications_with_timeout(),
        return_exceptions=True,  # 超時不影響其他任務
    )

    user, orders, notifications = results

    # 處理超時
    if isinstance(user, asyncio.TimeoutError):
        return JsonResponse({'error': 'User data timeout'}, status=504)

    if isinstance(orders, asyncio.TimeoutError):
        orders = []  # 使用默認值

    if isinstance(notifications, asyncio.TimeoutError):
        notifications = []

    return JsonResponse({
        'user': user,
        'orders': orders,
        'notifications': notifications,
    })

5. asyncio.as_completed() - 按完成順序處理

5.1 基本用法

as_completed() 按完成順序返回結果,適合需要快速響應的場景:

import asyncio

async def fetch_multiple_apis(request):
    """調用多個 API,按完成順序處理結果"""

    # 創建多個任務
    tasks = [
        fetch_api('https://api1.example.com'),
        fetch_api('https://api2.example.com'),
        fetch_api('https://api3.example.com'),
    ]

    results = []

    # 按完成順序處理
    for coro in asyncio.as_completed(tasks):
        result = await coro
        results.append(result)
        print(f"收到結果: {result}")

    return JsonResponse({'results': results})


async def fetch_api(url):
    """獲取 API 數據"""
    import random
    delay = random.uniform(0.1, 0.5)
    await asyncio.sleep(delay)
    return {'url': url, 'delay': delay}

執行過程

啟動 3 個任務
    ↓
api2 完成(0.15s) → 處理結果 #1
    ↓
api1 完成(0.23s) → 處理結果 #2
    ↓
api3 完成(0.41s) → 處理結果 #3
    ↓
返回所有結果

5.2 實戰案例:多數據源搜索

async def search_multiple_sources(request):
    """從多個數據源搜索,返回最快的結果"""

    query = request.GET.get('q', '')

    # 創建多個搜索任務
    tasks = [
        search_database(query),
        search_elasticsearch(query),
        search_external_api(query),
    ]

    results = []
    timeout = 2.0  # 總超時 2 秒

    try:
        # 按完成順序收集結果,總超時 2 秒
        for coro in asyncio.as_completed(tasks, timeout=timeout):
            try:
                result = await coro
                results.extend(result)

                # 如果已經有足夠結果,可以提前返回
                if len(results) >= 10:
                    break

            except Exception as e:
                print(f"搜索源出錯: {e}")
                continue

    except asyncio.TimeoutError:
        print("搜索超時,返回已有結果")

    return JsonResponse({
        'query': query,
        'results': results,
        'count': len(results),
    })


async def search_database(query):
    """從數據庫搜索"""
    await asyncio.sleep(0.5)
    return [{'source': 'db', 'title': f'DB result for {query}'}]


async def search_elasticsearch(query):
    """從 Elasticsearch 搜索"""
    await asyncio.sleep(0.3)
    return [{'source': 'es', 'title': f'ES result for {query}'}]


async def search_external_api(query):
    """從外部 API 搜索"""
    await asyncio.sleep(0.8)
    return [{'source': 'api', 'title': f'API result for {query}'}]

6. asyncio.Queue - 生產者消費者模式

6.1 基本隊列操作

import asyncio
from django.http import JsonResponse

async def process_batch_orders(request):
    """批量處理訂單(生產者消費者模式)"""

    # 創建異步隊列
    queue = asyncio.Queue(maxsize=10)

    # 啟動消費者
    consumers = [
        asyncio.create_task(order_processor(queue, i))
        for i in range(3)  # 3 個消費者
    ]

    # 生產者:將訂單放入隊列
    order_ids = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    for order_id in order_ids:
        await queue.put(order_id)

    # 等待所有訂單處理完成
    await queue.join()

    # 停止消費者
    for _ in consumers:
        await queue.put(None)  # 發送停止信號

    await asyncio.gather(*consumers)

    return JsonResponse({
        'status': 'All orders processed',
        'count': len(order_ids),
    })


async def order_processor(queue, worker_id):
    """訂單處理器(消費者)"""
    while True:
        order_id = await queue.get()

        if order_id is None:  # 停止信號
            queue.task_done()
            break

        try:
            # 處理訂單
            await process_order(order_id)
            print(f"Worker {worker_id} processed order {order_id}")

        except Exception as e:
            print(f"Worker {worker_id} error: {e}")

        finally:
            queue.task_done()


async def process_order(order_id):
    """處理單個訂單"""
    await asyncio.sleep(1)  # 模擬處理時間

執行過程

隊列: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

Worker 0: 處理訂單 1 → 1 秒
Worker 1: 處理訂單 2 → 1 秒
Worker 2: 處理訂單 3 → 1 秒
    ↓ (1 秒後)
Worker 0: 處理訂單 4 → 1 秒
Worker 1: 處理訂單 5 → 1 秒
Worker 2: 處理訂單 6 → 1 秒
    ↓ (1 秒後)
...

總耗時:約 4 秒(10 個訂單 / 3 個 Worker)
vs 串行:10 秒

7. 與第三方庫集成

7.1 aiohttp - 異步 HTTP 客戶端

import aiohttp
from django.http import JsonResponse

async def fetch_multiple_urls(request):
    """並行獲取多個 URL"""

    urls = [
        'https://api.github.com/users/django',
        'https://api.github.com/users/python',
        'https://api.github.com/users/microsoft',
    ]

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)

    return JsonResponse({'results': results})


async def fetch_url(session, url):
    """獲取單個 URL"""
    try:
        async with session.get(url, timeout=5) as response:
            return await response.json()
    except Exception as e:
        return {'error': str(e), 'url': url}

7.2 aiofiles - 異步文件操作

import aiofiles
from django.http import JsonResponse

async def read_large_file(request, filename):
    """異步讀取大文件"""

    content = []

    # 異步讀取文件
    async with aiofiles.open(f'/tmp/{filename}', 'r') as f:
        async for line in f:
            content.append(line.strip())

            # 限制行數
            if len(content) >= 1000:
                break

    return JsonResponse({
        'filename': filename,
        'lines': len(content),
        'preview': content[:10],
    })


async def write_logs(request):
    """異步寫入日誌文件"""

    logs = request.POST.getlist('logs')

    # 異步寫入文件
    async with aiofiles.open('/var/log/app.log', 'a') as f:
        for log in logs:
            await f.write(f"{log}\n")

    return JsonResponse({'status': 'Logs written', 'count': len(logs)})

7.3 aiomysql / asyncpg - 異步數據庫驅動

# 使用 asyncpg(PostgreSQL)
import asyncpg

async def raw_sql_query(request):
    """使用原始異步 SQL 查詢"""

    # 創建連接池
    pool = await asyncpg.create_pool(
        user='myuser',
        password='mypassword',
        database='mydb',
        host='localhost',
    )

    # 執行查詢
    async with pool.acquire() as conn:
        rows = await conn.fetch('SELECT * FROM users WHERE age > $1', 18)

    await pool.close()

    return JsonResponse({
        'count': len(rows),
        'users': [dict(row) for row in rows],
    })

8. 實戰案例

8.1 案例 1:聚合多個微服務

import aiohttp
from django.http import JsonResponse
import asyncio

async def user_profile_aggregation(request, user_id):
    """
    聚合多個微服務的用戶數據
    - 用戶服務
    - 訂單服務
    - 支付服務
    - 推薦服務
    """

    async with aiohttp.ClientSession() as session:
        # 設置超時和重試
        results = await asyncio.gather(
            fetch_with_retry(
                session,
                f'http://user-service/api/users/{user_id}',
                timeout=2.0,
                retries=2
            ),
            fetch_with_retry(
                session,
                f'http://order-service/api/orders?user_id={user_id}',
                timeout=3.0,
                retries=2
            ),
            fetch_with_retry(
                session,
                f'http://payment-service/api/payments?user_id={user_id}',
                timeout=3.0,
                retries=2
            ),
            fetch_with_retry(
                session,
                f'http://recommendation-service/api/recommendations/{user_id}',
                timeout=5.0,
                retries=1
            ),
            return_exceptions=True,
        )

    user, orders, payments, recommendations = results

    # 處理錯誤
    response_data = {}

    if isinstance(user, Exception):
        return JsonResponse({'error': 'User service unavailable'}, status=503)

    response_data['user'] = user

    response_data['orders'] = orders if not isinstance(orders, Exception) else []
    response_data['payments'] = payments if not isinstance(payments, Exception) else []
    response_data['recommendations'] = recommendations if not isinstance(recommendations, Exception) else []

    return JsonResponse(response_data)


async def fetch_with_retry(session, url, timeout=5.0, retries=2):
    """帶重試的 HTTP 請求"""
    for attempt in range(retries + 1):
        try:
            async with session.get(url, timeout=timeout) as response:
                return await response.json()

        except asyncio.TimeoutError:
            if attempt == retries:
                raise
            await asyncio.sleep(0.1 * (2 ** attempt))  # 指數退避

        except Exception as e:
            if attempt == retries:
                raise
            await asyncio.sleep(0.1 * (2 ** attempt))

8.2 案例 2:實時數據流處理

import asyncio
from django.http import StreamingHttpResponse

async def stream_live_data(request):
    """流式返回實時數據(Server-Sent Events)"""

    async def event_stream():
        """生成事件流"""
        for i in range(100):
            # 並行獲取多個數據源
            data1, data2, data3 = await asyncio.gather(
                fetch_sensor_data(1),
                fetch_sensor_data(2),
                fetch_sensor_data(3),
            )

            # 格式化為 SSE
            yield f"data: {{'sensor1': {data1}, 'sensor2': {data2}, 'sensor3': {data3}}}\n\n"

            # 等待 1 秒
            await asyncio.sleep(1)

    response = StreamingHttpResponse(
        event_stream(),
        content_type='text/event-stream'
    )
    response['Cache-Control'] = 'no-cache'
    response['X-Accel-Buffering'] = 'no'

    return response


async def fetch_sensor_data(sensor_id):
    """獲取傳感器數據"""
    await asyncio.sleep(0.1)
    import random
    return round(random.uniform(20, 30), 2)

8.3 案例 3:批量數據導入

import asyncio
from django.http import JsonResponse

async def batch_import_users(request):
    """批量導入用戶(分批並行處理)"""

    # 假設從文件讀取了 1000 個用戶
    users_data = [
        {'name': f'User{i}', 'email': f'user{i}@example.com'}
        for i in range(1000)
    ]

    # 分批處理(每批 100 個)
    batch_size = 100
    batches = [
        users_data[i:i + batch_size]
        for i in range(0, len(users_data), batch_size)
    ]

    # 並行處理所有批次
    results = await asyncio.gather(
        *[process_user_batch(batch, idx) for idx, batch in enumerate(batches)],
        return_exceptions=True,
    )

    # 統計結果
    total_success = sum(r['success'] for r in results if isinstance(r, dict))
    total_failed = sum(r['failed'] for r in results if isinstance(r, dict))

    return JsonResponse({
        'total': len(users_data),
        'success': total_success,
        'failed': total_failed,
        'batches': len(batches),
    })


async def process_user_batch(batch, batch_id):
    """處理一批用戶"""
    success = 0
    failed = 0

    for user_data in batch:
        try:
            await User.objects.acreate(**user_data)
            success += 1
        except Exception as e:
            print(f"Batch {batch_id} error: {e}")
            failed += 1

        # 避免過載
        await asyncio.sleep(0.01)

    return {'batch_id': batch_id, 'success': success, 'failed': failed}

9. 最佳實踐與陷阱

9.1 最佳實踐

1. 使用 asyncio.gather() 並行執行獨立任務

# ✅ 好
user, orders, notifications = await asyncio.gather(
    fetch_user(user_id),
    fetch_orders(user_id),
    fetch_notifications(user_id),
)

# ❌ 差
user = await fetch_user(user_id)
orders = await fetch_orders(user_id)
notifications = await fetch_notifications(user_id)

2. 為所有外部調用設置超時

# ✅ 好
result = await asyncio.wait_for(
    fetch_external_api(),
    timeout=5.0
)

# ❌ 差(可能永久阻塞)
result = await fetch_external_api()

3. 使用 return_exceptions=True 容錯

# ✅ 好
results = await asyncio.gather(
    task1(),
    task2(),
    task3(),
    return_exceptions=True,  # 一個失敗不影響其他
)

# ❌ 差(一個失敗全部失敗)
results = await asyncio.gather(
    task1(),
    task2(),
    task3(),
)

4. 限制並發數量

# ✅ 好(使用信號量限制並發)
semaphore = asyncio.Semaphore(10)  # 最多 10 個並發

async def fetch_with_limit(url):
    async with semaphore:
        return await fetch_url(url)

tasks = [fetch_with_limit(url) for url in urls]
results = await asyncio.gather(*tasks)


# ❌ 差(無限並發,可能壓垮服務器)
tasks = [fetch_url(url) for url in urls]  # 可能有 10000 個!
results = await asyncio.gather(*tasks)

9.2 常見陷阱

陷阱 1:忘記 await

# ❌ 錯誤
async def bad_view(request):
    user = User.objects.aget(id=1)  # 忘記 await!
    # user 是協程對象,不是 User 實例
    print(user.name)  # AttributeError!


# ✅ 正確
async def good_view(request):
    user = await User.objects.aget(id=1)
    print(user.name)  # OK

陷阱 2:在異步視圖中使用同步代碼

# ❌ 錯誤(阻塞事件循環)
async def bad_view(request):
    user = User.objects.get(id=1)  # 同步查詢,阻塞!
    time.sleep(1)  # 阻塞整個事件循環!


# ✅ 正確
async def good_view(request):
    user = await User.objects.aget(id=1)  # 異步查詢
    await asyncio.sleep(1)  # 異步睡眠

陷阱 3:過度使用 create_task 導致資源耗盡

# ❌ 錯誤(可能創建數千個任務)
async def bad_view(request):
    for i in range(10000):
        asyncio.create_task(process_item(i))  # 資源耗盡!


# ✅ 正確(使用隊列和有限 Worker)
async def good_view(request):
    queue = asyncio.Queue(maxsize=100)
    workers = [
        asyncio.create_task(worker(queue))
        for _ in range(10)  # 只有 10 個 Worker
    ]

    for i in range(10000):
        await queue.put(i)

    await queue.join()

10. 面試常見問題

Q1: asyncio.gather() 和 asyncio.wait() 有什麼區別?

答案

特性gather()wait()
返回值按順序返回結果列表返回兩個集合:done, pending
錯誤處理可選 return_exceptions需要手動檢查每個 task
使用場景簡單並行執行需要細粒度控制
易用性簡單複雜

推薦:大多數情況使用 gather(),除非需要細粒度控制。

Q2: 如何限制並發數量?

答案

使用 asyncio.Semaphore

semaphore = asyncio.Semaphore(10)  # 最多 10 個並發

async def fetch_with_limit(url):
    async with semaphore:
        return await fetch_url(url)

# 使用
results = await asyncio.gather(*[fetch_with_limit(url) for url in urls])

Q3: 異步視圖中如何處理 CPU 密集型任務?

答案

使用 loop.run_in_executor() 在線程池中執行:

import asyncio

async def cpu_intensive_view(request):
    loop = asyncio.get_event_loop()
    # 在線程池中執行 CPU 密集型任務
    result = await loop.run_in_executor(None, complex_calculation, data)
    return JsonResponse({'result': result})


def complex_calculation(data):
    # CPU 密集型計算
    return sum(data) ** 2

Q4: 如何在 Django 信號中使用異步代碼?

答案

Django 信號是同步的,需要使用 async_to_sync

from django.db.models.signals import post_save
from django.dispatch import receiver
from asgiref.sync import async_to_sync

@receiver(post_save, sender=Order)
def order_created(sender, instance, created, **kwargs):
    if created:
        # 將異步函數轉換為同步
        async_to_sync(send_order_email)(instance.id)


async def send_order_email(order_id):
    # 異步發送郵件
    await asyncio.sleep(1)
    print(f"Email sent for order {order_id}")

Q5: asyncio 在 Django 中的性能提升有多大?

答案

取決於應用類型:

I/O 密集型(大量 API 調用、數據庫查詢):

  • 響應時間:減少 50-80%
  • 吞吐量:提升 10-50 倍

CPU 密集型

  • 性能提升:幾乎沒有(甚至更差)

混合型

  • 性能提升:取決於 I/O 占比

11. 總結

11.1 核心要點

  1. asyncio 工具

    • gather(): 並行執行
    • create_task(): 後台任務
    • wait_for(): 超時控制
    • as_completed(): 按完成順序處理
    • Queue: 生產者消費者
  2. 最佳實踐

    • ✅ 並行執行獨立任務
    • ✅ 設置超時
    • ✅ 使用 return_exceptions=True 容錯
    • ✅ 限制並發數量
  3. 常見陷阱

    • ❌ 忘記 await
    • ❌ 在異步中使用同步代碼
    • ❌ 無限並發
  4. 性能

    • I/O 密集型:巨大提升
    • CPU 密集型:沒有提升

11.2 決策表

場景推薦方案
並行執行多個任務asyncio.gather()
後台任務asyncio.create_task()
設置超時asyncio.wait_for()
限制並發asyncio.Semaphore
CPU 密集型loop.run_in_executor()

參考資料

  1. 官方文檔

  2. 第三方庫

  3. 深入閱讀


本章完結:Django 異步編程系列到此結束!

0%