目錄
05-2. asyncio 進階
⏱️ 閱讀時間: 18 分鐘 🎯 難度: ⭐⭐⭐⭐ (高階)
🤔 一句話解釋
進階的 asyncio 技巧包括任務管理、錯誤處理、同步原語等,讓你寫出更健壯的非同步程式。
🔧 事件迴圈
事件迴圈基礎
import asyncio
# 取得事件迴圈
loop = asyncio.get_event_loop()
# 新版本推薦用 asyncio.run()
asyncio.run(main())
# 或者在非同步函數中取得
async def example():
loop = asyncio.get_running_loop()
print(f"當前迴圈: {loop}")自訂事件迴圈
import asyncio
async def main():
print("Hello")
await asyncio.sleep(1)
print("World")
# 使用自訂的事件迴圈設定
if __name__ == "__main__":
# 設定 debug 模式
asyncio.run(main(), debug=True)📋 任務管理
Task 物件
import asyncio
async def long_task(name: str):
print(f"{name} 開始")
await asyncio.sleep(2)
print(f"{name} 完成")
return f"{name} 結果"
async def main():
# 建立任務
task = asyncio.create_task(long_task("Task A"))
# 檢查任務狀態
print(f"任務完成: {task.done()}") # False
# 等待任務
result = await task
print(f"任務完成: {task.done()}") # True
print(f"結果: {result}")
asyncio.run(main())任務群組(Python 3.11+)
import asyncio
async def task(name: str):
await asyncio.sleep(1)
if name == "C":
raise ValueError(f"{name} 失敗")
return f"{name} 完成"
async def main():
# TaskGroup 會等待所有任務,並收集所有異常
try:
async with asyncio.TaskGroup() as tg:
task_a = tg.create_task(task("A"))
task_b = tg.create_task(task("B"))
task_c = tg.create_task(task("C"))
except* ValueError as eg:
print(f"捕獲異常: {eg.exceptions}")
asyncio.run(main())任務取消
import asyncio
async def cancellable_task():
try:
while True:
print("執行中...")
await asyncio.sleep(1)
except asyncio.CancelledError:
print("收到取消信號")
# 清理資源
print("清理完成")
raise # 重新拋出
async def main():
task = asyncio.create_task(cancellable_task())
await asyncio.sleep(3)
# 取消任務
task.cancel()
try:
await task
except asyncio.CancelledError:
print("任務已取消")
asyncio.run(main())⏱️ 超時處理
wait_for
import asyncio
async def slow_operation():
await asyncio.sleep(10)
return "完成"
async def main():
try:
result = await asyncio.wait_for(slow_operation(), timeout=3.0)
print(result)
except asyncio.TimeoutError:
print("操作超時")
asyncio.run(main())timeout(Python 3.11+)
import asyncio
async def main():
async with asyncio.timeout(3.0):
await asyncio.sleep(10) # 會被取消
asyncio.run(main())自訂超時處理
import asyncio
async def with_timeout(coro, timeout: float, default=None):
"""帶有預設值的超時處理"""
try:
return await asyncio.wait_for(coro, timeout=timeout)
except asyncio.TimeoutError:
return default
async def main():
result = await with_timeout(
asyncio.sleep(10),
timeout=1.0,
default="超時預設值"
)
print(result) # "超時預設值"
asyncio.run(main())🔒 同步原語
Lock(互斥鎖)
import asyncio
shared_resource = []
lock = asyncio.Lock()
async def safe_append(value):
"""使用鎖保護共享資源"""
async with lock:
print(f"取得鎖,添加 {value}")
shared_resource.append(value)
await asyncio.sleep(0.1)
print(f"釋放鎖,{value} 已添加")
async def main():
await asyncio.gather(
safe_append(1),
safe_append(2),
safe_append(3),
)
print(f"結果: {shared_resource}")
asyncio.run(main())Semaphore(信號量)
import asyncio
# 限制同時執行的任務數量
semaphore = asyncio.Semaphore(3)
async def limited_task(name: str):
async with semaphore:
print(f"{name} 開始")
await asyncio.sleep(2)
print(f"{name} 完成")
async def main():
# 同時啟動 10 個任務,但最多 3 個並行
tasks = [limited_task(f"Task {i}") for i in range(10)]
await asyncio.gather(*tasks)
asyncio.run(main())Event(事件)
import asyncio
event = asyncio.Event()
async def waiter(name: str):
print(f"{name} 等待事件...")
await event.wait()
print(f"{name} 事件已觸發!")
async def setter():
await asyncio.sleep(2)
print("設定事件")
event.set()
async def main():
await asyncio.gather(
waiter("A"),
waiter("B"),
waiter("C"),
setter(),
)
asyncio.run(main())Condition(條件變數)
import asyncio
condition = asyncio.Condition()
queue = []
async def producer():
for i in range(5):
async with condition:
queue.append(i)
print(f"生產: {i}")
condition.notify()
await asyncio.sleep(0.5)
async def consumer(name: str):
while True:
async with condition:
while not queue:
await condition.wait()
item = queue.pop(0)
print(f"{name} 消費: {item}")
async def main():
consumer_tasks = [
asyncio.create_task(consumer(f"消費者 {i}"))
for i in range(2)
]
await producer()
# 取消消費者
for task in consumer_tasks:
task.cancel()
asyncio.run(main())📬 Queue(佇列)
基本用法
import asyncio
async def producer(queue: asyncio.Queue):
for i in range(10):
await queue.put(i)
print(f"生產: {i}")
await asyncio.sleep(0.1)
async def consumer(queue: asyncio.Queue, name: str):
while True:
item = await queue.get()
print(f"{name} 處理: {item}")
await asyncio.sleep(0.3)
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=5)
# 啟動消費者
consumers = [
asyncio.create_task(consumer(queue, f"消費者 {i}"))
for i in range(3)
]
# 執行生產者
await producer(queue)
# 等待佇列處理完成
await queue.join()
# 取消消費者
for c in consumers:
c.cancel()
asyncio.run(main())優先佇列
import asyncio
from asyncio import PriorityQueue
async def main():
queue = PriorityQueue()
# 放入項目(優先級, 資料)
await queue.put((3, "低優先"))
await queue.put((1, "高優先"))
await queue.put((2, "中優先"))
while not queue.empty():
priority, item = await queue.get()
print(f"優先級 {priority}: {item}")
asyncio.run(main())
# 輸出:
# 優先級 1: 高優先
# 優先級 2: 中優先
# 優先級 3: 低優先🔄 執行同步程式碼
run_in_executor
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def blocking_io():
"""模擬同步的 I/O 操作"""
time.sleep(2)
return "I/O 完成"
def cpu_intensive():
"""模擬 CPU 密集操作"""
total = sum(i * i for i in range(10**7))
return total
async def main():
loop = asyncio.get_running_loop()
# 使用預設的線程池執行同步 I/O
result = await loop.run_in_executor(None, blocking_io)
print(result)
# 使用自訂線程池
with ThreadPoolExecutor(max_workers=4) as executor:
result = await loop.run_in_executor(executor, cpu_intensive)
print(f"計算結果: {result}")
asyncio.run(main())to_thread(Python 3.9+)
import asyncio
import time
def blocking_operation():
time.sleep(2)
return "完成"
async def main():
# 更簡潔的方式
result = await asyncio.to_thread(blocking_operation)
print(result)
asyncio.run(main())⚠️ 錯誤處理
gather 的錯誤處理
import asyncio
async def task(name: str, should_fail: bool = False):
await asyncio.sleep(1)
if should_fail:
raise ValueError(f"{name} 失敗")
return f"{name} 成功"
async def main():
# return_exceptions=True:不會立即拋出異常
results = await asyncio.gather(
task("A"),
task("B", should_fail=True),
task("C"),
return_exceptions=True
)
for result in results:
if isinstance(result, Exception):
print(f"錯誤: {result}")
else:
print(f"結果: {result}")
asyncio.run(main())wait 的錯誤處理
import asyncio
async def task(name: str, delay: float, should_fail: bool = False):
await asyncio.sleep(delay)
if should_fail:
raise ValueError(f"{name} 失敗")
return f"{name} 成功"
async def main():
tasks = [
asyncio.create_task(task("A", 1)),
asyncio.create_task(task("B", 2, should_fail=True)),
asyncio.create_task(task("C", 3)),
]
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_EXCEPTION
)
print(f"完成: {len(done)}, 待處理: {len(pending)}")
for task in done:
try:
print(f"結果: {task.result()}")
except Exception as e:
print(f"異常: {e}")
# 取消待處理的任務
for task in pending:
task.cancel()
asyncio.run(main())📝 實戰範例:HTTP 爬蟲
import asyncio
import httpx
from dataclasses import dataclass
from typing import Optional
@dataclass
class FetchResult:
url: str
status: int
content_length: int
error: Optional[str] = None
class AsyncCrawler:
def __init__(self, max_concurrent: int = 10, timeout: float = 30.0):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.timeout = timeout
async def fetch_url(
self,
client: httpx.AsyncClient,
url: str
) -> FetchResult:
"""取得單一 URL"""
async with self.semaphore:
try:
response = await client.get(url, timeout=self.timeout)
return FetchResult(
url=url,
status=response.status_code,
content_length=len(response.content)
)
except httpx.TimeoutException:
return FetchResult(
url=url,
status=0,
content_length=0,
error="Timeout"
)
except Exception as e:
return FetchResult(
url=url,
status=0,
content_length=0,
error=str(e)
)
async def crawl(self, urls: list[str]) -> list[FetchResult]:
"""並行爬取多個 URL"""
async with httpx.AsyncClient() as client:
tasks = [self.fetch_url(client, url) for url in urls]
return await asyncio.gather(*tasks)
async def main():
urls = [
"https://httpbin.org/get",
"https://httpbin.org/delay/1",
"https://httpbin.org/status/404",
"https://invalid-url.example.com",
]
crawler = AsyncCrawler(max_concurrent=5, timeout=5.0)
results = await crawler.crawl(urls)
for result in results:
if result.error:
print(f"❌ {result.url}: {result.error}")
else:
print(f"✅ {result.url}: {result.status} ({result.content_length} bytes)")
asyncio.run(main())✅ 重點總結
同步原語
| 原語 | 用途 |
|---|---|
Lock | 互斥存取 |
Semaphore | 限制並行數 |
Event | 事件通知 |
Condition | 條件等待 |
Queue | 任務佇列 |
任務管理
| 方法 | 說明 |
|---|---|
create_task() | 建立任務 |
gather() | 並行執行 |
wait() | 等待任務 |
wait_for() | 超時等待 |
TaskGroup | 任務群組(3.11+) |
錯誤處理
gather(return_exceptions=True)收集異常wait(return_when=FIRST_EXCEPTION)第一個異常時返回try/except asyncio.CancelledError處理取消
🎤 面試這樣答
Q: 如何在 asyncio 中限制並行數量?
答案:
使用
asyncio.Semaphore限制並行數量:semaphore = asyncio.Semaphore(10) # 最多 10 個並行 async def limited_task(): async with semaphore: # 取得信號量 await do_something() # 離開時自動釋放這樣即使啟動 100 個任務,同時執行的最多 10 個。
上一篇: 05-1. 非同步程式設計基礎 下一篇: 05-3. 非同步上下文管理器
最後更新:2025-12-17