Queue(佇列)資料結構完整教學
理解 FIFO 原理與 Python 實作技巧
目錄
Queue(佇列)資料結構完整教學
什麼是 Queue?
Queue(佇列)是一種遵循 FIFO(First In First Out,先進先出) 原則的線性資料結構。就像現實生活中的排隊一樣,第一個進入佇列的元素會第一個離開。
視覺化概念
排隊買票的例子:
[出口] ← 小明 ← 小華 ← 小美 ← [入口]
↑ ↑
先到的人 後到的人
(先離開) (後離開)
資料結構表示:
Front [10 | 20 | 30 | 40] Rear
出隊 ← ← 入隊
Queue 的基本操作
- Enqueue(入隊):在佇列尾端加入元素
- Dequeue(出隊):從佇列前端移除元素
- Front/Peek:查看佇列前端元素(不移除)
- IsEmpty:檢查佇列是否為空
- Size:取得佇列中的元素數量
為什麼需要 Queue?
實際應用場景
# 1. 印表機工作佇列
print_queue = Queue()
print_queue.enqueue("文件1.pdf")
print_queue.enqueue("報告.docx")
print_queue.enqueue("圖片.jpg")
# 按照順序列印
# 2. 作業系統的程序排程
process_queue = Queue()
process_queue.enqueue("Process A")
process_queue.enqueue("Process B")
# CPU 按順序處理
# 3. 網頁伺服器處理請求
request_queue = Queue()
request_queue.enqueue("GET /index.html")
request_queue.enqueue("POST /api/data")
# 按順序處理請求
實作方式比較
1. 使用 Python List(簡單但效率較低)
class QueueWithList:
"""使用 Python list 實作的 Queue"""
def __init__(self):
self.items = []
def enqueue(self, item):
"""入隊 - O(1)"""
self.items.append(item)
def dequeue(self):
"""出隊 - O(n) 因為需要移動所有元素"""
if self.is_empty():
raise IndexError("Queue is empty")
return self.items.pop(0) # 移除第一個元素
def front(self):
"""查看隊首元素 - O(1)"""
if self.is_empty():
raise IndexError("Queue is empty")
return self.items[0]
def is_empty(self):
"""檢查是否為空 - O(1)"""
return len(self.items) == 0
def size(self):
"""取得大小 - O(1)"""
return len(self.items)
def __str__(self):
return f"Queue({self.items})"
# 使用範例
queue = QueueWithList()
queue.enqueue(10)
queue.enqueue(20)
queue.enqueue(30)
print(queue) # Queue([10, 20, 30])
print(queue.dequeue()) # 10
print(queue) # Queue([20, 30])
2. 使用 collections.deque(推薦)
from collections import deque
class QueueWithDeque:
"""使用 collections.deque 實作的 Queue - 效率最高"""
def __init__(self):
self.items = deque()
def enqueue(self, item):
"""入隊 - O(1)"""
self.items.append(item)
def dequeue(self):
"""出隊 - O(1)"""
if self.is_empty():
raise IndexError("Queue is empty")
return self.items.popleft() # 從左邊移除
def front(self):
"""查看隊首元素 - O(1)"""
if self.is_empty():
raise IndexError("Queue is empty")
return self.items[0]
def is_empty(self):
"""檢查是否為空 - O(1)"""
return len(self.items) == 0
def size(self):
"""取得大小 - O(1)"""
return len(self.items)
def __str__(self):
return f"Queue({list(self.items)})"
# 使用範例
queue = QueueWithDeque()
for i in [10, 20, 30]:
queue.enqueue(i)
print(f"Enqueued {i}: {queue}")
while not queue.is_empty():
item = queue.dequeue()
print(f"Dequeued {item}: {queue}")
3. 使用 Linked List 實作
class Node:
"""節點類別"""
def __init__(self, data):
self.data = data
self.next = None
class QueueWithLinkedList:
"""使用 Linked List 實作的 Queue"""
def __init__(self):
self.front_node = None # 隊首
self.rear_node = None # 隊尾
self._size = 0
def enqueue(self, item):
"""入隊 - O(1)"""
new_node = Node(item)
if self.is_empty():
self.front_node = self.rear_node = new_node
else:
self.rear_node.next = new_node
self.rear_node = new_node
self._size += 1
def dequeue(self):
"""出隊 - O(1)"""
if self.is_empty():
raise IndexError("Queue is empty")
data = self.front_node.data
self.front_node = self.front_node.next
if self.front_node is None: # 佇列變空了
self.rear_node = None
self._size -= 1
return data
def front(self):
"""查看隊首元素 - O(1)"""
if self.is_empty():
raise IndexError("Queue is empty")
return self.front_node.data
def is_empty(self):
"""檢查是否為空 - O(1)"""
return self.front_node is None
def size(self):
"""取得大小 - O(1)"""
return self._size
def __str__(self):
if self.is_empty():
return "Queue([])"
result = []
current = self.front_node
while current:
result.append(str(current.data))
current = current.next
return f"Queue([{', '.join(result)}])"
# 使用範例
queue = QueueWithLinkedList()
print(f"空佇列: {queue}")
for i in range(1, 4):
queue.enqueue(i * 10)
print(f"Enqueue {i * 10}: {queue}")
print(f"Front element: {queue.front()}")
print(f"Dequeue: {queue.dequeue()}")
print(f"After dequeue: {queue}")
4. 使用固定大小的陣列(Circular Queue)
class CircularQueue:
"""環形佇列 - 使用固定大小的陣列"""
def __init__(self, capacity):
self.capacity = capacity
self.items = [None] * capacity
self.front = -1
self.rear = -1
self._size = 0
def enqueue(self, item):
"""入隊 - O(1)"""
if self.is_full():
raise OverflowError("Queue is full")
if self.is_empty():
self.front = self.rear = 0
else:
self.rear = (self.rear + 1) % self.capacity
self.items[self.rear] = item
self._size += 1
def dequeue(self):
"""出隊 - O(1)"""
if self.is_empty():
raise IndexError("Queue is empty")
data = self.items[self.front]
if self.front == self.rear: # 只有一個元素
self.front = self.rear = -1
else:
self.front = (self.front + 1) % self.capacity
self._size -= 1
return data
def front_element(self):
"""查看隊首元素 - O(1)"""
if self.is_empty():
raise IndexError("Queue is empty")
return self.items[self.front]
def is_empty(self):
"""檢查是否為空 - O(1)"""
return self.front == -1
def is_full(self):
"""檢查是否已滿 - O(1)"""
return self._size == self.capacity
def size(self):
"""取得大小 - O(1)"""
return self._size
def __str__(self):
if self.is_empty():
return "CircularQueue([])"
result = []
i = self.front
while True:
result.append(str(self.items[i]))
if i == self.rear:
break
i = (i + 1) % self.capacity
return f"CircularQueue({result})"
# 使用範例
cq = CircularQueue(5)
print(f"容量: {cq.capacity}")
# 測試環形特性
for i in range(1, 6):
cq.enqueue(i)
print(f"Enqueue {i}: {cq}")
print(f"\n佇列已滿: {cq.is_full()}")
# 出隊後再入隊,展示環形特性
for _ in range(2):
print(f"Dequeue: {cq.dequeue()}")
for i in range(6, 8):
cq.enqueue(i)
print(f"Enqueue {i}: {cq}")
Python 內建的 Queue 模組
Python 提供了 queue
模組,包含多種佇列實作:
import queue
# 1. 基本的 FIFO Queue
fifo_queue = queue.Queue(maxsize=5) # maxsize=0 表示無限大
# 入隊
fifo_queue.put(10)
fifo_queue.put(20)
fifo_queue.put(30)
# 出隊
print(fifo_queue.get()) # 10
print(fifo_queue.get()) # 20
# 檢查狀態
print(f"Queue size: {fifo_queue.qsize()}")
print(f"Is empty: {fifo_queue.empty()}")
print(f"Is full: {fifo_queue.full()}")
# 2. LIFO Queue(其實就是 Stack)
lifo_queue = queue.LifoQueue()
lifo_queue.put(10)
lifo_queue.put(20)
print(lifo_queue.get()) # 20 (後進先出)
# 3. Priority Queue(優先權佇列)
pq = queue.PriorityQueue()
pq.put((2, "中優先"))
pq.put((1, "高優先"))
pq.put((3, "低優先"))
while not pq.empty():
priority, item = pq.get()
print(f"Priority {priority}: {item}")
# 輸出順序:高優先 → 中優先 → 低優先
實際應用範例
1. 任務排程系統
class TaskScheduler:
"""簡單的任務排程器"""
def __init__(self):
self.task_queue = QueueWithDeque()
self.completed_tasks = []
def add_task(self, task_name):
"""新增任務"""
self.task_queue.enqueue({
'name': task_name,
'added_time': __import__('time').time()
})
print(f"Task '{task_name}' added to queue")
def process_next_task(self):
"""處理下一個任務"""
if self.task_queue.is_empty():
print("No tasks to process")
return
task = self.task_queue.dequeue()
print(f"Processing task: {task['name']}")
# 模擬處理時間
__import__('time').sleep(1)
task['completed_time'] = __import__('time').time()
self.completed_tasks.append(task)
print(f"Task '{task['name']}' completed")
def process_all_tasks(self):
"""處理所有任務"""
while not self.task_queue.is_empty():
self.process_next_task()
def show_status(self):
"""顯示狀態"""
print(f"\nPending tasks: {self.task_queue.size()}")
print(f"Completed tasks: {len(self.completed_tasks)}")
# 使用範例
scheduler = TaskScheduler()
scheduler.add_task("發送郵件")
scheduler.add_task("生成報告")
scheduler.add_task("備份資料")
scheduler.show_status()
scheduler.process_all_tasks()
scheduler.show_status()
2. 廣度優先搜尋(BFS)
from collections import deque
def bfs_shortest_path(graph, start, target):
"""使用 BFS 找最短路徑"""
queue = deque([(start, [start])])
visited = {start}
while queue:
node, path = queue.popleft()
if node == target:
return path
for neighbor in graph[node]:
if neighbor not in visited:
visited.add(neighbor)
queue.append((neighbor, path + [neighbor]))
return None
# 使用範例
graph = {
'A': ['B', 'C'],
'B': ['A', 'D', 'E'],
'C': ['A', 'F'],
'D': ['B'],
'E': ['B', 'F'],
'F': ['C', 'E']
}
path = bfs_shortest_path(graph, 'A', 'F')
print(f"Shortest path from A to F: {' -> '.join(path)}")
# 輸出: A -> C -> F
3. 生產者-消費者模式
import threading
import queue
import time
import random
class ProducerConsumer:
"""生產者-消費者模式範例"""
def __init__(self, buffer_size=5):
self.buffer = queue.Queue(maxsize=buffer_size)
self.running = True
def producer(self, producer_id):
"""生產者執行緒"""
while self.running:
item = f"Item_{producer_id}_{random.randint(1, 100)}"
self.buffer.put(item)
print(f"Producer {producer_id} produced: {item}")
time.sleep(random.uniform(0.5, 1.5))
def consumer(self, consumer_id):
"""消費者執行緒"""
while self.running or not self.buffer.empty():
try:
item = self.buffer.get(timeout=1)
print(f"Consumer {consumer_id} consumed: {item}")
time.sleep(random.uniform(0.5, 1.0))
except queue.Empty:
continue
def run(self, duration=5):
"""執行模擬"""
# 建立執行緒
producers = [
threading.Thread(target=self.producer, args=(i,))
for i in range(2)
]
consumers = [
threading.Thread(target=self.consumer, args=(i,))
for i in range(3)
]
# 啟動執行緒
for p in producers:
p.start()
for c in consumers:
c.start()
# 執行一段時間
time.sleep(duration)
self.running = False
# 等待所有執行緒結束
for p in producers:
p.join()
for c in consumers:
c.join()
print("\nSimulation completed!")
# 使用範例(簡化版,實際執行需要處理執行緒)
# pc = ProducerConsumer()
# pc.run(duration=5)
4. 網頁爬蟲的 URL 佇列
class WebCrawlerQueue:
"""網頁爬蟲的 URL 管理佇列"""
def __init__(self):
self.url_queue = QueueWithDeque()
self.visited_urls = set()
self.max_depth = 3
def add_url(self, url, depth=0):
"""新增 URL 到佇列"""
if url not in self.visited_urls and depth <= self.max_depth:
self.url_queue.enqueue({'url': url, 'depth': depth})
def crawl_next(self):
"""爬取下一個 URL"""
if self.url_queue.is_empty():
return None
url_info = self.url_queue.dequeue()
url = url_info['url']
depth = url_info['depth']
if url in self.visited_urls:
return self.crawl_next()
self.visited_urls.add(url)
print(f"Crawling: {url} (depth: {depth})")
# 模擬找到新的連結
if depth < self.max_depth:
for i in range(random.randint(0, 3)):
new_url = f"{url}/page{i}"
self.add_url(new_url, depth + 1)
return url
def crawl_all(self):
"""爬取所有 URL"""
while not self.url_queue.is_empty():
self.crawl_next()
print(f"\nTotal URLs crawled: {len(self.visited_urls)}")
# 使用範例
crawler = WebCrawlerQueue()
crawler.add_url("https://example.com")
crawler.crawl_all()
效能分析
時間複雜度比較
操作 | List 實作 | Deque 實作 | Linked List | Circular Array |
---|---|---|---|---|
enqueue() | O(1) | O(1) | O(1) | O(1) |
dequeue() | O(n) | O(1) | O(1) | O(1) |
front() | O(1) | O(1) | O(1) | O(1) |
is_empty() | O(1) | O(1) | O(1) | O(1) |
size() | O(1) | O(1) | O(1) | O(1) |
空間複雜度
- List/Deque:O(n),可能有額外的空間浪費
- Linked List:O(n),每個節點需要額外的指標空間
- Circular Array:O(capacity),固定大小
選擇適合的 Queue 實作
def choose_queue_implementation():
"""如何選擇適合的 Queue 實作"""
print("Queue 實作選擇指南:")
print("\n1. collections.deque(大多數情況的最佳選擇)")
print(" - 優點:效能優秀、內建支援、功能完整")
print(" - 適用:一般用途、不需要執行緒安全")
print("\n2. queue.Queue(多執行緒環境)")
print(" - 優點:執行緒安全、支援阻塞操作")
print(" - 適用:生產者-消費者模式、多執行緒應用")
print("\n3. 自定義 Linked List Queue")
print(" - 優點:教學用途、可自定義行為")
print(" - 適用:學習資料結構、特殊需求")
print("\n4. Circular Queue")
print(" - 優點:固定記憶體使用、適合嵌入式系統")
print(" - 適用:緩衝區、資源受限環境")
choose_queue_implementation()
常見面試問題
1. 使用兩個 Stack 實作 Queue
class QueueUsingStacks:
"""使用兩個 Stack 實作 Queue"""
def __init__(self):
self.in_stack = [] # 用於入隊
self.out_stack = [] # 用於出隊
def enqueue(self, item):
"""入隊 - O(1)"""
self.in_stack.append(item)
def dequeue(self):
"""出隊 - 平均 O(1),最壞 O(n)"""
if not self.out_stack:
# 將 in_stack 的元素轉移到 out_stack
while self.in_stack:
self.out_stack.append(self.in_stack.pop())
if not self.out_stack:
raise IndexError("Queue is empty")
return self.out_stack.pop()
def is_empty(self):
return len(self.in_stack) == 0 and len(self.out_stack) == 0
def __str__(self):
# 顯示實際的佇列順序
temp = self.out_stack[::-1] + self.in_stack
return f"Queue({temp})"
# 測試
q = QueueUsingStacks()
for i in [1, 2, 3]:
q.enqueue(i)
print(q) # Queue([1, 2, 3])
print(q.dequeue()) # 1
q.enqueue(4)
print(q) # Queue([2, 3, 4])
2. 實作具有 getMin() 的 Queue
class MinQueue:
"""支援 O(1) 取得最小值的 Queue"""
def __init__(self):
self.queue = deque()
self.min_deque = deque() # 儲存最小值
def enqueue(self, item):
"""入隊 - O(1)"""
self.queue.append(item)
# 維護最小值佇列
while self.min_deque and self.min_deque[-1] > item:
self.min_deque.pop()
self.min_deque.append(item)
def dequeue(self):
"""出隊 - O(1)"""
if not self.queue:
raise IndexError("Queue is empty")
item = self.queue.popleft()
# 如果移除的是最小值,也要從 min_deque 移除
if self.min_deque and self.min_deque[0] == item:
self.min_deque.popleft()
return item
def get_min(self):
"""取得最小值 - O(1)"""
if not self.min_deque:
raise ValueError("Queue is empty")
return self.min_deque[0]
def __str__(self):
return f"Queue({list(self.queue)}), Min: {self.get_min() if self.queue else 'N/A'}"
# 測試
mq = MinQueue()
for val in [3, 1, 4, 1, 5]:
mq.enqueue(val)
print(f"Enqueue {val}: {mq}")
while len(mq.queue) > 0:
val = mq.dequeue()
print(f"Dequeue {val}: Queue({list(mq.queue)}), Min: {mq.get_min() if mq.queue else 'N/A'}")
總結
Queue 是一個基礎但極其重要的資料結構:
核心概念
- FIFO 原則:先進先出
- 基本操作:enqueue、dequeue、front、is_empty
- 多種實作:list、deque、linked list、circular array
選擇建議
- 一般用途:使用
collections.deque
- 多執行緒:使用
queue.Queue
- 學習目的:實作自定義 Queue
- 特殊需求:根據具體情況選擇
應用場景
- 任務排程
- BFS 演算法
- 生產者-消費者模式
- 訊息佇列系統
- 印表機排程
理解 Queue 的原理和應用,是掌握更複雜系統設計的重要基礎!