03-2. Pipe(管道)詳解

⏱️ 閱讀時間: 12 分鐘 🎯 難度: ⭐⭐ (簡單)


🤔 一句話解釋

Pipe(管道)是一種單向或雙向的通訊通道,主要用於父子 Process 或相關 Process 之間的資料傳輸。


📞 用電話專線來比喻

Process A                    Process B
   ├─ 寫入端 ──────────→ 讀取端
   │
   └─ 就像電話專線:
      - 一端說話(寫入)
      - 另一端聽(讀取)
      - 點對點連接

🔧 Pipe 的類型

1. 單向 Pipe (One-way Pipe)

from multiprocessing import Process, Pipe

def sender(conn):
    """發送端"""
    messages = ["Hello", "World", "from", "Process A"]
    for msg in messages:
        conn.send(msg)
        print(f"發送: {msg}")
    conn.close()

def receiver(conn):
    """接收端"""
    while True:
        try:
            msg = conn.recv()
            print(f"收到: {msg}")
        except EOFError:
            break
    conn.close()

# 創建 Pipe(默認是雙向的)
parent_conn, child_conn = Pipe()

# 創建 Process
p1 = Process(target=sender, args=(parent_conn,))
p2 = Process(target=receiver, args=(child_conn,))

p1.start()
p2.start()
p1.join()
p2.join()

輸出:

發送: Hello
發送: World
發送: from
發送: Process A
收到: Hello
收到: World
收到: from
收到: Process A

2. 雙向 Pipe (Duplex Pipe)

from multiprocessing import Process, Pipe
import time

def worker1(conn):
    """Worker 1:發送並接收"""
    # 發送訊息
    conn.send("來自 Worker 1 的問候")
    print("[Worker 1] 已發送訊息")

    # 接收回應
    time.sleep(0.5)
    response = conn.recv()
    print(f"[Worker 1] 收到回應: {response}")
    conn.close()

def worker2(conn):
    """Worker 2:接收並回應"""
    # 接收訊息
    msg = conn.recv()
    print(f"[Worker 2] 收到訊息: {msg}")

    # 發送回應
    conn.send("Worker 2 已收到!")
    print("[Worker 2] 已發送回應")
    conn.close()

# 創建雙向 Pipe
conn1, conn2 = Pipe(duplex=True)  # duplex=True 是默認值

p1 = Process(target=worker1, args=(conn1,))
p2 = Process(target=worker2, args=(conn2,))

p1.start()
p2.start()
p1.join()
p2.join()

輸出:

[Worker 1] 已發送訊息
[Worker 2] 收到訊息: 來自 Worker 1 的問候
[Worker 2] 已發送回應
[Worker 1] 收到回應: Worker 2 已收到!

🎯 Pipe 的特性

特性 1:點對點通訊

from multiprocessing import Process, Pipe

def endpoint_a(conn):
    conn.send("A → B")
    print(f"[A] 發送完成")
    conn.close()

def endpoint_b(conn):
    msg = conn.recv()
    print(f"[B] 收到: {msg}")
    conn.close()

conn_a, conn_b = Pipe()

# 只能是 A 和 B 兩個端點通訊
Process(target=endpoint_a, args=(conn_a,)).start()
Process(target=endpoint_b, args=(conn_b,)).start()

# ⚠️ 不能有第三個 Process 加入這個 Pipe

特性 2:阻塞式 I/O

from multiprocessing import Process, Pipe
import time

def slow_sender(conn):
    time.sleep(3)  # 延遲 3 秒
    conn.send("終於來了!")
    conn.close()

def receiver(conn):
    print("等待接收資料...")
    msg = conn.recv()  # ⏸️ 阻塞,直到有資料
    print(f"收到: {msg}")
    conn.close()

conn1, conn2 = Pipe()

Process(target=slow_sender, args=(conn1,)).start()
Process(target=receiver, args=(conn2,)).start()

輸出:

等待接收資料...
(等待 3 秒...)
收到: 終於來了!

特性 3:FIFO(先進先出)

from multiprocessing import Process, Pipe

def sender(conn):
    messages = ["第一條", "第二條", "第三條"]
    for msg in messages:
        conn.send(msg)
    conn.close()

def receiver(conn):
    for _ in range(3):
        msg = conn.recv()
        print(f"收到: {msg}")
    conn.close()

conn1, conn2 = Pipe()

Process(target=sender, args=(conn1,)).start()
Process(target=receiver, args=(conn2,)).start()

輸出:

收到: 第一條  ← 先發送的先收到
收到: 第二條
收到: 第三條

🔄 實際案例 1:父子 Process 通訊

from multiprocessing import Process, Pipe
import os

