05-4. 並行請求處理

⏱️ 閱讀時間: 18 分鐘 🎯 難度: ⭐⭐⭐ (進階)


🤔 一句話解釋

並行請求處理讓你同時發送多個 HTTP 請求,大幅縮短等待外部 API 的時間。


🔄 串行 vs 並行

串行請求(Sequential):
API 1: ████████████░░░░░░░░░░░░  2s
API 2:             ████████████░░░░░░░░  2s
API 3:                         ████████████  2s
總時間: 6 秒

並行請求(Concurrent):
API 1: ████████████  2s
API 2: ████████████  2s
API 3: ████████████  2s
總時間: 2 秒

📦 httpx 非同步客戶端

基本用法

import httpx
import asyncio

async def fetch_single():
    """單一請求"""
    async with httpx.AsyncClient() as client:
        response = await client.get("https://api.example.com/data")
        return response.json()

async def fetch_multiple():
    """並行多個請求"""
    async with httpx.AsyncClient() as client:
        # 同時發送多個請求
        responses = await asyncio.gather(
            client.get("https://api.example.com/users"),
            client.get("https://api.example.com/posts"),
            client.get("https://api.example.com/comments"),
        )

        return [r.json() for r in responses]

設定超時和重試

import httpx
from httpx import Timeout, Limits

# 設定超時
timeout = Timeout(
    connect=5.0,    # 連線超時
    read=30.0,      # 讀取超時
    write=5.0,      # 寫入超時
    pool=5.0        # 連線池等待超時
)

# 設定連線限制
limits = Limits(
    max_connections=100,      # 最大連線數
    max_keepalive_connections=20  # 保持連線數
)

async def main():
    async with httpx.AsyncClient(
        timeout=timeout,
        limits=limits
    ) as client:
        response = await client.get("https://api.example.com/data")

🔧 批次並行請求

使用 asyncio.gather

import httpx
import asyncio

async def fetch_url(
    client: httpx.AsyncClient,
    url: str
) -> dict:
    """取得單一 URL"""
    try:
        response = await client.get(url)
        response.raise_for_status()
        return {"url": url, "data": response.json(), "error": None}
    except Exception as e:
        return {"url": url, "data": None, "error": str(e)}

async def fetch_all(urls: list[str]) -> list[dict]:
    """並行取得多個 URL"""
    async with httpx.AsyncClient() as client:
        tasks = [fetch_url(client, url) for url in urls]
        return await asyncio.gather(*tasks)

async def main():
    urls = [
        "https://jsonplaceholder.typicode.com/posts/1",
        "https://jsonplaceholder.typicode.com/posts/2",
        "https://jsonplaceholder.typicode.com/posts/3",
    ]

    results = await fetch_all(urls)
    for result in results:
        if result["error"]:
            print(f"❌ {result['url']}: {result['error']}")
        else:
            print(f"✅ {result['url']}: {result['data']['title']}")

asyncio.run(main())

限制並行數量

import httpx
import asyncio

class RateLimitedClient:
    """限制並行數量的 HTTP 客戶端"""

    def __init__(self, max_concurrent: int = 10):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.client = httpx.AsyncClient()

    async def __aenter__(self):
        return self

    async def __aexit__(self, *args):
        await self.client.aclose()

    async def get(self, url: str) -> httpx.Response:
        async with self.semaphore:
            return await self.client.get(url)

    async def fetch_all(self, urls: list[str]) -> list[dict]:
        tasks = [self._fetch_one(url) for url in urls]
        return await asyncio.gather(*tasks)

    async def _fetch_one(self, url: str) -> dict:
        try:
            response = await self.get(url)
            return {"url": url, "status": response.status_code}
        except Exception as e:
            return {"url": url, "error": str(e)}

async def main():
    urls = [f"https://httpbin.org/get?id={i}" for i in range(100)]

    async with RateLimitedClient(max_concurrent=10) as client:
        results = await client.fetch_all(urls)
        print(f"完成 {len(results)} 個請求")

📊 FastAPI 中的並行請求

聚合多個 API

from fastapi import FastAPI, HTTPException
import httpx
import asyncio

app = FastAPI()

# 共享的 HTTP 客戶端
http_client: httpx.AsyncClient = None

@app.on_event("startup")
async def startup():
    global http_client
    http_client = httpx.AsyncClient()

@app.on_event("shutdown")
async def shutdown():
    await http_client.aclose()


async def fetch_user(user_id: int) -> dict:
    """從 User 服務取得使用者"""
    response = await http_client.get(
        f"https://api.example.com/users/{user_id}"
    )
    return response.json()

async def fetch_orders(user_id: int) -> list:
    """從 Order 服務取得訂單"""
    response = await http_client.get(
        f"https://api.example.com/users/{user_id}/orders"
    )
    return response.json()

async def fetch_notifications(user_id: int) -> list:
    """從 Notification 服務取得通知"""
    response = await http_client.get(
        f"https://api.example.com/users/{user_id}/notifications"
    )
    return response.json()


@app.get("/dashboard/{user_id}")
async def get_dashboard(user_id: int):
    """
    聚合多個服務的資料

    並行呼叫 User、Order、Notification 服務
    """
    try:
        # 並行取得所有資料
        user, orders, notifications = await asyncio.gather(
            fetch_user(user_id),
            fetch_orders(user_id),
            fetch_notifications(user_id),
        )

        return {
            "user": user,
            "orders": orders,
            "notifications": notifications
        }
    except httpx.HTTPError as e:
        raise HTTPException(status_code=502, detail=f"Service error: {e}")

部分失敗處理

