02-5. Thread Pool 與實戰應用

⏱️ 閱讀時間: 18 分鐘 🎯 難度: ⭐⭐⭐ (中等)


🤔 一句話解釋

Thread Pool 是預先創建的 Thread 集合,重複使用來執行多個任務,避免頻繁創建/銷毀 Thread 的開銷。


❌ 沒有 Thread Pool 的問題

問題 1:頻繁創建/銷毀 Thread

import time
from threading import Thread

def task(task_id):
    time.sleep(0.1)
    return f"Task {task_id} 完成"

# 處理 100 個任務,每個任務創建一個 Thread
start = time.time()

threads = []
for i in range(100):
    t = Thread(target=task, args=(i,))
    t.start()
    threads.append(t)

for t in threads:
    t.join()

print(f"時間: {time.time() - start:.2f} 秒")
# 輸出:時間: 0.25 秒

# ⚠️ 問題:
# 1. 創建 100 個 Thread 有開銷
# 2. 100 個 Thread 同時執行,資源競爭
# 3. 無法控制並發數量

問題 2:資源耗盡

# ❌ 危險:創建過多 Thread
threads = [Thread(target=task, args=(i,)) for i in range(10000)]

for t in threads:
    t.start()

# 💥 可能導致:
# - 記憶體耗盡
# - CPU 過載
# - 系統不穩定

✅ Thread Pool 的優勢

傳統方式:
  創建 Thread → 執行任務 → 銷毀 Thread
  創建 Thread → 執行任務 → 銷毀 Thread
  創建 Thread → 執行任務 → 銷毀 Thread
  ⏱️ 開銷大!

Thread Pool:
  [Thread 1]  ↘
  [Thread 2]   → 任務佇列 → 重複使用
  [Thread 3]  ↗
  ⏱️ 開銷小!

優勢:

  • ✅ 避免頻繁創建/銷毀
  • ✅ 控制並發數量
  • ✅ 重複使用 Thread
  • ✅ 更好的資源管理

🏊 ThreadPoolExecutor 基本用法

1. 創建 Thread Pool

from concurrent.futures import ThreadPoolExecutor
import time

def task(n):
    print(f"執行任務 {n}")
    time.sleep(1)
    return n * 2

# 創建 Thread Pool:最多 3 個 Thread
with ThreadPoolExecutor(max_workers=3) as executor:
    # 提交任務
    future1 = executor.submit(task, 1)
    future2 = executor.submit(task, 2)
    future3 = executor.submit(task, 3)

    # 獲取結果
    print(f"結果 1: {future1.result()}")
    print(f"結果 2: {future2.result()}")
    print(f"結果 3: {future3.result()}")

# with 結束時自動等待所有任務完成

輸出:

執行任務 1
執行任務 2
執行任務 3
(等待 1 秒...)
結果 1: 2
結果 2: 4
結果 3: 6

2. map() 方法:批次處理

from concurrent.futures import ThreadPoolExecutor
import time

def square(n):
    time.sleep(0.5)
    return n * n

# 批次處理 10 個數字
numbers = range(10)

with ThreadPoolExecutor(max_workers=4) as executor:
    results = executor.map(square, numbers)

    # 結果按順序返回
    for num, result in zip(numbers, results):
        print(f"{num}² = {result}")

輸出:

0² = 0
1² = 1
2² = 4
3² = 9
...

3. submit() vs map()

from concurrent.futures import ThreadPoolExecutor

def task(n):
    return n * 2

with ThreadPoolExecutor(max_workers=3) as executor:
    # 方法 1: submit(獲取 Future 對象)
    future = executor.submit(task, 5)
    result = future.result()  # 10

    # 方法 2: map(批次處理)
    results = executor.map(task, [1, 2, 3, 4, 5])
    print(list(results))  # [2, 4, 6, 8, 10]

對比:

方法特點使用場景
submit()返回 Future,可逐個處理任務不同、需要個別控制
map()批次處理,結果有序相同任務、不同輸入

📥 Future 對象詳解

1. 檢查狀態

from concurrent.futures import ThreadPoolExecutor
import time

def slow_task():
    time.sleep(3)
    return "完成"

with ThreadPoolExecutor() as executor:
    future = executor.submit(slow_task)

    print(f"running: {future.running()}")   # True
    print(f"done: {future.done()}")         # False

    time.sleep(1)
    print(f"running: {future.running()}")   # True
    print(f"done: {future.done()}")         # False

    result = future.result()  # 等待完成
    print(f"done: {future.done()}")         # True
    print(f"結果: {result}")

