Django 面試準備 04-4:CPU 使用率過高

深入診斷與優化 CPU 密集型性能問題

04-4. CPU 使用率過高

CPU 使用率過高會導致請求響應變慢、系統負載升高。本章將深入探討 CPU 性能問題的診斷與優化。


1. 什麼是 CPU 使用率過高?

定義

CPU 使用率過高 通常指:

  • 單核心 CPU 使用率 > 80%:該核心接近飽和
  • 整體 CPU 使用率 > 70%:系統負載過高
  • Load Average > CPU 核心數:有任務在等待 CPU
# 查看 CPU 使用率
top

# 輸出:
# %Cpu(s): 85.2 us,  2.3 sy,  0.0 ni, 12.5 id
#          ↑ 用戶空間  ↑ 系統調用  ↑ 空閒

# 或使用 htop(更直觀)
htop

正常 vs 異常

# ✅ 正常:
# - CPU 使用率:30-60%
# - Load Average:< CPU 核心數
# - 響應時間:< 100ms

# ❌ 異常:
# - CPU 使用率:> 80% 持續
# - Load Average:> CPU 核心數 × 1.5
# - 響應時間:> 1000ms
# - CPU Wait:> 10%(等待 I/O)

2. 為什麼會發生 CPU 使用率過高?

原因 1:無效率的演算法

# ❌ 錯誤:O(n²) 複雜度
def find_duplicates_slow(request):
    items = Item.objects.all()  # 10,000 筆

    duplicates = []
    for i, item1 in enumerate(items):
        for j, item2 in enumerate(items):
            if i != j and item1.name == item2.name:
                duplicates.append(item1)

    # 10,000 × 10,000 = 100,000,000 次比較
    # CPU 使用率 100%,耗時數分鐘
    return JsonResponse({'duplicates': len(duplicates)})

# ✅ 正確:O(n) 複雜度
def find_duplicates_fast(request):
    items = Item.objects.values_list('name', flat=True)

    # 使用集合,只遍歷一次
    seen = set()
    duplicates = []

    for name in items:
        if name in seen:
            duplicates.append(name)
        seen.add(name)

    # 10,000 次操作,幾毫秒完成
    return JsonResponse({'duplicates': len(duplicates)})

原因 2:過多的正則表達式

# ❌ 錯誤:每次請求都編譯正則
import re

def validate_email(request):
    email = request.POST['email']

    # 每次都編譯正則(CPU 密集)
    if re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', email):
        return JsonResponse({'valid': True})

    return JsonResponse({'valid': False})

# ✅ 正確:預先編譯正則
import re

EMAIL_PATTERN = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')

def validate_email_fast(request):
    email = request.POST['email']

    if EMAIL_PATTERN.match(email):
        return JsonResponse({'valid': True})

    return JsonResponse({'valid': False})

原因 3:序列化大量資料

# ❌ 錯誤:序列化大量複雜物件
from rest_framework import serializers

class UserSerializer(serializers.ModelSerializer):
    posts = serializers.SerializerMethodField()
    followers = serializers.SerializerMethodField()

    def get_posts(self, obj):
        # 每個用戶都查詢一次 posts(N+1 問題)
        return obj.posts.count()

    def get_followers(self, obj):
        # 每個用戶都查詢一次 followers(N+1 問題)
        return obj.followers.count()

    class Meta:
        model = User
        fields = ['id', 'name', 'posts', 'followers']

def list_users(request):
    users = User.objects.all()  # 10,000 筆
    serializer = UserSerializer(users, many=True)

    # 10,000 個用戶 × 2 次查詢 = 20,000 次查詢
    # CPU 和資料庫都爆炸
    return JsonResponse(serializer.data, safe=False)

# ✅ 正確:使用 annotate 預先計算
from django.db.models import Count

def list_users_fast(request):
    users = User.objects.annotate(
        post_count=Count('posts'),
        follower_count=Count('followers')
    ).values('id', 'name', 'post_count', 'follower_count')

    # 只有 1 次查詢,直接返回字典
    return JsonResponse(list(users), safe=False)

原因 4:模板渲染複雜

<!-- ❌ 錯誤:模板中執行複雜邏輯 -->
{% for user in users %}
  <div>
    {{ user.name }}
    <!-- 每次都呼叫函數計算 -->
    Posts: {% for post in user.posts.all %}...{% endfor %}
    Followers: {{ user.followers.count }}
  </div>
{% endfor %}