@app.get("/dashboard/{user_id}")
async def get_dashboard_safe(user_id: int):
    """
    聚合多個服務,允許部分失敗
    """
    results = await asyncio.gather(
        fetch_user(user_id),
        fetch_orders(user_id),
        fetch_notifications(user_id),
        return_exceptions=True  # 不會因為一個失敗而全部失敗
    )

    user, orders, notifications = results

    return {
        "user": user if not isinstance(user, Exception) else None,
        "orders": orders if not isinstance(orders, Exception) else [],
        "notifications": (
            notifications
            if not isinstance(notifications, Exception)
            else []
        ),
        "errors": [
            str(r) for r in results if isinstance(r, Exception)
        ]
    }

🔄 重試機制

簡單重試

import httpx
import asyncio

async def fetch_with_retry(
    client: httpx.AsyncClient,
    url: str,
    max_retries: int = 3,
    delay: float = 1.0
) -> dict:
    """帶重試的請求"""
    last_exception = None

    for attempt in range(max_retries):
        try:
            response = await client.get(url)
            response.raise_for_status()
            return response.json()
        except (httpx.HTTPError, httpx.RequestError) as e:
            last_exception = e
            if attempt < max_retries - 1:
                # 指數退避
                wait_time = delay * (2 ** attempt)
                await asyncio.sleep(wait_time)

    raise last_exception

使用 tenacity 庫

from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type
)
import httpx

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=10),
    retry=retry_if_exception_type(httpx.HTTPError)
)
async def fetch_with_tenacity(url: str) -> dict:
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
        response.raise_for_status()
        return response.json()

📝 實戰範例:API 網關

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import httpx
import asyncio
from typing import Optional, Any
from dataclasses import dataclass
from enum import Enum

class ServiceStatus(Enum):
    OK = "ok"
    ERROR = "error"
    TIMEOUT = "timeout"

@dataclass
class ServiceResult:
    service: str
    status: ServiceStatus
    data: Optional[Any] = None
    error: Optional[str] = None

class APIGateway:
    """API 網關"""

    def __init__(self):
        self.client: Optional[httpx.AsyncClient] = None
        self.services = {
            "user": "https://user-service.internal",
            "order": "https://order-service.internal",
            "product": "https://product-service.internal",
            "inventory": "https://inventory-service.internal",
        }

    async def startup(self):
        self.client = httpx.AsyncClient(timeout=10.0)

    async def shutdown(self):
        if self.client:
            await self.client.aclose()

    async def call_service(
        self,
        service_name: str,
        path: str,
        timeout: float = 5.0
    ) -> ServiceResult:
        """呼叫單一服務"""
        if service_name not in self.services:
            return ServiceResult(
                service=service_name,
                status=ServiceStatus.ERROR,
                error="Unknown service"
            )

        url = f"{self.services[service_name]}{path}"

        try:
            response = await asyncio.wait_for(
                self.client.get(url),
                timeout=timeout
            )
            response.raise_for_status()
            return ServiceResult(
                service=service_name,
                status=ServiceStatus.OK,
                data=response.json()
            )
        except asyncio.TimeoutError:
            return ServiceResult(
                service=service_name,
                status=ServiceStatus.TIMEOUT,
                error="Request timeout"
            )
        except Exception as e:
            return ServiceResult(
                service=service_name,
                status=ServiceStatus.ERROR,
                error=str(e)
            )

    async def aggregate(
        self,
        requests: dict[str, str]
    ) -> dict[str, ServiceResult]:
        """聚合多個服務請求"""
        tasks = {
            name: self.call_service(name, path)
            for name, path in requests.items()
        }

        results = await asyncio.gather(*tasks.values())

        return dict(zip(tasks.keys(), results))


# FastAPI 應用
app = FastAPI()
gateway = APIGateway()

@app.on_event("startup")
async def startup():
    await gateway.startup()

@app.on_event("shutdown")
async def shutdown():
    await gateway.shutdown()

@app.get("/api/product/{product_id}")
async def get_product_details(product_id: int):
    """取得產品詳情(聚合多個服務)"""
    results = await gateway.aggregate({
        "product": f"/products/{product_id}",
        "inventory": f"/products/{product_id}/stock",
        "order": f"/products/{product_id}/sales",
    })

    # 組合結果
    response = {
        "product": None,
        "stock": None,
        "sales": None,
        "errors": []
    }

    for name, result in results.items():
        if result.status == ServiceStatus.OK:
            if name == "product":
                response["product"] = result.data
            elif name == "inventory":
                response["stock"] = result.data
            elif name == "order":
                response["sales"] = result.data
        else:
            response["errors"].append({
                "service": name,
                "error": result.error
            })

    return response

✅ 重點總結

並行方式

方式使用場景
asyncio.gather()等待所有完成
asyncio.wait()第一個完成或失敗
asyncio.as_completed()按完成順序處理

最佳實踐

  1. 重用 Client:避免頻繁建立連線
  2. 限制並行:使用 Semaphore 防止過載
  3. 設定超時:避免無限等待
  4. 處理部分失敗return_exceptions=True
  5. 實作重試:指數退避策略

🎤 面試這樣答

Q: 如何在 FastAPI 中並行呼叫多個外部 API?

答案:

使用 asyncio.gather() 並行發送請求:

async def get_dashboard(user_id: int):
    user, orders, notifications = await asyncio.gather(
        fetch_user(user_id),
        fetch_orders(user_id),
        fetch_notifications(user_id),
        return_exceptions=True  # 部分失敗處理
    )
    return {"user": user, "orders": orders}

重點:

  1. 共享 httpx.AsyncClient 避免重複建立連線
  2. 使用 return_exceptions=True 處理部分失敗
  3. 設定適當的超時時間

上一篇: 05-3. 非同步上下文管理器 下一篇: 05-5. WebSocket 基礎


最後更新:2025-12-17

0%