目錄
05-4. concurrent.futures 使用指南
⏱️ 閱讀時間: 12 分鐘 🎯 難度: ⭐⭐ (簡單)
🎯 本篇重點
掌握 Python concurrent.futures 模組,這是一個高層級的併發介面,統一了 Thread 和 Process 的使用方式。
🤔 為什麼需要 concurrent.futures?
傳統方式的問題
# threading:手動管理 Thread
from threading import Thread
threads = []
for task in tasks:
t = Thread(target=work, args=(task,))
t.start()
threads.append(t)
for t in threads:
t.join()
# multiprocessing:手動管理 Process
from multiprocessing import Process
processes = []
for task in tasks:
p = Process(target=work, args=(task,))
p.start()
processes.append(p)
for p in processes:
p.join()concurrent.futures 的優勢
# 統一的介面!只需換一個 Executor
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
# Thread 版本
with ThreadPoolExecutor() as executor:
results = executor.map(work, tasks)
# Process 版本(只改一行)
with ProcessPoolExecutor() as executor:
results = executor.map(work, tasks)優點:
- ✅ 統一的 API
- ✅ 自動管理 Pool
- ✅ 更容易獲取結果
- ✅ 更好的錯誤處理
1️⃣ ThreadPoolExecutor 基礎
基本用法
from concurrent.futures import ThreadPoolExecutor
import time
def task(n):
"""任務函式"""
print(f"任務 {n} 開始")
time.sleep(1)
return n * n
# 使用 Thread Pool
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交任務並獲取 Future 物件
future1 = executor.submit(task, 1)
future2 = executor.submit(task, 2)
future3 = executor.submit(task, 3)
# 獲取結果
print(f"結果 1: {future1.result()}") # 阻塞直到完成
print(f"結果 2: {future2.result()}")
print(f"結果 3: {future3.result()}")
# with 自動關閉 executor,等待所有任務完成submit() 方法
from concurrent.futures import ThreadPoolExecutor
import time
def download(url):
"""下載檔案"""
print(f"下載: {url}")
time.sleep(2)
return f"{url} 完成"
with ThreadPoolExecutor(max_workers=5) as executor:
# 提交多個任務
futures = []
urls = [f'https://example.com/file{i}' for i in range(10)]
for url in urls:
future = executor.submit(download, url)
futures.append(future)
# 獲取結果
for future in futures:
result = future.result(timeout=5) # 最多等 5 秒
print(result)map() 方法
from concurrent.futures import ThreadPoolExecutor
import time
def square(x):
time.sleep(0.5)
return x * x
with ThreadPoolExecutor(max_workers=4) as executor:
# map 會保持順序
numbers = range(10)
results = executor.map(square, numbers)
# 結果按原始順序返回
for num, result in zip(numbers, results):
print(f"{num}^2 = {result}")
# 輸出:
# 0^2 = 0
# 1^2 = 1
# 2^2 = 4
# ...(順序不變)map() vs submit()
from concurrent.futures import ThreadPoolExecutor
def task(x):
return x * 2
with ThreadPoolExecutor() as executor:
# map:批量提交,保持順序
results1 = list(executor.map(task, [1, 2, 3]))
print(results1) # [2, 4, 6](有序)
# submit:逐個提交,可自訂順序
futures = [executor.submit(task, x) for x in [1, 2, 3]]
results2 = [f.result() for f in futures]
print(results2) # [2, 4, 6]選擇:
map():任務相同,批量處理,需要順序submit():任務不同,需要靈活控制
2️⃣ ProcessPoolExecutor 基礎
基本用法
from concurrent.futures import ProcessPoolExecutor
import os
def cpu_task(n):
"""CPU 密集任務"""
print(f"任務 {n}, PID: {os.getpid()}")
total = 0
for i in range(n * 1000000):
total += i * i
return total
if __name__ == '__main__':
# 使用 Process Pool
with ProcessPoolExecutor(max_workers=4) as executor:
tasks = [1, 2, 3, 4, 5]
results = executor.map(cpu_task, tasks)
for task, result in zip(tasks, results):
print(f"任務 {task} 結果: {result}")輸出:
任務 1, PID: 1001
任務 2, PID: 1002
任務 3, PID: 1003
任務 4, PID: 1004
任務 5, PID: 1001 ← Process Pool 重複使用
...Thread vs Process 性能對比
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
def cpu_bound_task(n):
"""CPU 密集任務"""
return sum(i * i for i in range(n))
def io_bound_task(n):
"""I/O 密集任務"""
time.sleep(1)
return n
if __name__ == '__main__':
tasks = [10000000] * 4
# Thread Pool(CPU 密集)
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
list(executor.map(cpu_bound_task, tasks))
print(f"ThreadPool (CPU): {time.time() - start:.2f}s")
# 輸出:10.0s(無加速)
# Process Pool(CPU 密集)
start = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
list(executor.map(cpu_bound_task, tasks))
print(f"ProcessPool (CPU): {time.time() - start:.2f}s")
# 輸出:2.5s(4 倍加速)
# Thread Pool(I/O 密集)
tasks_io = [1] * 10
start = time.time()
with ThreadPoolExecutor(max_workers=10) as executor:
list(executor.map(io_bound_task, tasks_io))
print(f"ThreadPool (I/O): {time.time() - start:.2f}s")
# 輸出:1.0s(有加速)3️⃣ Future 物件
Future 的方法
from concurrent.futures import ThreadPoolExecutor
import time
def slow_task(n):
time.sleep(n)
return n * 10
with ThreadPoolExecutor() as executor:
future = executor.submit(slow_task, 3)
# 檢查狀態
print(f"Running: {future.running()}") # True
print(f"Done: {future.done()}") # False
# 非阻塞等待
time.sleep(1)
print(f"Running: {future.running()}") # True
print(f"Done: {future.done()}") # False
# 阻塞獲取結果
result = future.result(timeout=5) # 等待最多 5 秒
print(f"結果: {result}") # 30
print(f"Done: {future.done()}") # True取消 Future
from concurrent.futures import ThreadPoolExecutor
import time
def long_task(n):
time.sleep(5)
return n
with ThreadPoolExecutor(max_workers=2) as executor:
# 提交任務
future1 = executor.submit(long_task, 1)
future2 = executor.submit(long_task, 2)
future3 = executor.submit(long_task, 3) # 在佇列中等待
time.sleep(0.1)
# 嘗試取消
print(f"取消 future1: {future1.cancel()}") # False(已在執行)
print(f"取消 future2: {future2.cancel()}") # False(已在執行)
print(f"取消 future3: {future3.cancel()}") # True(還在佇列)
# 檢查狀態
print(f"future1 已取消: {future1.cancelled()}") # False
print(f"future3 已取消: {future3.cancelled()}") # True添加回調函式
from concurrent.futures import ThreadPoolExecutor
import time
def task(n):
time.sleep(1)
return n * 2
def on_complete(future):
"""任務完成時的回調"""
result = future.result()
print(f"✓ 任務完成,結果: {result}")
with ThreadPoolExecutor() as executor:
# 提交任務並添加回調
for i in range(5):
future = executor.submit(task, i)
future.add_done_callback(on_complete)
# 輸出:
# ✓ 任務完成,結果: 0
# ✓ 任務完成,結果: 2
# ...(順序可能不同)4️⃣ as_completed() 和 wait()
as_completed():按完成順序
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random
def task(n):
"""隨機耗時任務"""
sleep_time = random.uniform(1, 3)
time.sleep(sleep_time)
return n, sleep_time
with ThreadPoolExecutor(max_workers=5) as executor:
# 提交任務
futures = [executor.submit(task, i) for i in range(10)]
# 按完成順序處理
for future in as_completed(futures):
n, sleep_time = future.result()
print(f"任務 {n} 完成(耗時 {sleep_time:.2f}s)")
# 輸出:按完成順序,不是提交順序
# 任務 3 完成(耗時 1.2s)
# 任務 7 完成(耗時 1.5s)
# 任務 1 完成(耗時 2.1s)
# ...wait():等待多個 Future
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED
import time
def task(n):
time.sleep(n)
return n
with ThreadPoolExecutor() as executor:
futures = [executor.submit(task, i) for i in [3, 1, 2]]
# 等待全部完成
done, not_done = wait(futures, return_when=ALL_COMPLETED)
print(f"完成: {len(done)}, 未完成: {len(not_done)}")
# 等待第一個完成
futures = [executor.submit(task, i) for i in [3, 1, 2]]
done, not_done = wait(futures, return_when=FIRST_COMPLETED, timeout=5)
print(f"完成: {len(done)}, 未完成: {len(not_done)}")5️⃣ 實戰案例
案例 1:批量下載檔案
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
import time
def download_file(url):
"""下載單個檔案"""
try:
response = requests.get(url, timeout=10)
filename = url.split('/')[-1] or 'index.html'
with open(f'downloads/{filename}', 'wb') as f:
f.write(response.content)
return f"✓ {filename} ({len(response.content)} bytes)"
except Exception as e:
return f"✗ {url}: {e}"
def batch_download(urls, max_workers=5):
"""批量下載"""
start = time.time()
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任務
future_to_url = {executor.submit(download_file, url): url for url in urls}
# 按完成順序處理
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
result = future.result()
results.append((url, result))
print(result)
except Exception as e:
print(f"✗ 錯誤: {url}, {e}")
elapsed = time.time() - start
print(f"\n完成 {len(results)}/{len(urls)} 個下載,耗時 {elapsed:.2f}s")
return results
# 使用
urls = [
'https://www.python.org',
'https://www.github.com',
'https://www.stackoverflow.com',
]
batch_download(urls, max_workers=3)案例 2:並行處理資料
from concurrent.futures import ProcessPoolExecutor
import pandas as pd
import numpy as np
def process_chunk(data_chunk):
"""處理資料塊"""
# CPU 密集運算
result = data_chunk.apply(lambda x: x ** 2 + np.sin(x))
return result
def parallel_process(data, num_workers=4):
"""並行處理大型資料集"""
# 切分資料
chunk_size = len(data) // num_workers
chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]
# 並行處理
with ProcessPoolExecutor(max_workers=num_workers) as executor:
results = list(executor.map(process_chunk, chunks))
# 合併結果
return pd.concat(results)
if __name__ == '__main__':
# 創建大型資料集
data = pd.Series(np.random.rand(10000000))
# 並行處理
import time
start = time.time()
result = parallel_process(data, num_workers=4)
print(f"耗時: {time.time() - start:.2f}s")案例 3:混合 Thread 和 Process
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import requests
import time
def fetch_url(url):
"""I/O 密集:下載網頁"""
response = requests.get(url)
return response.text
def analyze_text(text):
"""CPU 密集:分析文字"""
# 模擬複雜分析
word_count = len(text.split())
char_count = len(text)
return {'words': word_count, 'chars': char_count}
def process_urls(urls):
"""混合處理"""
# 步驟 1:用 Thread 下載(I/O 密集)
with ThreadPoolExecutor(max_workers=10) as executor:
texts = list(executor.map(fetch_url, urls))
print(f"下載完成 {len(texts)} 個網頁")
# 步驟 2:用 Process 分析(CPU 密集)
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(analyze_text, texts))
print(f"分析完成 {len(results)} 個網頁")
return results
if __name__ == '__main__':
urls = [f'https://httpbin.org/html' for _ in range(20)]
results = process_urls(urls)
print(results)案例 4:進度追蹤
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def task(n):
time.sleep(1)
return n * 2
def process_with_progress(items, max_workers=5):
"""帶進度追蹤的處理"""
total = len(items)
completed = 0
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任務
futures = {executor.submit(task, item): item for item in items}
# 追蹤進度
for future in as_completed(futures):
item = futures[future]
try:
result = future.result()
completed += 1
progress = completed / total * 100
print(f"進度: {progress:.1f}% ({completed}/{total})", end='\r')
except Exception as e:
print(f"\n錯誤: {item}, {e}")
print(f"\n完成!")
# 使用
items = list(range(100))
process_with_progress(items, max_workers=10)6️⃣ 最佳實踐
1. 選擇合適的 Worker 數量
import os
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
# Thread Pool:I/O 密集型
# 可以設置較多 worker(幾十到幾百)
with ThreadPoolExecutor(max_workers=50) as executor:
pass
# Process Pool:CPU 密集型
# worker 數量 = CPU 核心數
num_workers = os.cpu_count() # 通常 4, 8, 16
with ProcessPoolExecutor(max_workers=num_workers) as executor:
pass2. 妥善處理異常
from concurrent.futures import ThreadPoolExecutor
def safe_task(item):
try:
return risky_operation(item)
except Exception as e:
return f"Error: {e}"
with ThreadPoolExecutor() as executor:
results = executor.map(safe_task, items)
for result in results:
if isinstance(result, str) and result.startswith("Error"):
print(f"處理錯誤: {result}")3. 使用 timeout 避免阻塞
from concurrent.futures import ThreadPoolExecutor, TimeoutError
def long_task():
time.sleep(100)
return "Done"
with ThreadPoolExecutor() as executor:
future = executor.submit(long_task)
try:
result = future.result(timeout=5) # 最多等 5 秒
except TimeoutError:
print("任務超時")
future.cancel() # 嘗試取消4. 使用 context manager
# ✅ 正確:自動清理
with ThreadPoolExecutor() as executor:
results = executor.map(task, items)
# ❌ 錯誤:可能忘記關閉
executor = ThreadPoolExecutor()
results = executor.map(task, items)
executor.shutdown(wait=True) # 手動關閉✅ 重點回顧
concurrent.futures 優勢:
- ✅ 統一的 API(Thread 和 Process)
- ✅ 自動管理 Pool
- ✅ 更容易獲取結果
- ✅ 更好的錯誤處理
兩種 Executor:
ThreadPoolExecutor- I/O 密集型ProcessPoolExecutor- CPU 密集型
兩種提交方式:
submit()- 逐個提交,靈活map()- 批量提交,保持順序
Future 物件:
result()- 獲取結果(阻塞)cancel()- 取消任務add_done_callback()- 添加回調
工具函式:
as_completed()- 按完成順序wait()- 等待多個 Future
關鍵:
- ✅ 比 threading/multiprocessing 更易用
- ✅ 適合大部分併發場景
- ✅ 是 Python 併發的首選方案
上一篇: 05-3. multiprocessing 模組完整指南 下一篇: 05-5. asyncio 基礎概念
最後更新:2025-01-06