def child_process(conn):
    """子 Process"""
    # 接收父 Process 的任務
    task = conn.recv()
    print(f"[子 Process {os.getpid()}] 收到任務: {task}")

    # 執行任務
    result = task.upper()  # 轉大寫

    # 回傳結果
    conn.send(result)
    print(f"[子 Process {os.getpid()}] 已回傳結果")
    conn.close()

# 主 Process
parent_conn, child_conn = Pipe()

# 創建子 Process
p = Process(target=child_process, args=(child_conn,))
p.start()

# 父 Process 發送任務
task = "hello world"
parent_conn.send(task)
print(f"[父 Process {os.getpid()}] 已發送任務: {task}")

# 父 Process 接收結果
result = parent_conn.recv()
print(f"[父 Process {os.getpid()}] 收到結果: {result}")

p.join()
parent_conn.close()

輸出:

[父 Process 12345] 已發送任務: hello world
[子 Process 12346] 收到任務: hello world
[子 Process 12346] 已回傳結果
[父 Process 12345] 收到結果: HELLO WORLD

📊 實際案例 2:工作分配系統

from multiprocessing import Process, Pipe
import time
import random

def worker(worker_id, conn):
    """工作者 Process"""
    while True:
        try:
            # 接收任務
            task = conn.recv()
            if task == "STOP":
                print(f"[Worker {worker_id}] 收到停止信號")
                break

            print(f"[Worker {worker_id}] 開始處理: {task}")

            # 模擬處理時間
            time.sleep(random.uniform(0.5, 1.5))

            # 回傳結果
            result = f"Task {task} 完成"
            conn.send(result)
            print(f"[Worker {worker_id}] 完成: {task}")

        except EOFError:
            break

    conn.close()

# 主 Process(任務分配器)
def main():
    # 創建 3 個 Worker
    workers = []
    for i in range(3):
        parent_conn, child_conn = Pipe()
        p = Process(target=worker, args=(i, child_conn))
        p.start()
        workers.append((p, parent_conn))

    # 分配任務
    tasks = [f"Task-{i}" for i in range(10)]
    worker_idx = 0

    for task in tasks:
        _, conn = workers[worker_idx]
        conn.send(task)
        print(f"[主程式] 分配 {task} 給 Worker {worker_idx}")

        # 輪流分配
        worker_idx = (worker_idx + 1) % len(workers)

    # 接收所有結果
    for _ in tasks:
        for _, conn in workers:
            if conn.poll():  # 檢查是否有資料
                result = conn.recv()
                print(f"[主程式] 收到結果: {result}")

    # 停止所有 Worker
    for _, conn in workers:
        conn.send("STOP")

    # 等待所有 Worker 結束
    for p, conn in workers:
        p.join()
        conn.close()

if __name__ == '__main__':
    main()

🔐 實際案例 3:安全的資料傳輸

from multiprocessing import Process, Pipe
import pickle
import hashlib

def secure_sender(conn):
    """安全發送端"""
    data = {
        'user': 'alice',
        'password': 'secret123',
        'balance': 10000
    }

    # 序列化資料
    serialized = pickle.dumps(data)

    # 計算校驗和
    checksum = hashlib.md5(serialized).hexdigest()

    # 發送資料和校驗和
    conn.send((serialized, checksum))
    print("[發送端] 已發送資料和校驗和")
    conn.close()

def secure_receiver(conn):
    """安全接收端"""
    # 接收資料
    serialized, received_checksum = conn.recv()

    # 驗證校驗和
    calculated_checksum = hashlib.md5(serialized).hexdigest()

    if calculated_checksum == received_checksum:
        print("[接收端] ✅ 校驗和驗證通過")
        data = pickle.loads(serialized)
        print(f"[接收端] 資料: {data}")
    else:
        print("[接收端] ❌ 校驗和驗證失敗,資料可能損壞")

    conn.close()

conn1, conn2 = Pipe()

Process(target=secure_sender, args=(conn1,)).start()
Process(target=secure_receiver, args=(conn2,)).start()

輸出:

[發送端] 已發送資料和校驗和
[接收端] ✅ 校驗和驗證通過
[接收端] 資料: {'user': 'alice', 'password': 'secret123', 'balance': 10000}

⚠️ Pipe 的限制與注意事項

限制 1:只能兩個端點

from multiprocessing import Process, Pipe

conn1, conn2 = Pipe()

# ✅ 正確:兩個 Process
def p1_func(conn):
    conn.send("Hello")
    conn.close()

def p2_func(conn):
    msg = conn.recv()
    print(msg)
    conn.close()

Process(target=p1_func, args=(conn1,)).start()
Process(target=p2_func, args=(conn2,)).start()

# ❌ 錯誤:不能有第三個 Process
# 如果需要多對多通訊,使用 Queue

限制 2:資料大小限制

