目錄
02-2. Thread 的優缺點
⏱️ 閱讀時間: 10 分鐘 🎯 難度: ⭐⭐ (簡單)
🤔 一句話解釋
Thread 的優點是輕量、快速、共享記憶體;缺點是競爭條件(Race Condition)、除錯困難、受 GIL 限制(Python)。
✅ Thread 的優點
1. 創建成本低
Process vs Thread 創建成本對比:
import time
from multiprocessing import Process
from threading import Thread
# 測試 Process
start = time.time()
processes = [Process(target=lambda: None) for _ in range(1000)]
for p in processes:
p.start()
for p in processes:
p.join()
print(f"Process: {time.time() - start:.2f} 秒")
# 測試 Thread
start = time.time()
threads = [Thread(target=lambda: None) for _ in range(1000)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Thread: {time.time() - start:.2f} 秒")輸出:
Process: 12.50 秒 ← 很慢
Thread: 0.15 秒 ← 超快!結論:
- Thread 創建速度是 Process 的 80+ 倍
- Thread 佔用記憶體更少
2. Context Switch 成本低
Process Context Switch:
1. 儲存完整的 CPU 狀態
2. 切換記憶體映射表(Page Table)
3. 清空 TLB(Translation Lookaside Buffer)
4. 載入新 Process 的狀態
⏱️ 成本:數十 µs(微秒)
Thread Context Switch:
1. 儲存 CPU 暫存器
2. 切換 Stack 指標
⏱️ 成本:數百 ns(奈秒)
💡 Thread 切換快 10-100 倍!3. 共享記憶體,通訊簡單
Process 通訊(需要 IPC):
from multiprocessing import Process, Queue
def worker(q):
result = "Hello from Process"
q.put(result) # 需要透過 Queue
q = Queue()
p = Process(target=worker, args=(q,))
p.start()
p.join()
print(q.get()) # 取得結果Thread 通訊(直接共享):
from threading import Thread
result = [] # 共享變數
def worker():
result.append("Hello from Thread") # 直接存取
t = Thread(target=worker)
t.start()
t.join()
print(result[0]) # 直接讀取結論:
- ✅ Thread 通訊更簡單
- ✅ 不需要序列化資料
- ✅ 不需要額外的 IPC 機制
4. 資源共享效率高
from threading import Thread
import time
# 共享一個大型資料結構
database = {f"key{i}": f"value{i}" for i in range(1000000)}
def query_worker(start, end):
"""查詢資料庫的一部分"""
count = 0
for i in range(start, end):
if f"key{i}" in database:
count += 1
print(f"查詢 {start}-{end}: 找到 {count} 筆")
# 用 4 個 Thread 並發查詢
threads = []
for i in range(4):
start = i * 250000
end = (i + 1) * 250000
t = Thread(target=query_worker, args=(start, end))
threads.append(t)
t.start()
for t in threads:
t.join()
# ✅ 所有 Thread 共享同一個 database
# ✅ 不需要複製資料
# ✅ 節省記憶體5. 適合 I/O 密集型任務
import time
import requests
from threading import Thread
urls = [
"https://example.com/api/1",
"https://example.com/api/2",
"https://example.com/api/3",
"https://example.com/api/4",
]
# 順序執行
start = time.time()
for url in urls:
requests.get(url)
print(f"順序執行: {time.time() - start:.2f} 秒")
# 輸出:順序執行: 4.00 秒
# Thread 並發執行
start = time.time()
threads = []
for url in urls:
t = Thread(target=lambda u: requests.get(u), args=(url,))
t.start()
threads.append(t)
for t in threads:
t.join()
print(f"Thread 並發: {time.time() - start:.2f} 秒")
# 輸出:Thread 並發: 1.00 秒 ← 快 4 倍!❌ Thread 的缺點
1. Race Condition(競爭條件)
from threading import Thread
counter = 0
def increment():
global counter
for _ in range(100000):
counter += 1 # ⚠️ 不是原子操作!
t1 = Thread(target=increment)
t2 = Thread(target=increment)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"Counter: {counter}")
# 預期:200000
# 實際:150000 ← 資料丟失!問題分析:
counter += 1 實際上是三個步驟:
Thread 1: READ counter (0)
Thread 2: READ counter (0) ← 同時讀取!
Thread 1: ADD 1 (0 + 1 = 1)
Thread 2: ADD 1 (0 + 1 = 1)
Thread 1: WRITE counter = 1
Thread 2: WRITE counter = 1 ← 覆蓋了!
💥 應該是 2,結果是 1解決方案:使用 Lock
from threading import Thread, Lock
counter = 0
lock = Lock()
def increment():
global counter
for _ in range(100000):
with lock: # ✅ 加鎖
counter += 1
t1 = Thread(target=increment)
t2 = Thread(target=increment)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"Counter: {counter}")
# 輸出:200000 ← 正確!2. Deadlock(死鎖)
from threading import Thread, Lock
import time
lock1 = Lock()
lock2 = Lock()
def thread1():
lock1.acquire()
print("Thread 1: 獲得 lock1")
time.sleep(0.1)
print("Thread 1: 等待 lock2...")
lock2.acquire() # ⏱️ 永遠等待
print("Thread 1: 獲得 lock2")
lock2.release()
lock1.release()
def thread2():
lock2.acquire()
print("Thread 2: 獲得 lock2")
time.sleep(0.1)
print("Thread 2: 等待 lock1...")
lock1.acquire() # ⏱️ 永遠等待
print("Thread 2: 獲得 lock1")
lock1.release()
lock2.release()
t1 = Thread(target=thread1)
t2 = Thread(target=thread2)
t1.start()
t2.start()
# 💀 程式卡死!死鎖圖示:
Thread 1: [持有 lock1] → 等待 lock2
↓
Thread 2: [持有 lock2] → 等待 lock1
↑____________________________|
形成循環等待 → 死鎖!避免死鎖:固定鎖的順序
def thread1():
lock1.acquire() # 先拿 lock1
lock2.acquire() # 再拿 lock2
# ... 工作 ...
lock2.release()
lock1.release()
def thread2():
lock1.acquire() # 先拿 lock1(順序相同!)
lock2.acquire() # 再拿 lock2
# ... 工作 ...
lock2.release()
lock1.release()
# ✅ 不會死鎖3. 除錯困難
from threading import Thread
import random
shared_list = []
def worker(worker_id):
for i in range(10):
shared_list.append(f"Worker {worker_id}: {i}")
if random.random() < 0.3:
# 隨機 sleep,增加不確定性
time.sleep(0.01)
threads = [Thread(target=worker, args=(i,)) for i in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
print(shared_list)
# 每次執行順序都不同!難以除錯!問題:
- ❌ 執行順序不確定
- ❌ 難以重現 Bug
- ❌ 測試結果不穩定
4. Python GIL 限制(Global Interpreter Lock)
import time
from threading import Thread
def cpu_task():
"""CPU 密集型任務"""
total = 0
for i in range(10000000):
total += i
return total
# 單 Thread
start = time.time()
cpu_task()
cpu_task()
print(f"單 Thread: {time.time() - start:.2f} 秒")
# 雙 Thread
start = time.time()
t1 = Thread(target=cpu_task)
t2 = Thread(target=cpu_task)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"雙 Thread: {time.time() - start:.2f} 秒")輸出:
單 Thread: 1.20 秒
雙 Thread: 1.25 秒 ← 沒有加速!原因:Python GIL
Python 解譯器同時只能執行一個 Thread!
Thread 1: [執行] [等待] [執行] [等待]
Thread 2: [等待] [執行] [等待] [執行]
💡 無法利用多核心 CPU!解決方案:使用 multiprocessing
from multiprocessing import Process
start = time.time()
p1 = Process(target=cpu_task)
p2 = Process(target=cpu_task)
p1.start()
p2.start()
p1.join()
p2.join()
print(f"雙 Process: {time.time() - start:.2f} 秒")
# 輸出:雙 Process: 0.65 秒 ← 快一倍!5. 一個崩潰全部崩潰
from threading import Thread
import time
def good_worker():
for i in range(5):
print(f"Good worker: {i}")
time.sleep(1)
def bad_worker():
time.sleep(2)
raise Exception("💥 崩潰!") # Thread 崩潰
t1 = Thread(target=good_worker)
t2 = Thread(target=bad_worker)
t1.start()
t2.start()
# 輸出:
# Good worker: 0
# Good worker: 1
# Exception in thread Thread-2:
# ... 整個程式崩潰!Process 的錯誤隔離:
from multiprocessing import Process
p1 = Process(target=good_worker)
p2 = Process(target=bad_worker)
p1.start()
p2.start()
# 輸出:
# Good worker: 0
# Good worker: 1
# Good worker: 2
# ... ✅ p1 繼續執行,不受影響!📊 優缺點總結
優點
| 優點 | 說明 | 適用場景 |
|---|---|---|
| ✅ 輕量級 | 創建快、占用少 | 需要大量並發單位 |
| ✅ 快速切換 | Context Switch 成本低 | 頻繁切換的任務 |
| ✅ 共享記憶體 | 通訊簡單、無需序列化 | 需要共享資料 |
| ✅ 資源共享 | 節省記憶體 | 多個任務存取同一資源 |
| ✅ I/O 並發 | 適合 I/O 等待 | 網路請求、檔案操作 |
缺點
| 缺點 | 說明 | 解決方案 |
|---|---|---|
| ❌ Race Condition | 多個 Thread 競爭資源 | 使用 Lock/Semaphore |
| ❌ Deadlock | 循環等待鎖 | 固定鎖順序、使用 timeout |
| ❌ 除錯困難 | 執行順序不確定 | 日誌記錄、單元測試 |
| ❌ GIL 限制 | Python 無法利用多核心 | 使用 multiprocessing |
| ❌ 錯誤傳播 | 一個崩潰全部崩潰 | 異常處理、使用 Process |
🎯 何時使用 Thread?
✅ 適合使用 Thread
# 1. I/O 密集型任務
def download_files(urls):
threads = []
for url in urls:
t = Thread(target=download, args=(url,))
t.start()
threads.append(t)
for t in threads:
t.join()
# 2. 需要共享大量資料
shared_data = load_huge_database()
def query_worker(query):
# 直接存取 shared_data
result = shared_data.query(query)
return result
# 3. GUI 應用程式
def handle_button_click():
# 在背景 Thread 執行長時間操作
t = Thread(target=long_running_task)
t.start()
# GUI 保持響應❌ 不適合使用 Thread
# 1. CPU 密集型任務(Python)
# ❌ Thread(受 GIL 限制)
threads = [Thread(target=cpu_task) for _ in range(4)]
# ✅ 使用 Process
processes = [Process(target=cpu_task) for _ in range(4)]
# 2. 需要錯誤隔離
# ❌ Thread(一個崩潰全部崩潰)
threads = [Thread(target=risky_task) for _ in range(10)]
# ✅ 使用 Process
processes = [Process(target=risky_task) for _ in range(10)]
# 3. 需要獨立狀態
# ❌ Thread(共享全域變數)
# ✅ 使用 Process(獨立記憶體)✅ 重點回顧
Thread 的優點:
- ✅ 創建成本低(比 Process 快 80+ 倍)
- ✅ Context Switch 快(快 10-100 倍)
- ✅ 共享記憶體,通訊簡單
- ✅ 資源共享效率高
- ✅ 適合 I/O 密集型任務
Thread 的缺點:
- ❌ Race Condition(需要加鎖)
- ❌ Deadlock(死鎖風險)
- ❌ 除錯困難(不確定性)
- ❌ Python GIL 限制(無法利用多核心)
- ❌ 一個崩潰全部崩潰
選擇原則:
- I/O 密集 → Thread
- CPU 密集 → Process
- 需要錯誤隔離 → Process
- 需要共享資料 → Thread
上一篇: 02-1. Thread 是什麼 下一篇: 02-3. Thread 的生命週期
最後更新:2025-01-06