05-3. multiprocessing 模組完整指南

⏱️ 閱讀時間: 20 分鐘 🎯 難度: ⭐⭐⭐ (中等)


🎯 本篇重點

完整掌握 Python multiprocessing 模組,包含 Process 創建、Pool 管理、資料共享、進程間通訊(IPC)等所有核心功能。


📚 multiprocessing 概覽

multiprocessing 是 Python 標準庫中用於創建和管理多個 Process 的模組,可以繞過 GIL 限制,實現真正的並行計算。

from multiprocessing import Process, Pool, Queue, Pipe, Manager
from multiprocessing import Value, Array, Lock

1️⃣ 基礎:創建 Process

方法 1:函式作為 target

from multiprocessing import Process
import os
import time

def worker(name, sleep_time):
    """Worker 函式"""
    print(f"[{name}] PID: {os.getpid()}, 父 PID: {os.getppid()}")
    print(f"[{name}] 開始工作...")
    time.sleep(sleep_time)
    print(f"[{name}] 完成工作")

if __name__ == '__main__':
    # 創建 Process
    p1 = Process(target=worker, args=('Worker-1', 2))
    p2 = Process(target=worker, args=('Worker-2', 3))

    # 啟動 Process
    p1.start()
    p2.start()

    # 等待完成
    p1.join()
    p2.join()

    print("所有 Process 完成")

輸出:

[Worker-1] PID: 1001, 父 PID: 1000
[Worker-1] 開始工作...
[Worker-2] PID: 1002, 父 PID: 1000
[Worker-2] 開始工作...
[Worker-1] 完成工作
[Worker-2] 完成工作
所有 Process 完成

方法 2:繼承 Process 類別

from multiprocessing import Process
import os

class WorkerProcess(Process):
    def __init__(self, name, task_id):
        super().__init__()
        self.task_name = name
        self.task_id = task_id

    def run(self):
        """Process 執行的主要邏輯"""
        print(f"[{self.task_name}] PID: {os.getpid()}")
        print(f"[{self.task_name}] 處理任務 {self.task_id}")
        # 執行任務
        result = self.process_task()
        print(f"[{self.task_name}] 結果: {result}")

    def process_task(self):
        """任務處理邏輯"""
        return sum(range(self.task_id * 1000000))

if __name__ == '__main__':
    # 創建自定義 Process
    workers = [WorkerProcess(f'Worker-{i}', i) for i in range(3)]

    # 啟動
    for w in workers:
        w.start()

    # 等待
    for w in workers:
        w.join()

Process 屬性和方法

from multiprocessing import Process
import time

def worker():
    time.sleep(2)

if __name__ == '__main__':
    p = Process(target=worker, name='MyWorker')

    # Process 屬性
    print(f"Name: {p.name}")          # MyWorker
    print(f"Daemon: {p.daemon}")      # False
    print(f"PID: {p.pid}")            # None(尚未啟動)
    print(f"Alive: {p.is_alive()}")   # False

    # 啟動 Process
    p.start()

    print(f"PID: {p.pid}")            # 1001(已啟動)
    print(f"Alive: {p.is_alive()}")   # True

    # 等待完成
    p.join(timeout=3)  # 最多等待 3 秒

    print(f"Exit code: {p.exitcode}")  # 0(正常結束)
    print(f"Alive: {p.is_alive()}")    # False

2️⃣ 進階:Process Pool

Pool 基礎用法

from multiprocessing import Pool
import time

def square(x):
    """計算平方"""
    print(f"計算 {x}^2,PID: {os.getpid()}")
    time.sleep(0.5)
    return x * x

if __name__ == '__main__':
    # 創建 Pool(4 個 Worker Process)
    with Pool(processes=4) as pool:
        # 方法 1:map(阻塞)
        results = pool.map(square, range(10))
        print(f"結果: {results}")

    # 輸出:結果: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

Pool 的四種執行方法

1. map():阻塞,有序

from multiprocessing import Pool

def process_data(x):
    return x * 2

if __name__ == '__main__':
    with Pool(4) as pool:
        results = pool.map(process_data, [1, 2, 3, 4, 5])
        print(results)
        # 輸出:[2, 4, 6, 8, 10](順序與輸入一致)

2. map_async():非阻塞,有序

from multiprocessing import Pool
import time

def process_data(x):
    time.sleep(1)
    return x * 2

if __name__ == '__main__':
    with Pool(4) as pool:
        # 非阻塞提交
        result_async = pool.map_async(process_data, [1, 2, 3, 4, 5])

        print("任務已提交,繼續其他工作...")
        time.sleep(0.5)

        # 等待結果
        results = result_async.get(timeout=5)
        print(results)
        # 輸出:[2, 4, 6, 8, 10]