<!-- 10,000 個用戶 × 複雜計算 = CPU 爆炸 -->
# ✅ 正確:在 view 中預先計算
def list_users_template(request):
    users = User.objects.annotate(
        post_count=Count('posts'),
        follower_count=Count('followers')
    )

    # 模板只需要顯示,不需要計算
    return render(request, 'users.html', {'users': users})

原因 5:圖片/影片處理

# ❌ 錯誤:同步處理大型圖片
from PIL import Image

def process_image_sync(request):
    image = request.FILES['image']

    # 讀取 10MB 圖片
    img = Image.open(image)

    # CPU 密集:調整大小、濾鏡、壓縮
    img = img.resize((1920, 1080))  # 5 秒
    img = img.filter(ImageFilter.BLUR)  # 3 秒
    img.save('/tmp/output.jpg', quality=85)  # 2 秒

    # 總共 10 秒,CPU 100%
    return JsonResponse({'status': 'ok'})

# ✅ 正確:使用 Celery 異步處理
from celery import shared_task

@shared_task
def process_image_task(image_path):
    img = Image.open(image_path)
    img = img.resize((1920, 1080))
    img = img.filter(ImageFilter.BLUR)
    img.save('/tmp/output.jpg', quality=85)
    return '/tmp/output.jpg'

def upload_image(request):
    image = request.FILES['image']

    # 儲存到臨時位置
    temp_path = f'/tmp/{image.name}'
    with open(temp_path, 'wb') as f:
        f.write(image.read())

    # 異步處理
    task = process_image_task.delay(temp_path)

    return JsonResponse({
        'task_id': task.id,
        'status': 'processing'
    })

原因 6:JSON 解析大型資料

# ❌ 錯誤:解析 100MB JSON
import json

def parse_large_json(request):
    data = request.body  # 100MB JSON

    # 一次性解析整個 JSON(CPU 密集)
    parsed = json.loads(data)  # 5 秒,CPU 100%

    # 處理資料
    results = process_data(parsed)

    return JsonResponse({'results': results})

# ✅ 正確:使用流式解析
import ijson

def parse_large_json_stream(request):
    # 流式解析,逐項處理
    results = []

    for item in ijson.items(request, 'items.item'):
        results.append(process_item(item))

    # 記憶體占用低,CPU 分散
    return JsonResponse({'results': results})

3. 如何診斷 CPU 使用率過高?

步驟 1:找出 CPU 使用率最高的進程

# 使用 top 找出高 CPU 進程
top -o %CPU

# 輸出:
#   PID  USER  %CPU  %MEM  COMMAND
# 12345  www   95.2   3.1  gunicorn: worker [myapp]
# 12346  www   88.7   3.2  gunicorn: worker [myapp]

# 或使用 htop(更直觀)
htop

# 按 F6 選擇排序方式 → PERCENT_CPU

步驟 2:使用 py-spy 分析 Worker

# 安裝 py-spy
pip install py-spy

# 找出 Worker PID
ps aux | grep gunicorn

# 實時監控 Worker 在執行什麼
sudo py-spy top --pid 12345

# 輸出:(顯示最耗 CPU 的函數)
#   %Own   %Total  OwnTime  TotalTime  Function (filename:line)
#  45.00%  45.00%    4.50s      4.50s   find_duplicates (views.py:123)
#  25.00%  25.00%    2.50s      2.50s   re.match (re.py:234)
#  15.00%  15.00%    1.50s      1.50s   json.loads (json/__init__.py:345)

# 生成火焰圖(Flame Graph)
sudo py-spy record --pid 12345 --output profile.svg --duration 60

# 打開 profile.svg 查看函數調用棧

步驟 3:使用 cProfile 分析

# views.py
import cProfile
import pstats
from io import StringIO

def profile_view(request):
    """分析特定 view 的性能"""
    profiler = cProfile.Profile()

    # 開始分析
    profiler.enable()

    # 執行業務邏輯
    result = expensive_function()

    # 停止分析
    profiler.disable()

    # 輸出報告
    s = StringIO()
    ps = pstats.Stats(profiler, stream=s).sort_stats('cumulative')
    ps.print_stats(20)  # 顯示前 20 個最耗時的函數

    return JsonResponse({
        'result': result,
        'profile': s.getvalue()
    })
# 輸出:
#    ncalls  tottime  percall  cumtime  percall filename:lineno(function)
#      1000    2.500    0.003    5.000    0.005 views.py:123(find_duplicates)
#     10000    1.500    0.000    1.500    0.000 re.py:234(match)
#         1    0.800    0.800    0.800    0.800 json/__init__.py:345(loads)

