Django 面試精華:異步 ORM 查詢

深入理解 Django 異步 ORM 的實現與最佳實踐

前言

Django 4.1 開始提供完整的異步 ORM 支持,這是 Django 異步化進程中的重要里程碑。

傳統同步 ORM 的問題

def get_user_posts(request, user_id):
    # 串行執行,每個查詢阻塞線程
    user = User.objects.get(id=user_id)                    # 50ms,線程阻塞
    posts = Post.objects.filter(user=user).all()           # 100ms,線程阻塞
    comments_count = Comment.objects.filter(user=user).count()  # 80ms,線程阻塞
    # 總耗時:230ms,線程被占用 230ms

    return {'user': user, 'posts': posts, 'comments': comments_count}

異步 ORM 的優勢

async def get_user_posts(request, user_id):
    # 並行執行,不阻塞事件循環
    user, posts, comments_count = await asyncio.gather(
        User.objects.aget(id=user_id),                     # 50ms,不阻塞
        Post.objects.filter(user_id=user_id).all(),        # 100ms,不阻塞
        Comment.objects.filter(user_id=user_id).acount(),  # 80ms,不阻塞
    )
    # 總耗時:100ms(取最慢的那個)
    # 事件循環實際占用時間:< 1ms

    return {'user': user, 'posts': list(posts), 'comments': comments_count}

關鍵優勢

  • 響應時間減少 56%(230ms → 100ms)
  • 事件循環不阻塞,可處理其他請求
  • 吞吐量提升 10-50 倍(I/O 密集型場景)

這篇文章將深入探討 Django 異步 ORM 的使用方法、性能優化和最佳實踐。


1. Django 異步 ORM 發展史

1.1 版本演進

Django 版本異步 ORM 支持說明
< 4.0❌ 無只能使用同步 ORM
4.0🟡 實驗性基本的 aget(), asave()
4.1✅ 穩定支持完整的異步 CRUD 操作
4.2✅ 增強異步事務、聚合函數
5.0+✅ 完善性能優化、更多異步方法

推薦:使用 Django 4.1+ 獲得完整的異步 ORM 體驗。

1.2 為什麼需要異步 ORM

同步 ORM 的問題

Gunicorn Worker (線程)
    ↓
執行查詢:User.objects.get(id=1)
    ↓ 發送 SQL 到數據庫
    ↓ ⏰ 等待數據庫響應(50ms)
    ↓ 線程阻塞,無法處理其他請求 ❌
    ↓ 收到數據庫響應
    ↓
繼續執行

在這 50ms 內,Worker 線程完全空閒,但無法處理其他請求!

異步 ORM 的解決方案

Uvicorn Worker (事件循環)
    ↓
協程 1:await User.objects.aget(id=1)
    ↓ 發送 SQL 到數據庫
    ↓ 切換到其他協程 ✅
    │
    ├─ 協程 2:處理其他請求
    ├─ 協程 3:處理其他請求
    ├─ 協程 N:處理其他請求
    │
    ↓ 收到數據庫響應(50ms 後)
    ↓
協程 1:繼續執行

在等待數據庫的 50ms 內,事件循環可以處理數百個其他請求!

2. 異步 ORM 基礎操作

2.1 常用異步方法

同步方法異步方法說明
.get().aget()獲取單個對象
.create().acreate()創建對象
.get_or_create().aget_or_create()獲取或創建
.update_or_create().aupdate_or_create()更新或創建
.save().asave()保存對象
.delete().adelete()刪除對象
.count().acount()計數
.exists().aexists()檢查是否存在
.first().afirst()獲取第一個
.last().alast()獲取最後一個
.aggregate().aaggregate()聚合(Django 5.0+)

2.2 獲取單個對象

from django.http import JsonResponse
from .models import User

# 同步版本
def sync_user_detail(request, user_id):
    user = User.objects.get(id=user_id)
    return JsonResponse({'name': user.name})


# 異步版本
async def async_user_detail(request, user_id):
    user = await User.objects.aget(id=user_id)
    return JsonResponse({'name': user.name})


# 處理不存在的情況
from django.core.exceptions import ObjectDoesNotExist

