02-4. Thread 同步機制

⏱️ 閱讀時間: 15 分鐘 🎯 難度: ⭐⭐⭐ (中等)


🤔 一句話解釋

Thread 同步機制用於協調多個 Thread 的執行順序,避免 Race Condition 和資料不一致問題。


🚦 為什麼需要同步機制?

問題:Race Condition

from threading import Thread

balance = 1000  # 銀行帳戶餘額

def withdraw(amount):
    global balance
    # 危險操作!分三步:
    temp = balance          # 1. 讀取
    temp = temp - amount    # 2. 計算
    balance = temp          # 3. 寫回

# 兩個 Thread 同時提款
t1 = Thread(target=withdraw, args=(500,))
t2 = Thread(target=withdraw, args=(300,))

t1.start()
t2.start()
t1.join()
t2.join()

print(f"餘額: {balance}")
# 預期:200 (1000 - 500 - 300)
# 實際:可能是 700(資料丟失!)

問題分析:

初始餘額:1000

Thread 1: READ  balance = 1000
Thread 2: READ  balance = 1000  ← 同時讀取
Thread 1: CALC  1000 - 500 = 500
Thread 2: CALC  1000 - 300 = 700
Thread 1: WRITE balance = 500
Thread 2: WRITE balance = 700   ← 覆蓋了 Thread 1 的結果

💥 應該是 200,結果是 700!

🔒 同步機制 1:Lock(互斥鎖)

定義:確保同一時間只有一個 Thread 能執行臨界區程式碼。

基本用法

from threading import Thread, Lock

balance = 1000
lock = Lock()  # 創建鎖

def withdraw(amount):
    global balance

    lock.acquire()  # 🔒 加鎖
    try:
        temp = balance
        temp = temp - amount
        balance = temp
        print(f"提款 {amount},餘額: {balance}")
    finally:
        lock.release()  # 🔓 解鎖

t1 = Thread(target=withdraw, args=(500,))
t2 = Thread(target=withdraw, args=(300,))

t1.start()
t2.start()
t1.join()
t2.join()

print(f"最終餘額: {balance}")
# 輸出:最終餘額: 200 ← 正確!

使用 with 語句(推薦)

from threading import Thread, Lock

balance = 1000
lock = Lock()

def withdraw(amount):
    global balance

    with lock:  # ✅ 自動 acquire() 和 release()
        temp = balance
        temp = temp - amount
        balance = temp
        print(f"提款 {amount},餘額: {balance}")

# 測試
threads = [Thread(target=withdraw, args=(100,)) for _ in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"最終餘額: {balance}")
# 輸出:最終餘額: 500 ← 正確!

Lock 的視覺化

Thread 1: [等待 Lock] → [獲得 Lock] → [執行臨界區] → [釋放 Lock]
                                           ↓
Thread 2: [等待 Lock] ←────────────────────┘ → [獲得 Lock] → ...
                                                    ↓
Thread 3: [等待 Lock] ←─────────────────────────────┘

關鍵:

  • 一次只能有一個 Thread 持有 Lock
  • 其他 Thread 必須等待
  • 避免了 Race Condition

🔁 同步機制 2:RLock(可重入鎖)

定義:同一個 Thread 可以多次獲取同一個 RLock,不會死鎖。

Lock 的問題

from threading import Lock

lock = Lock()

def outer():
    with lock:
        print("Outer 獲得鎖")
        inner()  # 💀 死鎖!

def inner():
    with lock:  # ❌ 同一個 Thread 再次嘗試獲取鎖
        print("Inner 獲得鎖")

outer()
# 程式卡死!

RLock 的解決方案

from threading import RLock

rlock = RLock()  # 可重入鎖

def outer():
    with rlock:
        print("Outer 獲得鎖")
        inner()  # ✅ 可以再次獲取

def inner():
    with rlock:  # ✅ 同一個 Thread 可以重入
        print("Inner 獲得鎖")

outer()

輸出:

Outer 獲得鎖
Inner 獲得鎖

RLock 的計數機制

from threading import RLock

rlock = RLock()

# 同一個 Thread 多次獲取
rlock.acquire()  # 計數:1
print("第一次獲取")

rlock.acquire()  # 計數:2
print("第二次獲取")

rlock.acquire()  # 計數:3
print("第三次獲取")

# 必須釋放相同次數
rlock.release()  # 計數:2
rlock.release()  # 計數:1
rlock.release()  # 計數:0,真正釋放

🚦 同步機制 3:Semaphore(號誌/信號量)

定義:允許固定數量的 Thread 同時訪問資源。

限制並發數量

from threading import Thread, Semaphore
import time

# 最多 3 個 Thread 同時訪問
semaphore = Semaphore(3)

def access_resource(name):
    print(f"{name} 等待進入")
    with semaphore:  # 🚦 獲取信號量
        print(f"{name} 進入資源區")
        time.sleep(2)  # 模擬使用資源
        print(f"{name} 離開資源區")

