05-2. threading 模組完整指南

⏱️ 閱讀時間: 15 分鐘 🎯 難度: ⭐⭐ (簡單)


🎯 本篇重點

完整掌握 Python threading 模組,包含 Thread 創建、同步機制、Thread Pool、最佳實踐等核心功能。


📚 threading 模組概覽

threading 是 Python 標準庫中用於創建和管理多執行緒的模組,適合 I/O 密集型任務。

from threading import Thread, Lock, Semaphore, Event, Condition
from threading import current_thread, enumerate, active_count

1️⃣ 創建 Thread

方法 1:函式作為 target

from threading import Thread
import time

def worker(name, sleep_time):
    """Worker 函式"""
    print(f"[{name}] 開始工作...")
    time.sleep(sleep_time)
    print(f"[{name}] 完成工作")

# 創建 Thread
t1 = Thread(target=worker, args=('Worker-1', 2))
t2 = Thread(target=worker, args=('Worker-2', 3))

# 啟動
t1.start()
t2.start()

# 等待完成
t1.join()
t2.join()

print("所有 Thread 完成")

方法 2:繼承 Thread 類別

from threading import Thread
import time

class WorkerThread(Thread):
    def __init__(self, name, task_id):
        super().__init__()
        self.task_name = name
        self.task_id = task_id

    def run(self):
        """Thread 執行的主要邏輯"""
        print(f"[{self.task_name}] 任務 {self.task_id} 開始")
        time.sleep(2)
        result = self.process_task()
        print(f"[{self.task_name}] 結果: {result}")

    def process_task(self):
        """任務處理邏輯"""
        return sum(range(self.task_id * 100000))

# 創建並啟動
workers = [WorkerThread(f'Worker-{i}', i) for i in range(3)]
for w in workers:
    w.start()
for w in workers:
    w.join()

Thread 基本屬性與方法

from threading import Thread, current_thread
import time

def worker():
    # 獲取當前 Thread
    thread = current_thread()
    print(f"Thread 名稱: {thread.name}")
    print(f"Thread ID: {thread.ident}")
    print(f"Is daemon: {thread.daemon}")
    time.sleep(1)

# 創建 Thread
t = Thread(target=worker, name='MyWorker')

# Thread 屬性
print(f"名稱: {t.name}")           # MyWorker
print(f"存活: {t.is_alive()}")     # False
print(f"Daemon: {t.daemon}")       # False

# 啟動
t.start()
print(f"存活: {t.is_alive()}")     # True

# 等待
t.join(timeout=2)  # 最多等 2 秒
print(f"存活: {t.is_alive()}")     # False

2️⃣ Daemon Thread(守護線程)

基本概念

from threading import Thread
import time

def background_task():
    """背景任務"""
    count = 0
    while True:
        print(f"背景任務運行中... {count}")
        count += 1
        time.sleep(1)

# 普通 Thread(會阻止程式結束)
# t = Thread(target=background_task)

# Daemon Thread(不會阻止程式結束)
t = Thread(target=background_task, daemon=True)
t.start()

time.sleep(3)
print("主 Thread 結束")
# Daemon Thread 會自動終止

輸出:

背景任務運行中... 0
背景任務運行中... 1
背景任務運行中... 2
主 Thread 結束

Daemon Thread 使用場景

from threading import Thread
import time

def log_monitor():
    """日誌監控(適合 Daemon)"""
    while True:
        print("監控日誌...")
        time.sleep(5)

def heartbeat():
    """心跳檢測(適合 Daemon)"""
    while True:
        print("♥ 心跳")
        time.sleep(10)

def important_task():
    """重要任務(不適合 Daemon)"""
    print("處理重要資料...")
    time.sleep(3)
    print("資料處理完成,儲存中...")

# Daemon Thread:背景服務
Thread(target=log_monitor, daemon=True).start()
Thread(target=heartbeat, daemon=True).start()

# 普通 Thread:重要任務
t = Thread(target=important_task)
t.start()
t.join()  # 確保完成

3️⃣ 同步機制

Lock(鎖)

from threading import Thread, Lock

# 沒有 Lock 的問題
counter = 0

def increment_unsafe():
    global counter
    for _ in range(100000):
        counter += 1  # Race Condition

t1 = Thread(target=increment_unsafe)
t2 = Thread(target=increment_unsafe)
t1.start(); t2.start()
t1.join(); t2.join()
print(f"沒有 Lock: {counter}")  # 可能 150000(錯誤)

# 使用 Lock 解決
counter = 0
lock = Lock()

