目錄
02-7. 何時用 Thread?何時用 Process?決策指南
⏱️ 閱讀時間: 10 分鐘 🎯 難度: ⭐⭐ (簡單)
🎯 本篇重點
提供清晰的決策樹和實戰案例,幫助你快速判斷何時使用 Thread、何時使用 Process。
🤔 快速決策流程圖
開始:我需要併發處理
↓
┌──────────────────────────┐
│ 任務類型是什麼? │
└──────────────────────────┘
↓
├─ I/O 密集型(網路、檔案、資料庫)
│ → 使用 Multi-threading
│ └─ 需要大量併發(>1000)?
│ ├─ 是 → 使用 asyncio
│ └─ 否 → 使用 threading
│
└─ CPU 密集型(計算、資料處理)
→ 使用 Multi-processing
└─ 需要數據共享?
├─ 否 → 使用 Pool
└─ 是 → 使用 Process + Manager/Value📊 決策矩陣
情境 1:任務類型
| 任務類型 | 推薦方案 | 原因 |
|---|---|---|
| 網路請求 | Thread | I/O 等待時釋放 GIL |
| 檔案讀寫 | Thread | I/O 操作 |
| 資料庫查詢 | Thread | I/O 等待 |
| 數學計算 | Process | 需要 CPU 並行 |
| 影像處理 | Process | CPU 密集 |
| 資料分析 | Process | CPU 密集 |
| 大量併發 I/O | asyncio | 最高效 |
情境 2:程式特性
| 需求 | Thread | Process |
|---|---|---|
| 需要共享大量資料 | ✅ 推薦 | ❌ 成本高 |
| 需要完全隔離 | ❌ 不行 | ✅ 推薦 |
| 需要快速創建 | ✅ 推薦 | ❌ 較慢 |
| 需要穩定性 | ❌ 一崩全崩 | ✅ 隔離 |
| 需要利用多核 | ❌ GIL 限制 | ✅ 可以 |
| 數量需要很多 | ✅ 可以 | ❌ 受限 |
🔍 實戰案例分析
案例 1:網路爬蟲
需求:
- 爬取 1000 個網頁
- 每個請求需要 1-2 秒
- 需要解析 HTML
分析:
# ✅ 推薦:Multi-threading
from threading import Thread
import requests
def crawl(url):
response = requests.get(url) # I/O 密集
# 簡單的 HTML 解析(不太耗 CPU)
return parse_html(response.text)
# Thread 方案
threads = [Thread(target=crawl, args=(url,)) for url in urls[:1000]]
for t in threads[:50]: # 每次 50 個並發
t.start()為什麼不用 Process?
- 創建 1000 個 Process 成本太高
- 網路請求是 I/O 等待,Thread 足夠
為什麼不用 asyncio?
- 如果使用
requests,無法用 asyncio - 若用
aiohttp,asyncio 更佳
最佳方案:
# 🏆 最佳:Thread Pool
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=50) as executor:
results = executor.map(crawl, urls)案例 2:影像批量處理
需求:
- 處理 10000 張圖片
- 每張需要壓縮、濾鏡、轉檔
- 單張處理需要 0.5 秒(CPU 密集)
分析:
# ✅ 推薦:Multi-processing
from multiprocessing import Pool
from PIL import Image
def process_image(image_path):
img = Image.open(image_path)
# CPU 密集運算
img = img.resize((800, 600))
img = apply_filter(img)
img.save(output_path, quality=85)
# Process Pool 方案
with Pool(processes=8) as pool: # 使用 8 核 CPU
pool.map(process_image, image_paths)為什麼不用 Thread?
- 影像處理是 CPU 密集
- GIL 限制,Thread 無法並行
- 測試結果:Thread 10 秒 vs Process 2 秒
成本對比:
# ❌ Thread 版本(慢)
# 10000 張 × 0.5 秒 ÷ 1 核 = 5000 秒
# ✅ Process 版本(快)
# 10000 張 × 0.5 秒 ÷ 8 核 = 625 秒案例 3:Web API 伺服器
需求:
- 處理 HTTP 請求
- 查詢資料庫
- 返回 JSON 結果
- 需要高併發
分析:
# 🏆 混合方案:Process + Thread
# Gunicorn 配置
gunicorn app:app \
--workers 4 \ # 4 個 Process(利用 4 核 CPU)
--threads 10 \ # 每個 Process 10 個 Thread
--worker-class gthread架構圖:
Gunicorn Master
├─ Worker Process 1 (CPU core 1)
│ ├─ Thread 1 → 處理請求 A (查詢資料庫)
│ ├─ Thread 2 → 處理請求 B (查詢資料庫)
│ └─ ... (10 threads)
│
├─ Worker Process 2 (CPU core 2)
│ └─ ... (10 threads)
│
├─ Worker Process 3 (CPU core 3)
│ └─ ... (10 threads)
│
└─ Worker Process 4 (CPU core 4)
└─ ... (10 threads)
總共:4 Process × 10 Thread = 40 個併發連線為什麼用 Process?
- 利用多核 CPU
- 隔離性:一個 Worker 崩潰不影響其他
為什麼用 Thread?
- 資料庫查詢是 I/O 等待
- Thread 可以在 I/O 期間處理其他請求
案例 4:機器學習模型訓練
需求:
- 訓練多個 ML 模型
- 每個模型需要大量 CPU 計算
- 模型之間不需要通訊
分析:
# ✅ 推薦:Multi-processing
from multiprocessing import Process
import numpy as np
from sklearn.ensemble import RandomForestClassifier
def train_model(data, params):
X, y = data
model = RandomForestClassifier(**params)
model.fit(X, y) # CPU 密集
return model
# Process 方案
processes = []
for params in model_params:
p = Process(target=train_model, args=(data, params))
p.start()
processes.append(p)
for p in processes:
p.join()為什麼不用 Thread?
- NumPy/scikit-learn 的部分操作釋放 GIL
- 但 Python 層面的計算仍受 GIL 限制
- Process 能保證完全並行
實測數據:
4 個模型訓練:
- 單執行:400 秒
- Thread:380 秒(僅快 5%)
- Process:110 秒(快 3.6 倍)案例 5:即時聊天伺服器
需求:
- 處理 10000+ 個連線
- 每個連線需要保持 WebSocket
- 轉發訊息給其他用戶
分析:
# 🏆 最佳:asyncio
import asyncio
import websockets
async def handle_client(websocket, path):
async for message in websocket:
# 轉發給其他用戶(I/O 操作)
await broadcast(message)
# 單 Thread,處理 10000+ 連線
asyncio.run(websockets.serve(handle_client, 'localhost', 8765))為什麼不用 Thread?
# ❌ Thread 方案(不可行)
# 10000 個連線 = 10000 個 Thread
# 記憶體:10000 × 8MB = 80GB(系統崩潰)為什麼不用 Process?
# ❌ Process 方案(更不可行)
# 10000 個連線 = 10000 個 Process
# 完全無法實現為什麼用 asyncio?
- 單 Thread 可處理 10000+ 連線
- 記憶體:僅幾 MB
- I/O 多路復用(epoll/kqueue)
🎯 詳細決策表
按任務類型選擇
| 任務類型 | 特徵 | Thread | Process | asyncio |
|---|---|---|---|---|
| HTTP 請求 | I/O 密集 | ✅ 推薦 | ❌ 成本高 | 🏆 最佳 |
| 檔案讀寫 | I/O 密集 | ✅ 推薦 | ❌ 成本高 | ✅ 可以 |
| 資料庫查詢 | I/O 密集 | ✅ 推薦 | ❌ 成本高 | ✅ 可以 |
| 數學計算 | CPU 密集 | ❌ 無效 | 🏆 最佳 | ❌ 無效 |
| 影像處理 | CPU 密集 | ❌ 無效 | 🏆 最佳 | ❌ 無效 |
| 影片轉檔 | CPU 密集 | ❌ 無效 | 🏆 最佳 | ❌ 無效 |
| 大量 WebSocket | I/O 密集 | ❌ 太多 | ❌ 不可行 | 🏆 最佳 |
按數量規模選擇
| 併發數量 | Thread | Process | asyncio |
|---|---|---|---|
| 1-10 | ✅ 可以 | ✅ 可以 | ✅ 可以 |
| 10-100 | ✅ 推薦 | ✅ 推薦 | ✅ 推薦 |
| 100-1000 | ✅ 推薦 | ⚠️ 謹慎 | ✅ 推薦 |
| 1000-10000 | ⚠️ 謹慎 | ❌ 不可行 | 🏆 最佳 |
| 10000+ | ❌ 不可行 | ❌ 不可行 | 🏆 唯一選擇 |
⚠️ 特殊情況處理
情況 1:需要共享大量資料
場景:
- 多個任務需要存取同一個大型資料集(1GB+)
- 資料只讀或很少修改
選擇:
# ✅ Thread(共享記憶體,無需複製)
from threading import Thread
large_dataset = load_1gb_data() # 載入一次
def worker(task_id):
# 所有 Thread 直接存取 large_dataset
result = process(large_dataset[task_id])
return result
threads = [Thread(target=worker, args=(i,)) for i in range(100)]為什麼不用 Process?
# ❌ Process 需要複製資料
# 100 個 Process × 1GB = 100GB 記憶體(爆炸)
# ⚠️ 除非使用 shared_memory(複雜)
from multiprocessing import shared_memory
# 需要額外處理,複雜度高情況 2:需要絕對穩定性
場景:
- 金融交易系統
- 醫療設備控制
- 飛機飛行控制
選擇:
# ✅ Process(完全隔離)
from multiprocessing import Process
def critical_task(task_data):
try:
result = execute_critical_operation(task_data)
return result
except Exception as e:
# 錯誤被隔離在這個 Process
log_error(e)
# 每個任務獨立 Process
processes = [Process(target=critical_task, args=(data,)) for data in tasks]為什麼不用 Thread?
# ❌ Thread:一個崩潰可能影響全部
# 金融系統不能接受這種風險情況 3:需要動態擴展
場景:
- 任務數量不確定
- 需要根據負載自動調整
- 需要快速創建/銷毀
選擇:
# ✅ Thread Pool(動態管理)
from concurrent.futures import ThreadPoolExecutor
def dynamic_worker(task):
# 處理任務
return result
# 自動管理 Thread 生命週期
with ThreadPoolExecutor(max_workers=50) as executor:
# 動態提交任務
futures = []
for task in incoming_tasks():
future = executor.submit(dynamic_worker, task)
futures.append(future)
# 等待結果
for future in futures:
result = future.result()📋 完整選擇清單
✅ 選擇 Thread 的理由
- 任務是 I/O 密集型
- 需要共享大量資料
- 需要快速創建/銷毀
- 併發數量 < 1000
- 需要簡單的通訊
- 記憶體有限
✅ 選擇 Process 的理由
- 任務是 CPU 密集型
- 需要利用多核 CPU
- 需要完全隔離
- 需要繞過 GIL
- 穩定性要求高
- 數量適中(< 100)
✅ 選擇 asyncio 的理由
- 需要大量併發(> 1000)
- 任務是 I/O 密集型
- 需要最低記憶體佔用
- 需要精確控制事件循環
- 使用支援 async/await 的庫
🎯 實用決策代碼
import os
from multiprocessing import cpu_count
def choose_concurrency_model(task_type, num_tasks, data_size_mb):
"""
自動選擇併發模型
Args:
task_type: 'io' or 'cpu'
num_tasks: 任務數量
data_size_mb: 共享資料大小(MB)
Returns:
'thread', 'process', or 'asyncio'
"""
if task_type == 'cpu':
# CPU 密集型一律用 Process
return 'process'
elif task_type == 'io':
if num_tasks > 1000:
# 大量 I/O 用 asyncio
return 'asyncio'
elif data_size_mb > 100:
# 大型共享資料用 Thread
return 'thread'
else:
# 預設用 Thread
return 'thread'
else:
raise ValueError("task_type must be 'io' or 'cpu'")
# 使用範例
model = choose_concurrency_model('io', 500, 50)
print(f"推薦使用: {model}")
# 輸出:推薦使用: thread
model = choose_concurrency_model('cpu', 8, 10)
print(f"推薦使用: {model}")
# 輸出:推薦使用: process
model = choose_concurrency_model('io', 5000, 1)
print(f"推薦使用: {model}")
# 輸出:推薦使用: asyncio✅ 重點回顧
快速決策:
- I/O 密集型 → Thread(或 asyncio)
- CPU 密集型 → Process
- 大量併發 → asyncio
- 需要共享資料 → Thread
- 需要隔離 → Process
常見組合:
- Web 伺服器:Process + Thread(Gunicorn)
- 爬蟲:Thread Pool 或 asyncio
- 資料分析:Process Pool
- 即時通訊:asyncio
- 批量處理:Process Pool
記住:
- ✅ 先分析任務類型(I/O vs CPU)
- ✅ 再考慮數量規模
- ✅ 最後考慮特殊需求
- ✅ 不確定就先用 Thread 試試
上一篇: 02-6. Multi-threading vs Multi-processing 完整對比 下一篇: 03-1. IPC 概述
最後更新:2025-01-06