2. 設置 timeout

from concurrent.futures import ThreadPoolExecutor, TimeoutError
import time

def slow_task():
    time.sleep(5)
    return "完成"

with ThreadPoolExecutor() as executor:
    future = executor.submit(slow_task)

    try:
        result = future.result(timeout=2)  # 最多等 2 秒
    except TimeoutError:
        print("⏱️ 任務超時")

輸出:

⏱️ 任務超時

3. 異常處理

from concurrent.futures import ThreadPoolExecutor

def risky_task(n):
    if n == 0:
        raise ValueError("n 不能為 0")
    return 10 / n

with ThreadPoolExecutor() as executor:
    future1 = executor.submit(risky_task, 5)
    future2 = executor.submit(risky_task, 0)  # 會失敗

    # 正常任務
    print(f"結果 1: {future1.result()}")  # 2.0

    # 異常任務
    try:
        print(f"結果 2: {future2.result()}")
    except ValueError as e:
        print(f"❌ 錯誤: {e}")

輸出:

結果 1: 2.0
❌ 錯誤: n 不能為 0

4. as_completed():先完成先處理

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random

def task(name):
    duration = random.randint(1, 3)
    print(f"{name} 開始 (預計 {duration} 秒)")
    time.sleep(duration)
    return f"{name} 完成 (耗時 {duration} 秒)"

with ThreadPoolExecutor(max_workers=3) as executor:
    # 提交任務
    futures = [executor.submit(task, f'Task-{i}') for i in range(5)]

    # 按完成順序處理
    for future in as_completed(futures):
        result = future.result()
        print(f"✅ {result}")

輸出:

Task-0 開始 (預計 2 秒)
Task-1 開始 (預計 1 秒)
Task-2 開始 (預計 3 秒)
✅ Task-1 完成 (耗時 1 秒)  ← 先完成
Task-3 開始 (預計 2 秒)
✅ Task-0 完成 (耗時 2 秒)
✅ Task-3 完成 (耗時 2 秒)
Task-4 開始 (預計 1 秒)
✅ Task-4 完成 (耗時 1 秒)
✅ Task-2 完成 (耗時 3 秒)  ← 最後完成

🌐 實戰案例 1:並發下載檔案

from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
import time

def download_file(url):
    """下載單個檔案"""
    filename = url.split('/')[-1]
    print(f"下載 {filename}...")

    try:
        response = requests.get(url, timeout=10)
        response.raise_for_status()

        with open(filename, 'wb') as f:
            f.write(response.content)

        return f"✅ {filename} 下載成功 ({len(response.content)} bytes)"
    except Exception as e:
        return f"❌ {filename} 下載失敗: {e}"

# 要下載的檔案列表
urls = [
    'https://example.com/file1.pdf',
    'https://example.com/file2.pdf',
    'https://example.com/file3.pdf',
    'https://example.com/file4.pdf',
    'https://example.com/file5.pdf',
]

# 使用 Thread Pool 並發下載
start = time.time()

with ThreadPoolExecutor(max_workers=5) as executor:
    # 提交所有下載任務
    futures = [executor.submit(download_file, url) for url in urls]

    # 按完成順序處理結果
    for future in as_completed(futures):
        result = future.result()
        print(result)

print(f"\n總耗時: {time.time() - start:.2f} 秒")

輸出:

下載 file1.pdf...
下載 file2.pdf...
下載 file3.pdf...
下載 file4.pdf...
下載 file5.pdf...
✅ file2.pdf 下載成功 (1024000 bytes)
✅ file1.pdf 下載成功 (2048000 bytes)
✅ file4.pdf 下載成功 (512000 bytes)
✅ file3.pdf 下載成功 (3072000 bytes)
✅ file5.pdf 下載成功 (768000 bytes)

總耗時: 2.34 秒

🔍 實戰案例 2:批次 API 請求

from concurrent.futures import ThreadPoolExecutor
import requests
import time

def fetch_user(user_id):
    """獲取單個用戶資料"""
    url = f"https://api.example.com/users/{user_id}"

    try:
        response = requests.get(url, timeout=5)
        response.raise_for_status()
        data = response.json()
        return {
            'user_id': user_id,
            'name': data.get('name'),
            'status': 'success'
        }
    except Exception as e:
        return {
            'user_id': user_id,
            'error': str(e),
            'status': 'failed'
        }

