Django 面試準備 04-4:CPU 使用率過高
深入診斷與優化 CPU 密集型性能問題
目錄
04-4. CPU 使用率過高
CPU 使用率過高會導致請求響應變慢、系統負載升高。本章將深入探討 CPU 性能問題的診斷與優化。
1. 什麼是 CPU 使用率過高?
定義
CPU 使用率過高 通常指:
- 單核心 CPU 使用率 > 80%:該核心接近飽和
- 整體 CPU 使用率 > 70%:系統負載過高
- Load Average > CPU 核心數:有任務在等待 CPU
# 查看 CPU 使用率
top
# 輸出:
# %Cpu(s): 85.2 us, 2.3 sy, 0.0 ni, 12.5 id
# ↑ 用戶空間 ↑ 系統調用 ↑ 空閒
# 或使用 htop(更直觀)
htop正常 vs 異常
# ✅ 正常:
# - CPU 使用率:30-60%
# - Load Average:< CPU 核心數
# - 響應時間:< 100ms
# ❌ 異常:
# - CPU 使用率:> 80% 持續
# - Load Average:> CPU 核心數 × 1.5
# - 響應時間:> 1000ms
# - CPU Wait:> 10%(等待 I/O)2. 為什麼會發生 CPU 使用率過高?
原因 1:無效率的演算法
# ❌ 錯誤:O(n²) 複雜度
def find_duplicates_slow(request):
items = Item.objects.all() # 10,000 筆
duplicates = []
for i, item1 in enumerate(items):
for j, item2 in enumerate(items):
if i != j and item1.name == item2.name:
duplicates.append(item1)
# 10,000 × 10,000 = 100,000,000 次比較
# CPU 使用率 100%,耗時數分鐘
return JsonResponse({'duplicates': len(duplicates)})
# ✅ 正確:O(n) 複雜度
def find_duplicates_fast(request):
items = Item.objects.values_list('name', flat=True)
# 使用集合,只遍歷一次
seen = set()
duplicates = []
for name in items:
if name in seen:
duplicates.append(name)
seen.add(name)
# 10,000 次操作,幾毫秒完成
return JsonResponse({'duplicates': len(duplicates)})原因 2:過多的正則表達式
# ❌ 錯誤:每次請求都編譯正則
import re
def validate_email(request):
email = request.POST['email']
# 每次都編譯正則(CPU 密集)
if re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', email):
return JsonResponse({'valid': True})
return JsonResponse({'valid': False})
# ✅ 正確:預先編譯正則
import re
EMAIL_PATTERN = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')
def validate_email_fast(request):
email = request.POST['email']
if EMAIL_PATTERN.match(email):
return JsonResponse({'valid': True})
return JsonResponse({'valid': False})原因 3:序列化大量資料
# ❌ 錯誤:序列化大量複雜物件
from rest_framework import serializers
class UserSerializer(serializers.ModelSerializer):
posts = serializers.SerializerMethodField()
followers = serializers.SerializerMethodField()
def get_posts(self, obj):
# 每個用戶都查詢一次 posts(N+1 問題)
return obj.posts.count()
def get_followers(self, obj):
# 每個用戶都查詢一次 followers(N+1 問題)
return obj.followers.count()
class Meta:
model = User
fields = ['id', 'name', 'posts', 'followers']
def list_users(request):
users = User.objects.all() # 10,000 筆
serializer = UserSerializer(users, many=True)
# 10,000 個用戶 × 2 次查詢 = 20,000 次查詢
# CPU 和資料庫都爆炸
return JsonResponse(serializer.data, safe=False)
# ✅ 正確:使用 annotate 預先計算
from django.db.models import Count
def list_users_fast(request):
users = User.objects.annotate(
post_count=Count('posts'),
follower_count=Count('followers')
).values('id', 'name', 'post_count', 'follower_count')
# 只有 1 次查詢,直接返回字典
return JsonResponse(list(users), safe=False)原因 4:模板渲染複雜
<!-- ❌ 錯誤:模板中執行複雜邏輯 -->
{% for user in users %}
<div>
{{ user.name }}
<!-- 每次都呼叫函數計算 -->
Posts: {% for post in user.posts.all %}...{% endfor %}
Followers: {{ user.followers.count }}
</div>
{% endfor %}
<!-- 10,000 個用戶 × 複雜計算 = CPU 爆炸 --># ✅ 正確:在 view 中預先計算
def list_users_template(request):
users = User.objects.annotate(
post_count=Count('posts'),
follower_count=Count('followers')
)
# 模板只需要顯示,不需要計算
return render(request, 'users.html', {'users': users})原因 5:圖片/影片處理
# ❌ 錯誤:同步處理大型圖片
from PIL import Image
def process_image_sync(request):
image = request.FILES['image']
# 讀取 10MB 圖片
img = Image.open(image)
# CPU 密集:調整大小、濾鏡、壓縮
img = img.resize((1920, 1080)) # 5 秒
img = img.filter(ImageFilter.BLUR) # 3 秒
img.save('/tmp/output.jpg', quality=85) # 2 秒
# 總共 10 秒,CPU 100%
return JsonResponse({'status': 'ok'})
# ✅ 正確:使用 Celery 異步處理
from celery import shared_task
@shared_task
def process_image_task(image_path):
img = Image.open(image_path)
img = img.resize((1920, 1080))
img = img.filter(ImageFilter.BLUR)
img.save('/tmp/output.jpg', quality=85)
return '/tmp/output.jpg'
def upload_image(request):
image = request.FILES['image']
# 儲存到臨時位置
temp_path = f'/tmp/{image.name}'
with open(temp_path, 'wb') as f:
f.write(image.read())
# 異步處理
task = process_image_task.delay(temp_path)
return JsonResponse({
'task_id': task.id,
'status': 'processing'
})原因 6:JSON 解析大型資料
# ❌ 錯誤:解析 100MB JSON
import json
def parse_large_json(request):
data = request.body # 100MB JSON
# 一次性解析整個 JSON(CPU 密集)
parsed = json.loads(data) # 5 秒,CPU 100%
# 處理資料
results = process_data(parsed)
return JsonResponse({'results': results})
# ✅ 正確:使用流式解析
import ijson
def parse_large_json_stream(request):
# 流式解析,逐項處理
results = []
for item in ijson.items(request, 'items.item'):
results.append(process_item(item))
# 記憶體占用低,CPU 分散
return JsonResponse({'results': results})3. 如何診斷 CPU 使用率過高?
步驟 1:找出 CPU 使用率最高的進程
# 使用 top 找出高 CPU 進程
top -o %CPU
# 輸出:
# PID USER %CPU %MEM COMMAND
# 12345 www 95.2 3.1 gunicorn: worker [myapp]
# 12346 www 88.7 3.2 gunicorn: worker [myapp]
# 或使用 htop(更直觀)
htop
# 按 F6 選擇排序方式 → PERCENT_CPU步驟 2:使用 py-spy 分析 Worker
# 安裝 py-spy
pip install py-spy
# 找出 Worker PID
ps aux | grep gunicorn
# 實時監控 Worker 在執行什麼
sudo py-spy top --pid 12345
# 輸出:(顯示最耗 CPU 的函數)
# %Own %Total OwnTime TotalTime Function (filename:line)
# 45.00% 45.00% 4.50s 4.50s find_duplicates (views.py:123)
# 25.00% 25.00% 2.50s 2.50s re.match (re.py:234)
# 15.00% 15.00% 1.50s 1.50s json.loads (json/__init__.py:345)
# 生成火焰圖(Flame Graph)
sudo py-spy record --pid 12345 --output profile.svg --duration 60
# 打開 profile.svg 查看函數調用棧步驟 3:使用 cProfile 分析
# views.py
import cProfile
import pstats
from io import StringIO
def profile_view(request):
"""分析特定 view 的性能"""
profiler = cProfile.Profile()
# 開始分析
profiler.enable()
# 執行業務邏輯
result = expensive_function()
# 停止分析
profiler.disable()
# 輸出報告
s = StringIO()
ps = pstats.Stats(profiler, stream=s).sort_stats('cumulative')
ps.print_stats(20) # 顯示前 20 個最耗時的函數
return JsonResponse({
'result': result,
'profile': s.getvalue()
})# 輸出:
# ncalls tottime percall cumtime percall filename:lineno(function)
# 1000 2.500 0.003 5.000 0.005 views.py:123(find_duplicates)
# 10000 1.500 0.000 1.500 0.000 re.py:234(match)
# 1 0.800 0.800 0.800 0.800 json/__init__.py:345(loads)步驟 4:使用 Django Debug Toolbar
# settings.py(開發環境)
if DEBUG:
INSTALLED_APPS += ['debug_toolbar']
MIDDLEWARE += ['debug_toolbar.middleware.DebugToolbarMiddleware']
# 可以看到:
# - SQL 查詢時間
# - 模板渲染時間
# - 信號處理時間
# - 快取命中率步驟 5:使用 Django Silk 記錄請求
# 安裝 Django Silk
pip install django-silk
# settings.py
INSTALLED_APPS += ['silk']
MIDDLEWARE += ['silk.middleware.SilkyMiddleware']
# urls.py
urlpatterns += [path('silk/', include('silk.urls', namespace='silk'))]
# 運行遷移
python manage.py migrate
# 訪問 http://localhost:8000/silk/
# 可以看到:
# - 每個請求的執行時間
# - SQL 查詢詳情
# - 慢請求排行4. 解決方案
方案 1:優化演算法
# ❌ O(n²) → ✅ O(n log n)
def sort_items_slow(request):
items = Item.objects.values_list('price', flat=True)
# 冒泡排序 O(n²)
for i in range(len(items)):
for j in range(len(items) - 1):
if items[j] > items[j + 1]:
items[j], items[j + 1] = items[j + 1], items[j]
return JsonResponse({'sorted': list(items)})
# ✅ 正確:使用內建排序 O(n log n)
def sort_items_fast(request):
items = Item.objects.values_list('price', flat=True)
# Python 內建排序(Timsort)
sorted_items = sorted(items)
return JsonResponse({'sorted': sorted_items})
# 或直接在資料庫排序
def sort_items_db(request):
items = Item.objects.order_by('price').values_list('price', flat=True)
return JsonResponse({'sorted': list(items)})方案 2:使用快取
# ✅ 使用 Redis 快取計算結果
from django.core.cache import cache
from django.views.decorators.cache import cache_page
@cache_page(300) # 快取 5 分鐘
def expensive_report(request):
cache_key = 'sales_report'
report = cache.get(cache_key)
if report is None:
# CPU 密集的計算
report = calculate_sales_report() # 10 秒
# 儲存到快取
cache.set(cache_key, report, timeout=300)
return JsonResponse(report)
# 快取命中率 90% → CPU 使用率降低 90%方案 3:使用 Celery 異步處理
# ✅ CPU 密集任務移到 Celery
from celery import shared_task
@shared_task
def generate_pdf_task(report_id):
"""CPU 密集:生成 PDF 報表"""
report = Report.objects.get(id=report_id)
# 生成 PDF(10 秒)
pdf = generate_pdf(report)
# 儲存到 S3
url = upload_to_s3(pdf)
# 發送郵件通知
send_email(report.user.email, url)
return url
def request_report(request):
report_id = request.POST['report_id']
# 異步生成
task = generate_pdf_task.delay(report_id)
return JsonResponse({
'task_id': task.id,
'message': '報表生成中,完成後將發送郵件'
})方案 4:增加 Worker 數量(適用於 CPU 密集)
# gunicorn.conf.py
import multiprocessing
# ✅ CPU 密集型:workers = CPU 核心數
workers = multiprocessing.cpu_count()
# 使用 sync worker(避免 GIL 問題)
worker_class = 'sync'
# 每個 worker 只處理一個請求
# CPU 負載平均分配到所有核心方案 5:使用資料庫索引
# ❌ 錯誤:全表掃描
def search_users(request):
keyword = request.GET['q']
# 沒有索引,全表掃描(CPU 密集)
users = User.objects.filter(name__icontains=keyword)
return JsonResponse({'users': list(users.values())})
# ✅ 正確:添加索引
class User(models.Model):
name = models.CharField(max_length=100, db_index=True) # 添加索引
class Meta:
indexes = [
models.Index(fields=['name']), # 或使用 Meta.indexes
]
# 查詢速度提升 100 倍,CPU 使用率降低方案 6:使用 select_related 和 prefetch_related
# ❌ 錯誤:N+1 查詢
def list_posts(request):
posts = Post.objects.all()
result = []
for post in posts:
result.append({
'title': post.title,
'author': post.author.name, # N+1
'comments': post.comments.count(), # N+1
})
# 1 + 1000 + 1000 = 2001 次查詢
# CPU 和資料庫都爆炸
# ✅ 正確:使用 select_related 和 prefetch_related
def list_posts_optimized(request):
posts = Post.objects.select_related('author') \
.prefetch_related('comments') \
.all()
result = []
for post in posts:
result.append({
'title': post.title,
'author': post.author.name, # 不會額外查詢
'comments': post.comments.count(), # 不會額外查詢
})
# 只有 2 次查詢(1 次 posts,1 次 comments)5. 最佳實踐
原則 1:設置 CPU 告警
# monitoring.py
import psutil
import logging
logger = logging.getLogger(__name__)
def check_cpu_usage():
"""檢查 CPU 使用率"""
cpu_percent = psutil.cpu_percent(interval=1)
if cpu_percent > 80:
logger.critical(f"High CPU usage: {cpu_percent}%")
# 發送告警(例如:發送到 Sentry)
return cpu_percent
# middleware.py
class CPUMonitorMiddleware:
def __call__(self, request):
cpu_before = psutil.cpu_percent()
response = self.get_response(request)
cpu_after = psutil.cpu_percent()
if cpu_after - cpu_before > 20:
logger.warning(
f"Request {request.path} caused CPU spike: "
f"{cpu_before}% → {cpu_after}%"
)
return response原則 2:定期性能測試
# 使用 locust 進行負載測試
pip install locust
# locustfile.py
from locust import HttpUser, task, between
class MyUser(HttpUser):
wait_time = between(1, 3)
@task
def expensive_endpoint(self):
self.client.get("/api/expensive-report/")
# 運行測試
locust -f locustfile.py --host=http://localhost:8000
# 訪問 http://localhost:8089
# 模擬 1000 個並發用戶,觀察 CPU 使用率原則 3:使用 APM 工具
# 使用 New Relic、Datadog、Sentry 等 APM 工具
# 安裝 New Relic
pip install newrelic
# newrelic.ini 配置
[newrelic]
app_name = My Django App
license_key = YOUR_LICENSE_KEY
# 啟動
NEW_RELIC_CONFIG_FILE=newrelic.ini newrelic-admin run-program gunicorn myapp.wsgi
# 可以看到:
# - 每個 view 的 CPU 時間
# - 慢查詢排行
# - 外部 API 調用時間
# - 錯誤率和異常原則 4:使用 QuerySet 的 only() 和 defer()
# ✅ 只載入需要的欄位
def list_users_minimal(request):
# 只載入 id 和 name
users = User.objects.only('id', 'name')
return JsonResponse({
'users': list(users.values('id', 'name'))
})
# 或排除大型欄位
def list_users_without_bio(request):
# 排除 bio 欄位(可能很長)
users = User.objects.defer('bio')
return JsonResponse({
'users': list(users.values('id', 'name', 'email'))
})6. 實戰案例:報表生成導致 CPU 飆高
問題描述
# ❌ 問題:生成報表時 CPU 100%
def sales_report(request):
start_date = request.GET['start']
end_date = request.GET['end']
# 查詢 3 個月的訂單(100,000 筆)
orders = Order.objects.filter(
created_at__range=[start_date, end_date]
).select_related('user', 'product')
# CPU 密集:計算統計資料
stats = {
'total_sales': 0,
'total_revenue': 0,
'by_product': {},
'by_category': {},
'by_date': {},
}
# 遍歷 100,000 筆(CPU 密集)
for order in orders:
stats['total_sales'] += 1
stats['total_revenue'] += order.total
# 分組統計
product_name = order.product.name
stats['by_product'][product_name] = stats['by_product'].get(product_name, 0) + 1
# ... 更多統計
# 耗時 30 秒,CPU 100%
return JsonResponse(stats)解決方案
# ✅ 方案 1:使用資料庫聚合
from django.db.models import Count, Sum
from django.db.models.functions import TruncDate
def sales_report_optimized(request):
start_date = request.GET['start']
end_date = request.GET['end']
# 在資料庫中計算聚合(快得多)
stats = Order.objects.filter(
created_at__range=[start_date, end_date]
).aggregate(
total_sales=Count('id'),
total_revenue=Sum('total')
)
# 按產品分組
by_product = Order.objects.filter(
created_at__range=[start_date, end_date]
).values('product__name').annotate(
count=Count('id'),
revenue=Sum('total')
)
# 按日期分組
by_date = Order.objects.filter(
created_at__range=[start_date, end_date]
).annotate(
date=TruncDate('created_at')
).values('date').annotate(
count=Count('id'),
revenue=Sum('total')
)
# 耗時 < 1 秒,CPU < 20%
return JsonResponse({
'total_sales': stats['total_sales'],
'total_revenue': float(stats['total_revenue'] or 0),
'by_product': list(by_product),
'by_date': list(by_date),
})
# ✅ 方案 2:快取結果
from django.core.cache import cache
def sales_report_cached(request):
start_date = request.GET['start']
end_date = request.GET['end']
cache_key = f'sales_report_{start_date}_{end_date}'
# 先檢查快取
report = cache.get(cache_key)
if report is None:
# 快取未命中,計算報表
report = calculate_report(start_date, end_date)
# 儲存到快取,TTL 1 小時
cache.set(cache_key, report, timeout=3600)
return JsonResponse(report)
# ✅ 方案 3:異步生成
from celery import shared_task
@shared_task
def generate_sales_report_task(user_email, start_date, end_date):
report = calculate_report(start_date, end_date)
# 儲存到 S3
file_url = save_report_to_s3(report)
# 發送郵件
send_email(user_email, file_url)
def request_sales_report(request):
start_date = request.GET['start']
end_date = request.GET['end']
# 異步生成
task = generate_sales_report_task.delay(
request.user.email,
start_date,
end_date
)
return JsonResponse({
'task_id': task.id,
'message': '報表生成中,完成後將發送郵件'
})面試常見問題
Q1:如何診斷 Django 應用的 CPU 瓶頸?
答案:
使用三種工具:
py-spy:實時監控 Worker
sudo py-spy top --pid <worker_pid>cProfile:分析特定函數
import cProfile cProfile.run('my_function()')Django Debug Toolbar / Silk:查看每個請求的詳細資訊
分析重點:
- 哪個函數最耗 CPU?
- 是演算法問題還是資料量問題?
- 是否有 N+1 查詢?
Q2:為什麼 CPU 密集型任務不適合使用 Async Worker?
答案:
因為 Python GIL(Global Interpreter Lock):
# GIL 限制:
# - 同一時間只有一個線程執行 Python bytecode
# - Async/Thread 只能利用 I/O 等待時間
# - CPU 密集任務無法真正並行
# 示例:
import asyncio
async def cpu_task():
# CPU 密集計算
result = sum(range(10000000))
return result
# 即使使用 async,仍然是串行執行
# 無法利用多核心 CPU正確做法: 使用多進程(Sync Worker)或 Celery。
Q3:如何優化 N+1 查詢問題?
答案:
使用 select_related 和 prefetch_related:
# ❌ N+1 查詢
posts = Post.objects.all()
for post in posts:
print(post.author.name) # 每次都查詢
# ✅ select_related(ForeignKey、OneToOneField)
posts = Post.objects.select_related('author')
for post in posts:
print(post.author.name) # 不會額外查詢
# ✅ prefetch_related(ManyToManyField、反向 ForeignKey)
posts = Post.objects.prefetch_related('comments')
for post in posts:
print(post.comments.count()) # 不會額外查詢差異:
select_related:使用 JOIN,單次查詢prefetch_related:使用 IN 查詢,兩次查詢
Q4:Load Average 是什麼?如何判斷系統負載?
答案:
Load Average 是等待 CPU 執行的平均進程數量:
uptime
# 輸出:load average: 2.50, 1.80, 1.20
# ↑ ↑ ↑
# 1分鐘 5分鐘 15分鐘判斷:
- Load < CPU 核心數:✅ 正常
- Load = CPU 核心數:⚠️ 接近飽和
- Load > CPU 核心數:❌ 過載,有進程在等待
例如:
- 4 核 CPU
- Load Average: 8.0 → 有 4 個進程在等待 CPU
小結
CPU 使用率過高問題的處理原則:
- 診斷工具:py-spy、cProfile、Django Silk
- 優化演算法:O(n²) → O(n log n)
- 使用快取:減少重複計算
- 異步處理:CPU 密集任務移到 Celery
- 資料庫優化:添加索引、使用聚合、避免 N+1 查詢
- 增加 Worker:CPU 密集型使用 Sync Worker
- 監控告警:CPU > 80% 立即告警
記住:CPU 是最寶貴的資源,要用在刀口上!