目錄
05-2. threading 模組完整指南
⏱️ 閱讀時間: 15 分鐘 🎯 難度: ⭐⭐ (簡單)
🎯 本篇重點
完整掌握 Python threading 模組,包含 Thread 創建、同步機制、Thread Pool、最佳實踐等核心功能。
📚 threading 模組概覽
threading 是 Python 標準庫中用於創建和管理多執行緒的模組,適合 I/O 密集型任務。
from threading import Thread, Lock, Semaphore, Event, Condition
from threading import current_thread, enumerate, active_count1️⃣ 創建 Thread
方法 1:函式作為 target
from threading import Thread
import time
def worker(name, sleep_time):
"""Worker 函式"""
print(f"[{name}] 開始工作...")
time.sleep(sleep_time)
print(f"[{name}] 完成工作")
# 創建 Thread
t1 = Thread(target=worker, args=('Worker-1', 2))
t2 = Thread(target=worker, args=('Worker-2', 3))
# 啟動
t1.start()
t2.start()
# 等待完成
t1.join()
t2.join()
print("所有 Thread 完成")方法 2:繼承 Thread 類別
from threading import Thread
import time
class WorkerThread(Thread):
def __init__(self, name, task_id):
super().__init__()
self.task_name = name
self.task_id = task_id
def run(self):
"""Thread 執行的主要邏輯"""
print(f"[{self.task_name}] 任務 {self.task_id} 開始")
time.sleep(2)
result = self.process_task()
print(f"[{self.task_name}] 結果: {result}")
def process_task(self):
"""任務處理邏輯"""
return sum(range(self.task_id * 100000))
# 創建並啟動
workers = [WorkerThread(f'Worker-{i}', i) for i in range(3)]
for w in workers:
w.start()
for w in workers:
w.join()Thread 基本屬性與方法
from threading import Thread, current_thread
import time
def worker():
# 獲取當前 Thread
thread = current_thread()
print(f"Thread 名稱: {thread.name}")
print(f"Thread ID: {thread.ident}")
print(f"Is daemon: {thread.daemon}")
time.sleep(1)
# 創建 Thread
t = Thread(target=worker, name='MyWorker')
# Thread 屬性
print(f"名稱: {t.name}") # MyWorker
print(f"存活: {t.is_alive()}") # False
print(f"Daemon: {t.daemon}") # False
# 啟動
t.start()
print(f"存活: {t.is_alive()}") # True
# 等待
t.join(timeout=2) # 最多等 2 秒
print(f"存活: {t.is_alive()}") # False2️⃣ Daemon Thread(守護線程)
基本概念
from threading import Thread
import time
def background_task():
"""背景任務"""
count = 0
while True:
print(f"背景任務運行中... {count}")
count += 1
time.sleep(1)
# 普通 Thread(會阻止程式結束)
# t = Thread(target=background_task)
# Daemon Thread(不會阻止程式結束)
t = Thread(target=background_task, daemon=True)
t.start()
time.sleep(3)
print("主 Thread 結束")
# Daemon Thread 會自動終止輸出:
背景任務運行中... 0
背景任務運行中... 1
背景任務運行中... 2
主 Thread 結束Daemon Thread 使用場景
from threading import Thread
import time
def log_monitor():
"""日誌監控(適合 Daemon)"""
while True:
print("監控日誌...")
time.sleep(5)
def heartbeat():
"""心跳檢測(適合 Daemon)"""
while True:
print("♥ 心跳")
time.sleep(10)
def important_task():
"""重要任務(不適合 Daemon)"""
print("處理重要資料...")
time.sleep(3)
print("資料處理完成,儲存中...")
# Daemon Thread:背景服務
Thread(target=log_monitor, daemon=True).start()
Thread(target=heartbeat, daemon=True).start()
# 普通 Thread:重要任務
t = Thread(target=important_task)
t.start()
t.join() # 確保完成3️⃣ 同步機制
Lock(鎖)
from threading import Thread, Lock
# 沒有 Lock 的問題
counter = 0
def increment_unsafe():
global counter
for _ in range(100000):
counter += 1 # Race Condition
t1 = Thread(target=increment_unsafe)
t2 = Thread(target=increment_unsafe)
t1.start(); t2.start()
t1.join(); t2.join()
print(f"沒有 Lock: {counter}") # 可能 150000(錯誤)
# 使用 Lock 解決
counter = 0
lock = Lock()
def increment_safe():
global counter
for _ in range(100000):
lock.acquire()
try:
counter += 1
finally:
lock.release()
# 或使用 with(推薦)
def increment_safe_with():
global counter
for _ in range(100000):
with lock:
counter += 1
t1 = Thread(target=increment_safe_with)
t2 = Thread(target=increment_safe_with)
t1.start(); t2.start()
t1.join(); t2.join()
print(f"有 Lock: {counter}") # 200000(正確)RLock(可重入鎖)
from threading import Thread, Lock, RLock
# Lock 不能重複 acquire
lock = Lock()
lock.acquire()
# lock.acquire() # 會死鎖!
# RLock 可以重複 acquire
rlock = RLock()
rlock.acquire()
rlock.acquire() # OK
rlock.release()
rlock.release()
# 使用場景:遞迴函式
class Account:
def __init__(self):
self.balance = 0
self.lock = RLock()
def deposit(self, amount):
with self.lock:
self.balance += amount
print(f"存入 {amount},餘額 {self.balance}")
def withdraw(self, amount):
with self.lock:
if self.balance >= amount:
self.balance -= amount
print(f"提款 {amount},餘額 {self.balance}")
def transfer(self, other, amount):
"""轉帳需要鎖定兩個帳戶"""
with self.lock: # 鎖定自己
self.withdraw(amount) # 再次 acquire(RLock 可以)
other.deposit(amount)
account1 = Account()
account2 = Account()
account1.balance = 1000
account1.transfer(account2, 500)Semaphore(信號量)
from threading import Thread, Semaphore
import time
# 限制同時執行的 Thread 數量
semaphore = Semaphore(3) # 最多 3 個同時執行
def worker(worker_id):
print(f"Worker {worker_id} 等待...")
semaphore.acquire()
try:
print(f"Worker {worker_id} 開始工作")
time.sleep(2)
print(f"Worker {worker_id} 完成")
finally:
semaphore.release()
# 創建 10 個 Thread,但最多 3 個同時執行
threads = [Thread(target=worker, args=(i,)) for i in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()輸出:
Worker 0 等待...
Worker 0 開始工作
Worker 1 等待...
Worker 1 開始工作
Worker 2 等待...
Worker 2 開始工作
Worker 3 等待... ← 等待中
...(每次最多 3 個同時工作)Event(事件)
from threading import Thread, Event
import time
# Event 用於 Thread 間的信號傳遞
event = Event()
def waiter():
"""等待事件"""
print("等待事件...")
event.wait() # 阻塞直到事件被設置
print("事件已觸發,繼續執行")
def setter():
"""設置事件"""
print("準備觸發事件...")
time.sleep(3)
print("觸發事件!")
event.set() # 設置事件
t1 = Thread(target=waiter)
t2 = Thread(target=setter)
t1.start()
t2.start()
t1.join()
t2.join()應用場景:等待初始化完成
from threading import Thread, Event
import time
ready_event = Event()
def init_service():
"""初始化服務"""
print("正在初始化服務...")
time.sleep(5) # 模擬初始化
print("服務初始化完成")
ready_event.set() # 通知已就緒
def use_service():
"""使用服務"""
print("等待服務就緒...")
ready_event.wait() # 等待初始化
print("開始使用服務")
Thread(target=init_service).start()
Thread(target=use_service).start()
Thread(target=use_service).start()Condition(條件變數)
from threading import Thread, Condition
import time
condition = Condition()
items = []
def producer():
"""生產者"""
for i in range(5):
time.sleep(1)
with condition:
items.append(i)
print(f"生產: {i}")
condition.notify() # 通知消費者
def consumer():
"""消費者"""
for _ in range(5):
with condition:
while len(items) == 0:
print("等待商品...")
condition.wait() # 等待通知
item = items.pop(0)
print(f"消費: {item}")
t1 = Thread(target=producer)
t2 = Thread(target=consumer)
t1.start()
t2.start()
t1.join()
t2.join()4️⃣ Thread Local Storage
問題:全域變數在 Thread 間共享
from threading import Thread
import time
user_id = None # 全域變數
def process_request(uid):
global user_id
user_id = uid
print(f"處理用戶 {user_id}")
time.sleep(1)
print(f"完成用戶 {user_id}") # 可能被其他 Thread 修改
# 兩個 Thread 會互相干擾
Thread(target=process_request, args=(1,)).start()
Thread(target=process_request, args=(2,)).start()解決:Thread Local
from threading import Thread, local
import time
# 使用 local() 創建 Thread Local Storage
thread_local = local()
def process_request(uid):
# 每個 Thread 有自己的 user_id
thread_local.user_id = uid
print(f"處理用戶 {thread_local.user_id}")
time.sleep(1)
print(f"完成用戶 {thread_local.user_id}") # 不會被干擾
# 每個 Thread 的 thread_local.user_id 是獨立的
Thread(target=process_request, args=(1,)).start()
Thread(target=process_request, args=(2,)).start()輸出:
處理用戶 1
處理用戶 2
完成用戶 1
完成用戶 25️⃣ Thread Pool(執行緒池)
使用 concurrent.futures
from concurrent.futures import ThreadPoolExecutor
import time
def task(n):
"""任務函式"""
print(f"任務 {n} 開始")
time.sleep(2)
return n * n
# 創建 Thread Pool
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交任務
futures = [executor.submit(task, i) for i in range(10)]
# 獲取結果
for future in futures:
result = future.result()
print(f"結果: {result}")map() 批量執行
from concurrent.futures import ThreadPoolExecutor
import requests
def fetch_url(url):
"""下載網頁"""
response = requests.get(url)
return len(response.content)
urls = [
'https://www.python.org',
'https://www.github.com',
'https://www.stackoverflow.com',
]
# 使用 map 批量執行
with ThreadPoolExecutor(max_workers=5) as executor:
results = executor.map(fetch_url, urls)
for url, size in zip(urls, results):
print(f"{url}: {size} bytes")6️⃣ Thread 管理函式
獲取 Thread 資訊
from threading import Thread, current_thread, enumerate, active_count
import time
def worker(name):
thread = current_thread()
print(f"[{thread.name}] 工作中...")
time.sleep(2)
# 創建多個 Thread
threads = [Thread(target=worker, args=(f'Worker-{i}',), name=f'T{i}') for i in range(3)]
for t in threads:
t.start()
# 獲取當前 Thread
print(f"主 Thread: {current_thread().name}")
# 獲取所有活動的 Thread
print(f"\n活動的 Thread 數量: {active_count()}")
for t in enumerate():
print(f" - {t.name} (alive: {t.is_alive()})")
# 等待完成
for t in threads:
t.join()
print(f"\n完成後活動 Thread: {active_count()}")7️⃣ 實戰案例
案例 1:多執行緒下載器
from threading import Thread, Lock
import requests
import time
class Downloader:
def __init__(self, max_workers=5):
self.max_workers = max_workers
self.lock = Lock()
self.completed = 0
self.failed = 0
def download(self, url):
"""下載單個檔案"""
try:
response = requests.get(url, timeout=10)
filename = url.split('/')[-1]
with open(f'downloads/{filename}', 'wb') as f:
f.write(response.content)
with self.lock:
self.completed += 1
print(f"✓ 下載完成: {filename} ({self.completed} 個)")
except Exception as e:
with self.lock:
self.failed += 1
print(f"✗ 下載失敗: {url}, {e}")
def download_all(self, urls):
"""批量下載"""
threads = []
for url in urls:
t = Thread(target=self.download, args=(url,))
t.start()
threads.append(t)
# 控制並發數
if len(threads) >= self.max_workers:
threads[0].join()
threads.pop(0)
# 等待剩餘 Thread
for t in threads:
t.join()
print(f"\n完成: {self.completed}, 失敗: {self.failed}")
# 使用
downloader = Downloader(max_workers=5)
urls = [f'https://httpbin.org/image/png' for _ in range(10)]
downloader.download_all(urls)案例 2:生產者-消費者模式
from threading import Thread, Lock
from queue import Queue
import time
import random
def producer(queue, producer_id):
"""生產者"""
for i in range(5):
item = f"P{producer_id}-Item{i}"
queue.put(item)
print(f"生產者 {producer_id} 生產: {item}")
time.sleep(random.uniform(0.1, 0.5))
queue.put(None) # 結束信號
def consumer(queue, consumer_id):
"""消費者"""
while True:
item = queue.get()
if item is None:
queue.put(None) # 傳遞結束信號給其他消費者
break
print(f"消費者 {consumer_id} 消費: {item}")
time.sleep(random.uniform(0.2, 0.8))
queue.task_done()
# 創建 Queue
queue = Queue()
# 創建生產者和消費者
producers = [Thread(target=producer, args=(queue, i)) for i in range(3)]
consumers = [Thread(target=consumer, args=(queue, i)) for i in range(2)]
# 啟動
for p in producers:
p.start()
for c in consumers:
c.start()
# 等待完成
for p in producers:
p.join()
for c in consumers:
c.join()
print("所有任務完成")案例 3:多執行緒 Web 爬蟲
from threading import Thread, Lock
from queue import Queue
import requests
from bs4 import BeautifulSoup
class WebCrawler:
def __init__(self, num_workers=5):
self.url_queue = Queue()
self.visited = set()
self.visited_lock = Lock()
self.num_workers = num_workers
def crawl(self, url):
"""爬取單個頁面"""
try:
response = requests.get(url, timeout=5)
soup = BeautifulSoup(response.content, 'html.parser')
# 提取標題
title = soup.find('title')
print(f"爬取: {url}")
print(f"標題: {title.text if title else 'N/A'}")
# 提取連結
for link in soup.find_all('a', href=True):
href = link['href']
if href.startswith('http'):
with self.visited_lock:
if href not in self.visited:
self.visited.add(href)
self.url_queue.put(href)
except Exception as e:
print(f"錯誤: {url}, {e}")
def worker(self):
"""Worker Thread"""
while True:
url = self.url_queue.get()
if url is None:
break
self.crawl(url)
self.url_queue.task_done()
def start(self, seed_urls):
"""開始爬取"""
# 添加種子 URL
for url in seed_urls:
self.url_queue.put(url)
self.visited.add(url)
# 創建 Worker
workers = []
for _ in range(self.num_workers):
t = Thread(target=self.worker)
t.start()
workers.append(t)
# 等待隊列清空
self.url_queue.join()
# 停止 Worker
for _ in range(self.num_workers):
self.url_queue.put(None)
for w in workers:
w.join()
print(f"爬取完成,共 {len(self.visited)} 個頁面")
# 使用
crawler = WebCrawler(num_workers=5)
crawler.start(['https://www.python.org'])8️⃣ 最佳實踐
1. 避免共享狀態
# ❌ 錯誤:共享可變狀態
results = []
def worker(data):
results.append(process(data)) # Race Condition
# ✅ 正確:使用 Queue 傳遞結果
from queue import Queue
result_queue = Queue()
def worker(data):
result = process(data)
result_queue.put(result)2. 使用 with 管理 Lock
# ❌ 錯誤:可能忘記釋放
lock.acquire()
data.append(item)
lock.release()
# ✅ 正確:自動釋放
with lock:
data.append(item)3. 設置 Thread 超時
# ✅ 避免無限等待
t = Thread(target=long_task)
t.start()
t.join(timeout=10) # 最多等 10 秒
if t.is_alive():
print("Thread 超時仍在執行")4. 妥善處理異常
from threading import Thread
def safe_worker(data):
try:
result = risky_operation(data)
return result
except Exception as e:
print(f"錯誤: {e}")
return None
t = Thread(target=safe_worker, args=(data,))
t.start()
t.join()✅ 重點回顧
Thread 創建:
Thread(target=func)- 函式作為 target- 繼承
Thread類別 - 面向物件 daemon=True- 守護線程
同步機制:
Lock- 基本鎖RLock- 可重入鎖Semaphore- 信號量Event- 事件信號Condition- 條件變數
Thread Pool:
ThreadPoolExecutor- 標準 Thread Poolsubmit()- 提交單一任務map()- 批量執行
關鍵觀念:
- ✅ 適合 I/O 密集型任務
- ✅ GIL 限制 CPU 並行
- ✅ 使用 Lock 保護共享資源
- ✅ 避免死鎖(Lock 順序一致)
上一篇: 05-1. Python GIL 深度解析 下一篇: 05-3. multiprocessing 模組完整指南
最後更新:2025-01-06