05-1. 非同步程式設計基礎

⏱️ 閱讀時間: 20 分鐘 🎯 難度: ⭐⭐⭐ (進階)


🤔 一句話解釋

非同步程式設計讓程式在等待 I/O 操作時可以處理其他任務,大幅提升併發效能。


🔄 同步 vs 非同步

┌─────────────────────────────────────────────────────────┐
│                    同步 (Synchronous)                   │
├─────────────────────────────────────────────────────────┤
│                                                         │
│  任務 1: ████████████░░░░░░░░░░░░  執行中... 等待      │
│  任務 2:             ████████████░░░░░░░░  等待        │
│  任務 3:                         ████████████         │
│                                                         │
│  總時間: ══════════════════════════════════════════    │
│                                                         │
└─────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────┐
│                    非同步 (Asynchronous)                │
├─────────────────────────────────────────────────────────┤
│                                                         │
│  任務 1: ████░░░░████  執行 → 等待 → 繼續               │
│  任務 2:   ████░░░░████  執行 → 等待 → 繼續             │
│  任務 3:     ████░░░░████  執行 → 等待 → 繼續           │
│                                                         │
│  總時間: ════════════════                               │
│                                                         │
│  等待時間被有效利用!                                    │
└─────────────────────────────────────────────────────────┘

🎯 何時使用非同步

適合非同步的場景(I/O 密集)

場景說明
資料庫查詢等待資料庫回應
HTTP 請求呼叫外部 API
檔案讀寫讀取/寫入檔案
網路通訊WebSocket、TCP

不適合非同步的場景(CPU 密集)

場景說明
複雜計算數學運算、演算法
圖片處理調整大小、濾鏡
加密運算雜湊、加密
資料壓縮壓縮/解壓縮

📚 Python async/await 基礎

基本語法

import asyncio

# 定義非同步函數
async def hello():
    print("Hello")
    await asyncio.sleep(1)  # 非同步等待
    print("World")

# 執行非同步函數
asyncio.run(hello())

核心概念

# 1. async def:定義協程(coroutine)
async def my_coroutine():
    return "Hello"

# 2. await:等待協程完成
async def main():
    result = await my_coroutine()
    print(result)

# 3. asyncio.run():執行非同步程式
asyncio.run(main())

# 4. 協程物件 vs 協程函數
coro_func = my_coroutine  # 協程函數
coro_obj = my_coroutine()  # 協程物件(需要 await)

重要規則

# ❌ 錯誤:直接呼叫協程不會執行
async def fetch_data():
    return "data"

fetch_data()  # 只會建立協程物件,不會執行

# ✅ 正確:使用 await
result = await fetch_data()

# ❌ 錯誤:在非同步函數外使用 await
def sync_function():
    await fetch_data()  # SyntaxError

# ✅ 正確:在非同步函數內使用 await
async def async_function():
    await fetch_data()

🔧 asyncio 常用 API

並行執行

import asyncio

async def task(name: str, delay: float) -> str:
    print(f"{name} 開始")
    await asyncio.sleep(delay)
    print(f"{name} 完成")
    return f"{name} 結果"

async def main():
    # ===== asyncio.gather:同時執行多個任務 =====
    results = await asyncio.gather(
        task("A", 2),
        task("B", 1),
        task("C", 3),
    )
    print(results)  # ['A 結果', 'B 結果', 'C 結果']

    # ===== asyncio.create_task:建立任務(不等待)=====
    task_a = asyncio.create_task(task("A", 2))
    task_b = asyncio.create_task(task("B", 1))

    # 做其他事情...
    print("任務已建立")

    # 等待任務完成
    result_a = await task_a
    result_b = await task_b

asyncio.run(main())

超時處理

import asyncio

async def slow_operation():
    await asyncio.sleep(10)
    return "完成"

async def main():
    try:
        # 設定 5 秒超時
        result = await asyncio.wait_for(
            slow_operation(),
            timeout=5.0
        )
    except asyncio.TimeoutError:
        print("操作超時!")

asyncio.run(main())

任務取消

import asyncio

async def long_running_task():
    try:
        while True:
            print("執行中...")
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print("任務被取消")
        raise  # 重新拋出,讓外部知道任務已取消

async def main():
    task = asyncio.create_task(long_running_task())

    await asyncio.sleep(3)

    # 取消任務
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("確認任務已取消")

asyncio.run(main())

📊 並行處理模式

1. 批次處理

async def process_item(item: int) -> int:
    await asyncio.sleep(0.1)  # 模擬 I/O
    return item * 2

async def process_batch(items: list[int]) -> list[int]:
    """批次並行處理"""
    tasks = [process_item(item) for item in items]
    return await asyncio.gather(*tasks)

async def main():
    items = list(range(100))
    results = await process_batch(items)
    print(f"處理完成:{len(results)} 項")

asyncio.run(main())

2. 限制並行數量

import asyncio

async def limited_process(
    items: list[int],
    max_concurrent: int = 10
) -> list[int]:
    """限制並行數量的批次處理"""
    semaphore = asyncio.Semaphore(max_concurrent)

    async def process_with_limit(item: int) -> int:
        async with semaphore:
            await asyncio.sleep(0.1)  # 模擬 I/O
            return item * 2

    tasks = [process_with_limit(item) for item in items]
    return await asyncio.gather(*tasks)