3. apply():阻塞,單一任務

from multiprocessing import Pool

def add(x, y):
    return x + y

if __name__ == '__main__':
    with Pool(4) as pool:
        # 單一任務(阻塞)
        result = pool.apply(add, (3, 5))
        print(result)  # 輸出:8

4. apply_async():非阻塞,單一任務

from multiprocessing import Pool
import time

def slow_add(x, y):
    time.sleep(2)
    return x + y

if __name__ == '__main__':
    with Pool(4) as pool:
        # 非阻塞提交多個任務
        results = []
        for i in range(5):
            result = pool.apply_async(slow_add, (i, i))
            results.append(result)

        print("所有任務已提交")

        # 獲取結果
        for i, r in enumerate(results):
            print(f"任務 {i} 結果: {r.get()}")

Pool 進階功能

進度追蹤

from multiprocessing import Pool
import time

def process_item(x):
    time.sleep(0.1)
    return x * x

if __name__ == '__main__':
    items = range(100)
    total = len(items)

    with Pool(4) as pool:
        # 使用 imap 獲取即時進度
        results = []
        for i, result in enumerate(pool.imap(process_item, items), 1):
            results.append(result)
            print(f"進度: {i}/{total} ({i/total*100:.1f}%)", end='\r')

        print(f"\n完成!總共 {len(results)} 個結果")

錯誤處理

from multiprocessing import Pool

def risky_task(x):
    if x == 5:
        raise ValueError(f"錯誤:x = {x}")
    return x * 2

if __name__ == '__main__':
    with Pool(4) as pool:
        results = []
        for i in range(10):
            result = pool.apply_async(risky_task, (i,))
            results.append((i, result))

        # 獲取結果並處理錯誤
        for i, r in results:
            try:
                value = r.get(timeout=1)
                print(f"任務 {i}: {value}")
            except ValueError as e:
                print(f"任務 {i} 發生錯誤: {e}")

3️⃣ 資料共享:一個程式,多個 Process

問題:Process 無法直接共享變數

from multiprocessing import Process

counter = 0  # 全域變數

def increment():
    global counter
    for _ in range(100000):
        counter += 1
    print(f"Process counter: {counter}")

if __name__ == '__main__':
    p1 = Process(target=increment)
    p2 = Process(target=increment)

    p1.start(); p2.start()
    p1.join(); p2.join()

    print(f"Main counter: {counter}")

# 輸出:
# Process counter: 100000  ← Process 1 的副本
# Process counter: 100000  ← Process 2 的副本
# Main counter: 0          ← 主 Process 未變

解決方案 1:Value 和 Array

from multiprocessing import Process, Value, Array, Lock
import time

def increment_shared(counter, lock):
    """使用共享記憶體"""
    for _ in range(100000):
        with lock:  # 需要 Lock
            counter.value += 1

def update_array(shared_array, index, value, lock):
    """修改共享陣列"""
    with lock:
        shared_array[index] = value

if __name__ == '__main__':
    # 共享整數
    counter = Value('i', 0)  # 'i' = int
    lock = Lock()

    # 創建 Process
    p1 = Process(target=increment_shared, args=(counter, lock))
    p2 = Process(target=increment_shared, args=(counter, lock))

    p1.start(); p2.start()
    p1.join(); p2.join()

    print(f"Counter: {counter.value}")  # 輸出:200000

    # 共享陣列
    arr = Array('d', [0.0, 0.0, 0.0])  # 'd' = double
    processes = []

    for i in range(3):
        p = Process(target=update_array, args=(arr, i, i * 10.5, lock))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

    print(f"Array: {list(arr)}")  # 輸出:[0.0, 10.5, 21.0]

Value 和 Array 類型代碼:

'c' : ctypes.c_char      # 字元
'i' : ctypes.c_int       # 整數
'f' : ctypes.c_float     # 浮點數
'd' : ctypes.c_double    # 雙精度浮點數
'b' : ctypes.c_byte      # 位元組

解決方案 2:Manager

from multiprocessing import Process, Manager

def worker(shared_dict, shared_list, worker_id):
    """使用 Manager 共享複雜資料結構"""
    # 修改共享字典
    shared_dict[worker_id] = f"Worker-{worker_id} 結果"

    # 修改共享列表
    shared_list.append(worker_id * 10)

if __name__ == '__main__':
    # 創建 Manager
    with Manager() as manager:
        # 共享字典和列表
        shared_dict = manager.dict()
        shared_list = manager.list()

        # 創建多個 Process
        processes = []
        for i in range(5):
            p = Process(target=worker, args=(shared_dict, shared_list, i))
            p.start()
            processes.append(p)

        for p in processes:
            p.join()

        print(f"Dict: {dict(shared_dict)}")
        print(f"List: {list(shared_list)}")