def increment_safe():
    global counter
    for _ in range(100000):
        lock.acquire()
        try:
            counter += 1
        finally:
            lock.release()

# 或使用 with(推薦)
def increment_safe_with():
    global counter
    for _ in range(100000):
        with lock:
            counter += 1

t1 = Thread(target=increment_safe_with)
t2 = Thread(target=increment_safe_with)
t1.start(); t2.start()
t1.join(); t2.join()
print(f"有 Lock: {counter}")  # 200000(正確)

RLock(可重入鎖)

from threading import Thread, Lock, RLock

# Lock 不能重複 acquire
lock = Lock()
lock.acquire()
# lock.acquire()  # 會死鎖!

# RLock 可以重複 acquire
rlock = RLock()
rlock.acquire()
rlock.acquire()  # OK
rlock.release()
rlock.release()

# 使用場景:遞迴函式
class Account:
    def __init__(self):
        self.balance = 0
        self.lock = RLock()

    def deposit(self, amount):
        with self.lock:
            self.balance += amount
            print(f"存入 {amount},餘額 {self.balance}")

    def withdraw(self, amount):
        with self.lock:
            if self.balance >= amount:
                self.balance -= amount
                print(f"提款 {amount},餘額 {self.balance}")

    def transfer(self, other, amount):
        """轉帳需要鎖定兩個帳戶"""
        with self.lock:  # 鎖定自己
            self.withdraw(amount)  # 再次 acquire(RLock 可以)
            other.deposit(amount)

account1 = Account()
account2 = Account()
account1.balance = 1000

account1.transfer(account2, 500)

Semaphore(信號量)

from threading import Thread, Semaphore
import time

# 限制同時執行的 Thread 數量
semaphore = Semaphore(3)  # 最多 3 個同時執行

def worker(worker_id):
    print(f"Worker {worker_id} 等待...")
    semaphore.acquire()
    try:
        print(f"Worker {worker_id} 開始工作")
        time.sleep(2)
        print(f"Worker {worker_id} 完成")
    finally:
        semaphore.release()

