04-3. Deadlock(死鎖)

⏱️ 閱讀時間: 12 分鐘 🎯 難度: ⭐⭐⭐ (中等)


🎯 本篇重點

深入理解 Deadlock 的成因、必要條件、檢測與預防方法,學習如何避免這個可能導致程式完全卡死的嚴重問題。


🤔 什麼是 Deadlock?

Deadlock(死鎖) = 多個 Thread 互相等待對方釋放資源,導致所有 Thread 永遠阻塞

一句話解釋: 當兩個或多個 Thread 互相等待對方持有的資源,而誰都無法繼續執行,就發生了 Deadlock。


🚗 用十字路口來比喻

經典的十字路口 Deadlock

        ↑ 車 C
        │
車 B ←──┼──→ 車 D
        │
        ↓ 車 A

狀況:
- 車 A 等車 B 離開
- 車 B 等車 C 離開
- 車 C 等車 D 離開
- 車 D 等車 A 離開

結果:所有車都動不了!這就是 Deadlock

💻 經典 Deadlock 案例

案例 1:哲學家就餐問題

from threading import Thread, Lock
import time

# 5 支筷子(Lock)
chopsticks = [Lock() for _ in range(5)]

def philosopher(philosopher_id):
    """哲學家進餐"""
    left = philosopher_id
    right = (philosopher_id + 1) % 5

    print(f"哲學家 {philosopher_id} 開始思考")
    time.sleep(1)

    print(f"哲學家 {philosopher_id} 嘗試拿起左筷子 {left}")
    chopsticks[left].acquire()
    print(f"哲學家 {philosopher_id} 拿起了左筷子 {left}")

    time.sleep(0.1)  # 製造 Deadlock 的機會

    print(f"哲學家 {philosopher_id} 嘗試拿起右筷子 {right}")
    chopsticks[right].acquire()  # ← 可能永遠等待
    print(f"哲學家 {philosopher_id} 拿起了右筷子 {right}")

    print(f"哲學家 {philosopher_id} 開始進餐")
    time.sleep(2)

    chopsticks[left].release()
    chopsticks[right].release()
    print(f"哲學家 {philosopher_id} 進餐完畢")

# 創建 5 個哲學家
philosophers = [Thread(target=philosopher, args=(i,)) for i in range(5)]

for p in philosophers:
    p.start()

for p in philosophers:
    p.join()  # ← 可能永遠等待(Deadlock)

可能的輸出(Deadlock):

哲學家 0 開始思考
哲學家 1 開始思考
...
哲學家 0 拿起了左筷子 0
哲學家 1 拿起了左筷子 1
哲學家 2 拿起了左筷子 2
哲學家 3 拿起了左筷子 3
哲學家 4 拿起了左筷子 4
哲學家 0 嘗試拿起右筷子 1  ← 被哲學家 1 持有
哲學家 1 嘗試拿起右筷子 2  ← 被哲學家 2 持有
...(所有人互相等待,程式卡死)

案例 2:銀行轉帳 Deadlock

from threading import Thread, Lock
import time

class Account:
    def __init__(self, account_id, balance):
        self.account_id = account_id
        self.balance = balance
        self.lock = Lock()

def transfer_unsafe(from_account, to_account, amount):
    """不安全的轉帳(可能 Deadlock)"""
    print(f"轉帳: {from_account.account_id}{to_account.account_id}, {amount}")

    # 獲取源帳戶鎖
    from_account.lock.acquire()
    print(f"已鎖定帳戶 {from_account.account_id}")
    time.sleep(0.1)  # 模擬處理時間

    # 獲取目標帳戶鎖
    to_account.lock.acquire()
    print(f"已鎖定帳戶 {to_account.account_id}")

    # 執行轉帳
    from_account.balance -= amount
    to_account.balance += amount

    # 釋放鎖
    to_account.lock.release()
    from_account.lock.release()

# 創建兩個帳戶
account_a = Account('A', 1000)
account_b = Account('B', 1000)

# Thread 1: A → B
# Thread 2: B → A(同時進行)
t1 = Thread(target=transfer_unsafe, args=(account_a, account_b, 100))
t2 = Thread(target=transfer_unsafe, args=(account_b, account_a, 200))

t1.start()
t2.start()

t1.join()  # ← 可能永遠等待
t2.join()

Deadlock 發生過程:

時間 T0:
- Thread 1: 鎖定帳戶 A
- Thread 2: 鎖定帳戶 B

時間 T1:
- Thread 1: 等待帳戶 B 的鎖(被 Thread 2 持有)
- Thread 2: 等待帳戶 A 的鎖(被 Thread 1 持有)

結果:Deadlock!

📋 Deadlock 的四個必要條件

Deadlock 發生必須同時滿足以下四個條件(Coffman 條件):

1. Mutual Exclusion(互斥)

定義: 資源一次只能被一個 Thread 使用

lock = Lock()

# 同一時間只有一個 Thread 可以獲得鎖
with lock:
    # 臨界區
    pass

2. Hold and Wait(持有並等待)

定義: Thread 持有至少一個資源,同時等待其他資源