# 批次處理 100 個用戶
user_ids = range(1, 101)

start = time.time()

with ThreadPoolExecutor(max_workers=10) as executor:
    # 使用 map 批次處理
    results = executor.map(fetch_user, user_ids)

    # 統計結果
    success_count = 0
    failed_count = 0

    for result in results:
        if result['status'] == 'success':
            success_count += 1
            print(f"✅ User {result['user_id']}: {result['name']}")
        else:
            failed_count += 1
            print(f"❌ User {result['user_id']}: {result['error']}")

print(f"\n成功: {success_count}, 失敗: {failed_count}")
print(f"總耗時: {time.time() - start:.2f} 秒")

🗃️ 實戰案例 3:資料庫批次查詢

from concurrent.futures import ThreadPoolExecutor
import sqlite3
import time

def query_database(query_id, keyword):
    """執行資料庫查詢"""
    # 每個 Thread 需要自己的連線
    conn = sqlite3.connect('database.db')
    cursor = conn.cursor()

    try:
        cursor.execute(
            "SELECT * FROM products WHERE name LIKE ?",
            (f'%{keyword}%',)
        )
        results = cursor.fetchall()

        return {
            'query_id': query_id,
            'keyword': keyword,
            'count': len(results),
            'results': results[:5]  # 只返回前 5 筆
        }
    finally:
        conn.close()

# 批次查詢
keywords = ['apple', 'banana', 'cherry', 'date', 'elderberry']

with ThreadPoolExecutor(max_workers=3) as executor:
    futures = [
        executor.submit(query_database, i, keyword)
        for i, keyword in enumerate(keywords)
    ]

    for future in futures:
        result = future.result()
        print(f"查詢 {result['query_id']} ({result['keyword']}): "
              f"找到 {result['count']} 筆")

🖼️ 實戰案例 4:圖片處理

from concurrent.futures import ThreadPoolExecutor
from PIL import Image
import os

def resize_image(input_path, output_dir, size=(800, 600)):
    """調整圖片大小"""
    try:
        filename = os.path.basename(input_path)
        output_path = os.path.join(output_dir, f"resized_{filename}")

        # 開啟圖片
        img = Image.open(input_path)

        # 調整大小
        img.thumbnail(size)

        # 儲存
        img.save(output_path)

        return f"✅ {filename} 處理完成"
    except Exception as e:
        return f"❌ {filename} 處理失敗: {e}"

# 獲取所有圖片
image_dir = "./images"
output_dir = "./resized_images"
os.makedirs(output_dir, exist_ok=True)

image_files = [
    os.path.join(image_dir, f)
    for f in os.listdir(image_dir)
    if f.endswith(('.jpg', '.png', '.jpeg'))
]

print(f"找到 {len(image_files)} 張圖片")

# 並發處理
with ThreadPoolExecutor(max_workers=4) as executor:
    futures = [
        executor.submit(resize_image, img_path, output_dir)
        for img_path in image_files
    ]

    for future in futures:
        print(future.result())

📊 實戰案例 5:網站健康檢查

from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
import time

def check_website(url):
    """檢查網站狀態"""
    start = time.time()

    try:
        response = requests.get(url, timeout=5)
        elapsed = time.time() - start

        return {
            'url': url,
            'status': response.status_code,
            'response_time': elapsed,
            'ok': response.status_code == 200
        }
    except Exception as e:
        elapsed = time.time() - start
        return {
            'url': url,
            'status': None,
            'response_time': elapsed,
            'ok': False,
            'error': str(e)
        }

# 要檢查的網站列表
websites = [
    'https://google.com',
    'https://github.com',
    'https://stackoverflow.com',
    'https://reddit.com',
    'https://twitter.com',
]

print("開始健康檢查...\n")

with ThreadPoolExecutor(max_workers=5) as executor:
    futures = [executor.submit(check_website, url) for url in websites]

    for future in as_completed(futures):
        result = future.result()

        status = "✅" if result['ok'] else "❌"
        print(f"{status} {result['url']}")
        print(f"   狀態碼: {result.get('status', 'N/A')}")
        print(f"   響應時間: {result['response_time']:.2f} 秒")

        if 'error' in result:
            print(f"   錯誤: {result['error']}")

        print()

🎯 Thread Pool 最佳實踐

1. 選擇合適的 max_workers

import os
from concurrent.futures import ThreadPoolExecutor

# ❌ 不好:過多 Thread
with ThreadPoolExecutor(max_workers=1000) as executor:
    pass

