目錄
03-4. Shared Memory(共享記憶體)
⏱️ 閱讀時間: 12 分鐘 🎯 難度: ⭐⭐⭐ (中等)
🎯 本篇重點
深入理解 Shared Memory 這種最快的 IPC 方式,學習如何在多個 Process 間安全地共享資料。
🤔 什麼是 Shared Memory?
Shared Memory(共享記憶體) = 多個 Process 共用同一塊記憶體區域
一句話解釋: 讓多個獨立的 Process 直接存取同一塊記憶體,避免資料複製,是最快的 IPC 方式。
🏢 用辦公室來比喻
普通 IPC(Pipe、Queue)
Process A 辦公室 Process B 辦公室
├─ 文件 A ├─ 文件 B(複製的)
│ │
└──→ 傳真機 ──→ 傳真機 ──┘
每次溝通都要複製文件(慢、耗資源)Shared Memory
Process A 辦公室 Process B 辦公室
├─ 員工 ├─ 員工
│ │
└──→ 共用雲端硬碟 ←──────┘
(同一份檔案)
兩邊直接存取同一份檔案(快、高效)
但需要鎖定機制避免衝突💻 Shared Memory 原理
記憶體佈局對比
普通 Process(獨立記憶體)
Process A (PID 1001) Process B (PID 1002)
├─ Code Segment ├─ Code Segment
├─ Data: counter = 0 ├─ Data: counter = 0(獨立副本)
├─ Heap ├─ Heap
└─ Stack └─ Stack
無法直接共享資料Shared Memory
Process A (PID 1001) Process B (PID 1002)
├─ Code Segment ├─ Code Segment
├─ Data ├─ Data
├─ Heap ├─ Heap
│ └─ 指向共享記憶體 │ └─ 指向共享記憶體
└─ Stack └─ Stack
↓ ↓
└──→ Shared Memory ←─────────┘
counter = 100
兩個 Process 存取同一塊記憶體1️⃣ Python Shared Memory 基礎
Value:共享單一值
from multiprocessing import Process, Value
import time
def increment(shared_value, name):
"""增加共享值"""
for _ in range(5):
shared_value.value += 1
print(f"{name}: {shared_value.value}")
time.sleep(0.1)
if __name__ == '__main__':
# 創建共享整數(初始值 0)
counter = Value('i', 0) # 'i' = int
# 兩個 Process 同時修改
p1 = Process(target=increment, args=(counter, 'P1'))
p2 = Process(target=increment, args=(counter, 'P2'))
p1.start()
p2.start()
p1.join()
p2.join()
print(f"最終值: {counter.value}")
# 可能輸出:最終值: 8(Race Condition!)Value 支援的類型
from multiprocessing import Value
# 整數類型
i_value = Value('i', 0) # int
l_value = Value('l', 0) # long
# 浮點數
f_value = Value('f', 0.0) # float
d_value = Value('d', 0.0) # double
# 字元
c_value = Value('c', b'A') # char
# 布林
b_value = Value('b', True) # signed char (當作 bool)
# 使用範例
d_value.value = 3.14
print(d_value.value) # 3.14Array:共享陣列
from multiprocessing import Process, Array
def modify_array(shared_array, process_id):
"""修改共享陣列"""
for i in range(len(shared_array)):
shared_array[i] += process_id
print(f"P{process_id}: {list(shared_array)}")
if __name__ == '__main__':
# 創建共享陣列
arr = Array('i', [0, 0, 0, 0, 0]) # 5 個整數
p1 = Process(target=modify_array, args=(arr, 1))
p2 = Process(target=modify_array, args=(arr, 2))
p1.start()
p2.start()
p1.join()
p2.join()
print(f"最終陣列: {list(arr)}")2️⃣ 同步機制:避免 Race Condition
問題:Race Condition
from multiprocessing import Process, Value
import time
def unsafe_increment(counter):
"""不安全的增加(會有 Race Condition)"""
for _ in range(100000):
# 1. 讀取值
temp = counter.value
# 2. 增加
temp += 1
# 3. 寫回(可能被其他 Process 覆蓋)
counter.value = temp
if __name__ == '__main__':
counter = Value('i', 0)
p1 = Process(target=unsafe_increment, args=(counter,))
p2 = Process(target=unsafe_increment, args=(counter,))
p1.start(); p2.start()
p1.join(); p2.join()
print(f"計數器: {counter.value}")
# 預期:200000
# 實際:可能 150000(錯誤!)解決:使用 Lock
from multiprocessing import Process, Value, Lock
def safe_increment(counter, lock):
"""安全的增加(使用 Lock)"""
for _ in range(100000):
lock.acquire()
try:
counter.value += 1
finally:
lock.release()
# 或使用 with(推薦)
def safe_increment_with(counter, lock):
for _ in range(100000):
with lock:
counter.value += 1
if __name__ == '__main__':
counter = Value('i', 0)
lock = Lock()
p1 = Process(target=safe_increment_with, args=(counter, lock))
p2 = Process(target=safe_increment_with, args=(counter, lock))
p1.start(); p2.start()
p1.join(); p2.join()
print(f"計數器: {counter.value}")
# 輸出:200000(正確!)Value 內建鎖
from multiprocessing import Process, Value
def increment_builtin_lock(counter):
"""Value 內建的 Lock"""
for _ in range(100000):
with counter.get_lock(): # Value 內建 Lock
counter.value += 1
if __name__ == '__main__':
counter = Value('i', 0)
p1 = Process(target=increment_builtin_lock, args=(counter,))
p2 = Process(target=increment_builtin_lock, args=(counter,))
p1.start(); p2.start()
p1.join(); p2.join()
print(f"計數器: {counter.value}") # 2000003️⃣ Manager:共享複雜資料結構
Manager 支援的類型
from multiprocessing import Manager
if __name__ == '__main__':
with Manager() as manager:
# 列表
shared_list = manager.list([1, 2, 3])
# 字典
shared_dict = manager.dict({'key': 'value'})
# 佇列
shared_queue = manager.Queue()
# Namespace(最靈活)
shared_ns = manager.Namespace()
shared_ns.counter = 0
shared_ns.name = "Test"
print(f"List: {shared_list}")
print(f"Dict: {shared_dict}")
print(f"Namespace: {shared_ns.counter}, {shared_ns.name}")Manager.list 範例
from multiprocessing import Process, Manager
def add_items(shared_list, start, end):
"""添加項目到共享列表"""
for i in range(start, end):
shared_list.append(i)
print(f"添加: {i}")
if __name__ == '__main__':
with Manager() as manager:
shared_list = manager.list()
# 兩個 Process 同時添加
p1 = Process(target=add_items, args=(shared_list, 0, 5))
p2 = Process(target=add_items, args=(shared_list, 5, 10))
p1.start(); p2.start()
p1.join(); p2.join()
print(f"共享列表: {list(shared_list)}")
# 輸出:[0, 1, 2, 3, 4, 5, 6, 7, 8, 9](順序可能不同)Manager.dict 範例
from multiprocessing import Process, Manager
import time
def worker(shared_dict, worker_id):
"""Worker 更新共享字典"""
shared_dict[worker_id] = f"Worker {worker_id} 完成"
shared_dict['counter'] = shared_dict.get('counter', 0) + 1
time.sleep(1)
if __name__ == '__main__':
with Manager() as manager:
shared_dict = manager.dict()
shared_dict['counter'] = 0
processes = [Process(target=worker, args=(shared_dict, i)) for i in range(5)]
for p in processes:
p.start()
for p in processes:
p.join()
print("共享字典內容:")
for key, value in shared_dict.items():
print(f" {key}: {value}")Manager.Namespace 範例
from multiprocessing import Process, Manager
def update_status(namespace, worker_id):
"""更新狀態"""
namespace.active_workers += 1
namespace.status = f"Worker {worker_id} 正在工作"
# 執行任務
import time
time.sleep(2)
namespace.completed_tasks += 1
namespace.active_workers -= 1
if __name__ == '__main__':
with Manager() as manager:
# 創建 Namespace
ns = manager.Namespace()
ns.active_workers = 0
ns.completed_tasks = 0
ns.status = "等待中"
processes = [Process(target=update_status, args=(ns, i)) for i in range(3)]
for p in processes:
p.start()
# 監控狀態
import time
for _ in range(5):
print(f"活動 Worker: {ns.active_workers}, 完成: {ns.completed_tasks}, 狀態: {ns.status}")
time.sleep(0.5)
for p in processes:
p.join()4️⃣ Python 3.8+ shared_memory 模組
新式 Shared Memory API
from multiprocessing import Process
from multiprocessing.shared_memory import SharedMemory
import numpy as np
def worker(shm_name, shape):
"""Worker Process 存取共享記憶體"""
# 連接到現有共享記憶體
existing_shm = SharedMemory(name=shm_name)
# 創建 NumPy 陣列視圖
arr = np.ndarray(shape, dtype=np.int64, buffer=existing_shm.buf)
# 修改資料
arr[0] = 100
arr[1] = 200
print(f"Worker 修改後: {arr}")
# 關閉(不刪除)
existing_shm.close()
if __name__ == '__main__':
# 創建共享記憶體
shm = SharedMemory(create=True, size=1000)
# 創建 NumPy 陣列
arr = np.ndarray((10,), dtype=np.int64, buffer=shm.buf)
arr[:] = 0 # 初始化
print(f"初始陣列: {arr}")
# 啟動 Worker
p = Process(target=worker, args=(shm.name, arr.shape))
p.start()
p.join()
print(f"主 Process 看到: {arr}")
# 清理
shm.close()
shm.unlink() # 刪除共享記憶體5️⃣ 實戰案例
案例 1:多 Process 計數器
from multiprocessing import Process, Value, Lock
import time
class SharedCounter:
def __init__(self):
self.count = Value('i', 0)
self.lock = Lock()
def increment(self):
with self.lock:
self.count.value += 1
def get_value(self):
return self.count.value
def worker(counter, worker_id, num_tasks):
"""Worker 執行任務並更新計數器"""
for i in range(num_tasks):
# 模擬工作
time.sleep(0.01)
counter.increment()
if (i + 1) % 10 == 0:
print(f"Worker {worker_id}: 完成 {i+1} 個任務,總計 {counter.get_value()}")
if __name__ == '__main__':
counter = SharedCounter()
num_workers = 4
tasks_per_worker = 25
processes = [
Process(target=worker, args=(counter, i, tasks_per_worker))
for i in range(num_workers)
]
start = time.time()
for p in processes:
p.start()
for p in processes:
p.join()
print(f"\n總完成任務: {counter.get_value()}")
print(f"耗時: {time.time() - start:.2f}s")案例 2:共享狀態監控
from multiprocessing import Process, Manager
import time
import random
def data_processor(shared_state, worker_id):
"""資料處理 Worker"""
for i in range(10):
# 更新狀態
with shared_state['lock']:
shared_state['active_workers'].add(worker_id)
shared_state['total_processed'] += 1
# 模擬處理
time.sleep(random.uniform(0.1, 0.5))
# 更新進度
progress = (i + 1) * 10
shared_state['worker_progress'][worker_id] = progress
# 完成
with shared_state['lock']:
shared_state['active_workers'].remove(worker_id)
shared_state['completed_workers'].add(worker_id)
def monitor(shared_state, num_workers):
"""監控 Worker 狀態"""
while len(shared_state['completed_workers']) < num_workers:
print("\n=== 狀態監控 ===")
print(f"活動 Worker: {len(shared_state['active_workers'])}")
print(f"已完成 Worker: {len(shared_state['completed_workers'])}")
print(f"總處理數: {shared_state['total_processed']}")
print("Worker 進度:")
for worker_id, progress in shared_state['worker_progress'].items():
print(f" Worker {worker_id}: {progress}%")
time.sleep(1)
print("\n所有 Worker 完成!")
if __name__ == '__main__':
with Manager() as manager:
# 共享狀態
shared_state = manager.dict()
shared_state['active_workers'] = manager.set()
shared_state['completed_workers'] = manager.set()
shared_state['worker_progress'] = manager.dict()
shared_state['total_processed'] = 0
shared_state['lock'] = manager.Lock()
num_workers = 5
# 啟動 Worker
workers = [
Process(target=data_processor, args=(shared_state, i))
for i in range(num_workers)
]
# 啟動監控
monitor_process = Process(target=monitor, args=(shared_state, num_workers))
for w in workers:
w.start()
monitor_process.start()
for w in workers:
w.join()
monitor_process.join()案例 3:大型陣列共享(NumPy)
from multiprocessing import Process
from multiprocessing.shared_memory import SharedMemory
import numpy as np
import time
def parallel_compute(shm_name, shape, start_idx, end_idx):
"""並行計算陣列的一部分"""
# 連接共享記憶體
shm = SharedMemory(name=shm_name)
arr = np.ndarray(shape, dtype=np.float64, buffer=shm.buf)
# 計算指定範圍
for i in range(start_idx, end_idx):
arr[i] = np.sin(i) ** 2 + np.cos(i) ** 2
shm.close()
if __name__ == '__main__':
# 創建大型陣列(100 萬個元素)
size = 1000000
shm = SharedMemory(create=True, size=size * 8) # 8 bytes per float64
arr = np.ndarray((size,), dtype=np.float64, buffer=shm.buf)
arr[:] = 0 # 初始化
# 分配給 4 個 Process
num_workers = 4
chunk_size = size // num_workers
start = time.time()
processes = []
for i in range(num_workers):
start_idx = i * chunk_size
end_idx = (i + 1) * chunk_size if i < num_workers - 1 else size
p = Process(
target=parallel_compute,
args=(shm.name, arr.shape, start_idx, end_idx)
)
p.start()
processes.append(p)
for p in processes:
p.join()
print(f"並行計算耗時: {time.time() - start:.2f}s")
print(f"前 10 個結果: {arr[:10]}")
print(f"檢查總和: {arr.sum():.2f}")
# 清理
shm.close()
shm.unlink()6️⃣ Value/Array vs Manager 對比
效能對比
from multiprocessing import Process, Value, Manager
import time
def test_value(shared_value):
"""測試 Value 效能"""
for _ in range(100000):
with shared_value.get_lock():
shared_value.value += 1
def test_manager(shared_dict):
"""測試 Manager 效能"""
for _ in range(100000):
shared_dict['counter'] = shared_dict.get('counter', 0) + 1
if __name__ == '__main__':
# Value 測試
value = Value('i', 0)
start = time.time()
p = Process(target=test_value, args=(value,))
p.start()
p.join()
print(f"Value 耗時: {time.time() - start:.2f}s")
# Manager 測試
with Manager() as manager:
shared_dict = manager.dict()
shared_dict['counter'] = 0
start = time.time()
p = Process(target=test_manager, args=(shared_dict,))
p.start()
p.join()
print(f"Manager 耗時: {time.time() - start:.2f}s")結果:
Value 耗時: 0.5s ← 快
Manager 耗時: 2.5s ← 慢(5 倍)選擇建議
| 特性 | Value/Array | Manager |
|---|---|---|
| 效能 | ⚡ 快 | 🐢 慢(有代理成本) |
| 類型支援 | 基本類型 | 複雜類型 |
| 使用難度 | 簡單 | 簡單 |
| 適用場景 | 簡單共享、高頻存取 | 複雜結構、靈活性 |
建議:
- ✅ 簡單類型、高效能 → Value/Array
- ✅ 複雜結構、彈性 → Manager
- ✅ 大型陣列、NumPy → shared_memory
✅ 重點回顧
Shared Memory 優勢:
- ✅ 最快的 IPC 方式(無資料複製)
- ✅ 適合大量資料交換
- ✅ 低延遲
三種實現方式:
- Value/Array - 基本類型,高效能
- Manager - 複雜類型,靈活
- shared_memory - 大型資料,NumPy
同步機制:
- ✅ 必須使用 Lock 避免 Race Condition
- ✅ Value/Array 內建 Lock
- ✅ Manager 自動處理部分同步
關鍵注意:
- ⚠️ 需要手動同步(Lock)
- ⚠️ Manager 有代理成本(較慢)
- ⚠️ 需要手動清理(shared_memory.unlink)
適用場景:
- ✅ 大型資料集共享(影像、陣列)
- ✅ 高頻率資料交換
- ✅ 需要最佳效能的情況
- ❌ 簡單通訊(用 Queue 更簡單)
上一篇: 03-3. Message Queue(消息隊列) 下一篇: 03-5. Socket 通訊
最後更新:2025-01-06