lock_a.acquire()  # 持有 Lock A
# ...
lock_b.acquire()  # 等待 Lock B

3. No Preemption(不可搶占)

定義: 資源不能被強制從 Thread 中奪走,只能主動釋放

# 無法強制其他 Thread 釋放 Lock
lock.acquire()  # 只能等待

4. Circular Wait(循環等待)

定義: 存在一個 Thread 鏈,每個 Thread 都在等待下一個 Thread 持有的資源

Thread A → 等待 Lock 1 → 被 Thread B 持有
Thread B → 等待 Lock 2 → 被 Thread C 持有
Thread C → 等待 Lock 3 → 被 Thread A 持有

形成循環!

🛡️ 預防 Deadlock

策略 1:Lock 順序(破壞循環等待)

from threading import Thread, Lock
import time

class Account:
    def __init__(self, account_id, balance):
        self.account_id = account_id
        self.balance = balance
        self.lock = Lock()

def transfer_safe(from_account, to_account, amount):
    """安全的轉帳(按 ID 順序鎖定)"""
    # 總是按帳戶 ID 順序鎖定
    first_lock = from_account if from_account.account_id < to_account.account_id else to_account
    second_lock = to_account if from_account.account_id < to_account.account_id else from_account

    with first_lock.lock:
        time.sleep(0.1)
        with second_lock.lock:
            # 執行轉帳
            from_account.balance -= amount
            to_account.balance += amount
            print(f"轉帳完成: {from_account.account_id}{to_account.account_id}, {amount}")

# 測試
account_a = Account('A', 1000)
account_b = Account('B', 1000)

t1 = Thread(target=transfer_safe, args=(account_a, account_b, 100))
t2 = Thread(target=transfer_safe, args=(account_b, account_a, 200))

t1.start(); t2.start()
t1.join(); t2.join()

print(f"帳戶 A: {account_a.balance}")
print(f"帳戶 B: {account_b.balance}")

為什麼安全?

Thread 1: A → B
- 先鎖 A(ID 較小)
- 再鎖 B

Thread 2: B → A
- 先鎖 A(ID 較小)← 等待 Thread 1
- 再鎖 B

沒有循環等待,不會 Deadlock!

策略 2:哲學家就餐的解決方案

方案 A:限制並發數量

from threading import Thread, Lock, Semaphore
import time

chopsticks = [Lock() for _ in range(5)]
# 最多 4 個哲學家同時進餐
max_diners = Semaphore(4)

def philosopher_safe_v1(philosopher_id):
    left = philosopher_id
    right = (philosopher_id + 1) % 5

    with max_diners:  # 限制並發
        chopsticks[left].acquire()
        chopsticks[right].acquire()

        print(f"哲學家 {philosopher_id} 正在進餐")
        time.sleep(2)

        chopsticks[right].release()
        chopsticks[left].release()

方案 B:奇偶策略

def philosopher_safe_v2(philosopher_id):
    left = philosopher_id
    right = (philosopher_id + 1) % 5

    # 奇數哲學家先拿右筷子
    if philosopher_id % 2 == 1:
        left, right = right, left

    chopsticks[left].acquire()
    chopsticks[right].acquire()

    print(f"哲學家 {philosopher_id} 正在進餐")
    time.sleep(2)

    chopsticks[right].release()
    chopsticks[left].release()

策略 3:超時機制(破壞持有並等待)

from threading import Lock
import time

def try_acquire_locks(lock_a, lock_b, timeout=1):
    """嘗試獲取鎖,超時則釋放已持有的鎖"""
    start_time = time.time()

    # 獲取第一個鎖
    if not lock_a.acquire(timeout=timeout):
        return False

    # 嘗試獲取第二個鎖
    remaining_time = timeout - (time.time() - start_time)
    if not lock_b.acquire(timeout=max(0, remaining_time)):
        # 獲取失敗,釋放第一個鎖
        lock_a.release()
        return False

    return True

def transfer_with_timeout(from_account, to_account, amount):
    """帶超時的轉帳"""
    max_retries = 3

    for attempt in range(max_retries):
        if try_acquire_locks(from_account.lock, to_account.lock, timeout=1):
            try:
                # 執行轉帳
                from_account.balance -= amount
                to_account.balance += amount
                print(f"轉帳成功(第 {attempt + 1} 次嘗試)")
                return True
            finally:
                to_account.lock.release()
                from_account.lock.release()
        else:
            print(f"獲取鎖失敗,重試... ({attempt + 1}/{max_retries})")
            time.sleep(0.1)  # 短暫等待後重試

    print("轉帳失敗:超過最大重試次數")
    return False

策略 4:一次性獲取所有資源

from threading import Lock

class GlobalLock:
    """全域鎖,用於協調多個資源的獲取"""
    def __init__(self):
        self.lock = Lock()

    def acquire_all(self, *locks):
        """一次性獲取所有鎖"""
        with self.lock:
            for lock in locks:
                lock.acquire()

    def release_all(self, *locks):
        """釋放所有鎖"""
        for lock in locks:
            lock.release()

global_lock = GlobalLock()