步驟 4:使用 Django Debug Toolbar

# settings.py(開發環境)
if DEBUG:
    INSTALLED_APPS += ['debug_toolbar']
    MIDDLEWARE += ['debug_toolbar.middleware.DebugToolbarMiddleware']

# 可以看到:
# - SQL 查詢時間
# - 模板渲染時間
# - 信號處理時間
# - 快取命中率

步驟 5:使用 Django Silk 記錄請求

# 安裝 Django Silk
pip install django-silk

# settings.py
INSTALLED_APPS += ['silk']
MIDDLEWARE += ['silk.middleware.SilkyMiddleware']

# urls.py
urlpatterns += [path('silk/', include('silk.urls', namespace='silk'))]

# 運行遷移
python manage.py migrate

# 訪問 http://localhost:8000/silk/
# 可以看到:
# - 每個請求的執行時間
# - SQL 查詢詳情
# - 慢請求排行

4. 解決方案

方案 1:優化演算法

# ❌ O(n²) → ✅ O(n log n)
def sort_items_slow(request):
    items = Item.objects.values_list('price', flat=True)

    # 冒泡排序 O(n²)
    for i in range(len(items)):
        for j in range(len(items) - 1):
            if items[j] > items[j + 1]:
                items[j], items[j + 1] = items[j + 1], items[j]

    return JsonResponse({'sorted': list(items)})

# ✅ 正確:使用內建排序 O(n log n)
def sort_items_fast(request):
    items = Item.objects.values_list('price', flat=True)

    # Python 內建排序(Timsort)
    sorted_items = sorted(items)

    return JsonResponse({'sorted': sorted_items})

# 或直接在資料庫排序
def sort_items_db(request):
    items = Item.objects.order_by('price').values_list('price', flat=True)
    return JsonResponse({'sorted': list(items)})

方案 2:使用快取

# ✅ 使用 Redis 快取計算結果
from django.core.cache import cache
from django.views.decorators.cache import cache_page

@cache_page(300)  # 快取 5 分鐘
def expensive_report(request):
    cache_key = 'sales_report'

    report = cache.get(cache_key)

    if report is None:
        # CPU 密集的計算
        report = calculate_sales_report()  # 10 秒

        # 儲存到快取
        cache.set(cache_key, report, timeout=300)

    return JsonResponse(report)

# 快取命中率 90% → CPU 使用率降低 90%

方案 3:使用 Celery 異步處理

# ✅ CPU 密集任務移到 Celery
from celery import shared_task

@shared_task
def generate_pdf_task(report_id):
    """CPU 密集:生成 PDF 報表"""
    report = Report.objects.get(id=report_id)

    # 生成 PDF(10 秒)
    pdf = generate_pdf(report)

    # 儲存到 S3
    url = upload_to_s3(pdf)

    # 發送郵件通知
    send_email(report.user.email, url)

    return url

def request_report(request):
    report_id = request.POST['report_id']

    # 異步生成
    task = generate_pdf_task.delay(report_id)

    return JsonResponse({
        'task_id': task.id,
        'message': '報表生成中,完成後將發送郵件'
    })

方案 4:增加 Worker 數量(適用於 CPU 密集)

# gunicorn.conf.py
import multiprocessing

# ✅ CPU 密集型:workers = CPU 核心數
workers = multiprocessing.cpu_count()

# 使用 sync worker(避免 GIL 問題)
worker_class = 'sync'

# 每個 worker 只處理一個請求
# CPU 負載平均分配到所有核心

方案 5:使用資料庫索引

# ❌ 錯誤:全表掃描
def search_users(request):
    keyword = request.GET['q']

    # 沒有索引,全表掃描(CPU 密集)
    users = User.objects.filter(name__icontains=keyword)

    return JsonResponse({'users': list(users.values())})

# ✅ 正確:添加索引
class User(models.Model):
    name = models.CharField(max_length=100, db_index=True)  # 添加索引

    class Meta:
        indexes = [
            models.Index(fields=['name']),  # 或使用 Meta.indexes
        ]

# 查詢速度提升 100 倍,CPU 使用率降低
# ❌ 錯誤:N+1 查詢
def list_posts(request):
    posts = Post.objects.all()

    result = []
    for post in posts:
        result.append({
            'title': post.title,
            'author': post.author.name,  # N+1
            'comments': post.comments.count(),  # N+1
        })

    # 1 + 1000 + 1000 = 2001 次查詢
    # CPU 和資料庫都爆炸