# 創建 10 個 Thread,但最多 3 個同時執行
threads = [Thread(target=worker, args=(i,)) for i in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()

輸出:

Worker 0 等待...
Worker 0 開始工作
Worker 1 等待...
Worker 1 開始工作
Worker 2 等待...
Worker 2 開始工作
Worker 3 等待...  ← 等待中
...(每次最多 3 個同時工作)

Event(事件)

from threading import Thread, Event
import time

# Event 用於 Thread 間的信號傳遞
event = Event()

def waiter():
    """等待事件"""
    print("等待事件...")
    event.wait()  # 阻塞直到事件被設置
    print("事件已觸發,繼續執行")

def setter():
    """設置事件"""
    print("準備觸發事件...")
    time.sleep(3)
    print("觸發事件!")
    event.set()  # 設置事件

t1 = Thread(target=waiter)
t2 = Thread(target=setter)

t1.start()
t2.start()

t1.join()
t2.join()

應用場景:等待初始化完成

from threading import Thread, Event
import time

ready_event = Event()

def init_service():
    """初始化服務"""
    print("正在初始化服務...")
    time.sleep(5)  # 模擬初始化
    print("服務初始化完成")
    ready_event.set()  # 通知已就緒

def use_service():
    """使用服務"""
    print("等待服務就緒...")
    ready_event.wait()  # 等待初始化
    print("開始使用服務")

Thread(target=init_service).start()
Thread(target=use_service).start()
Thread(target=use_service).start()

Condition(條件變數)

from threading import Thread, Condition
import time

condition = Condition()
items = []

def producer():
    """生產者"""
    for i in range(5):
        time.sleep(1)
        with condition:
            items.append(i)
            print(f"生產: {i}")
            condition.notify()  # 通知消費者

def consumer():
    """消費者"""
    for _ in range(5):
        with condition:
            while len(items) == 0:
                print("等待商品...")
                condition.wait()  # 等待通知
            item = items.pop(0)
            print(f"消費: {item}")

t1 = Thread(target=producer)
t2 = Thread(target=consumer)

t1.start()
t2.start()

t1.join()
t2.join()

4️⃣ Thread Local Storage

問題:全域變數在 Thread 間共享

from threading import Thread
import time

user_id = None  # 全域變數

def process_request(uid):
    global user_id
    user_id = uid
    print(f"處理用戶 {user_id}")
    time.sleep(1)
    print(f"完成用戶 {user_id}")  # 可能被其他 Thread 修改

# 兩個 Thread 會互相干擾
Thread(target=process_request, args=(1,)).start()
Thread(target=process_request, args=(2,)).start()

解決:Thread Local

from threading import Thread, local
import time

# 使用 local() 創建 Thread Local Storage
thread_local = local()

def process_request(uid):
    # 每個 Thread 有自己的 user_id
    thread_local.user_id = uid
    print(f"處理用戶 {thread_local.user_id}")
    time.sleep(1)
    print(f"完成用戶 {thread_local.user_id}")  # 不會被干擾

# 每個 Thread 的 thread_local.user_id 是獨立的
Thread(target=process_request, args=(1,)).start()
Thread(target=process_request, args=(2,)).start()

輸出:

處理用戶 1
處理用戶 2
完成用戶 1
完成用戶 2

5️⃣ Thread Pool(執行緒池)

使用 concurrent.futures

from concurrent.futures import ThreadPoolExecutor
import time

def task(n):
    """任務函式"""
    print(f"任務 {n} 開始")
    time.sleep(2)
    return n * n

# 創建 Thread Pool
with ThreadPoolExecutor(max_workers=3) as executor:
    # 提交任務
    futures = [executor.submit(task, i) for i in range(10)]

    # 獲取結果
    for future in futures:
        result = future.result()
        print(f"結果: {result}")

map() 批量執行

from concurrent.futures import ThreadPoolExecutor
import requests

def fetch_url(url):
    """下載網頁"""
    response = requests.get(url)
    return len(response.content)

urls = [
    'https://www.python.org',
    'https://www.github.com',
    'https://www.stackoverflow.com',
]

# 使用 map 批量執行
with ThreadPoolExecutor(max_workers=5) as executor:
    results = executor.map(fetch_url, urls)
    for url, size in zip(urls, results):
        print(f"{url}: {size} bytes")

6️⃣ Thread 管理函式

獲取 Thread 資訊

from threading import Thread, current_thread, enumerate, active_count
import time

def worker(name):
    thread = current_thread()
    print(f"[{thread.name}] 工作中...")
    time.sleep(2)

# 創建多個 Thread
threads = [Thread(target=worker, args=(f'Worker-{i}',), name=f'T{i}') for i in range(3)]
for t in threads:
    t.start()

# 獲取當前 Thread
print(f"主 Thread: {current_thread().name}")

# 獲取所有活動的 Thread
print(f"\n活動的 Thread 數量: {active_count()}")
for t in enumerate():
    print(f"  - {t.name} (alive: {t.is_alive()})")

# 等待完成
for t in threads:
    t.join()

print(f"\n完成後活動 Thread: {active_count()}")

7️⃣ 實戰案例

案例 1:多執行緒下載器

from threading import Thread, Lock
import requests
import time

class Downloader:
    def __init__(self, max_workers=5):
        self.max_workers = max_workers
        self.lock = Lock()
        self.completed = 0
        self.failed = 0

    def download(self, url):
        """下載單個檔案"""
        try:
            response = requests.get(url, timeout=10)
            filename = url.split('/')[-1]
            with open(f'downloads/{filename}', 'wb') as f:
                f.write(response.content)

            with self.lock:
                self.completed += 1
                print(f"✓ 下載完成: {filename} ({self.completed} 個)")

        except Exception as e:
            with self.lock:
                self.failed += 1
                print(f"✗ 下載失敗: {url}, {e}")

    def download_all(self, urls):
        """批量下載"""
        threads = []
        for url in urls:
            t = Thread(target=self.download, args=(url,))
            t.start()
            threads.append(t)

            # 控制並發數
            if len(threads) >= self.max_workers:
                threads[0].join()
                threads.pop(0)

        # 等待剩餘 Thread
        for t in threads:
            t.join()

        print(f"\n完成: {self.completed}, 失敗: {self.failed}")

# 使用
downloader = Downloader(max_workers=5)
urls = [f'https://httpbin.org/image/png' for _ in range(10)]
downloader.download_all(urls)

案例 2:生產者-消費者模式

from threading import Thread, Lock
from queue import Queue
import time
import random

def producer(queue, producer_id):
    """生產者"""
    for i in range(5):
        item = f"P{producer_id}-Item{i}"
        queue.put(item)
        print(f"生產者 {producer_id} 生產: {item}")
        time.sleep(random.uniform(0.1, 0.5))

    queue.put(None)  # 結束信號

def consumer(queue, consumer_id):
    """消費者"""
    while True:
        item = queue.get()
        if item is None:
            queue.put(None)  # 傳遞結束信號給其他消費者
            break

        print(f"消費者 {consumer_id} 消費: {item}")
        time.sleep(random.uniform(0.2, 0.8))
        queue.task_done()

# 創建 Queue
queue = Queue()

# 創建生產者和消費者
producers = [Thread(target=producer, args=(queue, i)) for i in range(3)]
consumers = [Thread(target=consumer, args=(queue, i)) for i in range(2)]

# 啟動
for p in producers:
    p.start()
for c in consumers:
    c.start()

# 等待完成
for p in producers:
    p.join()
for c in consumers:
    c.join()

print("所有任務完成")

案例 3:多執行緒 Web 爬蟲

from threading import Thread, Lock
from queue import Queue
import requests
from bs4 import BeautifulSoup

class WebCrawler:
    def __init__(self, num_workers=5):
        self.url_queue = Queue()
        self.visited = set()
        self.visited_lock = Lock()
        self.num_workers = num_workers

    def crawl(self, url):
        """爬取單個頁面"""
        try:
            response = requests.get(url, timeout=5)
            soup = BeautifulSoup(response.content, 'html.parser')

            # 提取標題
            title = soup.find('title')
            print(f"爬取: {url}")
            print(f"標題: {title.text if title else 'N/A'}")

            # 提取連結
            for link in soup.find_all('a', href=True):
                href = link['href']
                if href.startswith('http'):
                    with self.visited_lock:
                        if href not in self.visited:
                            self.visited.add(href)
                            self.url_queue.put(href)

        except Exception as e:
            print(f"錯誤: {url}, {e}")

    def worker(self):
        """Worker Thread"""
        while True:
            url = self.url_queue.get()
            if url is None:
                break
            self.crawl(url)
            self.url_queue.task_done()

    def start(self, seed_urls):
        """開始爬取"""
        # 添加種子 URL
        for url in seed_urls:
            self.url_queue.put(url)
            self.visited.add(url)

        # 創建 Worker
        workers = []
        for _ in range(self.num_workers):
            t = Thread(target=self.worker)
            t.start()
            workers.append(t)

        # 等待隊列清空
        self.url_queue.join()

        # 停止 Worker
        for _ in range(self.num_workers):
            self.url_queue.put(None)
        for w in workers:
            w.join()

        print(f"爬取完成,共 {len(self.visited)} 個頁面")

# 使用
crawler = WebCrawler(num_workers=5)
crawler.start(['https://www.python.org'])

8️⃣ 最佳實踐

1. 避免共享狀態

# ❌ 錯誤:共享可變狀態
results = []

def worker(data):
    results.append(process(data))  # Race Condition

# ✅ 正確:使用 Queue 傳遞結果
from queue import Queue

result_queue = Queue()

def worker(data):
    result = process(data)
    result_queue.put(result)

2. 使用 with 管理 Lock

# ❌ 錯誤:可能忘記釋放
lock.acquire()
data.append(item)
lock.release()

# ✅ 正確:自動釋放
with lock:
    data.append(item)

3. 設置 Thread 超時

# ✅ 避免無限等待
t = Thread(target=long_task)
t.start()
t.join(timeout=10)  # 最多等 10 秒

if t.is_alive():
    print("Thread 超時仍在執行")

4. 妥善處理異常

from threading import Thread

def safe_worker(data):
    try:
        result = risky_operation(data)
        return result
    except Exception as e:
        print(f"錯誤: {e}")
        return None

t = Thread(target=safe_worker, args=(data,))
t.start()
t.join()

✅ 重點回顧

Thread 創建:

  • Thread(target=func) - 函式作為 target
  • 繼承 Thread 類別 - 面向物件
  • daemon=True - 守護線程

同步機制:

  • Lock - 基本鎖
  • RLock - 可重入鎖
  • Semaphore - 信號量
  • Event - 事件信號
  • Condition - 條件變數

Thread Pool:

  • ThreadPoolExecutor - 標準 Thread Pool
  • submit() - 提交單一任務
  • map() - 批量執行

關鍵觀念:

  • ✅ 適合 I/O 密集型任務
  • ✅ GIL 限制 CPU 並行
  • ✅ 使用 Lock 保護共享資源
  • ✅ 避免死鎖(Lock 順序一致)

上一篇: 05-1. Python GIL 深度解析 下一篇: 05-3. multiprocessing 模組完整指南


最後更新:2025-01-06

0%