async def main():
    items = list(range(100))
    results = await limited_process(items, max_concurrent=10)
    print(f"處理完成:{len(results)} 項")

asyncio.run(main())

3. 生產者-消費者模式

import asyncio
from asyncio import Queue

async def producer(queue: Queue, items: list):
    """生產者:將項目放入隊列"""
    for item in items:
        await queue.put(item)
        print(f"生產: {item}")

    # 發送結束信號
    await queue.put(None)

async def consumer(queue: Queue, name: str):
    """消費者:從隊列取出並處理"""
    while True:
        item = await queue.get()

        if item is None:
            # 收到結束信號,放回並退出
            await queue.put(None)
            break

        print(f"{name} 處理: {item}")
        await asyncio.sleep(0.5)  # 模擬處理

        queue.task_done()

async def main():
    queue = Queue(maxsize=10)

    # 啟動生產者和多個消費者
    items = list(range(20))

    await asyncio.gather(
        producer(queue, items),
        consumer(queue, "消費者 A"),
        consumer(queue, "消費者 B"),
        consumer(queue, "消費者 C"),
    )

asyncio.run(main())

🌐 非同步 HTTP 請求

使用 httpx

import asyncio
import httpx

async def fetch_url(client: httpx.AsyncClient, url: str) -> dict:
    """非同步取得 URL 內容"""
    response = await client.get(url)
    return {
        "url": url,
        "status": response.status_code,
        "length": len(response.content)
    }

async def fetch_all(urls: list[str]) -> list[dict]:
    """並行取得多個 URL"""
    async with httpx.AsyncClient() as client:
        tasks = [fetch_url(client, url) for url in urls]
        return await asyncio.gather(*tasks)

async def main():
    urls = [
        "https://httpbin.org/get",
        "https://httpbin.org/ip",
        "https://httpbin.org/user-agent",
    ]

    results = await fetch_all(urls)
    for result in results:
        print(result)

asyncio.run(main())

使用 aiohttp

import asyncio
import aiohttp

async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
    async with session.get(url) as response:
        return {
            "url": url,
            "status": response.status,
            "content": await response.text()
        }

async def fetch_all(urls: list[str]) -> list[dict]:
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        return await asyncio.gather(*tasks)

📝 FastAPI 中的非同步

基本用法

from fastapi import FastAPI
import httpx

app = FastAPI()

# 非同步端點
@app.get("/async")
async def async_endpoint():
    async with httpx.AsyncClient() as client:
        response = await client.get("https://api.example.com/data")
        return response.json()

# 同步端點(FastAPI 會在線程池中執行)
@app.get("/sync")
def sync_endpoint():
    # 同步程式碼
    import requests
    response = requests.get("https://api.example.com/data")
    return response.json()

依賴注入

from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession

app = FastAPI()

# 非同步依賴項
async def get_db() -> AsyncSession:
    async with AsyncSessionLocal() as session:
        yield session

# 非同步端點使用非同步依賴項
@app.get("/users")
async def get_users(db: AsyncSession = Depends(get_db)):
    result = await db.execute(select(User))
    return result.scalars().all()

⚠️ 常見陷阱

1. 阻塞呼叫

import time

# ❌ 錯誤:在非同步函數中使用同步的阻塞呼叫
async def bad_example():
    time.sleep(5)  # 阻塞整個事件迴圈!

# ✅ 正確:使用非同步的等待
async def good_example():
    await asyncio.sleep(5)

# ✅ 如果必須使用同步函數,用 run_in_executor
async def run_sync_in_async():
    loop = asyncio.get_event_loop()
    await loop.run_in_executor(None, time.sleep, 5)

2. 忘記 await

async def fetch_data():
    return "data"

# ❌ 錯誤:忘記 await
async def bad_example():
    result = fetch_data()  # 這是協程物件,不是結果!
    print(result)  # <coroutine object ...>

# ✅ 正確
async def good_example():
    result = await fetch_data()
    print(result)  # "data"

3. 混用同步和非同步

# ❌ 錯誤:在非同步中呼叫同步的資料庫操作
async def bad_example():
    # 假設這是同步的 SQLAlchemy
    users = db.query(User).all()  # 阻塞!

# ✅ 正確:使用非同步版本
async def good_example():
    result = await db.execute(select(User))
    users = result.scalars().all()

✅ 重點總結

async/await 基礎

語法說明
async def定義協程函數
await等待協程完成
asyncio.run()執行非同步程式
asyncio.gather()並行執行多個任務
asyncio.create_task()建立任務

適用場景

場景建議
I/O 密集使用非同步
CPU 密集使用多進程
混合非同步 + run_in_executor

注意事項

  1. 不要在非同步中使用同步阻塞呼叫
  2. 不要忘記 await
  3. 使用非同步版本的函式庫

🎤 面試這樣答

Q: Python 的 async/await 是如何工作的?

答案:

Python 的 async/await 基於事件迴圈(Event Loop):

  1. 協程(Coroutine):async def 定義的函數
  2. 事件迴圈:管理和調度協程的執行
  3. await:暫停當前協程,讓出控制權

當協程遇到 await 時:

  • 暫停當前協程
  • 事件迴圈可以執行其他協程
  • I/O 完成後,恢復原協程
async def example():
    print("開始")
    await asyncio.sleep(1)  # 暫停,讓出控制權
    print("結束")

下一篇: 05-2. asyncio 進階


最後更新:2025-12-17

0%