# 輸出:
# Dict: {0: 'Worker-0 結果', 1: 'Worker-1 結果', ...}
# List: [0, 10, 20, 30, 40]

Manager 支援的類型:

manager.list()       # 列表
manager.dict()       # 字典
manager.Queue()      # 佇列
manager.Lock()       # 鎖
manager.Value()      # 值
manager.Array()      # 陣列
manager.Namespace()  # 命名空間

解決方案 3:Namespace(最靈活)

from multiprocessing import Process, Manager

def worker(namespace, worker_id):
    """使用 Namespace 共享屬性"""
    namespace.counter += 1
    namespace.results.append(worker_id)
    namespace.status = f"Worker {worker_id} 完成"

if __name__ == '__main__':
    with Manager() as manager:
        # 創建 Namespace
        ns = manager.Namespace()
        ns.counter = 0
        ns.results = manager.list()
        ns.status = "初始化"

        # 創建 Process
        processes = [Process(target=worker, args=(ns, i)) for i in range(5)]

        for p in processes:
            p.start()
        for p in processes:
            p.join()

        print(f"Counter: {ns.counter}")
        print(f"Results: {list(ns.results)}")
        print(f"Status: {ns.status}")

4️⃣ 進程間通訊(IPC)

方法 1:Queue(佇列)

from multiprocessing import Process, Queue
import time

def producer(queue, n):
    """生產者"""
    for i in range(n):
        item = f"Item-{i}"
        queue.put(item)
        print(f"生產: {item}")
        time.sleep(0.5)
    queue.put(None)  # 結束信號

def consumer(queue):
    """消費者"""
    while True:
        item = queue.get()
        if item is None:
            break
        print(f"消費: {item}")
        time.sleep(1)

if __name__ == '__main__':
    # 創建 Queue
    q = Queue()

    # 創建生產者和消費者
    p1 = Process(target=producer, args=(q, 5))
    p2 = Process(target=consumer, args=(q,))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

方法 2:Pipe(管道)

from multiprocessing import Process, Pipe

def sender(conn):
    """發送端"""
    conn.send("Hello")
    conn.send({"key": "value"})
    conn.send([1, 2, 3])
    conn.close()

def receiver(conn):
    """接收端"""
    msg1 = conn.recv()
    msg2 = conn.recv()
    msg3 = conn.recv()
    print(f"收到: {msg1}, {msg2}, {msg3}")
    conn.close()

if __name__ == '__main__':
    # 創建 Pipe
    parent_conn, child_conn = Pipe()

    # 創建 Process
    p1 = Process(target=sender, args=(child_conn,))
    p2 = Process(target=receiver, args=(parent_conn,))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

Queue vs Pipe:

# Queue:多對多,線程安全
# Pipe:點對點,更快但不支援多個寫入者

5️⃣ 同步機制

Lock(鎖)

from multiprocessing import Process, Lock, Value
import time

def increment_with_lock(counter, lock):
    """使用 Lock 保護共享資源"""
    for _ in range(100000):
        lock.acquire()
        try:
            counter.value += 1
        finally:
            lock.release()

def increment_no_lock(counter):
    """沒有 Lock(錯誤示範)"""
    for _ in range(100000):
        counter.value += 1  # Race Condition

if __name__ == '__main__':
    # 測試有 Lock
    counter = Value('i', 0)
    lock = Lock()
    p1 = Process(target=increment_with_lock, args=(counter, lock))
    p2 = Process(target=increment_with_lock, args=(counter, lock))
    p1.start(); p2.start()
    p1.join(); p2.join()
    print(f"有 Lock: {counter.value}")  # 200000

    # 測試沒有 Lock
    counter = Value('i', 0)
    p1 = Process(target=increment_no_lock, args=(counter,))
    p2 = Process(target=increment_no_lock, args=(counter,))
    p1.start(); p2.start()
    p1.join(); p2.join()
    print(f"沒有 Lock: {counter.value}")  # 可能 150000(錯誤)

Semaphore(信號量)

from multiprocessing import Process, Semaphore
import time

def worker(semaphore, worker_id):
    """限制同時執行的 Process 數量"""
    print(f"Worker {worker_id} 等待...")
    semaphore.acquire()
    try:
        print(f"Worker {worker_id} 開始工作")
        time.sleep(2)
        print(f"Worker {worker_id} 完成")
    finally:
        semaphore.release()

