05-3. 非同步上下文管理器

⏱️ 閱讀時間: 15 分鐘 🎯 難度: ⭐⭐⭐ (進階)


🤔 一句話解釋

非同步上下文管理器讓你用 async with 語法管理非同步資源的取得和釋放。


🔄 同步 vs 非同步上下文管理器

# 同步上下文管理器
with open("file.txt") as f:
    content = f.read()

# 非同步上下文管理器
async with aiofiles.open("file.txt") as f:
    content = await f.read()

📝 定義非同步上下文管理器

使用 aenteraexit

import asyncio

class AsyncResource:
    """非同步上下文管理器"""

    def __init__(self, name: str):
        self.name = name

    async def __aenter__(self):
        """進入上下文時執行"""
        print(f"{self.name}: 取得資源")
        await asyncio.sleep(0.1)  # 模擬非同步初始化
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """離開上下文時執行"""
        print(f"{self.name}: 釋放資源")
        await asyncio.sleep(0.1)  # 模擬非同步清理

        # 返回 True 表示異常已處理,不會向上傳播
        # 返回 False 或 None 表示異常會繼續傳播
        return False

    async def do_something(self):
        print(f"{self.name}: 執行操作")

async def main():
    async with AsyncResource("資源 A") as resource:
        await resource.do_something()

asyncio.run(main())
# 輸出:
# 資源 A: 取得資源
# 資源 A: 執行操作
# 資源 A: 釋放資源

使用 @asynccontextmanager 裝飾器

from contextlib import asynccontextmanager
import asyncio

@asynccontextmanager
async def async_resource(name: str):
    """使用裝飾器定義非同步上下文管理器"""
    print(f"{name}: 取得資源")
    await asyncio.sleep(0.1)

    try:
        yield name  # 產出的值會被綁定到 as 變數
    finally:
        print(f"{name}: 釋放資源")
        await asyncio.sleep(0.1)

async def main():
    async with async_resource("資源 B") as name:
        print(f"使用 {name}")

asyncio.run(main())

🔧 實用範例

資料庫連線管理

from contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

engine = create_async_engine("postgresql+asyncpg://...")
AsyncSessionLocal = sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False
)

@asynccontextmanager
async def get_db_session():
    """資料庫 Session 上下文管理器"""
    session = AsyncSessionLocal()
    try:
        yield session
        await session.commit()
    except Exception:
        await session.rollback()
        raise
    finally:
        await session.close()

# 使用
async def example():
    async with get_db_session() as db:
        # 執行資料庫操作
        result = await db.execute(select(User))
        users = result.scalars().all()

HTTP 客戶端管理

from contextlib import asynccontextmanager
import httpx

class APIClient:
    def __init__(self, base_url: str):
        self.base_url = base_url
        self.client: httpx.AsyncClient | None = None

    async def __aenter__(self):
        self.client = httpx.AsyncClient(base_url=self.base_url)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.client:
            await self.client.aclose()

    async def get(self, path: str):
        return await self.client.get(path)

    async def post(self, path: str, data: dict):
        return await self.client.post(path, json=data)

# 使用
async def main():
    async with APIClient("https://api.example.com") as client:
        response = await client.get("/users")
        print(response.json())

分散式鎖

from contextlib import asynccontextmanager
import redis.asyncio as redis
import asyncio
import uuid

class DistributedLock:
    def __init__(self, redis_client: redis.Redis, key: str, ttl: int = 30):
        self.redis = redis_client
        self.key = f"lock:{key}"
        self.ttl = ttl
        self.token = str(uuid.uuid4())

    async def __aenter__(self):
        """嘗試取得鎖"""
        while True:
            acquired = await self.redis.set(
                self.key,
                self.token,
                nx=True,  # 只在 key 不存在時設定
                ex=self.ttl
            )
            if acquired:
                return self
            await asyncio.sleep(0.1)

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """釋放鎖"""
        # 使用 Lua 腳本確保只刪除自己的鎖
        script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        await self.redis.eval(script, 1, self.key, self.token)