async def safe_user_detail(request, user_id):
    try:
        user = await User.objects.aget(id=user_id)
        return JsonResponse({'name': user.name})
    except ObjectDoesNotExist:
        return JsonResponse({'error': 'User not found'}, status=404)

2.3 創建對象

# 異步創建
async def create_user(request):
    user = await User.objects.acreate(
        name='Alice',
        email='alice@example.com',
        age=25
    )
    return JsonResponse({'id': user.id, 'name': user.name})


# 獲取或創建
async def get_or_create_user(request, email):
    user, created = await User.objects.aget_or_create(
        email=email,
        defaults={'name': 'Unknown', 'age': 18}
    )
    return JsonResponse({
        'id': user.id,
        'created': created,
    })


# 更新或創建
async def update_or_create_user(request, email):
    user, created = await User.objects.aupdate_or_create(
        email=email,
        defaults={'name': 'Alice', 'age': 25}
    )
    return JsonResponse({
        'id': user.id,
        'created': created,
    })

2.4 更新對象

# 方式 1:修改實例後保存
async def update_user_name(request, user_id):
    user = await User.objects.aget(id=user_id)
    user.name = 'Bob'
    await user.asave()
    return JsonResponse({'success': True})


# 方式 2:批量更新(仍然是同步方法)
async def bulk_update_users(request):
    # ⚠️ update() 目前沒有異步版本,需要用 sync_to_async
    from asgiref.sync import sync_to_async

    count = await sync_to_async(
        User.objects.filter(age__lt=18).update
    )(age=18)

    return JsonResponse({'updated': count})


# 方式 3:使用 F() 表達式(同步)
from django.db.models import F

async def increment_view_count(request, post_id):
    from asgiref.sync import sync_to_async

    await sync_to_async(
        Post.objects.filter(id=post_id).update
    )(views=F('views') + 1)

    return JsonResponse({'success': True})

2.5 刪除對象

# 刪除單個對象
async def delete_user(request, user_id):
    user = await User.objects.aget(id=user_id)
    await user.adelete()
    return JsonResponse({'success': True})


# 批量刪除(需要 sync_to_async)
from asgiref.sync import sync_to_async

async def delete_inactive_users(request):
    count = await sync_to_async(
        lambda: User.objects.filter(is_active=False).delete()[0]
    )()
    return JsonResponse({'deleted': count})

3. AsyncQuerySet 使用

3.1 什麼是 AsyncQuerySet

AsyncQuerySet 是 QuerySet 的異步版本,支持 async for 遍歷。

# 同步 QuerySet
def sync_list_users(request):
    users = User.objects.filter(is_active=True)
    user_list = [user.name for user in users]  # 遍歷
    return JsonResponse({'users': user_list})


# 異步 QuerySet
async def async_list_users(request):
    users = User.objects.filter(is_active=True)  # AsyncQuerySet
    user_list = []
    async for user in users:  # 異步遍歷
        user_list.append(user.name)
    return JsonResponse({'users': user_list})

3.2 過濾和排序

from .models import Post

async def list_posts(request):
    # 鏈式調用(與同步版本相同)
    posts = Post.objects.filter(
        published=True
    ).exclude(
        title__icontains='draft'
    ).order_by(
        '-created_at'
    )[:10]  # 前 10 篇

    # 異步遍歷
    post_list = []
    async for post in posts:
        post_list.append({
            'id': post.id,
            'title': post.title,
            'created_at': post.created_at.isoformat(),
        })

    return JsonResponse({'posts': post_list})

3.3 計數和檢查存在

async def post_stats(request, user_id):
    # 異步計數
    total_posts = await Post.objects.filter(user_id=user_id).acount()

    # 異步檢查存在
    has_published = await Post.objects.filter(
        user_id=user_id,
        published=True
    ).aexists()

    # 異步獲取第一個和最後一個
    first_post = await Post.objects.filter(
        user_id=user_id
    ).order_by('created_at').afirst()

    last_post = await Post.objects.filter(
        user_id=user_id
    ).order_by('-created_at').afirst()

    return JsonResponse({
        'total': total_posts,
        'has_published': has_published,
        'first': first_post.title if first_post else None,
        'last': last_post.title if last_post else None,
    })

