# 

# 05-4. 並行請求處理

&gt; ⏱️ **閱讀時間：** 18 分鐘
&gt; 🎯 **難度：** ⭐⭐⭐ (進階)

---

## 🤔 一句話解釋

**並行請求處理讓你同時發送多個 HTTP 請求，大幅縮短等待外部 API 的時間。**

---

## 🔄 串行 vs 並行

```
串行請求（Sequential）：
API 1: ████████████░░░░░░░░░░░░  2s
API 2:             ████████████░░░░░░░░  2s
API 3:                         ████████████  2s
總時間: 6 秒

並行請求（Concurrent）：
API 1: ████████████  2s
API 2: ████████████  2s
API 3: ████████████  2s
總時間: 2 秒
```

---

## 📦 httpx 非同步客戶端

### 基本用法

```python
import httpx
import asyncio

async def fetch_single():
    &#34;&#34;&#34;單一請求&#34;&#34;&#34;
    async with httpx.AsyncClient() as client:
        response = await client.get(&#34;https://api.example.com/data&#34;)
        return response.json()

async def fetch_multiple():
    &#34;&#34;&#34;並行多個請求&#34;&#34;&#34;
    async with httpx.AsyncClient() as client:
        # 同時發送多個請求
        responses = await asyncio.gather(
            client.get(&#34;https://api.example.com/users&#34;),
            client.get(&#34;https://api.example.com/posts&#34;),
            client.get(&#34;https://api.example.com/comments&#34;),
        )

        return [r.json() for r in responses]
```

### 設定超時和重試

```python
import httpx
from httpx import Timeout, Limits

# 設定超時
timeout = Timeout(
    connect=5.0,    # 連線超時
    read=30.0,      # 讀取超時
    write=5.0,      # 寫入超時
    pool=5.0        # 連線池等待超時
)

# 設定連線限制
limits = Limits(
    max_connections=100,      # 最大連線數
    max_keepalive_connections=20  # 保持連線數
)

async def main():
    async with httpx.AsyncClient(
        timeout=timeout,
        limits=limits
    ) as client:
        response = await client.get(&#34;https://api.example.com/data&#34;)
```

---

## 🔧 批次並行請求

### 使用 asyncio.gather

```python
import httpx
import asyncio

async def fetch_url(
    client: httpx.AsyncClient,
    url: str
) -&gt; dict:
    &#34;&#34;&#34;取得單一 URL&#34;&#34;&#34;
    try:
        response = await client.get(url)
        response.raise_for_status()
        return {&#34;url&#34;: url, &#34;data&#34;: response.json(), &#34;error&#34;: None}
    except Exception as e:
        return {&#34;url&#34;: url, &#34;data&#34;: None, &#34;error&#34;: str(e)}

async def fetch_all(urls: list[str]) -&gt; list[dict]:
    &#34;&#34;&#34;並行取得多個 URL&#34;&#34;&#34;
    async with httpx.AsyncClient() as client:
        tasks = [fetch_url(client, url) for url in urls]
        return await asyncio.gather(*tasks)

async def main():
    urls = [
        &#34;https://jsonplaceholder.typicode.com/posts/1&#34;,
        &#34;https://jsonplaceholder.typicode.com/posts/2&#34;,
        &#34;https://jsonplaceholder.typicode.com/posts/3&#34;,
    ]

    results = await fetch_all(urls)
    for result in results:
        if result[&#34;error&#34;]:
            print(f&#34;❌ {result[&#39;url&#39;]}: {result[&#39;error&#39;]}&#34;)
        else:
            print(f&#34;✅ {result[&#39;url&#39;]}: {result[&#39;data&#39;][&#39;title&#39;]}&#34;)

asyncio.run(main())
```

### 限制並行數量

```python
import httpx
import asyncio

class RateLimitedClient:
    &#34;&#34;&#34;限制並行數量的 HTTP 客戶端&#34;&#34;&#34;

    def __init__(self, max_concurrent: int = 10):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.client = httpx.AsyncClient()

    async def __aenter__(self):
        return self

    async def __aexit__(self, *args):
        await self.client.aclose()

    async def get(self, url: str) -&gt; httpx.Response:
        async with self.semaphore:
            return await self.client.get(url)

    async def fetch_all(self, urls: list[str]) -&gt; list[dict]:
        tasks = [self._fetch_one(url) for url in urls]
        return await asyncio.gather(*tasks)

    async def _fetch_one(self, url: str) -&gt; dict:
        try:
            response = await self.get(url)
            return {&#34;url&#34;: url, &#34;status&#34;: response.status_code}
        except Exception as e:
            return {&#34;url&#34;: url, &#34;error&#34;: str(e)}

async def main():
    urls = [f&#34;https://httpbin.org/get?id={i}&#34; for i in range(100)]

    async with RateLimitedClient(max_concurrent=10) as client:
        results = await client.fetch_all(urls)
        print(f&#34;完成 {len(results)} 個請求&#34;)
```