# 使用
async def main():
    redis_client = redis.from_url("redis://localhost:6379")

    async with DistributedLock(redis_client, "my_resource"):
        print("取得鎖,執行操作")
        await asyncio.sleep(2)
        print("操作完成")

檔案操作

import aiofiles
from contextlib import asynccontextmanager

@asynccontextmanager
async def temp_file(filename: str):
    """臨時檔案上下文管理器"""
    import os

    try:
        yield filename
    finally:
        # 清理臨時檔案
        if os.path.exists(filename):
            os.remove(filename)

async def main():
    async with temp_file("temp.txt") as filename:
        async with aiofiles.open(filename, "w") as f:
            await f.write("Hello, World!")

        async with aiofiles.open(filename, "r") as f:
            content = await f.read()
            print(content)

    # 離開後檔案會被刪除

🔄 巢狀上下文管理器

多個資源

async def main():
    async with (
        get_db_session() as db,
        APIClient("https://api.example.com") as client
    ):
        # 同時使用多個資源
        users = await db.execute(select(User))
        response = await client.get("/external-data")

AsyncExitStack

from contextlib import AsyncExitStack

async def main():
    async with AsyncExitStack() as stack:
        # 動態添加上下文管理器
        db = await stack.enter_async_context(get_db_session())
        client = await stack.enter_async_context(
            APIClient("https://api.example.com")
        )

        # 根據條件添加
        if need_lock:
            lock = await stack.enter_async_context(
                DistributedLock(redis, "resource")
            )

        # 所有資源會在離開時自動清理

⚠️ 異常處理

aexit 中處理異常

class SafeResource:
    async def __aenter__(self):
        print("取得資源")
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("清理資源")

        if exc_type is ValueError:
            print(f"捕獲 ValueError: {exc_val}")
            return True  # 異常已處理

        return False  # 其他異常繼續傳播

async def main():
    # ValueError 會被處理
    async with SafeResource():
        raise ValueError("測試錯誤")

    print("繼續執行")

asyncio.run(main())

📝 FastAPI 整合

Lifespan 事件

from fastapi import FastAPI
from contextlib import asynccontextmanager
import httpx

@asynccontextmanager
async def lifespan(app: FastAPI):
    """應用程式生命週期管理"""
    # 啟動時
    print("應用程式啟動")
    app.state.http_client = httpx.AsyncClient()
    app.state.db = await create_db_pool()

    yield

    # 關閉時
    print("應用程式關閉")
    await app.state.http_client.aclose()
    await app.state.db.close()

app = FastAPI(lifespan=lifespan)

@app.get("/")
async def root():
    # 使用 app.state 中的資源
    response = await app.state.http_client.get("https://api.example.com")
    return response.json()

依賴項

from fastapi import Depends

async def get_db():
    """依賴項:資料庫 Session"""
    async with get_db_session() as db:
        yield db

@app.get("/users")
async def list_users(db: AsyncSession = Depends(get_db)):
    result = await db.execute(select(User))
    return result.scalars().all()

✅ 重點總結

定義方式

方式使用場景
__aenter__ / __aexit__類別需要保存狀態
@asynccontextmanager簡單的資源管理

重要方法

方法說明
__aenter__進入時執行,返回值綁定到 as
__aexit__離開時執行,返回 True 抑制異常

使用場景

  1. 資料庫連線
  2. HTTP 客戶端
  3. 檔案操作
  4. 分散式鎖
  5. FastAPI Lifespan

🎤 面試這樣答

Q: async with 和 with 的差別?

答案:

async with 用於非同步上下文管理器:

  • with 使用 __enter____exit__
  • async with 使用 __aenter____aexit__

非同步版本可以在進入和離開時執行非同步操作:

async def __aenter__(self):
    await self.connect()  # 非同步連線
    return self

async def __aexit__(self, *args):
    await self.close()  # 非同步關閉

上一篇: 05-2. asyncio 進階 下一篇: 05-4. 並行請求處理


最後更新:2025-12-17

0%