# ✅ 正確:使用 select_related 和 prefetch_related
def list_posts_optimized(request):
    posts = Post.objects.select_related('author') \
                        .prefetch_related('comments') \
                        .all()

    result = []
    for post in posts:
        result.append({
            'title': post.title,
            'author': post.author.name,  # 不會額外查詢
            'comments': post.comments.count(),  # 不會額外查詢
        })

    # 只有 2 次查詢(1 次 posts,1 次 comments)

5. 最佳實踐

原則 1:設置 CPU 告警

# monitoring.py
import psutil
import logging

logger = logging.getLogger(__name__)

def check_cpu_usage():
    """檢查 CPU 使用率"""
    cpu_percent = psutil.cpu_percent(interval=1)

    if cpu_percent > 80:
        logger.critical(f"High CPU usage: {cpu_percent}%")
        # 發送告警(例如:發送到 Sentry)

    return cpu_percent

# middleware.py
class CPUMonitorMiddleware:
    def __call__(self, request):
        cpu_before = psutil.cpu_percent()

        response = self.get_response(request)

        cpu_after = psutil.cpu_percent()

        if cpu_after - cpu_before > 20:
            logger.warning(
                f"Request {request.path} caused CPU spike: "
                f"{cpu_before}% → {cpu_after}%"
            )

        return response

原則 2:定期性能測試

# 使用 locust 進行負載測試
pip install locust

# locustfile.py
from locust import HttpUser, task, between

class MyUser(HttpUser):
    wait_time = between(1, 3)

    @task
    def expensive_endpoint(self):
        self.client.get("/api/expensive-report/")

# 運行測試
locust -f locustfile.py --host=http://localhost:8000

# 訪問 http://localhost:8089
# 模擬 1000 個並發用戶,觀察 CPU 使用率

原則 3:使用 APM 工具

# 使用 New Relic、Datadog、Sentry 等 APM 工具
# 安裝 New Relic
pip install newrelic

# newrelic.ini 配置
[newrelic]
app_name = My Django App
license_key = YOUR_LICENSE_KEY

# 啟動
NEW_RELIC_CONFIG_FILE=newrelic.ini newrelic-admin run-program gunicorn myapp.wsgi

# 可以看到:
# - 每個 view 的 CPU 時間
# - 慢查詢排行
# - 外部 API 調用時間
# - 錯誤率和異常

原則 4:使用 QuerySet 的 only() 和 defer()

# ✅ 只載入需要的欄位
def list_users_minimal(request):
    # 只載入 id 和 name
    users = User.objects.only('id', 'name')

    return JsonResponse({
        'users': list(users.values('id', 'name'))
    })

# 或排除大型欄位
def list_users_without_bio(request):
    # 排除 bio 欄位(可能很長)
    users = User.objects.defer('bio')

    return JsonResponse({
        'users': list(users.values('id', 'name', 'email'))
    })

6. 實戰案例:報表生成導致 CPU 飆高

問題描述

# ❌ 問題:生成報表時 CPU 100%
def sales_report(request):
    start_date = request.GET['start']
    end_date = request.GET['end']

    # 查詢 3 個月的訂單(100,000 筆)
    orders = Order.objects.filter(
        created_at__range=[start_date, end_date]
    ).select_related('user', 'product')

    # CPU 密集:計算統計資料
    stats = {
        'total_sales': 0,
        'total_revenue': 0,
        'by_product': {},
        'by_category': {},
        'by_date': {},
    }

    # 遍歷 100,000 筆(CPU 密集)
    for order in orders:
        stats['total_sales'] += 1
        stats['total_revenue'] += order.total

        # 分組統計
        product_name = order.product.name
        stats['by_product'][product_name] = stats['by_product'].get(product_name, 0) + 1

        # ... 更多統計

    # 耗時 30 秒,CPU 100%
    return JsonResponse(stats)

解決方案

# ✅ 方案 1:使用資料庫聚合
from django.db.models import Count, Sum
from django.db.models.functions import TruncDate

def sales_report_optimized(request):
    start_date = request.GET['start']
    end_date = request.GET['end']

    # 在資料庫中計算聚合(快得多)
    stats = Order.objects.filter(
        created_at__range=[start_date, end_date]
    ).aggregate(
        total_sales=Count('id'),
        total_revenue=Sum('total')
    )

    # 按產品分組
    by_product = Order.objects.filter(
        created_at__range=[start_date, end_date]
    ).values('product__name').annotate(
        count=Count('id'),
        revenue=Sum('total')
    )

    # 按日期分組
    by_date = Order.objects.filter(
        created_at__range=[start_date, end_date]
    ).annotate(
        date=TruncDate('created_at')
    ).values('date').annotate(
        count=Count('id'),
        revenue=Sum('total')
    )

    # 耗時 < 1 秒,CPU < 20%
    return JsonResponse({
        'total_sales': stats['total_sales'],
        'total_revenue': float(stats['total_revenue'] or 0),
        'by_product': list(by_product),
        'by_date': list(by_date),
    })