注意:目前 Django 異步 ORM 對關聯查詢的支持有限。

# ✅ select_related(支持)
async def post_with_author(request, post_id):
    # 異步 select_related
    post = await Post.objects.select_related('author').aget(id=post_id)

    # 可以直接訪問關聯對象
    return JsonResponse({
        'title': post.title,
        'author': post.author.name,
    })


# 🟡 prefetch_related(部分支持)
async def user_with_posts(request, user_id):
    # prefetch_related 在異步模式下行為可能不符合預期
    # 推薦手動並行查詢
    user, posts = await asyncio.gather(
        User.objects.aget(id=user_id),
        Post.objects.filter(user_id=user_id).all(),
    )

    post_list = []
    async for post in posts:
        post_list.append(post.title)

    return JsonResponse({
        'user': user.name,
        'posts': post_list,
    })

4. 並行查詢優化

4.1 使用 asyncio.gather

asyncio.gather() 是並行執行多個異步操作的標準方法:

import asyncio
from django.http import JsonResponse

async def user_dashboard(request, user_id):
    # 並行執行 5 個查詢
    results = await asyncio.gather(
        User.objects.aget(id=user_id),
        Post.objects.filter(user_id=user_id).acount(),
        Comment.objects.filter(user_id=user_id).acount(),
        Like.objects.filter(user_id=user_id).acount(),
        Follow.objects.filter(follower_id=user_id).acount(),
    )

    user, post_count, comment_count, like_count, follow_count = results

    return JsonResponse({
        'user': user.name,
        'stats': {
            'posts': post_count,
            'comments': comment_count,
            'likes': like_count,
            'follows': follow_count,
        },
    })

性能對比

同步版本(串行):
├─ User.objects.get()         50ms
├─ Post.objects.count()       80ms
├─ Comment.objects.count()    70ms
├─ Like.objects.count()       60ms
└─ Follow.objects.count()     40ms
總耗時:300ms

異步版本(並行):
├─ User.objects.aget()        50ms  ┐
├─ Post.objects.acount()      80ms  │
├─ Comment.objects.acount()   70ms  ├─ 並行執行
├─ Like.objects.acount()      60ms  │
└─ Follow.objects.acount()    40ms  ┘
總耗時:80ms(取最慢的那個)

性能提升:73% ↑

4.2 容錯處理

使用 return_exceptions=True 讓單個查詢失敗不影響其他查詢:

async def user_dashboard_safe(request, user_id):
    results = await asyncio.gather(
        User.objects.aget(id=user_id),
        Post.objects.filter(user_id=user_id).acount(),
        Comment.objects.filter(user_id=user_id).acount(),
        fetch_external_api(user_id),  # 可能失敗的外部 API
        return_exceptions=True,  # 不讓一個錯誤影響其他操作
    )

    user, post_count, comment_count, external_data = results

    # 檢查錯誤
    if isinstance(user, Exception):
        return JsonResponse({'error': 'User not found'}, status=404)

    if isinstance(post_count, Exception):
        post_count = 0  # 使用默認值

    if isinstance(comment_count, Exception):
        comment_count = 0

    if isinstance(external_data, Exception):
        external_data = {}  # 使用默認值

    return JsonResponse({
        'user': user.name,
        'posts': post_count,
        'comments': comment_count,
        'external': external_data,
    })

4.3 條件並行查詢

根據業務邏輯動態決定需要並行執行哪些查詢:

async def user_profile(request, user_id, include_stats=False):
    # 基本查詢(總是執行)
    tasks = [
        User.objects.aget(id=user_id),
        Post.objects.filter(user_id=user_id).order_by('-created_at')[:5].all(),
    ]

    # 條件查詢(根據參數決定)
    if include_stats:
        tasks.extend([
            Post.objects.filter(user_id=user_id).acount(),
            Comment.objects.filter(user_id=user_id).acount(),
            Follow.objects.filter(follower_id=user_id).acount(),
        ])

    # 執行所有查詢
    results = await asyncio.gather(*tasks)

    user = results[0]
    recent_posts = results[1]

    response_data = {
        'user': user.name,
        'recent_posts': [post.title async for post in recent_posts],
    }

    # 如果包含統計信息
    if include_stats:
        response_data['stats'] = {
            'posts': results[2],
            'comments': results[3],
            'follows': results[4],
        }

    return JsonResponse(response_data)