from multiprocessing import Pipe
import sys

conn1, conn2 = Pipe()

# ⚠️ 發送過大的資料可能會有問題
huge_data = "x" * (100 * 1024 * 1024)  # 100 MB

try:
    conn1.send(huge_data)
    print(f"資料大小: {sys.getsizeof(huge_data) / 1024 / 1024:.2f} MB")
except Exception as e:
    print(f"錯誤: {e}")

# 💡 建議:大資料使用 Shared Memory

注意事項 1:記得關閉連接

from multiprocessing import Process, Pipe

def sender(conn):
    conn.send("Hello")
    conn.close()  # ✅ 記得關閉

def receiver(conn):
    msg = conn.recv()
    print(msg)
    conn.close()  # ✅ 記得關閉

conn1, conn2 = Pipe()

p1 = Process(target=sender, args=(conn1,))
p2 = Process(target=receiver, args=(conn2,))

p1.start()
p2.start()
p1.join()
p2.join()

# ⚠️ 如果不關閉,可能導致資源洩漏

注意事項 2:避免死鎖

from multiprocessing import Process, Pipe

def deadlock_example(conn1, conn2):
    # ❌ 危險:兩個 Process 都先 recv()
    msg = conn1.recv()  # 阻塞,等待 conn2 發送
    conn2.send("Response")  # 永遠不會執行到這裡

# 💀 兩個 Process 都在等待對方,造成死鎖

解決方案:

def safe_example(conn1, conn2):
    # ✅ 正確:一個先 send,一個先 recv
    conn2.send("Request")
    msg = conn1.recv()

🔍 poll() 檢查是否有資料

from multiprocessing import Process, Pipe
import time

def sender(conn):
    time.sleep(2)  # 延遲 2 秒
    conn.send("Hello")
    conn.close()

def receiver(conn):
    print("等待資料...")

    # 使用 poll() 檢查
    while not conn.poll():  # 非阻塞檢查
        print("還沒有資料,繼續等待...")
        time.sleep(0.5)

    # 有資料了,接收
    msg = conn.recv()
    print(f"收到: {msg}")
    conn.close()

conn1, conn2 = Pipe()

Process(target=sender, args=(conn1,)).start()
Process(target=receiver, args=(conn2,)).start()

輸出:

等待資料...
還沒有資料,繼續等待...
還沒有資料,繼續等待...
還沒有資料,繼續等待...
收到: Hello

📈 Pipe vs Queue

特性PipeQueue
端點數量2 個多個
速度更快較慢
使用場景父子 Process生產者/消費者
複雜度
線程安全
# Pipe:適合簡單的點對點通訊
conn1, conn2 = Pipe()

# Queue:適合多對多通訊
from multiprocessing import Queue
queue = Queue()

💡 最佳實踐

1. 使用 with 語句(如果可能)

# 雖然 Pipe 沒有內建 context manager,但可以確保關閉
def safe_communication():
    conn1, conn2 = Pipe()
    try:
        conn1.send("Hello")
        msg = conn2.recv()
        return msg
    finally:
        conn1.close()
        conn2.close()

2. 設置超時

from multiprocessing import Pipe
import time

conn1, conn2 = Pipe()

# 使用 poll() 設置超時
if conn2.poll(timeout=5):  # 最多等 5 秒
    msg = conn2.recv()
    print(f"收到: {msg}")
else:
    print("超時,沒有收到資料")

3. 適當的錯誤處理

from multiprocessing import Process, Pipe

def robust_receiver(conn):
    try:
        while True:
            if conn.poll():
                msg = conn.recv()
                print(f"收到: {msg}")
            else:
                break
    except EOFError:
        print("連接已關閉")
    except Exception as e:
        print(f"錯誤: {e}")
    finally:
        conn.close()

✅ 重點回顧

Pipe 的特性:

  • 點對點通訊(只能兩個端點)
  • 可以是單向或雙向
  • FIFO(先進先出)
  • 阻塞式 I/O
  • 速度快,適合簡單場景

Pipe 的方法:

  • send(obj) - 發送對象
  • recv() - 接收對象(阻塞)
  • poll(timeout) - 檢查是否有資料(非阻塞)
  • close() - 關閉連接

適用場景:

  • ✅ 父子 Process 通訊
  • ✅ 簡單的點對點資料交換
  • ✅ 需要雙向通訊
  • ❌ 多對多通訊(使用 Queue)
  • ❌ 大量資料傳輸(使用 Shared Memory)

注意事項:

  • 記得關閉連接
  • 避免死鎖(注意 send/recv 順序)
  • 不適合大資料傳輸
  • 只能兩個端點

上一篇: 03-1. IPC 概述 下一篇: 03-3. Message Queue 詳解


最後更新:2025-01-06

0%