---

## 📊 FastAPI 中的並行請求

### 聚合多個 API

```python
from fastapi import FastAPI, HTTPException
import httpx
import asyncio

app = FastAPI()

# 共享的 HTTP 客戶端
http_client: httpx.AsyncClient = None

@app.on_event(&#34;startup&#34;)
async def startup():
    global http_client
    http_client = httpx.AsyncClient()

@app.on_event(&#34;shutdown&#34;)
async def shutdown():
    await http_client.aclose()


async def fetch_user(user_id: int) -&gt; dict:
    &#34;&#34;&#34;從 User 服務取得使用者&#34;&#34;&#34;
    response = await http_client.get(
        f&#34;https://api.example.com/users/{user_id}&#34;
    )
    return response.json()

async def fetch_orders(user_id: int) -&gt; list:
    &#34;&#34;&#34;從 Order 服務取得訂單&#34;&#34;&#34;
    response = await http_client.get(
        f&#34;https://api.example.com/users/{user_id}/orders&#34;
    )
    return response.json()

async def fetch_notifications(user_id: int) -&gt; list:
    &#34;&#34;&#34;從 Notification 服務取得通知&#34;&#34;&#34;
    response = await http_client.get(
        f&#34;https://api.example.com/users/{user_id}/notifications&#34;
    )
    return response.json()


@app.get(&#34;/dashboard/{user_id}&#34;)
async def get_dashboard(user_id: int):
    &#34;&#34;&#34;
    聚合多個服務的資料

    並行呼叫 User、Order、Notification 服務
    &#34;&#34;&#34;
    try:
        # 並行取得所有資料
        user, orders, notifications = await asyncio.gather(
            fetch_user(user_id),
            fetch_orders(user_id),
            fetch_notifications(user_id),
        )

        return {
            &#34;user&#34;: user,
            &#34;orders&#34;: orders,
            &#34;notifications&#34;: notifications
        }
    except httpx.HTTPError as e:
        raise HTTPException(status_code=502, detail=f&#34;Service error: {e}&#34;)
```

### 部分失敗處理

```python
@app.get(&#34;/dashboard/{user_id}&#34;)
async def get_dashboard_safe(user_id: int):
    &#34;&#34;&#34;
    聚合多個服務，允許部分失敗
    &#34;&#34;&#34;
    results = await asyncio.gather(
        fetch_user(user_id),
        fetch_orders(user_id),
        fetch_notifications(user_id),
        return_exceptions=True  # 不會因為一個失敗而全部失敗
    )

    user, orders, notifications = results

    return {
        &#34;user&#34;: user if not isinstance(user, Exception) else None,
        &#34;orders&#34;: orders if not isinstance(orders, Exception) else [],
        &#34;notifications&#34;: (
            notifications
            if not isinstance(notifications, Exception)
            else []
        ),
        &#34;errors&#34;: [
            str(r) for r in results if isinstance(r, Exception)
        ]
    }
```

---

## 🔄 重試機制

### 簡單重試

```python
import httpx
import asyncio

async def fetch_with_retry(
    client: httpx.AsyncClient,
    url: str,
    max_retries: int = 3,
    delay: float = 1.0
) -&gt; dict:
    &#34;&#34;&#34;帶重試的請求&#34;&#34;&#34;
    last_exception = None

    for attempt in range(max_retries):
        try:
            response = await client.get(url)
            response.raise_for_status()
            return response.json()
        except (httpx.HTTPError, httpx.RequestError) as e:
            last_exception = e
            if attempt &lt; max_retries - 1:
                # 指數退避
                wait_time = delay * (2 ** attempt)
                await asyncio.sleep(wait_time)

    raise last_exception
```

### 使用 tenacity 庫

```python
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type
)
import httpx

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=10),
    retry=retry_if_exception_type(httpx.HTTPError)
)
async def fetch_with_tenacity(url: str) -&gt; dict:
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
        response.raise_for_status()
        return response.json()
```

---

## 📝 實戰範例：API 網關