5. 異步事務

5.1 基本異步事務(Django 4.2+)

from django.db import transaction

async def transfer_money(request, from_user_id, to_user_id, amount):
    """異步事務:轉賬"""
    try:
        async with transaction.atomic():
            # 異步獲取兩個用戶
            from_user, to_user = await asyncio.gather(
                User.objects.select_for_update().aget(id=from_user_id),
                User.objects.select_for_update().aget(id=to_user_id),
            )

            # 檢查餘額
            if from_user.balance < amount:
                raise ValueError("Insufficient balance")

            # 扣款
            from_user.balance -= amount
            await from_user.asave()

            # 加款
            to_user.balance += amount
            await to_user.asave()

            # 記錄交易(異步創建)
            await Transaction.objects.acreate(
                from_user=from_user,
                to_user=to_user,
                amount=amount,
            )

        return JsonResponse({'success': True})

    except ValueError as e:
        return JsonResponse({'error': str(e)}, status=400)

    except Exception as e:
        return JsonResponse({'error': 'Transaction failed'}, status=500)

5.2 裝飾器方式

from asgiref.sync import sync_to_async

@sync_to_async
@transaction.atomic
def create_order_sync(user_id, items):
    """同步事務函數"""
    user = User.objects.get(id=user_id)
    order = Order.objects.create(user=user, total=0)

    total = 0
    for item_data in items:
        item = OrderItem.objects.create(
            order=order,
            product_id=item_data['product_id'],
            quantity=item_data['quantity'],
            price=item_data['price'],
        )
        total += item.price * item.quantity

    order.total = total
    order.save()

    return order


async def create_order_async(request):
    """在異步視圖中調用"""
    items = request.data.get('items')
    order = await create_order_sync(request.user.id, items)

    return JsonResponse({
        'order_id': order.id,
        'total': float(order.total),
    })

6. 性能對比與測試

6.1 單個查詢對比

import time

# 同步版本
def sync_single_query(request):
    start = time.time()
    user = User.objects.get(id=1)
    elapsed = (time.time() - start) * 1000
    print(f"同步單查詢: {elapsed:.2f}ms")
    return JsonResponse({'name': user.name})
    # 輸出:同步單查詢: 50.23ms


# 異步版本
async def async_single_query(request):
    start = time.time()
    user = await User.objects.aget(id=1)
    elapsed = (time.time() - start) * 1000
    print(f"異步單查詢: {elapsed:.2f}ms")
    return JsonResponse({'name': user.name})
    # 輸出:異步單查詢: 51.87ms

結論:單個查詢時,異步沒有明顯優勢(甚至略慢,因為有協程開銷)。

6.2 並行查詢對比

# 同步版本(串行)
def sync_multiple_queries(request):
    start = time.time()

    user = User.objects.get(id=1)           # 50ms
    posts = Post.objects.filter(user=user).count()  # 80ms
    comments = Comment.objects.filter(user=user).count()  # 70ms

    elapsed = (time.time() - start) * 1000
    print(f"同步多查詢: {elapsed:.2f}ms")
    # 輸出:同步多查詢: 203.45ms

    return JsonResponse({
        'user': user.name,
        'posts': posts,
        'comments': comments,
    })


# 異步版本(並行)
async def async_multiple_queries(request):
    start = time.time()

    user, posts, comments = await asyncio.gather(
        User.objects.aget(id=1),              # 50ms
        Post.objects.filter(user_id=1).acount(),  # 80ms
        Comment.objects.filter(user_id=1).acount(),  # 70ms
    )

    elapsed = (time.time() - start) * 1000
    print(f"異步多查詢: {elapsed:.2f}ms")
    # 輸出:異步多查詢: 82.34ms

    return JsonResponse({
        'user': user.name,
        'posts': posts,
        'comments': comments,
    })