# 創建 10 個 Thread
threads = [Thread(target=access_resource, args=(f'Thread-{i}',)) for i in range(10)]

for t in threads:
    t.start()

for t in threads:
    t.join()

輸出:

Thread-0 等待進入
Thread-1 等待進入
Thread-2 等待進入
Thread-0 進入資源區    ← 前 3 個同時進入
Thread-1 進入資源區
Thread-2 進入資源區
Thread-3 等待進入      ← 後面的等待
Thread-4 等待進入
...
(2 秒後)
Thread-0 離開資源區
Thread-3 進入資源區    ← 有位置後進入

實際案例:資料庫連線池

from threading import Thread, Semaphore
import time

class ConnectionPool:
    def __init__(self, max_connections):
        self.semaphore = Semaphore(max_connections)

    def query(self, query_id):
        print(f"查詢 {query_id} 等待連線")
        with self.semaphore:
            print(f"查詢 {query_id} 獲得連線")
            time.sleep(1)  # 模擬查詢
            print(f"查詢 {query_id} 完成")

# 資料庫連線池:最多 5 個連線
pool = ConnectionPool(max_connections=5)

# 10 個查詢請求
threads = [Thread(target=pool.query, args=(i,)) for i in range(10)]

for t in threads:
    t.start()

for t in threads:
    t.join()

🔔 同步機制 4:Event(事件)

定義:一個 Thread 發送信號,其他 Thread 等待信號。

基本用法

from threading import Thread, Event
import time

# 創建事件
event = Event()

def waiter(name):
    print(f"{name} 等待信號")
    event.wait()  # ⏸️ 阻塞,直到收到信號
    print(f"{name} 收到信號,開始工作")

def signaler():
    print("Signaler: 準備中...")
    time.sleep(3)
    print("Signaler: 發送信號!")
    event.set()  # 🔔 發送信號

# 創建等待者
w1 = Thread(target=waiter, args=('Waiter-1',))
w2 = Thread(target=waiter, args=('Waiter-2',))
w3 = Thread(target=waiter, args=('Waiter-3',))

# 創建信號發送者
s = Thread(target=signaler)

# 啟動
w1.start()
w2.start()
w3.start()
s.start()

w1.join()
w2.join()
w3.join()
s.join()

輸出:

Waiter-1 等待信號
Waiter-2 等待信號
Waiter-3 等待信號
Signaler: 準備中...
(等待 3 秒...)
Signaler: 發送信號!
Waiter-1 收到信號,開始工作
Waiter-2 收到信號,開始工作
Waiter-3 收到信號,開始工作

實際案例:伺服器啟動通知

from threading import Thread, Event
import time

server_started = Event()

def start_server():
    print("伺服器啟動中...")
    time.sleep(3)  # 模擬啟動過程
    print("伺服器啟動完成!")
    server_started.set()  # 🔔 通知已啟動

def client_request(client_id):
    print(f"客戶端 {client_id} 等待伺服器啟動")
    server_started.wait()  # ⏸️ 等待伺服器啟動
    print(f"客戶端 {client_id} 發送請求")

# 啟動伺服器
server = Thread(target=start_server)
server.start()

# 創建客戶端
clients = [Thread(target=client_request, args=(i,)) for i in range(5)]
for c in clients:
    c.start()

server.join()
for c in clients:
    c.join()

Event 的方法

from threading import Event

event = Event()

# 檢查狀態
print(event.is_set())  # False

# 設置信號
event.set()
print(event.is_set())  # True

# 清除信號
event.clear()
print(event.is_set())  # False

# 等待信號(可設超時)
event.wait(timeout=5)  # 最多等 5 秒

🚧 同步機制 5:Condition(條件變數)

定義:複雜的等待/通知機制,支援多個條件。

生產者-消費者模式

from threading import Thread, Condition
import time

# 共享資源
queue = []
condition = Condition()

def producer():
    for i in range(5):
        with condition:
            queue.append(i)
            print(f"生產: {i},佇列: {queue}")
            condition.notify()  # 🔔 通知消費者
        time.sleep(1)

def consumer():
    for _ in range(5):
        with condition:
            while len(queue) == 0:
                print("消費者: 佇列為空,等待...")
                condition.wait()  # ⏸️ 等待生產者
            item = queue.pop(0)
            print(f"消費: {item},佇列: {queue}")

p = Thread(target=producer)
c = Thread(target=consumer)

c.start()
time.sleep(0.1)  # 確保消費者先啟動
p.start()

p.join()
c.join()

輸出:

消費者: 佇列為空,等待...
生產: 0,佇列: [0]
消費: 0,佇列: []
消費者: 佇列為空,等待...
生產: 1,佇列: [1]
消費: 1,佇列: []
...

Condition 的方法

from threading import Condition

condition = Condition()

# 等待
condition.wait()          # 等待通知
condition.wait(timeout=5) # 最多等 5 秒

