目錄
05-3. 非同步上下文管理器
⏱️ 閱讀時間: 15 分鐘 🎯 難度: ⭐⭐⭐ (進階)
🤔 一句話解釋
非同步上下文管理器讓你用 async with 語法管理非同步資源的取得和釋放。
🔄 同步 vs 非同步上下文管理器
# 同步上下文管理器
with open("file.txt") as f:
content = f.read()
# 非同步上下文管理器
async with aiofiles.open("file.txt") as f:
content = await f.read()📝 定義非同步上下文管理器
使用 aenter 和 aexit
import asyncio
class AsyncResource:
"""非同步上下文管理器"""
def __init__(self, name: str):
self.name = name
async def __aenter__(self):
"""進入上下文時執行"""
print(f"{self.name}: 取得資源")
await asyncio.sleep(0.1) # 模擬非同步初始化
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""離開上下文時執行"""
print(f"{self.name}: 釋放資源")
await asyncio.sleep(0.1) # 模擬非同步清理
# 返回 True 表示異常已處理,不會向上傳播
# 返回 False 或 None 表示異常會繼續傳播
return False
async def do_something(self):
print(f"{self.name}: 執行操作")
async def main():
async with AsyncResource("資源 A") as resource:
await resource.do_something()
asyncio.run(main())
# 輸出:
# 資源 A: 取得資源
# 資源 A: 執行操作
# 資源 A: 釋放資源使用 @asynccontextmanager 裝飾器
from contextlib import asynccontextmanager
import asyncio
@asynccontextmanager
async def async_resource(name: str):
"""使用裝飾器定義非同步上下文管理器"""
print(f"{name}: 取得資源")
await asyncio.sleep(0.1)
try:
yield name # 產出的值會被綁定到 as 變數
finally:
print(f"{name}: 釋放資源")
await asyncio.sleep(0.1)
async def main():
async with async_resource("資源 B") as name:
print(f"使用 {name}")
asyncio.run(main())🔧 實用範例
資料庫連線管理
from contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
engine = create_async_engine("postgresql+asyncpg://...")
AsyncSessionLocal = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
@asynccontextmanager
async def get_db_session():
"""資料庫 Session 上下文管理器"""
session = AsyncSessionLocal()
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
# 使用
async def example():
async with get_db_session() as db:
# 執行資料庫操作
result = await db.execute(select(User))
users = result.scalars().all()HTTP 客戶端管理
from contextlib import asynccontextmanager
import httpx
class APIClient:
def __init__(self, base_url: str):
self.base_url = base_url
self.client: httpx.AsyncClient | None = None
async def __aenter__(self):
self.client = httpx.AsyncClient(base_url=self.base_url)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.client:
await self.client.aclose()
async def get(self, path: str):
return await self.client.get(path)
async def post(self, path: str, data: dict):
return await self.client.post(path, json=data)
# 使用
async def main():
async with APIClient("https://api.example.com") as client:
response = await client.get("/users")
print(response.json())分散式鎖
from contextlib import asynccontextmanager
import redis.asyncio as redis
import asyncio
import uuid
class DistributedLock:
def __init__(self, redis_client: redis.Redis, key: str, ttl: int = 30):
self.redis = redis_client
self.key = f"lock:{key}"
self.ttl = ttl
self.token = str(uuid.uuid4())
async def __aenter__(self):
"""嘗試取得鎖"""
while True:
acquired = await self.redis.set(
self.key,
self.token,
nx=True, # 只在 key 不存在時設定
ex=self.ttl
)
if acquired:
return self
await asyncio.sleep(0.1)
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""釋放鎖"""
# 使用 Lua 腳本確保只刪除自己的鎖
script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
await self.redis.eval(script, 1, self.key, self.token)
# 使用
async def main():
redis_client = redis.from_url("redis://localhost:6379")
async with DistributedLock(redis_client, "my_resource"):
print("取得鎖,執行操作")
await asyncio.sleep(2)
print("操作完成")檔案操作
import aiofiles
from contextlib import asynccontextmanager
@asynccontextmanager
async def temp_file(filename: str):
"""臨時檔案上下文管理器"""
import os
try:
yield filename
finally:
# 清理臨時檔案
if os.path.exists(filename):
os.remove(filename)
async def main():
async with temp_file("temp.txt") as filename:
async with aiofiles.open(filename, "w") as f:
await f.write("Hello, World!")
async with aiofiles.open(filename, "r") as f:
content = await f.read()
print(content)
# 離開後檔案會被刪除🔄 巢狀上下文管理器
多個資源
async def main():
async with (
get_db_session() as db,
APIClient("https://api.example.com") as client
):
# 同時使用多個資源
users = await db.execute(select(User))
response = await client.get("/external-data")AsyncExitStack
from contextlib import AsyncExitStack
async def main():
async with AsyncExitStack() as stack:
# 動態添加上下文管理器
db = await stack.enter_async_context(get_db_session())
client = await stack.enter_async_context(
APIClient("https://api.example.com")
)
# 根據條件添加
if need_lock:
lock = await stack.enter_async_context(
DistributedLock(redis, "resource")
)
# 所有資源會在離開時自動清理⚠️ 異常處理
在 aexit 中處理異常
class SafeResource:
async def __aenter__(self):
print("取得資源")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("清理資源")
if exc_type is ValueError:
print(f"捕獲 ValueError: {exc_val}")
return True # 異常已處理
return False # 其他異常繼續傳播
async def main():
# ValueError 會被處理
async with SafeResource():
raise ValueError("測試錯誤")
print("繼續執行")
asyncio.run(main())📝 FastAPI 整合
Lifespan 事件
from fastapi import FastAPI
from contextlib import asynccontextmanager
import httpx
@asynccontextmanager
async def lifespan(app: FastAPI):
"""應用程式生命週期管理"""
# 啟動時
print("應用程式啟動")
app.state.http_client = httpx.AsyncClient()
app.state.db = await create_db_pool()
yield
# 關閉時
print("應用程式關閉")
await app.state.http_client.aclose()
await app.state.db.close()
app = FastAPI(lifespan=lifespan)
@app.get("/")
async def root():
# 使用 app.state 中的資源
response = await app.state.http_client.get("https://api.example.com")
return response.json()依賴項
from fastapi import Depends
async def get_db():
"""依賴項:資料庫 Session"""
async with get_db_session() as db:
yield db
@app.get("/users")
async def list_users(db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User))
return result.scalars().all()✅ 重點總結
定義方式
| 方式 | 使用場景 |
|---|---|
__aenter__ / __aexit__ | 類別需要保存狀態 |
@asynccontextmanager | 簡單的資源管理 |
重要方法
| 方法 | 說明 |
|---|---|
__aenter__ | 進入時執行,返回值綁定到 as |
__aexit__ | 離開時執行,返回 True 抑制異常 |
使用場景
- 資料庫連線
- HTTP 客戶端
- 檔案操作
- 分散式鎖
- FastAPI Lifespan
🎤 面試這樣答
Q: async with 和 with 的差別?
答案:
async with用於非同步上下文管理器:
with使用__enter__和__exit__async with使用__aenter__和__aexit__非同步版本可以在進入和離開時執行非同步操作:
async def __aenter__(self): await self.connect() # 非同步連線 return self async def __aexit__(self, *args): await self.close() # 非同步關閉
上一篇: 05-2. asyncio 進階 下一篇: 05-4. 並行請求處理
最後更新:2025-12-17