目錄
05-4. 並行請求處理
⏱️ 閱讀時間: 18 分鐘 🎯 難度: ⭐⭐⭐ (進階)
🤔 一句話解釋
並行請求處理讓你同時發送多個 HTTP 請求,大幅縮短等待外部 API 的時間。
🔄 串行 vs 並行
串行請求(Sequential):
API 1: ████████████░░░░░░░░░░░░ 2s
API 2: ████████████░░░░░░░░ 2s
API 3: ████████████ 2s
總時間: 6 秒
並行請求(Concurrent):
API 1: ████████████ 2s
API 2: ████████████ 2s
API 3: ████████████ 2s
總時間: 2 秒📦 httpx 非同步客戶端
基本用法
import httpx
import asyncio
async def fetch_single():
"""單一請求"""
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com/data")
return response.json()
async def fetch_multiple():
"""並行多個請求"""
async with httpx.AsyncClient() as client:
# 同時發送多個請求
responses = await asyncio.gather(
client.get("https://api.example.com/users"),
client.get("https://api.example.com/posts"),
client.get("https://api.example.com/comments"),
)
return [r.json() for r in responses]設定超時和重試
import httpx
from httpx import Timeout, Limits
# 設定超時
timeout = Timeout(
connect=5.0, # 連線超時
read=30.0, # 讀取超時
write=5.0, # 寫入超時
pool=5.0 # 連線池等待超時
)
# 設定連線限制
limits = Limits(
max_connections=100, # 最大連線數
max_keepalive_connections=20 # 保持連線數
)
async def main():
async with httpx.AsyncClient(
timeout=timeout,
limits=limits
) as client:
response = await client.get("https://api.example.com/data")🔧 批次並行請求
使用 asyncio.gather
import httpx
import asyncio
async def fetch_url(
client: httpx.AsyncClient,
url: str
) -> dict:
"""取得單一 URL"""
try:
response = await client.get(url)
response.raise_for_status()
return {"url": url, "data": response.json(), "error": None}
except Exception as e:
return {"url": url, "data": None, "error": str(e)}
async def fetch_all(urls: list[str]) -> list[dict]:
"""並行取得多個 URL"""
async with httpx.AsyncClient() as client:
tasks = [fetch_url(client, url) for url in urls]
return await asyncio.gather(*tasks)
async def main():
urls = [
"https://jsonplaceholder.typicode.com/posts/1",
"https://jsonplaceholder.typicode.com/posts/2",
"https://jsonplaceholder.typicode.com/posts/3",
]
results = await fetch_all(urls)
for result in results:
if result["error"]:
print(f"❌ {result['url']}: {result['error']}")
else:
print(f"✅ {result['url']}: {result['data']['title']}")
asyncio.run(main())限制並行數量
import httpx
import asyncio
class RateLimitedClient:
"""限制並行數量的 HTTP 客戶端"""
def __init__(self, max_concurrent: int = 10):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.client = httpx.AsyncClient()
async def __aenter__(self):
return self
async def __aexit__(self, *args):
await self.client.aclose()
async def get(self, url: str) -> httpx.Response:
async with self.semaphore:
return await self.client.get(url)
async def fetch_all(self, urls: list[str]) -> list[dict]:
tasks = [self._fetch_one(url) for url in urls]
return await asyncio.gather(*tasks)
async def _fetch_one(self, url: str) -> dict:
try:
response = await self.get(url)
return {"url": url, "status": response.status_code}
except Exception as e:
return {"url": url, "error": str(e)}
async def main():
urls = [f"https://httpbin.org/get?id={i}" for i in range(100)]
async with RateLimitedClient(max_concurrent=10) as client:
results = await client.fetch_all(urls)
print(f"完成 {len(results)} 個請求")📊 FastAPI 中的並行請求
聚合多個 API
from fastapi import FastAPI, HTTPException
import httpx
import asyncio
app = FastAPI()
# 共享的 HTTP 客戶端
http_client: httpx.AsyncClient = None
@app.on_event("startup")
async def startup():
global http_client
http_client = httpx.AsyncClient()
@app.on_event("shutdown")
async def shutdown():
await http_client.aclose()
async def fetch_user(user_id: int) -> dict:
"""從 User 服務取得使用者"""
response = await http_client.get(
f"https://api.example.com/users/{user_id}"
)
return response.json()
async def fetch_orders(user_id: int) -> list:
"""從 Order 服務取得訂單"""
response = await http_client.get(
f"https://api.example.com/users/{user_id}/orders"
)
return response.json()
async def fetch_notifications(user_id: int) -> list:
"""從 Notification 服務取得通知"""
response = await http_client.get(
f"https://api.example.com/users/{user_id}/notifications"
)
return response.json()
@app.get("/dashboard/{user_id}")
async def get_dashboard(user_id: int):
"""
聚合多個服務的資料
並行呼叫 User、Order、Notification 服務
"""
try:
# 並行取得所有資料
user, orders, notifications = await asyncio.gather(
fetch_user(user_id),
fetch_orders(user_id),
fetch_notifications(user_id),
)
return {
"user": user,
"orders": orders,
"notifications": notifications
}
except httpx.HTTPError as e:
raise HTTPException(status_code=502, detail=f"Service error: {e}")部分失敗處理
@app.get("/dashboard/{user_id}")
async def get_dashboard_safe(user_id: int):
"""
聚合多個服務,允許部分失敗
"""
results = await asyncio.gather(
fetch_user(user_id),
fetch_orders(user_id),
fetch_notifications(user_id),
return_exceptions=True # 不會因為一個失敗而全部失敗
)
user, orders, notifications = results
return {
"user": user if not isinstance(user, Exception) else None,
"orders": orders if not isinstance(orders, Exception) else [],
"notifications": (
notifications
if not isinstance(notifications, Exception)
else []
),
"errors": [
str(r) for r in results if isinstance(r, Exception)
]
}🔄 重試機制
簡單重試
import httpx
import asyncio
async def fetch_with_retry(
client: httpx.AsyncClient,
url: str,
max_retries: int = 3,
delay: float = 1.0
) -> dict:
"""帶重試的請求"""
last_exception = None
for attempt in range(max_retries):
try:
response = await client.get(url)
response.raise_for_status()
return response.json()
except (httpx.HTTPError, httpx.RequestError) as e:
last_exception = e
if attempt < max_retries - 1:
# 指數退避
wait_time = delay * (2 ** attempt)
await asyncio.sleep(wait_time)
raise last_exception使用 tenacity 庫
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type
)
import httpx
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type(httpx.HTTPError)
)
async def fetch_with_tenacity(url: str) -> dict:
async with httpx.AsyncClient() as client:
response = await client.get(url)
response.raise_for_status()
return response.json()📝 實戰範例:API 網關
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import httpx
import asyncio
from typing import Optional, Any
from dataclasses import dataclass
from enum import Enum
class ServiceStatus(Enum):
OK = "ok"
ERROR = "error"
TIMEOUT = "timeout"
@dataclass
class ServiceResult:
service: str
status: ServiceStatus
data: Optional[Any] = None
error: Optional[str] = None
class APIGateway:
"""API 網關"""
def __init__(self):
self.client: Optional[httpx.AsyncClient] = None
self.services = {
"user": "https://user-service.internal",
"order": "https://order-service.internal",
"product": "https://product-service.internal",
"inventory": "https://inventory-service.internal",
}
async def startup(self):
self.client = httpx.AsyncClient(timeout=10.0)
async def shutdown(self):
if self.client:
await self.client.aclose()
async def call_service(
self,
service_name: str,
path: str,
timeout: float = 5.0
) -> ServiceResult:
"""呼叫單一服務"""
if service_name not in self.services:
return ServiceResult(
service=service_name,
status=ServiceStatus.ERROR,
error="Unknown service"
)
url = f"{self.services[service_name]}{path}"
try:
response = await asyncio.wait_for(
self.client.get(url),
timeout=timeout
)
response.raise_for_status()
return ServiceResult(
service=service_name,
status=ServiceStatus.OK,
data=response.json()
)
except asyncio.TimeoutError:
return ServiceResult(
service=service_name,
status=ServiceStatus.TIMEOUT,
error="Request timeout"
)
except Exception as e:
return ServiceResult(
service=service_name,
status=ServiceStatus.ERROR,
error=str(e)
)
async def aggregate(
self,
requests: dict[str, str]
) -> dict[str, ServiceResult]:
"""聚合多個服務請求"""
tasks = {
name: self.call_service(name, path)
for name, path in requests.items()
}
results = await asyncio.gather(*tasks.values())
return dict(zip(tasks.keys(), results))
# FastAPI 應用
app = FastAPI()
gateway = APIGateway()
@app.on_event("startup")
async def startup():
await gateway.startup()
@app.on_event("shutdown")
async def shutdown():
await gateway.shutdown()
@app.get("/api/product/{product_id}")
async def get_product_details(product_id: int):
"""取得產品詳情(聚合多個服務)"""
results = await gateway.aggregate({
"product": f"/products/{product_id}",
"inventory": f"/products/{product_id}/stock",
"order": f"/products/{product_id}/sales",
})
# 組合結果
response = {
"product": None,
"stock": None,
"sales": None,
"errors": []
}
for name, result in results.items():
if result.status == ServiceStatus.OK:
if name == "product":
response["product"] = result.data
elif name == "inventory":
response["stock"] = result.data
elif name == "order":
response["sales"] = result.data
else:
response["errors"].append({
"service": name,
"error": result.error
})
return response✅ 重點總結
並行方式
| 方式 | 使用場景 |
|---|---|
asyncio.gather() | 等待所有完成 |
asyncio.wait() | 第一個完成或失敗 |
asyncio.as_completed() | 按完成順序處理 |
最佳實踐
- 重用 Client:避免頻繁建立連線
- 限制並行:使用 Semaphore 防止過載
- 設定超時:避免無限等待
- 處理部分失敗:
return_exceptions=True - 實作重試:指數退避策略
🎤 面試這樣答
Q: 如何在 FastAPI 中並行呼叫多個外部 API?
答案:
使用
asyncio.gather()並行發送請求:async def get_dashboard(user_id: int): user, orders, notifications = await asyncio.gather( fetch_user(user_id), fetch_orders(user_id), fetch_notifications(user_id), return_exceptions=True # 部分失敗處理 ) return {"user": user, "orders": orders}重點:
- 共享 httpx.AsyncClient 避免重複建立連線
- 使用
return_exceptions=True處理部分失敗- 設定適當的超時時間
上一篇: 05-3. 非同步上下文管理器 下一篇: 05-5. WebSocket 基礎
最後更新:2025-12-17