目錄
05-1. 非同步程式設計基礎
⏱️ 閱讀時間: 20 分鐘 🎯 難度: ⭐⭐⭐ (進階)
🤔 一句話解釋
非同步程式設計讓程式在等待 I/O 操作時可以處理其他任務,大幅提升併發效能。
🔄 同步 vs 非同步
┌─────────────────────────────────────────────────────────┐
│ 同步 (Synchronous) │
├─────────────────────────────────────────────────────────┤
│ │
│ 任務 1: ████████████░░░░░░░░░░░░ 執行中... 等待 │
│ 任務 2: ████████████░░░░░░░░ 等待 │
│ 任務 3: ████████████ │
│ │
│ 總時間: ══════════════════════════════════════════ │
│ │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ 非同步 (Asynchronous) │
├─────────────────────────────────────────────────────────┤
│ │
│ 任務 1: ████░░░░████ 執行 → 等待 → 繼續 │
│ 任務 2: ████░░░░████ 執行 → 等待 → 繼續 │
│ 任務 3: ████░░░░████ 執行 → 等待 → 繼續 │
│ │
│ 總時間: ════════════════ │
│ │
│ 等待時間被有效利用! │
└─────────────────────────────────────────────────────────┘🎯 何時使用非同步
適合非同步的場景(I/O 密集)
| 場景 | 說明 |
|---|---|
| 資料庫查詢 | 等待資料庫回應 |
| HTTP 請求 | 呼叫外部 API |
| 檔案讀寫 | 讀取/寫入檔案 |
| 網路通訊 | WebSocket、TCP |
不適合非同步的場景(CPU 密集)
| 場景 | 說明 |
|---|---|
| 複雜計算 | 數學運算、演算法 |
| 圖片處理 | 調整大小、濾鏡 |
| 加密運算 | 雜湊、加密 |
| 資料壓縮 | 壓縮/解壓縮 |
📚 Python async/await 基礎
基本語法
import asyncio
# 定義非同步函數
async def hello():
print("Hello")
await asyncio.sleep(1) # 非同步等待
print("World")
# 執行非同步函數
asyncio.run(hello())核心概念
# 1. async def:定義協程(coroutine)
async def my_coroutine():
return "Hello"
# 2. await:等待協程完成
async def main():
result = await my_coroutine()
print(result)
# 3. asyncio.run():執行非同步程式
asyncio.run(main())
# 4. 協程物件 vs 協程函數
coro_func = my_coroutine # 協程函數
coro_obj = my_coroutine() # 協程物件(需要 await)重要規則
# ❌ 錯誤:直接呼叫協程不會執行
async def fetch_data():
return "data"
fetch_data() # 只會建立協程物件,不會執行
# ✅ 正確:使用 await
result = await fetch_data()
# ❌ 錯誤:在非同步函數外使用 await
def sync_function():
await fetch_data() # SyntaxError
# ✅ 正確:在非同步函數內使用 await
async def async_function():
await fetch_data()🔧 asyncio 常用 API
並行執行
import asyncio
async def task(name: str, delay: float) -> str:
print(f"{name} 開始")
await asyncio.sleep(delay)
print(f"{name} 完成")
return f"{name} 結果"
async def main():
# ===== asyncio.gather:同時執行多個任務 =====
results = await asyncio.gather(
task("A", 2),
task("B", 1),
task("C", 3),
)
print(results) # ['A 結果', 'B 結果', 'C 結果']
# ===== asyncio.create_task:建立任務(不等待)=====
task_a = asyncio.create_task(task("A", 2))
task_b = asyncio.create_task(task("B", 1))
# 做其他事情...
print("任務已建立")
# 等待任務完成
result_a = await task_a
result_b = await task_b
asyncio.run(main())超時處理
import asyncio
async def slow_operation():
await asyncio.sleep(10)
return "完成"
async def main():
try:
# 設定 5 秒超時
result = await asyncio.wait_for(
slow_operation(),
timeout=5.0
)
except asyncio.TimeoutError:
print("操作超時!")
asyncio.run(main())任務取消
import asyncio
async def long_running_task():
try:
while True:
print("執行中...")
await asyncio.sleep(1)
except asyncio.CancelledError:
print("任務被取消")
raise # 重新拋出,讓外部知道任務已取消
async def main():
task = asyncio.create_task(long_running_task())
await asyncio.sleep(3)
# 取消任務
task.cancel()
try:
await task
except asyncio.CancelledError:
print("確認任務已取消")
asyncio.run(main())📊 並行處理模式
1. 批次處理
async def process_item(item: int) -> int:
await asyncio.sleep(0.1) # 模擬 I/O
return item * 2
async def process_batch(items: list[int]) -> list[int]:
"""批次並行處理"""
tasks = [process_item(item) for item in items]
return await asyncio.gather(*tasks)
async def main():
items = list(range(100))
results = await process_batch(items)
print(f"處理完成:{len(results)} 項")
asyncio.run(main())2. 限制並行數量
import asyncio
async def limited_process(
items: list[int],
max_concurrent: int = 10
) -> list[int]:
"""限制並行數量的批次處理"""
semaphore = asyncio.Semaphore(max_concurrent)
async def process_with_limit(item: int) -> int:
async with semaphore:
await asyncio.sleep(0.1) # 模擬 I/O
return item * 2
tasks = [process_with_limit(item) for item in items]
return await asyncio.gather(*tasks)
async def main():
items = list(range(100))
results = await limited_process(items, max_concurrent=10)
print(f"處理完成:{len(results)} 項")
asyncio.run(main())3. 生產者-消費者模式
import asyncio
from asyncio import Queue
async def producer(queue: Queue, items: list):
"""生產者:將項目放入隊列"""
for item in items:
await queue.put(item)
print(f"生產: {item}")
# 發送結束信號
await queue.put(None)
async def consumer(queue: Queue, name: str):
"""消費者:從隊列取出並處理"""
while True:
item = await queue.get()
if item is None:
# 收到結束信號,放回並退出
await queue.put(None)
break
print(f"{name} 處理: {item}")
await asyncio.sleep(0.5) # 模擬處理
queue.task_done()
async def main():
queue = Queue(maxsize=10)
# 啟動生產者和多個消費者
items = list(range(20))
await asyncio.gather(
producer(queue, items),
consumer(queue, "消費者 A"),
consumer(queue, "消費者 B"),
consumer(queue, "消費者 C"),
)
asyncio.run(main())🌐 非同步 HTTP 請求
使用 httpx
import asyncio
import httpx
async def fetch_url(client: httpx.AsyncClient, url: str) -> dict:
"""非同步取得 URL 內容"""
response = await client.get(url)
return {
"url": url,
"status": response.status_code,
"length": len(response.content)
}
async def fetch_all(urls: list[str]) -> list[dict]:
"""並行取得多個 URL"""
async with httpx.AsyncClient() as client:
tasks = [fetch_url(client, url) for url in urls]
return await asyncio.gather(*tasks)
async def main():
urls = [
"https://httpbin.org/get",
"https://httpbin.org/ip",
"https://httpbin.org/user-agent",
]
results = await fetch_all(urls)
for result in results:
print(result)
asyncio.run(main())使用 aiohttp
import asyncio
import aiohttp
async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
async with session.get(url) as response:
return {
"url": url,
"status": response.status,
"content": await response.text()
}
async def fetch_all(urls: list[str]) -> list[dict]:
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
return await asyncio.gather(*tasks)📝 FastAPI 中的非同步
基本用法
from fastapi import FastAPI
import httpx
app = FastAPI()
# 非同步端點
@app.get("/async")
async def async_endpoint():
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com/data")
return response.json()
# 同步端點(FastAPI 會在線程池中執行)
@app.get("/sync")
def sync_endpoint():
# 同步程式碼
import requests
response = requests.get("https://api.example.com/data")
return response.json()依賴注入
from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession
app = FastAPI()
# 非同步依賴項
async def get_db() -> AsyncSession:
async with AsyncSessionLocal() as session:
yield session
# 非同步端點使用非同步依賴項
@app.get("/users")
async def get_users(db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User))
return result.scalars().all()⚠️ 常見陷阱
1. 阻塞呼叫
import time
# ❌ 錯誤:在非同步函數中使用同步的阻塞呼叫
async def bad_example():
time.sleep(5) # 阻塞整個事件迴圈!
# ✅ 正確:使用非同步的等待
async def good_example():
await asyncio.sleep(5)
# ✅ 如果必須使用同步函數,用 run_in_executor
async def run_sync_in_async():
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, time.sleep, 5)2. 忘記 await
async def fetch_data():
return "data"
# ❌ 錯誤:忘記 await
async def bad_example():
result = fetch_data() # 這是協程物件,不是結果!
print(result) # <coroutine object ...>
# ✅ 正確
async def good_example():
result = await fetch_data()
print(result) # "data"3. 混用同步和非同步
# ❌ 錯誤:在非同步中呼叫同步的資料庫操作
async def bad_example():
# 假設這是同步的 SQLAlchemy
users = db.query(User).all() # 阻塞!
# ✅ 正確:使用非同步版本
async def good_example():
result = await db.execute(select(User))
users = result.scalars().all()✅ 重點總結
async/await 基礎
| 語法 | 說明 |
|---|---|
async def | 定義協程函數 |
await | 等待協程完成 |
asyncio.run() | 執行非同步程式 |
asyncio.gather() | 並行執行多個任務 |
asyncio.create_task() | 建立任務 |
適用場景
| 場景 | 建議 |
|---|---|
| I/O 密集 | 使用非同步 |
| CPU 密集 | 使用多進程 |
| 混合 | 非同步 + run_in_executor |
注意事項
- 不要在非同步中使用同步阻塞呼叫
- 不要忘記 await
- 使用非同步版本的函式庫
🎤 面試這樣答
Q: Python 的 async/await 是如何工作的?
答案:
Python 的 async/await 基於事件迴圈(Event Loop):
- 協程(Coroutine):async def 定義的函數
- 事件迴圈:管理和調度協程的執行
- await:暫停當前協程,讓出控制權
當協程遇到 await 時:
- 暫停當前協程
- 事件迴圈可以執行其他協程
- I/O 完成後,恢復原協程
async def example(): print("開始") await asyncio.sleep(1) # 暫停,讓出控制權 print("結束")
下一篇: 05-2. asyncio 進階
最後更新:2025-12-17