結論:並行查詢時,異步性能提升 60%(203ms → 82ms)。

6.3 壓力測試

使用 wrk 進行壓力測試:

# 測試同步版本
wrk -t4 -c100 -d30s http://localhost:8000/sync/dashboard/1/
# Requests/sec:    245.32

# 測試異步版本
wrk -t4 -c100 -d30s http://localhost:8000/async/dashboard/1/
# Requests/sec:   1124.58

結論:異步版本吞吐量提升 358%


7. 限制與注意事項

7.1 不支持的操作

目前不支持(Django 4.1):

  1. 批量更新

    # ❌ 沒有 aupdate() 方法
    User.objects.filter(age__lt=18).update(age=18)
    
    # ✅ 需要使用 sync_to_async
    await sync_to_async(
        User.objects.filter(age__lt=18).update
    )(age=18)
  2. 批量創建

    # ❌ 沒有 abulk_create() 方法
    User.objects.bulk_create([...])
    
    # ✅ 需要使用 sync_to_async
    await sync_to_async(User.objects.bulk_create)([...])
  3. 聚合函數(Django 5.0+ 支持):

    from django.db.models import Avg, Sum
    
    # Django 4.x: ❌ 沒有異步版本
    # Django 5.0+: ✅ 支持 aaggregate()
    stats = await User.objects.aaggregate(
        avg_age=Avg('age'),
        total_balance=Sum('balance'),
    )
  4. 原始 SQL

    # ❌ 沒有異步版本
    User.objects.raw("SELECT * FROM users")
    
    # ✅ 需要使用 sync_to_async 或數據庫驅動的異步接口

7.2 數據庫驅動支持

異步 ORM 需要數據庫驅動支持異步操作:

數據庫同步驅動異步驅動Django 支持
PostgreSQLpsycopg2psycopg3, asyncpg✅ 完整支持
MySQLmysqlclientaiomysql, asyncmy🟡 部分支持
SQLitesqlite3aiosqlite✅ 支持
Oraclecx_Oracle-❌ 不支持

推薦配置(PostgreSQL)

# settings.py

DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.postgresql',
        'NAME': 'mydb',
        'USER': 'myuser',
        'PASSWORD': 'mypassword',
        'HOST': 'localhost',
        'PORT': '5432',
        # 異步連接池配置
        'CONN_MAX_AGE': 600,
    }
}

7.3 連接池管理

異步環境下需要特別注意連接池配置:

# settings.py

DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.postgresql',
        'NAME': 'mydb',
        'CONN_MAX_AGE': 600,  # 連接復用時間(秒)
        'OPTIONS': {
            'connect_timeout': 10,
            'options': '-c statement_timeout=30000',  # 30 秒查詢超時
        },
    }
}

# 異步環境推薦設置
# - CONN_MAX_AGE: 較長的連接復用時間(減少連接開銷)
# - statement_timeout: 防止長時間查詢阻塞

8. 實戰案例

8.1 案例 1:用戶儀表板(複雜並行查詢)

from django.http import JsonResponse
from .models import User, Post, Comment, Like, Follow
import asyncio

async def user_dashboard_api(request, user_id):
    """
    用戶儀表板 API
    需要並行查詢多個數據源
    """
    # 第一批:基本資料和計數統計
    basic_results = await asyncio.gather(
        User.objects.aget(id=user_id),
        Post.objects.filter(user_id=user_id).acount(),
        Comment.objects.filter(user_id=user_id).acount(),
        Like.objects.filter(user_id=user_id).acount(),
        Follow.objects.filter(follower_id=user_id).acount(),
        Follow.objects.filter(following_id=user_id).acount(),
    )

    user, post_count, comment_count, like_count, following_count, follower_count = basic_results

    # 第二批:最近內容(使用第一批的結果)
    recent_results = await asyncio.gather(
        get_recent_posts(user_id, limit=5),
        get_recent_comments(user_id, limit=10),
        get_popular_posts(user_id, limit=3),
    )

    recent_posts, recent_comments, popular_posts = recent_results

    return JsonResponse({
        'user': {
            'id': user.id,
            'name': user.name,
            'email': user.email,
            'avatar': user.avatar,
        },
        'stats': {
            'posts': post_count,
            'comments': comment_count,
            'likes': like_count,
            'following': following_count,
            'followers': follower_count,
        },
        'recent_posts': recent_posts,
        'recent_comments': recent_comments,
        'popular_posts': popular_posts,
    })