if __name__ == '__main__':
    # 最多 2 個 Process 同時執行
    semaphore = Semaphore(2)

    # 創建 5 個 Process
    processes = [Process(target=worker, args=(semaphore, i)) for i in range(5)]

    for p in processes:
        p.start()

    for p in processes:
        p.join()

# 輸出:
# Worker 0 等待...
# Worker 0 開始工作
# Worker 1 等待...
# Worker 1 開始工作
# Worker 2 等待...    ← 等待中
# ...

6️⃣ 實戰案例

案例 1:批量影像處理

from multiprocessing import Pool
from PIL import Image
import os

def process_image(image_path):
    """處理單張圖片"""
    try:
        # 開啟圖片
        img = Image.open(image_path)

        # 調整大小
        img = img.resize((800, 600))

        # 套用濾鏡
        # img = img.filter(ImageFilter.BLUR)

        # 儲存
        output_path = f"output/{os.path.basename(image_path)}"
        img.save(output_path, quality=85)

        return f"完成: {image_path}"
    except Exception as e:
        return f"錯誤: {image_path}, {e}"

if __name__ == '__main__':
    # 獲取所有圖片
    image_paths = [f"images/img_{i}.jpg" for i in range(100)]

    # 使用 Process Pool 批量處理
    with Pool(processes=8) as pool:
        results = pool.map(process_image, image_paths)

    # 統計結果
    success = sum(1 for r in results if r.startswith("完成"))
    print(f"成功: {success}/{len(results)}")

案例 2:Web 爬蟲(混合 Thread 和 Process)

from multiprocessing import Process, Queue
from threading import Thread
import requests

def crawler_process(url_queue, result_queue, process_id):
    """每個 Process 內用多個 Thread 爬取"""
    def crawl(url):
        try:
            response = requests.get(url, timeout=5)
            return (url, response.status_code, len(response.content))
        except Exception as e:
            return (url, 0, str(e))

    threads = []
    while not url_queue.empty():
        try:
            url = url_queue.get(timeout=1)
            t = Thread(target=lambda u: result_queue.put(crawl(u)), args=(url,))
            t.start()
            threads.append(t)
        except:
            break

    for t in threads:
        t.join()

    print(f"Process {process_id} 完成")

if __name__ == '__main__':
    # 準備 URL
    urls = [f"https://httpbin.org/delay/1" for _ in range(100)]
    url_queue = Queue()
    result_queue = Queue()

    for url in urls:
        url_queue.put(url)

    # 創建 4 個 Process
    processes = []
    for i in range(4):
        p = Process(target=crawler_process, args=(url_queue, result_queue, i))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

    # 收集結果
    results = []
    while not result_queue.empty():
        results.append(result_queue.get())

    print(f"總共爬取: {len(results)} 個 URL")

7️⃣ 最佳實踐

1. 永遠使用 if __name__ == '__main__'

# ✅ 正確
if __name__ == '__main__':
    p = Process(target=worker)
    p.start()

# ❌ 錯誤(Windows 上會無限創建 Process)
p = Process(target=worker)
p.start()

2. 使用 Process Pool 管理 Process

# ❌ 錯誤:手動創建太多 Process
processes = [Process(target=work) for _ in range(1000)]

# ✅ 正確:使用 Pool
with Pool(8) as pool:
    pool.map(work, range(1000))

3. 選擇合適的共享方式

# 簡單類型 → Value/Array(快)
counter = Value('i', 0)

# 複雜類型 → Manager(慢但靈活)
shared_dict = Manager().dict()

# 大量資料 → 考慮不共享,用 Queue 傳遞結果

4. 正確處理異常

from multiprocessing import Pool

def safe_worker(x):
    try:
        return risky_operation(x)
    except Exception as e:
        return f"Error: {e}"

if __name__ == '__main__':
    with Pool(4) as pool:
        results = pool.map(safe_worker, data)

✅ 重點回顧

Process 創建:

  • Process(target=func) - 基本用法
  • 繼承 Process 類別 - 面向物件
  • Pool - 管理 Process 池

資料共享:

  • Value, Array - 簡單類型
  • Manager - 複雜類型
  • Namespace - 靈活共享

進程間通訊:

  • Queue - 多對多,線程安全
  • Pipe - 點對點,更快

同步機制:

  • Lock - 互斥鎖
  • Semaphore - 信號量

關鍵:

  • ✅ 用於 CPU 密集型任務
  • ✅ 繞過 GIL,真正並行
  • ✅ 記得 if __name__ == '__main__'
  • ✅ 優先使用 Pool 管理

上一篇: 05-2. threading 模組完整指南 下一篇: 05-4. concurrent.futures 使用指南


最後更新:2025-01-06

0%