# 通知
condition.notify()        # 通知一個等待者
condition.notify_all()    # 通知所有等待者

🏁 同步機制 6:Barrier(屏障)

定義:等待所有 Thread 到達某個點後,才一起繼續執行。

基本用法

from threading import Thread, Barrier
import time
import random

# 創建 Barrier:等待 3 個 Thread
barrier = Barrier(3)

def runner(name):
    print(f"{name} 開始準備")
    time.sleep(random.randint(1, 3))  # 準備時間不同
    print(f"{name} 準備完成,等待其他人")

    barrier.wait()  # 🏁 等待所有人到達

    print(f"{name} 開始跑!")

# 創建 3 個跑者
threads = [Thread(target=runner, args=(f'跑者-{i}',)) for i in range(3)]

for t in threads:
    t.start()

for t in threads:
    t.join()

輸出:

跑者-0 開始準備
跑者-1 開始準備
跑者-2 開始準備
跑者-1 準備完成,等待其他人
跑者-0 準備完成,等待其他人
跑者-2 準備完成,等待其他人
跑者-2 開始跑!         ← 所有人到齊後一起開始
跑者-0 開始跑!
跑者-1 開始跑!

實際案例:並行測試

from threading import Thread, Barrier
import time

# 3 個測試 Thread 同時開始
barrier = Barrier(3)

def performance_test(test_id):
    print(f"測試 {test_id} 初始化中")
    time.sleep(1)

    print(f"測試 {test_id} 準備就緒")
    barrier.wait()  # 🏁 等待所有測試準備好

    # 所有測試同時開始
    start = time.time()
    time.sleep(2)  # 模擬測試
    elapsed = time.time() - start

    print(f"測試 {test_id} 完成,耗時: {elapsed:.2f} 秒")

threads = [Thread(target=performance_test, args=(i,)) for i in range(3)]

for t in threads:
    t.start()

for t in threads:
    t.join()

📊 同步機制對比

機制用途典型場景
Lock互斥鎖,一次一個 Thread保護共享資源
RLock可重入鎖遞迴函數、嵌套調用
Semaphore限制並發數量連線池、資源限制
Event信號通知啟動通知、狀態廣播
Condition複雜等待/通知生產者-消費者
Barrier同步點並行測試、分階段任務

💡 最佳實踐

1. 始終使用 with 語句

# ❌ 不好:手動管理
lock.acquire()
try:
    # 臨界區
    pass
finally:
    lock.release()

# ✅ 好:自動管理
with lock:
    # 臨界區
    pass

2. 避免死鎖:固定鎖順序

lock1 = Lock()
lock2 = Lock()

# ✅ 所有 Thread 按相同順序獲取鎖
def thread_func():
    with lock1:  # 先鎖 lock1
        with lock2:  # 再鎖 lock2
            # 臨界區
            pass

3. 使用 timeout 避免永久阻塞

from threading import Lock

lock = Lock()

if lock.acquire(timeout=5):  # 最多等 5 秒
    try:
        # 臨界區
        pass
    finally:
        lock.release()
else:
    print("無法獲取鎖,超時")

4. 減小臨界區範圍

# ❌ 不好:臨界區過大
with lock:
    data = fetch_data()  # 慢
    result = process(data)  # 慢
    save(result)

# ✅ 好:只保護必要部分
data = fetch_data()
result = process(data)
with lock:
    save(result)  # 只鎖寫入

⚠️ 常見錯誤

1. 忘記釋放鎖

# ❌ 錯誤
lock.acquire()
if error:
    return  # 忘記 release()!
lock.release()

# ✅ 正確
with lock:
    if error:
        return  # with 會自動 release

2. 鎖的粒度過粗

# ❌ 不好:整個函數都鎖住
with lock:
    # 100 行程式碼...
    pass

# ✅ 好:只鎖必要部分
# 前處理
with lock:
    # 只鎖關鍵部分
    pass
# 後處理

3. 在 Lock 內等待另一個 Lock

# ❌ 可能死鎖
with lock1:
    with lock2:  # 危險!
        pass

# ✅ 使用固定順序或避免嵌套

✅ 重點回顧

六大同步機制:

  1. Lock - 基本互斥鎖
  2. RLock - 可重入鎖
  3. Semaphore - 限制並發數
  4. Event - 事件通知
  5. Condition - 條件變數
  6. Barrier - 同步屏障

使用原則:

  • ✅ 使用 with 語句
  • ✅ 固定鎖順序
  • ✅ 減小臨界區
  • ✅ 設置 timeout
  • ✅ 避免嵌套鎖

避免問題:

  • ❌ Race Condition
  • ❌ Deadlock
  • ❌ 忘記釋放鎖
  • ❌ 鎖粒度過粗

上一篇: 02-3. Thread 的生命週期 下一篇: 02-5. Thread Pool 與實戰應用


最後更新:2025-01-06

0%