async def get_recent_posts(user_id, limit=5):
    """獲取最近文章"""
    posts = []
    queryset = Post.objects.filter(
        user_id=user_id
    ).order_by('-created_at')[:limit]

    async for post in queryset:
        posts.append({
            'id': post.id,
            'title': post.title,
            'created_at': post.created_at.isoformat(),
        })

    return posts


async def get_recent_comments(user_id, limit=10):
    """獲取最近評論"""
    comments = []
    queryset = Comment.objects.filter(
        user_id=user_id
    ).select_related('post').order_by('-created_at')[:limit]

    async for comment in queryset:
        comments.append({
            'id': comment.id,
            'content': comment.content[:50],
            'post_title': comment.post.title,
            'created_at': comment.created_at.isoformat(),
        })

    return comments


async def get_popular_posts(user_id, limit=3):
    """獲取熱門文章"""
    posts = []
    queryset = Post.objects.filter(
        user_id=user_id
    ).order_by('-view_count')[:limit]

    async for post in queryset:
        posts.append({
            'id': post.id,
            'title': post.title,
            'views': post.view_count,
        })

    return posts

性能分析

同步版本(串行):
├─ 基本查詢(6 個): 50 + 80 + 70 + 60 + 40 + 40 = 340ms
└─ 最近內容(3 個): 100 + 120 + 80 = 300ms
總耗時:640ms

異步版本(並行):
├─ 基本查詢(6 個並行): max(50, 80, 70, 60, 40, 40) = 80ms
└─ 最近內容(3 個並行): max(100, 120, 80) = 120ms
總耗時:200ms

性能提升:69% ↑

8.2 案例 2:批量數據導出

import csv
from django.http import StreamingHttpResponse

class AsyncCSVBuffer:
    """異步 CSV 緩衝器"""
    def __init__(self):
        self.buffer = []

    async def write_row(self, row):
        self.buffer.append(row)
        if len(self.buffer) >= 100:  # 每 100 行 yield 一次
            yield from self.flush()

    def flush(self):
        """刷新緩衝區"""
        output = '\n'.join([','.join(map(str, row)) for row in self.buffer])
        self.buffer = []
        return [output + '\n']


async def export_users_csv(request):
    """異步導出用戶數據為 CSV"""
    async def generate():
        # CSV 頭部
        yield 'ID,Name,Email,Posts,Comments\n'

        # 異步遍歷所有用戶
        async for user in User.objects.all():
            # 並行獲取用戶統計
            post_count, comment_count = await asyncio.gather(
                Post.objects.filter(user=user).acount(),
                Comment.objects.filter(user=user).acount(),
            )

            # 生成 CSV 行
            yield f'{user.id},{user.name},{user.email},{post_count},{comment_count}\n'

    response = StreamingHttpResponse(
        generate(),
        content_type='text/csv'
    )
    response['Content-Disposition'] = 'attachment; filename="users.csv"'

    return response

8.3 案例 3:緩存與數據庫混合查詢

from django.core.cache import cache
import json

async def get_post_with_cache(request, post_id):
    """
    優先從緩存獲取,緩存未命中則查詢數據庫
    並行查詢文章和相關數據
    """
    cache_key = f'post:{post_id}'

    # 嘗試從緩存獲取
    cached_data = await sync_to_async(cache.get)(cache_key)
    if cached_data:
        return JsonResponse(json.loads(cached_data))

    # 緩存未命中,並行查詢數據庫
    results = await asyncio.gather(
        Post.objects.select_related('author').aget(id=post_id),
        Comment.objects.filter(post_id=post_id).acount(),
        Like.objects.filter(post_id=post_id).acount(),
    )

    post, comment_count, like_count = results

    # 構建響應數據
    data = {
        'id': post.id,
        'title': post.title,
        'content': post.content,
        'author': post.author.name,
        'comments': comment_count,
        'likes': like_count,
    }

    # 異步寫入緩存
    await sync_to_async(cache.set)(
        cache_key,
        json.dumps(data),
        timeout=300  # 5 分鐘
    )

    return JsonResponse(data)