```python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import httpx
import asyncio
from typing import Optional, Any
from dataclasses import dataclass
from enum import Enum

class ServiceStatus(Enum):
    OK = &#34;ok&#34;
    ERROR = &#34;error&#34;
    TIMEOUT = &#34;timeout&#34;

@dataclass
class ServiceResult:
    service: str
    status: ServiceStatus
    data: Optional[Any] = None
    error: Optional[str] = None

class APIGateway:
    &#34;&#34;&#34;API 網關&#34;&#34;&#34;

    def __init__(self):
        self.client: Optional[httpx.AsyncClient] = None
        self.services = {
            &#34;user&#34;: &#34;https://user-service.internal&#34;,
            &#34;order&#34;: &#34;https://order-service.internal&#34;,
            &#34;product&#34;: &#34;https://product-service.internal&#34;,
            &#34;inventory&#34;: &#34;https://inventory-service.internal&#34;,
        }

    async def startup(self):
        self.client = httpx.AsyncClient(timeout=10.0)

    async def shutdown(self):
        if self.client:
            await self.client.aclose()

    async def call_service(
        self,
        service_name: str,
        path: str,
        timeout: float = 5.0
    ) -&gt; ServiceResult:
        &#34;&#34;&#34;呼叫單一服務&#34;&#34;&#34;
        if service_name not in self.services:
            return ServiceResult(
                service=service_name,
                status=ServiceStatus.ERROR,
                error=&#34;Unknown service&#34;
            )

        url = f&#34;{self.services[service_name]}{path}&#34;

        try:
            response = await asyncio.wait_for(
                self.client.get(url),
                timeout=timeout
            )
            response.raise_for_status()
            return ServiceResult(
                service=service_name,
                status=ServiceStatus.OK,
                data=response.json()
            )
        except asyncio.TimeoutError:
            return ServiceResult(
                service=service_name,
                status=ServiceStatus.TIMEOUT,
                error=&#34;Request timeout&#34;
            )
        except Exception as e:
            return ServiceResult(
                service=service_name,
                status=ServiceStatus.ERROR,
                error=str(e)
            )

    async def aggregate(
        self,
        requests: dict[str, str]
    ) -&gt; dict[str, ServiceResult]:
        &#34;&#34;&#34;聚合多個服務請求&#34;&#34;&#34;
        tasks = {
            name: self.call_service(name, path)
            for name, path in requests.items()
        }

        results = await asyncio.gather(*tasks.values())

        return dict(zip(tasks.keys(), results))


# FastAPI 應用
app = FastAPI()
gateway = APIGateway()

@app.on_event(&#34;startup&#34;)
async def startup():
    await gateway.startup()

@app.on_event(&#34;shutdown&#34;)
async def shutdown():
    await gateway.shutdown()

@app.get(&#34;/api/product/{product_id}&#34;)
async def get_product_details(product_id: int):
    &#34;&#34;&#34;取得產品詳情（聚合多個服務）&#34;&#34;&#34;
    results = await gateway.aggregate({
        &#34;product&#34;: f&#34;/products/{product_id}&#34;,
        &#34;inventory&#34;: f&#34;/products/{product_id}/stock&#34;,
        &#34;order&#34;: f&#34;/products/{product_id}/sales&#34;,
    })

    # 組合結果
    response = {
        &#34;product&#34;: None,
        &#34;stock&#34;: None,
        &#34;sales&#34;: None,
        &#34;errors&#34;: []
    }

    for name, result in results.items():
        if result.status == ServiceStatus.OK:
            if name == &#34;product&#34;:
                response[&#34;product&#34;] = result.data
            elif name == &#34;inventory&#34;:
                response[&#34;stock&#34;] = result.data
            elif name == &#34;order&#34;:
                response[&#34;sales&#34;] = result.data
        else:
            response[&#34;errors&#34;].append({
                &#34;service&#34;: name,
                &#34;error&#34;: result.error
            })

    return response
```

---

## ✅ 重點總結

### 並行方式

| 方式 | 使用場景 |
|------|----------|
| `asyncio.gather()` | 等待所有完成 |
| `asyncio.wait()` | 第一個完成或失敗 |
| `asyncio.as_completed()` | 按完成順序處理 |

### 最佳實踐

1. **重用 Client**：避免頻繁建立連線
2. **限制並行**：使用 Semaphore 防止過載
3. **設定超時**：避免無限等待
4. **處理部分失敗**：`return_exceptions=True`
5. **實作重試**：指數退避策略

---

## 🎤 面試這樣答

### Q: 如何在 FastAPI 中並行呼叫多個外部 API？

**答案：**

&gt; 使用 `asyncio.gather()` 並行發送請求：
&gt;
&gt; ```python
&gt; async def get_dashboard(user_id: int):
&gt;     user, orders, notifications = await asyncio.gather(
&gt;         fetch_user(user_id),
&gt;         fetch_orders(user_id),
&gt;         fetch_notifications(user_id),
&gt;         return_exceptions=True  # 部分失敗處理
&gt;     )
&gt;     return {&#34;user&#34;: user, &#34;orders&#34;: orders}
&gt; ```
&gt;
&gt; 重點：
&gt; 1. 共享 httpx.AsyncClient 避免重複建立連線
&gt; 2. 使用 `return_exceptions=True` 處理部分失敗
&gt; 3. 設定適當的超時時間

---

**上一篇：** [05-3. 非同步上下文管理器](./05-3)
**下一篇：** [05-5. WebSocket 基礎](./05-5)

---

最後更新：2025-12-17


---

> 作者: luk  
> URL: https://yoru-karu-blog-lalaluk-52581ac5e0cef170a3c8922c19182ecb6f7bd604.gitlab.io/posts/tutorial/fastapi/05-4/  

