Django 面試準備 05-2:面試常見問題(進階)
性能優化、故障排查與架構設計進階問題
目錄
05-2. 面試常見問題(進階)
本章整理 Gunicorn 相關的進階面試問題,涵蓋性能優化、問題排查、架構設計等高級主題。
1. 性能優化
Q1:如何優化 Gunicorn 的性能?請列舉 5 個以上的優化方法。
標準答案:
8 大優化方法:
1. 選擇正確的 Worker 類型
# gunicorn.conf.py
# I/O 密集型 → Uvicorn Worker
workers = 4
worker_class = 'uvicorn.workers.UvicornWorker'
# CPU 密集型 → Sync Worker
workers = 9 # (2 × CPU) + 1
worker_class = 'sync'
# 混合型 → Gthread Worker
workers = 4
threads = 10
worker_class = 'gthread'2. 啟用 preload_app 節省記憶體
preload_app = True # 記憶體節省 50-70%
def post_fork(server, worker):
# Worker Fork 後重新初始化資料庫連接
from django.db import connections
connections.close_all()3. 配置資料庫連接池
# settings.py
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'CONN_MAX_AGE': 600, # 連接重用 10 分鐘
'CONN_HEALTH_CHECKS': True, # 連接健康檢查
}
}4. 使用 max_requests 定期重啟 Worker
# 防止記憶體洩漏累積
max_requests = 1000
max_requests_jitter = 50 # 隨機誤差5. 調整 Worker 連接數限制
# Async Worker
worker_connections = 1000 # 每個 Worker 最多 1000 個並發
# Sync Worker
workers = 9 # 增加 Worker 數量提高並發6. 設置合理的 Keepalive
keepalive = 5 # HTTP Keep-Alive,減少連接開銷
# 配合 Nginx:
# proxy_http_version 1.1;
# proxy_set_header Connection "";7. 優化 Timeout 設置
timeout = 30 # 根據實際情況調整
graceful_timeout = 30
# 監控慢請求,優化程式碼而不是增加 timeout8. 使用 Worker 臨時文件目錄
# 使用記憶體文件系統提升性能
worker_tmp_dir = '/dev/shm' # tmpfs,速度更快完整配置示例:
# gunicorn.conf.py - 性能優化完整版
import multiprocessing
# Worker 配置
workers = multiprocessing.cpu_count()
worker_class = 'uvicorn.workers.UvicornWorker'
worker_connections = 1000
# 預加載
preload_app = True
# 定期重啟
max_requests = 1000
max_requests_jitter = 50
# 超時
timeout = 30
graceful_timeout = 30
keepalive = 5
# 臨時文件
worker_tmp_dir = '/dev/shm'
# 日誌
accesslog = '-'
errorlog = '-'
loglevel = 'info'
# 鉤子
def post_fork(server, worker):
from django.db import connections
connections.close_all()延伸問題:如何監控這些優化是否生效?
答:
- CPU 使用率:
top/htop - 記憶體使用:
ps aux/free -m - 請求響應時間:Nginx access log、Django Silk
- Worker 狀態:
ps aux | grep gunicorn - 資料庫連接:
SELECT count(*) FROM pg_stat_activity
Q2:在高並發場景下,如何避免資料庫連接數不足?
標準答案:
問題:
# 場景:電商促銷活動
# - Gunicorn: 4 workers × 10 threads = 40 併發
# - 每個請求需要 1-2 個資料庫連接
# - 實際需要:40 × 2 = 80 個連接
# - PostgreSQL max_connections = 100
# - 加上其他服務(Celery、監控)= 超過 100!
# 錯誤:
django.db.utils.OperationalError: FATAL: sorry, too many clients already解決方案:
方案 1:計算並配置正確的連接數
# 1. 計算需要的連接數
workers = 4
threads = 10
connections_per_request = 2 # 平均每個請求的連接數
total_connections = workers × threads × connections_per_request
# = 4 × 10 × 2 = 80
# 2. 設置資料庫最大連接數(留 50% 餘量)
max_connections = total_connections × 1.5 = 120
# 3. PostgreSQL 配置
# postgresql.conf
# max_connections = 200 # 增加到 200方案 2:使用 PgBouncer 連接池
# 安裝 PgBouncer
sudo apt-get install pgbouncer
# 配置 /etc/pgbouncer/pgbouncer.ini
[databases]
mydb = host=localhost port=5432 dbname=mydb
[pgbouncer]
pool_mode = transaction # 事務級連接池
max_client_conn = 1000 # 應用層最多 1000 個連接
default_pool_size = 25 # 資料庫實際只需 25 個連接# Django 配置連接到 PgBouncer
DATABASES = {
'default': {
'HOST': '127.0.0.1',
'PORT': '6432', # PgBouncer 端口
'CONN_MAX_AGE': 0, # 使用 PgBouncer 時設為 0
}
}
# 架構:
# Django (1000 connections) → PgBouncer → PostgreSQL (25 connections)方案 3:優化查詢,減少連接占用時間
# ❌ 錯誤:長時間持有連接
@transaction.atomic
def bad_process(request):
user = User.objects.select_for_update().get(id=1)
time.sleep(10) # 持有連接 10 秒
user.save()
# ✅ 正確:縮短事務範圍
def good_process(request):
# 先處理業務邏輯(不持有連接)
data = process_business_logic()
# 快速更新資料庫
with transaction.atomic():
user = User.objects.select_for_update().get(id=1)
user.data = data
user.save()方案 4:使用 CONN_MAX_AGE 重用連接
# settings.py
DATABASES = {
'default': {
'CONN_MAX_AGE': 600, # 連接重用 10 分鐘
'CONN_HEALTH_CHECKS': True,
}
}
# 好處:
# - 減少建立/關閉連接的開銷
# - 但不會占用過多連接(有過期時間)方案 5:使用讀寫分離
# settings.py
DATABASES = {
'default': { # 寫入
'HOST': 'master.db.example.com',
'CONN_MAX_AGE': 600,
},
'replica': { # 讀取
'HOST': 'replica.db.example.com',
'CONN_MAX_AGE': 600,
},
}
# 路由
class DatabaseRouter:
def db_for_read(self, model, **hints):
return 'replica'
def db_for_write(self, model, **hints):
return 'default'
# 負載分散,減少主庫連接壓力延伸問題:PgBouncer 的三種模式有什麼區別?
答:
| 模式 | 說明 | 適用場景 |
|---|---|---|
| session | 連接綁定整個會話 | 使用臨時表、預處理語句 |
| transaction | 連接綁定單個事務 | ✅ 推薦,適合 Django |
| statement | 連接綁定單個 SQL | 不支援事務 |
Q3:如何設計一個支持 10,000 QPS 的 Django 應用架構?
標準答案:
完整架構設計:
Internet
↓
┌─────────────────┐
│ Cloudflare │ ← DDoS 防護、CDN、SSL
└─────────────────┘
↓
┌─────────────────┐
│ Load Balancer │ ← AWS ELB / Nginx
│ (多台服務器) │
└─────────────────┘
↓ ↓
┌──────────┴────────┴──────────┐
↓ ↓
┌──────────────┐ ┌──────────────┐
│ Server 1 │ │ Server 2 │
│ │ │ │
│ Nginx │ │ Nginx │
│ ↓ │ │ ↓ │
│ Gunicorn │ │ Gunicorn │
│ (8 workers) │ │ (8 workers) │
│ ↓ │ │ ↓ │
│ Django │ │ Django │
└──────────────┘ └──────────────┘
↓ ↓
└───────────┬───────────────────┘
↓
┌───────────────────────┐
│ Redis │ ← 快取、Session
│ (Redis Cluster) │
└───────────────────────┘
↓
┌───────────────────────┐
│ PostgreSQL │ ← 主從複製、讀寫分離
│ Master + 2 Replicas │
└───────────────────────┘
↓
┌───────────────────────┐
│ Celery │ ← 異步任務
│ (10 workers) │
└───────────────────────┘各層配置:
1. Load Balancer(負載均衡器)
# Nginx Load Balancer 配置
upstream django_backend {
least_conn; # 最少連接算法
server 10.0.1.10:80 max_fails=3 fail_timeout=30s;
server 10.0.1.11:80 max_fails=3 fail_timeout=30s;
server 10.0.1.12:80 max_fails=3 fail_timeout=30s;
server 10.0.1.13:80 max_fails=3 fail_timeout=30s;
}
server {
listen 80;
location / {
proxy_pass http://django_backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
}2. 單台服務器配置
# gunicorn.conf.py
import multiprocessing
# 8 核 CPU 服務器
workers = 8
worker_class = 'uvicorn.workers.UvicornWorker'
worker_connections = 1000
# 每台服務器可以處理:
# 8 workers × 1000 connections = 8,000 併發
# 4 台服務器:
# 4 × 8,000 = 32,000 併發 ✅ 遠超 10,000 QPS
preload_app = True
max_requests = 1000
max_requests_jitter = 50
timeout = 30
keepalive = 53. Redis 快取配置
# settings.py
CACHES = {
'default': {
'BACKEND': 'django_redis.cache.RedisCache',
'LOCATION': [
'redis://10.0.2.10:6379/0', # Redis 節點 1
'redis://10.0.2.11:6379/0', # Redis 節點 2
'redis://10.0.2.12:6379/0', # Redis 節點 3
],
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
'CONNECTION_POOL_KWARGS': {
'max_connections': 50,
'retry_on_timeout': True,
}
}
}
}
# 快取策略
from django.views.decorators.cache import cache_page
@cache_page(300) # 快取 5 分鐘
def product_list(request):
# 熱門商品列表
products = Product.objects.filter(is_hot=True)[:100]
return render(request, 'products.html', {'products': products})4. 資料庫讀寫分離
# settings.py
DATABASES = {
'default': { # 主庫(寫)
'ENGINE': 'django.db.backends.postgresql',
'HOST': 'master.db.internal',
'CONN_MAX_AGE': 600,
},
'replica_1': { # 從庫 1(讀)
'ENGINE': 'django.db.backends.postgresql',
'HOST': 'replica1.db.internal',
'CONN_MAX_AGE': 600,
},
'replica_2': { # 從庫 2(讀)
'ENGINE': 'django.db.backends.postgresql',
'HOST': 'replica2.db.internal',
'CONN_MAX_AGE': 600,
},
}
# 自動路由
class ReplicaRouter:
def db_for_read(self, model, **hints):
import random
return random.choice(['replica_1', 'replica_2'])
def db_for_write(self, model, **hints):
return 'default'
DATABASE_ROUTERS = ['myapp.routers.ReplicaRouter']5. 性能優化
# views.py - 優化查詢
from django.db.models import Prefetch
def order_list(request):
# ✅ 使用 select_related 和 prefetch_related
orders = Order.objects.select_related('user') \
.prefetch_related(
Prefetch('items',
queryset=OrderItem.objects.select_related('product'))
)[:100]
# ✅ 使用 only() 只載入需要的欄位
orders = orders.only('id', 'created_at', 'total', 'user__name')
# ✅ 使用 iterator() 處理大量資料
for order in orders.iterator(chunk_size=100):
process_order(order)性能指標:
# 單台服務器性能:
# - 8 Async Workers
# - 每個 Worker 1000 併發
# - 平均響應時間:50ms
# - QPS = 8 × 1000 / 0.05 = 160,000 QPS(理論值)
# 實際考慮:
# - 資料庫查詢:10ms
# - Redis 查詢:2ms
# - 業務邏輯:10ms
# - 總時間:22ms
# - QPS = 8 × 1000 / 0.022 ≈ 36,000 QPS
# 4 台服務器:
# 36,000 × 4 = 144,000 QPS ✅ 遠超 10,000 QPS 目標延伸問題:如果流量突然增加 10 倍怎麼辦?
答:
- 水平擴展:增加服務器數量(Auto Scaling)
- 垂直擴展:升級服務器配置(更多 CPU、記憶體)
- 啟用限流:Nginx
limit_req模組 - 降級策略:關閉非核心功能、返回快取資料
- CDN 加速:靜態資源、API 響應都走 CDN
2. 故障排查
Q4:生產環境突然出現大量 502 錯誤,如何排查?
標準答案:
排查流程(5 分鐘內定位):
步驟 1:檢查 Gunicorn 是否運行
# 檢查進程
ps aux | grep gunicorn
# 如果沒有任何輸出 → Gunicorn 已停止
# 原因:OOM、崩潰、被殺死
# 檢查 Systemd 狀態
systemctl status gunicorn
# 查看最近的日誌
journalctl -u gunicorn -n 50 --no-pager步驟 2:檢查 Worker 是否超時
# 查看 Gunicorn 錯誤日誌
tail -f /var/log/gunicorn/error.log
# 典型的超時日誌:
# [CRITICAL] WORKER TIMEOUT (pid:12345)
# [WARNING] Worker with pid 12345 was terminated due to signal 9
# → Worker 處理請求超過 timeout,被強制殺死步驟 3:檢查系統資源
# CPU 使用率
top
# 記憶體使用
free -m
# 磁碟空間
df -h
# 如果記憶體不足 → OOM Killer 可能殺死進程
dmesg | grep -i "out of memory"步驟 4:檢查資料庫連接
# PostgreSQL:查看當前連接數
psql -c "SELECT count(*) FROM pg_stat_activity;"
# 查看最大連接數
psql -c "SHOW max_connections;"
# 如果連接數 ≈ max_connections → 連接池耗盡步驟 5:檢查 Nginx 日誌
# 查看 Nginx 錯誤日誌
tail -f /var/log/nginx/error.log
# 常見 502 錯誤原因:
# 1. connect() failed (111: Connection refused)
# → Gunicorn 未運行
# 2. upstream timed out (110: Connection timed out)
# → Worker 超時
# 3. no live upstreams while connecting to upstream
# → 所有 Worker 都崩潰了常見原因與解決方案:
| 原因 | 症狀 | 解決方案 |
|---|---|---|
| Worker 超時 | 日誌顯示 WORKER TIMEOUT | 優化慢查詢、增加 timeout |
| 記憶體不足 | OOM Killer 日誌 | 增加記憶體、減少 Worker 數量 |
| 資料庫連接數不足 | OperationalError | 增加 max_connections、使用 PgBouncer |
| Worker 崩潰 | Worker 反覆重啟 | 檢查程式碼 bug、查看 Sentry |
| 磁碟空間滿 | 無法寫日誌 | 清理磁碟、配置日誌輪轉 |
快速恢復:
# 1. 重啟 Gunicorn(臨時解決)
systemctl restart gunicorn
# 2. 如果無法重啟,強制殺死並重啟
pkill -9 gunicorn
systemctl start gunicorn
# 3. 檢查是否恢復
curl -I http://localhost:8000/health/延伸問題:如何預防 502 錯誤?
答:
- 監控告警:CPU、記憶體、Worker 狀態
- 健康檢查:Nginx
health_check、Load Balancer - 限流:防止流量突增
- 熔斷:外部 API 超時自動降級
- 定期重啟:
max_requests防止記憶體洩漏累積
Q5:如何診斷和解決記憶體洩漏問題?
標準答案:
診斷流程:
步驟 1:確認記憶體洩漏
# 監控 Worker 記憶體使用(每 5 秒)
watch -n 5 'ps aux | grep gunicorn | grep worker'
# 輸出:
# www 12345 2.3 5.2 250000 210000 ← 剛啟動:200MB
# 等待 10 分鐘...
# www 12345 2.1 8.5 350000 340000 ← 增長到:340MB
# 再等待 10 分鐘...
# www 12345 2.3 12.1 450000 480000 ← 繼續增長:480MB
# 確認記憶體洩漏!步驟 2:使用 tracemalloc 追蹤
# middleware.py
import tracemalloc
import logging
logger = logging.getLogger(__name__)
class MemoryLeakDetector:
def __init__(self, get_response):
self.get_response = get_response
self.request_count = 0
tracemalloc.start()
def __call__(self, request):
self.request_count += 1
response = self.get_response(request)
# 每 100 個請求檢查一次
if self.request_count % 100 == 0:
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
logger.warning(f"Top 10 memory allocations after {self.request_count} requests:")
for stat in top_stats[:10]:
logger.warning(f"{stat}")
return response步驟 3:找出洩漏來源
# 常見洩漏來源:
# 1. 全局變量累積
REQUEST_LOG = [] # ❌ 持續增長
def log_request(request):
REQUEST_LOG.append(request.path) # 永不清空
# 2. 快取沒有過期時間
USER_CACHE = {} # ❌ 永久儲存
def get_user(user_id):
if user_id not in USER_CACHE:
USER_CACHE[user_id] = User.objects.get(id=user_id)
return USER_CACHE[user_id]
# 3. ORM QuerySet 快取
def process_users():
users = User.objects.all() # ❌ 快取所有用戶到記憶體
for user in users:
process(user)步驟 4:修復洩漏
# 修復方案:
# 1. 移除全局累積
# REQUEST_LOG = [] # ❌ 刪除
import logging
logger = logging.getLogger(__name__)
def log_request(request):
logger.info(f"Request: {request.path}") # ✅ 使用 logging
# 2. 使用外部快取 + 過期時間
from django.core.cache import cache
def get_user(user_id):
cache_key = f'user_{user_id}'
user = cache.get(cache_key)
if user is None:
user = User.objects.get(id=user_id)
cache.set(cache_key, user, timeout=300) # ✅ 5 分鐘過期
return user
# 3. 使用 iterator() 避免快取
def process_users():
users = User.objects.all().iterator(chunk_size=100) # ✅ 逐批處理
for user in users:
process(user)步驟 5:配置自動重啟
# gunicorn.conf.py
# 即使有小量洩漏,也會定期釋放
max_requests = 1000 # 處理 1000 個請求後重啟
max_requests_jitter = 50延伸問題:如何預防記憶體洩漏?
答:
- 代碼審查:檢查全局變量、快取使用
- 單元測試:測試記憶體使用
- 監控告警:Worker 記憶體 > 500MB 告警
- 定期重啟:
max_requests作為安全網 - 使用外部存儲:Redis 代替內存快取
3. 架構設計
Q6:Django + Gunicorn + Celery 的完整架構是什麼?各部分如何協作?
標準答案:
完整架構圖:
用戶請求
↓
┌────────────────┐
│ Nginx │ ← 反向代理、靜態文件
└────────────────┘
↓
┌────────────────┐
│ Gunicorn │ ← HTTP 請求處理
│ (ASGI Worker) │
└────────────────┘
↓
┌────────────────┐
│ Django │ ← Web 應用邏輯
└────────────────┘
↓ ↓
┌──────────┴───┬───┴──────────┐
↓ ↓ ↓
┌──────────┐ ┌────────────┐ ┌──────────────┐
│PostgreSQL│ │ Redis │ │ Redis Queue │
│ (數據) │ │ (快取) │ │ (消息隊列) │
└──────────┘ └────────────┘ └──────────────┘
↓
┌────────────────┐
│ Celery │ ← 異步任務處理
│ (Workers) │
└────────────────┘各組件職責:
1. Nginx
# nginx.conf
server {
listen 80;
server_name example.com;
# 靜態文件
location /static/ {
alias /var/www/static/;
expires 30d;
}
# 媒體文件
location /media/ {
alias /var/www/media/;
}
# 動態請求轉發到 Gunicorn
location / {
proxy_pass http://127.0.0.1:8000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
}2. Gunicorn
# gunicorn.conf.py
workers = 4
worker_class = 'uvicorn.workers.UvicornWorker'
# 處理 HTTP 請求
# - I/O 密集型請求(資料庫查詢、API 呼叫)
# - 即時響應的請求
# - 不處理長時間運行的任務(交給 Celery)3. Django
# views.py
from celery import shared_task
def create_order(request):
# 1. 即時處理:創建訂單
order = Order.objects.create(
user=request.user,
total=100
)
# 2. 異步處理:發送郵件(交給 Celery)
send_order_email.delay(order.id)
# 3. 異步處理:生成 PDF 發票(交給 Celery)
generate_invoice.delay(order.id)
# 4. 立即返回(不等待異步任務完成)
return JsonResponse({
'order_id': order.id,
'status': 'created'
})4. Celery
# tasks.py
from celery import shared_task
from django.core.mail import send_mail
@shared_task
def send_order_email(order_id):
"""異步發送訂單郵件"""
order = Order.objects.get(id=order_id)
send_mail(
subject=f'訂單 {order.id} 已創建',
message=f'您的訂單總金額:${order.total}',
from_email='noreply@example.com',
recipient_list=[order.user.email],
)
@shared_task
def generate_invoice(order_id):
"""異步生成 PDF 發票"""
order = Order.objects.get(id=order_id)
# CPU 密集:生成 PDF(10 秒)
pdf = create_pdf(order)
# 上傳到 S3
upload_to_s3(pdf, f'invoices/{order.id}.pdf')5. Redis
# settings.py
# 用途 1:Django 快取
CACHES = {
'default': {
'BACKEND': 'django_redis.cache.RedisCache',
'LOCATION': 'redis://127.0.0.1:6379/0',
}
}
# 用途 2:Session 存儲
SESSION_ENGINE = 'django.contrib.sessions.backends.cache'
# 用途 3:Celery 消息隊列
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'協作流程示例:
# 場景:用戶下單
# 1. 用戶提交訂單
POST /api/orders/
# 2. Nginx 接收請求
# - 檢查靜態文件?否 → 轉發到 Gunicorn
# 3. Gunicorn Worker 接收請求
# - 使用事件循環處理
# - 不阻塞其他請求
# 4. Django 處理業務邏輯
order = Order.objects.create(...) # 寫入 PostgreSQL
# 5. 檢查 Redis 快取
user = cache.get(f'user_{user_id}') # 讀取 Redis
# 6. 發送異步任務到 Celery
send_order_email.delay(order.id) # 寫入 Redis Queue
# 7. 立即返回響應給用戶
return JsonResponse({'order_id': order.id})
# 用戶等待時間:< 100ms
# 8. Celery Worker 從 Redis 讀取任務
# - 在背景執行 send_order_email
# - 發送郵件(2 秒)
# - 用戶不需要等待延伸問題:為什麼需要 Celery?Gunicorn 不能處理異步任務嗎?
答:
- Gunicorn 適合處理 短時間、需要即時響應 的請求
- Celery 適合處理 長時間、不需要即時響應 的任務
- 分離後:
- Gunicorn Worker 不會被長任務阻塞
- 提高用戶響應速度
- 可以獨立擴展(增加 Celery Worker)
小結
本章涵蓋了 Gunicorn 面試的進階問題:
性能優化:
- 8 大優化方法(Worker 選擇、preload、連接池等)
- 避免資料庫連接數不足
- 設計高並發架構(10,000 QPS)
故障排查:
- 502 錯誤快速定位(5 分鐘內)
- 記憶體洩漏診斷與修復
架構設計:
- Django + Gunicorn + Celery 完整架構
- 各組件職責與協作流程
答題技巧:
- 展示系統思維(不只是單點優化)
- 說明實際經驗(遇到過什麼問題、如何解決)
- 提供監控方案(如何驗證優化效果)
- 考慮權衡取捨(沒有完美方案,只有最適合的)
記住:進階問題不只是技術,更是工程思維和實戰經驗!