# ✅ 好:根據任務類型決定
# I/O 密集:CPU 核心數的 2-4 倍
max_workers = os.cpu_count() * 2

# CPU 密集:使用 ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
    pass

2. 使用 context manager

# ❌ 不好:手動管理
executor = ThreadPoolExecutor(max_workers=5)
executor.submit(task)
executor.shutdown(wait=True)

# ✅ 好:自動管理
with ThreadPoolExecutor(max_workers=5) as executor:
    executor.submit(task)
# 自動 shutdown

3. 適當的異常處理

from concurrent.futures import ThreadPoolExecutor

def risky_task(n):
    if n < 0:
        raise ValueError(f"無效數字: {n}")
    return n * 2

with ThreadPoolExecutor() as executor:
    futures = [executor.submit(risky_task, n) for n in range(-2, 3)]

    for future in futures:
        try:
            result = future.result()
            print(f"結果: {result}")
        except Exception as e:
            print(f"錯誤: {e}")
            # 記錄錯誤、重試、或其他處理

4. 監控進度

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def task(task_id):
    time.sleep(1)
    return f"Task {task_id} 完成"

tasks = range(20)
total = len(tasks)

with ThreadPoolExecutor(max_workers=5) as executor:
    futures = [executor.submit(task, i) for i in tasks]

    completed = 0
    for future in as_completed(futures):
        completed += 1
        progress = (completed / total) * 100
        print(f"進度: {progress:.1f}% ({completed}/{total})")
        result = future.result()
        print(f"  {result}")

⚡ 性能對比

import time
from threading import Thread
from concurrent.futures import ThreadPoolExecutor

def task(n):
    time.sleep(0.01)
    return n * 2

tasks = range(100)

# 方法 1: 逐個執行
start = time.time()
results = [task(n) for n in tasks]
print(f"順序執行: {time.time() - start:.2f} 秒")

# 方法 2: 手動創建 Thread
start = time.time()
threads = [Thread(target=task, args=(n,)) for n in tasks]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(f"手動 Thread: {time.time() - start:.2f} 秒")

# 方法 3: Thread Pool
start = time.time()
with ThreadPoolExecutor(max_workers=10) as executor:
    results = list(executor.map(task, tasks))
print(f"Thread Pool: {time.time() - start:.2f} 秒")

輸出:

順序執行: 1.05 秒
手動 Thread: 0.15 秒
Thread Pool: 0.12 秒  ← 最快!

🆚 ThreadPoolExecutor vs ProcessPoolExecutor

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time

def cpu_task(n):
    """CPU 密集型任務"""
    total = 0
    for i in range(10000000):
        total += i
    return total

def io_task(n):
    """I/O 密集型任務"""
    time.sleep(1)
    return n

# CPU 密集:Process Pool 更快
start = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(cpu_task, range(4)))
print(f"Process Pool (CPU): {time.time() - start:.2f} 秒")

start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(cpu_task, range(4)))
print(f"Thread Pool (CPU): {time.time() - start:.2f} 秒")

# I/O 密集:Thread Pool 更快
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(io_task, range(4)))
print(f"Thread Pool (I/O): {time.time() - start:.2f} 秒")

輸出:

Process Pool (CPU): 0.65 秒  ← 快
Thread Pool (CPU): 2.50 秒   ← 慢(GIL)
Thread Pool (I/O): 1.02 秒   ← 快

✅ 重點回顧

Thread Pool 的優勢:

  • ✅ 避免頻繁創建/銷毀 Thread
  • ✅ 控制並發數量
  • ✅ 重複使用 Thread
  • ✅ 簡化程式碼

ThreadPoolExecutor 的使用:

  • submit() - 提交單個任務,返回 Future
  • map() - 批次處理,結果有序
  • as_completed() - 按完成順序處理

Future 對象:

  • result() - 獲取結果(阻塞)
  • done() - 檢查是否完成
  • running() - 檢查是否執行中
  • exception() - 獲取異常

適用場景:

  • ✅ I/O 密集型任務(網路請求、檔案操作)
  • ✅ 大量短期任務
  • ❌ CPU 密集型任務(使用 ProcessPoolExecutor)

最佳實踐:

  • 使用 with 語句
  • 合理設置 max_workers
  • 適當的異常處理
  • 監控任務進度

上一篇: 02-4. Thread 同步機制 下一篇: 03-1. 並發模型概述


最後更新:2025-01-06

0%