def transfer_atomic(from_account, to_account, amount):
    """原子性轉帳"""
    global_lock.acquire_all(from_account.lock, to_account.lock)
    try:
        from_account.balance -= amount
        to_account.balance += amount
        print("轉帳完成")
    finally:
        global_lock.release_all(from_account.lock, to_account.lock)

🔍 檢測 Deadlock

方法 1:Thread Dump

import threading
import sys

def print_thread_dump():
    """打印所有 Thread 的狀態"""
    print("\n=== Thread Dump ===")
    for thread in threading.enumerate():
        print(f"Thread: {thread.name}")
        print(f"  Alive: {thread.is_alive()}")
        print(f"  Daemon: {thread.daemon}")
        print(f"  Ident: {thread.ident}")
    print("==================\n")

# 定期檢查
import time
while True:
    time.sleep(10)
    print_thread_dump()

方法 2:超時檢測

from threading import Thread
import time

def worker_with_deadlock():
    # 可能會 Deadlock 的程式碼
    pass

# 使用 timeout 檢測
t = Thread(target=worker_with_deadlock)
t.start()

t.join(timeout=5)  # 最多等 5 秒

if t.is_alive():
    print("⚠️ 可能發生 Deadlock!Thread 仍在執行")
    # 無法強制終止 Python Thread,需要重新設計
else:
    print("✅ Thread 正常結束")

方法 3:資源分配圖

class DeadlockDetector:
    """簡單的 Deadlock 檢測器"""
    def __init__(self):
        self.wait_graph = {}  # Thread → 等待的資源
        self.hold_graph = {}  # Thread → 持有的資源

    def add_wait(self, thread_id, resource_id):
        """Thread 等待資源"""
        if thread_id not in self.wait_graph:
            self.wait_graph[thread_id] = []
        self.wait_graph[thread_id].append(resource_id)
        self.check_cycle()

    def add_hold(self, thread_id, resource_id):
        """Thread 持有資源"""
        if thread_id not in self.hold_graph:
            self.hold_graph[thread_id] = []
        self.hold_graph[thread_id].append(resource_id)

    def check_cycle(self):
        """檢測循環等待"""
        # 簡化版:檢測是否有循環
        visited = set()

        def dfs(thread_id, path):
            if thread_id in path:
                print(f"⚠️ 檢測到 Deadlock: {path + [thread_id]}")
                return True

            if thread_id in visited:
                return False

            visited.add(thread_id)
            path.append(thread_id)

            # 找到這個 Thread 等待的資源
            if thread_id in self.wait_graph:
                for resource in self.wait_graph[thread_id]:
                    # 找到持有這個資源的 Thread
                    for holder in self.hold_graph:
                        if resource in self.hold_graph[holder]:
                            if dfs(holder, path):
                                return True

            path.pop()
            return False

        for thread in self.wait_graph:
            if dfs(thread, []):
                return True

        return False

🎯 Deadlock 處理策略總結

預防(Prevention)

破壞四個必要條件之一:

條件破壞方法
互斥使用無鎖資料結構(不實際)
持有並等待一次性獲取所有資源
不可搶占使用超時,失敗則釋放
循環等待資源排序,按順序獲取

避免(Avoidance)

# 銀行家算法(Banker's Algorithm)
# 在分配資源前檢查是否會導致 Deadlock
def is_safe_state(available, allocation, need):
    """檢查是否為安全狀態"""
    # 實作銀行家算法
    pass

檢測與恢復(Detection & Recovery)

# 1. 定期檢測 Deadlock
# 2. 發現後終止部分 Thread 或回滾

def detect_and_recover():
    if deadlock_detected():
        # 選擇犧牲者(Victim Selection)
        victim_thread = select_victim()
        # 終止或回滾
        terminate_thread(victim_thread)

✅ 重點回顧

Deadlock 定義:

  • 多個 Thread 互相等待資源
  • 導致所有 Thread 永遠阻塞
  • 程式完全卡死

四個必要條件(Coffman):

  1. 互斥 - 資源獨佔
  2. 持有並等待 - 持有資源同時等待
  3. 不可搶占 - 無法強制奪取
  4. 循環等待 - 形成等待循環

預防策略:

  • Lock 順序 - 總是按相同順序獲取鎖
  • 限制並發 - 減少競爭
  • 超時機制 - 失敗則釋放並重試
  • 一次性獲取 - 獲取所有資源或都不獲取

檢測方法:

  • ✅ Thread Dump
  • ✅ 超時檢測
  • ✅ 資源分配圖分析

經典問題:

  • 🍴 哲學家就餐問題
  • 💰 銀行轉帳問題
  • 🚗 十字路口問題

關鍵理解:

  • ⚠️ Deadlock 很難除錯(程式卡死)
  • ⚠️ 必須在設計階段預防
  • ⚠️ 一旦發生,通常只能重啟
  • ⚠️ Lock 順序是最簡單有效的預防方法

上一篇: 04-2. Race Condition(競態條件) 下一篇: 04-4. Lock / Mutex / Semaphore


最後更新:2025-01-06

0%