Django 面試精華:異步 ORM 查詢
深入理解 Django 異步 ORM 的實現與最佳實踐
前言
Django 4.1 開始提供完整的異步 ORM 支持,這是 Django 異步化進程中的重要里程碑。
傳統同步 ORM 的問題:
def get_user_posts(request, user_id):
# 串行執行,每個查詢阻塞線程
user = User.objects.get(id=user_id) # 50ms,線程阻塞
posts = Post.objects.filter(user=user).all() # 100ms,線程阻塞
comments_count = Comment.objects.filter(user=user).count() # 80ms,線程阻塞
# 總耗時:230ms,線程被占用 230ms
return {'user': user, 'posts': posts, 'comments': comments_count}異步 ORM 的優勢:
async def get_user_posts(request, user_id):
# 並行執行,不阻塞事件循環
user, posts, comments_count = await asyncio.gather(
User.objects.aget(id=user_id), # 50ms,不阻塞
Post.objects.filter(user_id=user_id).all(), # 100ms,不阻塞
Comment.objects.filter(user_id=user_id).acount(), # 80ms,不阻塞
)
# 總耗時:100ms(取最慢的那個)
# 事件循環實際占用時間:< 1ms
return {'user': user, 'posts': list(posts), 'comments': comments_count}關鍵優勢:
- ✅ 響應時間減少 56%(230ms → 100ms)
- ✅ 事件循環不阻塞,可處理其他請求
- ✅ 吞吐量提升 10-50 倍(I/O 密集型場景)
這篇文章將深入探討 Django 異步 ORM 的使用方法、性能優化和最佳實踐。
1. Django 異步 ORM 發展史
1.1 版本演進
| Django 版本 | 異步 ORM 支持 | 說明 |
|---|---|---|
| < 4.0 | ❌ 無 | 只能使用同步 ORM |
| 4.0 | 🟡 實驗性 | 基本的 aget(), asave() 等 |
| 4.1 | ✅ 穩定支持 | 完整的異步 CRUD 操作 |
| 4.2 | ✅ 增強 | 異步事務、聚合函數 |
| 5.0+ | ✅ 完善 | 性能優化、更多異步方法 |
推薦:使用 Django 4.1+ 獲得完整的異步 ORM 體驗。
1.2 為什麼需要異步 ORM
同步 ORM 的問題:
Gunicorn Worker (線程)
↓
執行查詢:User.objects.get(id=1)
↓ 發送 SQL 到數據庫
↓ ⏰ 等待數據庫響應(50ms)
↓ 線程阻塞,無法處理其他請求 ❌
↓ 收到數據庫響應
↓
繼續執行
在這 50ms 內,Worker 線程完全空閒,但無法處理其他請求!異步 ORM 的解決方案:
Uvicorn Worker (事件循環)
↓
協程 1:await User.objects.aget(id=1)
↓ 發送 SQL 到數據庫
↓ 切換到其他協程 ✅
│
├─ 協程 2:處理其他請求
├─ 協程 3:處理其他請求
├─ 協程 N:處理其他請求
│
↓ 收到數據庫響應(50ms 後)
↓
協程 1:繼續執行
在等待數據庫的 50ms 內,事件循環可以處理數百個其他請求!2. 異步 ORM 基礎操作
2.1 常用異步方法
| 同步方法 | 異步方法 | 說明 |
|---|---|---|
.get() | .aget() | 獲取單個對象 |
.create() | .acreate() | 創建對象 |
.get_or_create() | .aget_or_create() | 獲取或創建 |
.update_or_create() | .aupdate_or_create() | 更新或創建 |
.save() | .asave() | 保存對象 |
.delete() | .adelete() | 刪除對象 |
.count() | .acount() | 計數 |
.exists() | .aexists() | 檢查是否存在 |
.first() | .afirst() | 獲取第一個 |
.last() | .alast() | 獲取最後一個 |
.aggregate() | .aaggregate() | 聚合(Django 5.0+) |
2.2 獲取單個對象
from django.http import JsonResponse
from .models import User
# 同步版本
def sync_user_detail(request, user_id):
user = User.objects.get(id=user_id)
return JsonResponse({'name': user.name})
# 異步版本
async def async_user_detail(request, user_id):
user = await User.objects.aget(id=user_id)
return JsonResponse({'name': user.name})
# 處理不存在的情況
from django.core.exceptions import ObjectDoesNotExist
async def safe_user_detail(request, user_id):
try:
user = await User.objects.aget(id=user_id)
return JsonResponse({'name': user.name})
except ObjectDoesNotExist:
return JsonResponse({'error': 'User not found'}, status=404)2.3 創建對象
# 異步創建
async def create_user(request):
user = await User.objects.acreate(
name='Alice',
email='alice@example.com',
age=25
)
return JsonResponse({'id': user.id, 'name': user.name})
# 獲取或創建
async def get_or_create_user(request, email):
user, created = await User.objects.aget_or_create(
email=email,
defaults={'name': 'Unknown', 'age': 18}
)
return JsonResponse({
'id': user.id,
'created': created,
})
# 更新或創建
async def update_or_create_user(request, email):
user, created = await User.objects.aupdate_or_create(
email=email,
defaults={'name': 'Alice', 'age': 25}
)
return JsonResponse({
'id': user.id,
'created': created,
})2.4 更新對象
# 方式 1:修改實例後保存
async def update_user_name(request, user_id):
user = await User.objects.aget(id=user_id)
user.name = 'Bob'
await user.asave()
return JsonResponse({'success': True})
# 方式 2:批量更新(仍然是同步方法)
async def bulk_update_users(request):
# ⚠️ update() 目前沒有異步版本,需要用 sync_to_async
from asgiref.sync import sync_to_async
count = await sync_to_async(
User.objects.filter(age__lt=18).update
)(age=18)
return JsonResponse({'updated': count})
# 方式 3:使用 F() 表達式(同步)
from django.db.models import F
async def increment_view_count(request, post_id):
from asgiref.sync import sync_to_async
await sync_to_async(
Post.objects.filter(id=post_id).update
)(views=F('views') + 1)
return JsonResponse({'success': True})2.5 刪除對象
# 刪除單個對象
async def delete_user(request, user_id):
user = await User.objects.aget(id=user_id)
await user.adelete()
return JsonResponse({'success': True})
# 批量刪除(需要 sync_to_async)
from asgiref.sync import sync_to_async
async def delete_inactive_users(request):
count = await sync_to_async(
lambda: User.objects.filter(is_active=False).delete()[0]
)()
return JsonResponse({'deleted': count})3. AsyncQuerySet 使用
3.1 什麼是 AsyncQuerySet
AsyncQuerySet 是 QuerySet 的異步版本,支持 async for 遍歷。
# 同步 QuerySet
def sync_list_users(request):
users = User.objects.filter(is_active=True)
user_list = [user.name for user in users] # 遍歷
return JsonResponse({'users': user_list})
# 異步 QuerySet
async def async_list_users(request):
users = User.objects.filter(is_active=True) # AsyncQuerySet
user_list = []
async for user in users: # 異步遍歷
user_list.append(user.name)
return JsonResponse({'users': user_list})3.2 過濾和排序
from .models import Post
async def list_posts(request):
# 鏈式調用(與同步版本相同)
posts = Post.objects.filter(
published=True
).exclude(
title__icontains='draft'
).order_by(
'-created_at'
)[:10] # 前 10 篇
# 異步遍歷
post_list = []
async for post in posts:
post_list.append({
'id': post.id,
'title': post.title,
'created_at': post.created_at.isoformat(),
})
return JsonResponse({'posts': post_list})3.3 計數和檢查存在
async def post_stats(request, user_id):
# 異步計數
total_posts = await Post.objects.filter(user_id=user_id).acount()
# 異步檢查存在
has_published = await Post.objects.filter(
user_id=user_id,
published=True
).aexists()
# 異步獲取第一個和最後一個
first_post = await Post.objects.filter(
user_id=user_id
).order_by('created_at').afirst()
last_post = await Post.objects.filter(
user_id=user_id
).order_by('-created_at').afirst()
return JsonResponse({
'total': total_posts,
'has_published': has_published,
'first': first_post.title if first_post else None,
'last': last_post.title if last_post else None,
})3.4 select_related 和 prefetch_related
注意:目前 Django 異步 ORM 對關聯查詢的支持有限。
# ✅ select_related(支持)
async def post_with_author(request, post_id):
# 異步 select_related
post = await Post.objects.select_related('author').aget(id=post_id)
# 可以直接訪問關聯對象
return JsonResponse({
'title': post.title,
'author': post.author.name,
})
# 🟡 prefetch_related(部分支持)
async def user_with_posts(request, user_id):
# prefetch_related 在異步模式下行為可能不符合預期
# 推薦手動並行查詢
user, posts = await asyncio.gather(
User.objects.aget(id=user_id),
Post.objects.filter(user_id=user_id).all(),
)
post_list = []
async for post in posts:
post_list.append(post.title)
return JsonResponse({
'user': user.name,
'posts': post_list,
})4. 並行查詢優化
4.1 使用 asyncio.gather
asyncio.gather() 是並行執行多個異步操作的標準方法:
import asyncio
from django.http import JsonResponse
async def user_dashboard(request, user_id):
# 並行執行 5 個查詢
results = await asyncio.gather(
User.objects.aget(id=user_id),
Post.objects.filter(user_id=user_id).acount(),
Comment.objects.filter(user_id=user_id).acount(),
Like.objects.filter(user_id=user_id).acount(),
Follow.objects.filter(follower_id=user_id).acount(),
)
user, post_count, comment_count, like_count, follow_count = results
return JsonResponse({
'user': user.name,
'stats': {
'posts': post_count,
'comments': comment_count,
'likes': like_count,
'follows': follow_count,
},
})性能對比:
同步版本(串行):
├─ User.objects.get() 50ms
├─ Post.objects.count() 80ms
├─ Comment.objects.count() 70ms
├─ Like.objects.count() 60ms
└─ Follow.objects.count() 40ms
總耗時:300ms
異步版本(並行):
├─ User.objects.aget() 50ms ┐
├─ Post.objects.acount() 80ms │
├─ Comment.objects.acount() 70ms ├─ 並行執行
├─ Like.objects.acount() 60ms │
└─ Follow.objects.acount() 40ms ┘
總耗時:80ms(取最慢的那個)
性能提升:73% ↑4.2 容錯處理
使用 return_exceptions=True 讓單個查詢失敗不影響其他查詢:
async def user_dashboard_safe(request, user_id):
results = await asyncio.gather(
User.objects.aget(id=user_id),
Post.objects.filter(user_id=user_id).acount(),
Comment.objects.filter(user_id=user_id).acount(),
fetch_external_api(user_id), # 可能失敗的外部 API
return_exceptions=True, # 不讓一個錯誤影響其他操作
)
user, post_count, comment_count, external_data = results
# 檢查錯誤
if isinstance(user, Exception):
return JsonResponse({'error': 'User not found'}, status=404)
if isinstance(post_count, Exception):
post_count = 0 # 使用默認值
if isinstance(comment_count, Exception):
comment_count = 0
if isinstance(external_data, Exception):
external_data = {} # 使用默認值
return JsonResponse({
'user': user.name,
'posts': post_count,
'comments': comment_count,
'external': external_data,
})4.3 條件並行查詢
根據業務邏輯動態決定需要並行執行哪些查詢:
async def user_profile(request, user_id, include_stats=False):
# 基本查詢(總是執行)
tasks = [
User.objects.aget(id=user_id),
Post.objects.filter(user_id=user_id).order_by('-created_at')[:5].all(),
]
# 條件查詢(根據參數決定)
if include_stats:
tasks.extend([
Post.objects.filter(user_id=user_id).acount(),
Comment.objects.filter(user_id=user_id).acount(),
Follow.objects.filter(follower_id=user_id).acount(),
])
# 執行所有查詢
results = await asyncio.gather(*tasks)
user = results[0]
recent_posts = results[1]
response_data = {
'user': user.name,
'recent_posts': [post.title async for post in recent_posts],
}
# 如果包含統計信息
if include_stats:
response_data['stats'] = {
'posts': results[2],
'comments': results[3],
'follows': results[4],
}
return JsonResponse(response_data)5. 異步事務
5.1 基本異步事務(Django 4.2+)
from django.db import transaction
async def transfer_money(request, from_user_id, to_user_id, amount):
"""異步事務:轉賬"""
try:
async with transaction.atomic():
# 異步獲取兩個用戶
from_user, to_user = await asyncio.gather(
User.objects.select_for_update().aget(id=from_user_id),
User.objects.select_for_update().aget(id=to_user_id),
)
# 檢查餘額
if from_user.balance < amount:
raise ValueError("Insufficient balance")
# 扣款
from_user.balance -= amount
await from_user.asave()
# 加款
to_user.balance += amount
await to_user.asave()
# 記錄交易(異步創建)
await Transaction.objects.acreate(
from_user=from_user,
to_user=to_user,
amount=amount,
)
return JsonResponse({'success': True})
except ValueError as e:
return JsonResponse({'error': str(e)}, status=400)
except Exception as e:
return JsonResponse({'error': 'Transaction failed'}, status=500)5.2 裝飾器方式
from asgiref.sync import sync_to_async
@sync_to_async
@transaction.atomic
def create_order_sync(user_id, items):
"""同步事務函數"""
user = User.objects.get(id=user_id)
order = Order.objects.create(user=user, total=0)
total = 0
for item_data in items:
item = OrderItem.objects.create(
order=order,
product_id=item_data['product_id'],
quantity=item_data['quantity'],
price=item_data['price'],
)
total += item.price * item.quantity
order.total = total
order.save()
return order
async def create_order_async(request):
"""在異步視圖中調用"""
items = request.data.get('items')
order = await create_order_sync(request.user.id, items)
return JsonResponse({
'order_id': order.id,
'total': float(order.total),
})6. 性能對比與測試
6.1 單個查詢對比
import time
# 同步版本
def sync_single_query(request):
start = time.time()
user = User.objects.get(id=1)
elapsed = (time.time() - start) * 1000
print(f"同步單查詢: {elapsed:.2f}ms")
return JsonResponse({'name': user.name})
# 輸出:同步單查詢: 50.23ms
# 異步版本
async def async_single_query(request):
start = time.time()
user = await User.objects.aget(id=1)
elapsed = (time.time() - start) * 1000
print(f"異步單查詢: {elapsed:.2f}ms")
return JsonResponse({'name': user.name})
# 輸出:異步單查詢: 51.87ms結論:單個查詢時,異步沒有明顯優勢(甚至略慢,因為有協程開銷)。
6.2 並行查詢對比
# 同步版本(串行)
def sync_multiple_queries(request):
start = time.time()
user = User.objects.get(id=1) # 50ms
posts = Post.objects.filter(user=user).count() # 80ms
comments = Comment.objects.filter(user=user).count() # 70ms
elapsed = (time.time() - start) * 1000
print(f"同步多查詢: {elapsed:.2f}ms")
# 輸出:同步多查詢: 203.45ms
return JsonResponse({
'user': user.name,
'posts': posts,
'comments': comments,
})
# 異步版本(並行)
async def async_multiple_queries(request):
start = time.time()
user, posts, comments = await asyncio.gather(
User.objects.aget(id=1), # 50ms
Post.objects.filter(user_id=1).acount(), # 80ms
Comment.objects.filter(user_id=1).acount(), # 70ms
)
elapsed = (time.time() - start) * 1000
print(f"異步多查詢: {elapsed:.2f}ms")
# 輸出:異步多查詢: 82.34ms
return JsonResponse({
'user': user.name,
'posts': posts,
'comments': comments,
})結論:並行查詢時,異步性能提升 60%(203ms → 82ms)。
6.3 壓力測試
使用 wrk 進行壓力測試:
# 測試同步版本
wrk -t4 -c100 -d30s http://localhost:8000/sync/dashboard/1/
# Requests/sec: 245.32
# 測試異步版本
wrk -t4 -c100 -d30s http://localhost:8000/async/dashboard/1/
# Requests/sec: 1124.58結論:異步版本吞吐量提升 358%!
7. 限制與注意事項
7.1 不支持的操作
❌ 目前不支持(Django 4.1):
批量更新:
# ❌ 沒有 aupdate() 方法 User.objects.filter(age__lt=18).update(age=18) # ✅ 需要使用 sync_to_async await sync_to_async( User.objects.filter(age__lt=18).update )(age=18)批量創建:
# ❌ 沒有 abulk_create() 方法 User.objects.bulk_create([...]) # ✅ 需要使用 sync_to_async await sync_to_async(User.objects.bulk_create)([...])聚合函數(Django 5.0+ 支持):
from django.db.models import Avg, Sum # Django 4.x: ❌ 沒有異步版本 # Django 5.0+: ✅ 支持 aaggregate() stats = await User.objects.aaggregate( avg_age=Avg('age'), total_balance=Sum('balance'), )原始 SQL:
# ❌ 沒有異步版本 User.objects.raw("SELECT * FROM users") # ✅ 需要使用 sync_to_async 或數據庫驅動的異步接口
7.2 數據庫驅動支持
異步 ORM 需要數據庫驅動支持異步操作:
| 數據庫 | 同步驅動 | 異步驅動 | Django 支持 |
|---|---|---|---|
| PostgreSQL | psycopg2 | psycopg3, asyncpg | ✅ 完整支持 |
| MySQL | mysqlclient | aiomysql, asyncmy | 🟡 部分支持 |
| SQLite | sqlite3 | aiosqlite | ✅ 支持 |
| Oracle | cx_Oracle | - | ❌ 不支持 |
推薦配置(PostgreSQL):
# settings.py
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'mydb',
'USER': 'myuser',
'PASSWORD': 'mypassword',
'HOST': 'localhost',
'PORT': '5432',
# 異步連接池配置
'CONN_MAX_AGE': 600,
}
}7.3 連接池管理
異步環境下需要特別注意連接池配置:
# settings.py
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'mydb',
'CONN_MAX_AGE': 600, # 連接復用時間(秒)
'OPTIONS': {
'connect_timeout': 10,
'options': '-c statement_timeout=30000', # 30 秒查詢超時
},
}
}
# 異步環境推薦設置
# - CONN_MAX_AGE: 較長的連接復用時間(減少連接開銷)
# - statement_timeout: 防止長時間查詢阻塞8. 實戰案例
8.1 案例 1:用戶儀表板(複雜並行查詢)
from django.http import JsonResponse
from .models import User, Post, Comment, Like, Follow
import asyncio
async def user_dashboard_api(request, user_id):
"""
用戶儀表板 API
需要並行查詢多個數據源
"""
# 第一批:基本資料和計數統計
basic_results = await asyncio.gather(
User.objects.aget(id=user_id),
Post.objects.filter(user_id=user_id).acount(),
Comment.objects.filter(user_id=user_id).acount(),
Like.objects.filter(user_id=user_id).acount(),
Follow.objects.filter(follower_id=user_id).acount(),
Follow.objects.filter(following_id=user_id).acount(),
)
user, post_count, comment_count, like_count, following_count, follower_count = basic_results
# 第二批:最近內容(使用第一批的結果)
recent_results = await asyncio.gather(
get_recent_posts(user_id, limit=5),
get_recent_comments(user_id, limit=10),
get_popular_posts(user_id, limit=3),
)
recent_posts, recent_comments, popular_posts = recent_results
return JsonResponse({
'user': {
'id': user.id,
'name': user.name,
'email': user.email,
'avatar': user.avatar,
},
'stats': {
'posts': post_count,
'comments': comment_count,
'likes': like_count,
'following': following_count,
'followers': follower_count,
},
'recent_posts': recent_posts,
'recent_comments': recent_comments,
'popular_posts': popular_posts,
})
async def get_recent_posts(user_id, limit=5):
"""獲取最近文章"""
posts = []
queryset = Post.objects.filter(
user_id=user_id
).order_by('-created_at')[:limit]
async for post in queryset:
posts.append({
'id': post.id,
'title': post.title,
'created_at': post.created_at.isoformat(),
})
return posts
async def get_recent_comments(user_id, limit=10):
"""獲取最近評論"""
comments = []
queryset = Comment.objects.filter(
user_id=user_id
).select_related('post').order_by('-created_at')[:limit]
async for comment in queryset:
comments.append({
'id': comment.id,
'content': comment.content[:50],
'post_title': comment.post.title,
'created_at': comment.created_at.isoformat(),
})
return comments
async def get_popular_posts(user_id, limit=3):
"""獲取熱門文章"""
posts = []
queryset = Post.objects.filter(
user_id=user_id
).order_by('-view_count')[:limit]
async for post in queryset:
posts.append({
'id': post.id,
'title': post.title,
'views': post.view_count,
})
return posts性能分析:
同步版本(串行):
├─ 基本查詢(6 個): 50 + 80 + 70 + 60 + 40 + 40 = 340ms
└─ 最近內容(3 個): 100 + 120 + 80 = 300ms
總耗時:640ms
異步版本(並行):
├─ 基本查詢(6 個並行): max(50, 80, 70, 60, 40, 40) = 80ms
└─ 最近內容(3 個並行): max(100, 120, 80) = 120ms
總耗時:200ms
性能提升:69% ↑8.2 案例 2:批量數據導出
import csv
from django.http import StreamingHttpResponse
class AsyncCSVBuffer:
"""異步 CSV 緩衝器"""
def __init__(self):
self.buffer = []
async def write_row(self, row):
self.buffer.append(row)
if len(self.buffer) >= 100: # 每 100 行 yield 一次
yield from self.flush()
def flush(self):
"""刷新緩衝區"""
output = '\n'.join([','.join(map(str, row)) for row in self.buffer])
self.buffer = []
return [output + '\n']
async def export_users_csv(request):
"""異步導出用戶數據為 CSV"""
async def generate():
# CSV 頭部
yield 'ID,Name,Email,Posts,Comments\n'
# 異步遍歷所有用戶
async for user in User.objects.all():
# 並行獲取用戶統計
post_count, comment_count = await asyncio.gather(
Post.objects.filter(user=user).acount(),
Comment.objects.filter(user=user).acount(),
)
# 生成 CSV 行
yield f'{user.id},{user.name},{user.email},{post_count},{comment_count}\n'
response = StreamingHttpResponse(
generate(),
content_type='text/csv'
)
response['Content-Disposition'] = 'attachment; filename="users.csv"'
return response8.3 案例 3:緩存與數據庫混合查詢
from django.core.cache import cache
import json
async def get_post_with_cache(request, post_id):
"""
優先從緩存獲取,緩存未命中則查詢數據庫
並行查詢文章和相關數據
"""
cache_key = f'post:{post_id}'
# 嘗試從緩存獲取
cached_data = await sync_to_async(cache.get)(cache_key)
if cached_data:
return JsonResponse(json.loads(cached_data))
# 緩存未命中,並行查詢數據庫
results = await asyncio.gather(
Post.objects.select_related('author').aget(id=post_id),
Comment.objects.filter(post_id=post_id).acount(),
Like.objects.filter(post_id=post_id).acount(),
)
post, comment_count, like_count = results
# 構建響應數據
data = {
'id': post.id,
'title': post.title,
'content': post.content,
'author': post.author.name,
'comments': comment_count,
'likes': like_count,
}
# 異步寫入緩存
await sync_to_async(cache.set)(
cache_key,
json.dumps(data),
timeout=300 # 5 分鐘
)
return JsonResponse(data)9. 面試常見問題
Q1: Django 異步 ORM 和同步 ORM 有什麼區別?
答案:
| 特性 | 同步 ORM | 異步 ORM |
|---|---|---|
| 執行方式 | 阻塞線程 | 非阻塞事件循環 |
| 方法前綴 | 無(get, save) | a 前綴(aget, asave) |
| 查詢遍歷 | for user in users | async for user in users |
| 並行查詢 | 無法並行 | asyncio.gather() |
| 性能 | 串行執行 | 並行執行 |
| Django 版本 | 任意版本 | 4.1+ |
Q2: 什麼時候應該使用異步 ORM?
答案:
✅ 應該使用:
- 需要並行執行多個數據庫查詢
- I/O 密集型場景(API 調用 + 數據庫查詢)
- 高並發需求(> 1000 QPS)
- 已經在使用 ASGI 和異步視圖
❌ 不應該使用:
- 單個簡單查詢(沒有並行優勢)
- 批量更新操作(目前不支持)
- 複雜的 ORM 操作(如聚合、註解)
- 使用 WSGI 服務器
Q3: 如何並行執行多個異步查詢?
答案:
使用 asyncio.gather():
# 並行執行 3 個查詢
user, posts, comments = await asyncio.gather(
User.objects.aget(id=1),
Post.objects.filter(user_id=1).acount(),
Comment.objects.filter(user_id=1).acount(),
)注意:
- ✅ 適合獨立的查詢(不互相依賴)
- ❌ 不適合有依賴關係的查詢(需要串行)
Q4: 異步 ORM 支持事務嗎?
答案:
支持(Django 4.2+):
from django.db import transaction
async def transfer(from_id, to_id, amount):
async with transaction.atomic():
from_user = await User.objects.aget(id=from_id)
to_user = await User.objects.aget(id=to_id)
from_user.balance -= amount
await from_user.asave()
to_user.balance += amount
await to_user.asave()限制:
- 🟡 部分數據庫驅動可能不完全支持
- 🟡 嵌套事務支持有限
Q5: 為什麼單個異步查詢比同步慢?
答案:
因為異步有額外開銷:
- 協程創建和調度
- 事件循環管理
- 上下文切換
同步查詢:
發起查詢 → 等待結果 → 返回
耗時:50ms
異步查詢:
創建協程 → 發起查詢 → 掛起 → 恢復 → 返回
耗時:50ms + 協程開銷(1-2ms)結論:
- 單個查詢:同步更快
- 多個查詢:異步更快(並行執行)
10. 總結
10.1 核心要點
異步 ORM:
- Django 4.1+ 完整支持
- 方法前綴
a:aget(),asave(),acount() - 使用
async for遍歷 AsyncQuerySet
性能優勢:
- 並行查詢:提升 50-80%
- 高並發:吞吐量提升 10-50 倍
- 不阻塞事件循環
最佳實踐:
- ✅ 多個獨立查詢 → 使用
asyncio.gather() - ✅ I/O 密集型 → 異步 ORM
- ❌ 單個查詢 → 同步 ORM(更簡單)
- ❌ 批量操作 → 使用
sync_to_async
- ✅ 多個獨立查詢 → 使用
限制:
- 需要異步數據庫驅動
- 部分 ORM 功能不支持(批量更新、聚合)
- 需要 Django 4.1+
10.2 快速決策表
| 場景 | 推薦 | 理由 |
|---|---|---|
| 多個並行查詢 | 異步 ✅ | 性能提升明顯 |
| 單個查詢 | 同步 ✅ | 更簡單,性能相當 |
| 複雜聚合 | 同步 ✅ | 異步支持有限 |
| 批量更新 | 同步 ✅ | 異步不支持 |
| 高並發場景 | 異步 ✅ | 吞吐量高 |
參考資料
官方文檔:
數據庫驅動:
深入閱讀:
下一篇預告:12-4. asyncio 在 Django 中的使用 - 深入探討如何在 Django 中充分利用 asyncio 的強大功能