9. 面試常見問題

Q1: Django 異步 ORM 和同步 ORM 有什麼區別?

答案

特性同步 ORM異步 ORM
執行方式阻塞線程非阻塞事件循環
方法前綴無(get, savea 前綴(aget, asave
查詢遍歷for user in usersasync for user in users
並行查詢無法並行asyncio.gather()
性能串行執行並行執行
Django 版本任意版本4.1+

Q2: 什麼時候應該使用異步 ORM?

答案

應該使用

  1. 需要並行執行多個數據庫查詢
  2. I/O 密集型場景(API 調用 + 數據庫查詢)
  3. 高並發需求(> 1000 QPS)
  4. 已經在使用 ASGI 和異步視圖

不應該使用

  1. 單個簡單查詢(沒有並行優勢)
  2. 批量更新操作(目前不支持)
  3. 複雜的 ORM 操作(如聚合、註解)
  4. 使用 WSGI 服務器

Q3: 如何並行執行多個異步查詢?

答案

使用 asyncio.gather()

# 並行執行 3 個查詢
user, posts, comments = await asyncio.gather(
    User.objects.aget(id=1),
    Post.objects.filter(user_id=1).acount(),
    Comment.objects.filter(user_id=1).acount(),
)

注意

  • ✅ 適合獨立的查詢(不互相依賴)
  • ❌ 不適合有依賴關係的查詢(需要串行)

Q4: 異步 ORM 支持事務嗎?

答案

支持(Django 4.2+):

from django.db import transaction

async def transfer(from_id, to_id, amount):
    async with transaction.atomic():
        from_user = await User.objects.aget(id=from_id)
        to_user = await User.objects.aget(id=to_id)

        from_user.balance -= amount
        await from_user.asave()

        to_user.balance += amount
        await to_user.asave()

限制

  • 🟡 部分數據庫驅動可能不完全支持
  • 🟡 嵌套事務支持有限

Q5: 為什麼單個異步查詢比同步慢?

答案

因為異步有額外開銷

  • 協程創建和調度
  • 事件循環管理
  • 上下文切換
同步查詢:
發起查詢 → 等待結果 → 返回
耗時:50ms

異步查詢:
創建協程 → 發起查詢 → 掛起 → 恢復 → 返回
耗時:50ms + 協程開銷(1-2ms)

結論

  • 單個查詢:同步更快
  • 多個查詢:異步更快(並行執行)

10. 總結

10.1 核心要點

  1. 異步 ORM

    • Django 4.1+ 完整支持
    • 方法前綴 aaget(), asave(), acount()
    • 使用 async for 遍歷 AsyncQuerySet
  2. 性能優勢

    • 並行查詢:提升 50-80%
    • 高並發:吞吐量提升 10-50 倍
    • 不阻塞事件循環
  3. 最佳實踐

    • ✅ 多個獨立查詢 → 使用 asyncio.gather()
    • ✅ I/O 密集型 → 異步 ORM
    • ❌ 單個查詢 → 同步 ORM(更簡單)
    • ❌ 批量操作 → 使用 sync_to_async
  4. 限制

    • 需要異步數據庫驅動
    • 部分 ORM 功能不支持(批量更新、聚合)
    • 需要 Django 4.1+

10.2 快速決策表

場景推薦理由
多個並行查詢異步 ✅性能提升明顯
單個查詢同步 ✅更簡單,性能相當
複雜聚合同步 ✅異步支持有限
批量更新同步 ✅異步不支持
高並發場景異步 ✅吞吐量高

參考資料

  1. 官方文檔

  2. 數據庫驅動

  3. 深入閱讀


下一篇預告:12-4. asyncio 在 Django 中的使用 - 深入探討如何在 Django 中充分利用 asyncio 的強大功能

0%