# ✅ 方案 2:快取結果
from django.core.cache import cache

def sales_report_cached(request):
    start_date = request.GET['start']
    end_date = request.GET['end']

    cache_key = f'sales_report_{start_date}_{end_date}'

    # 先檢查快取
    report = cache.get(cache_key)

    if report is None:
        # 快取未命中,計算報表
        report = calculate_report(start_date, end_date)

        # 儲存到快取,TTL 1 小時
        cache.set(cache_key, report, timeout=3600)

    return JsonResponse(report)

# ✅ 方案 3:異步生成
from celery import shared_task

@shared_task
def generate_sales_report_task(user_email, start_date, end_date):
    report = calculate_report(start_date, end_date)

    # 儲存到 S3
    file_url = save_report_to_s3(report)

    # 發送郵件
    send_email(user_email, file_url)

def request_sales_report(request):
    start_date = request.GET['start']
    end_date = request.GET['end']

    # 異步生成
    task = generate_sales_report_task.delay(
        request.user.email,
        start_date,
        end_date
    )

    return JsonResponse({
        'task_id': task.id,
        'message': '報表生成中,完成後將發送郵件'
    })

面試常見問題

Q1:如何診斷 Django 應用的 CPU 瓶頸?

答案:

使用三種工具:

  1. py-spy:實時監控 Worker

    sudo py-spy top --pid <worker_pid>
  2. cProfile:分析特定函數

    import cProfile
    cProfile.run('my_function()')
  3. Django Debug Toolbar / Silk:查看每個請求的詳細資訊

分析重點:

  • 哪個函數最耗 CPU?
  • 是演算法問題還是資料量問題?
  • 是否有 N+1 查詢?

Q2:為什麼 CPU 密集型任務不適合使用 Async Worker?

答案:

因為 Python GIL(Global Interpreter Lock)

# GIL 限制:
# - 同一時間只有一個線程執行 Python bytecode
# - Async/Thread 只能利用 I/O 等待時間
# - CPU 密集任務無法真正並行

# 示例:
import asyncio

async def cpu_task():
    # CPU 密集計算
    result = sum(range(10000000))
    return result

# 即使使用 async,仍然是串行執行
# 無法利用多核心 CPU

正確做法: 使用多進程(Sync Worker)或 Celery。


Q3:如何優化 N+1 查詢問題?

答案:

使用 select_relatedprefetch_related

# ❌ N+1 查詢
posts = Post.objects.all()
for post in posts:
    print(post.author.name)  # 每次都查詢

# ✅ select_related(ForeignKey、OneToOneField)
posts = Post.objects.select_related('author')
for post in posts:
    print(post.author.name)  # 不會額外查詢

# ✅ prefetch_related(ManyToManyField、反向 ForeignKey)
posts = Post.objects.prefetch_related('comments')
for post in posts:
    print(post.comments.count())  # 不會額外查詢

差異:

  • select_related:使用 JOIN,單次查詢
  • prefetch_related:使用 IN 查詢,兩次查詢

Q4:Load Average 是什麼?如何判斷系統負載?

答案:

Load Average 是等待 CPU 執行的平均進程數量:

uptime
# 輸出:load average: 2.50, 1.80, 1.20
#                      ↑     ↑     ↑
#                    1分鐘  5分鐘  15分鐘

判斷:

  • Load < CPU 核心數:✅ 正常
  • Load = CPU 核心數:⚠️ 接近飽和
  • Load > CPU 核心數:❌ 過載,有進程在等待

例如:

  • 4 核 CPU
  • Load Average: 8.0 → 有 4 個進程在等待 CPU

小結

CPU 使用率過高問題的處理原則:

  1. 診斷工具:py-spy、cProfile、Django Silk
  2. 優化演算法:O(n²) → O(n log n)
  3. 使用快取:減少重複計算
  4. 異步處理:CPU 密集任務移到 Celery
  5. 資料庫優化:添加索引、使用聚合、避免 N+1 查詢
  6. 增加 Worker:CPU 密集型使用 Sync Worker
  7. 監控告警:CPU > 80% 立即告警

記住:CPU 是最寶貴的資源,要用在刀口上!

0%