# 

# 05-6. Server-Sent Events (SSE)

&gt; ⏱️ **閱讀時間：** 15 分鐘
&gt; 🎯 **難度：** ⭐⭐⭐ (進階)

---

## 🤔 一句話解釋

**Server-Sent Events (SSE) 是伺服器單向推送資料給客戶端的技術，比 WebSocket 更簡單，適合即時通知場景。**

---

## 🔄 WebSocket vs SSE

```
WebSocket（雙向通訊）：
┌────────┐ ◀═══════▶ ┌────────┐
│ Client │   雙向    │ Server │
└────────┘           └────────┘
客戶端和伺服器都可以主動發送

SSE（單向推送）：
┌────────┐ ◀──────── ┌────────┐
│ Client │   單向    │ Server │
└────────┘           └────────┘
只有伺服器可以推送
```

### 比較表

| 特性 | WebSocket | SSE |
|------|-----------|-----|
| **方向** | 雙向 | 單向（伺服器 → 客戶端）|
| **協議** | ws:// | http:// |
| **複雜度** | 較高 | 較低 |
| **自動重連** | 需自行實作 | 瀏覽器內建 |
| **二進制** | 支援 | 不支援（純文字）|
| **適用場景** | 聊天、遊戲 | 通知、即時更新 |

---

## 🎯 適用場景

| 場景 | 說明 |
|------|------|
| **即時通知** | 新訊息、系統警告 |
| **股票報價** | 價格即時更新 |
| **進度更新** | 檔案上傳、任務進度 |
| **社群動態** | 新貼文、新留言 |
| **日誌串流** | 即時日誌輸出 |

---

## 📦 FastAPI SSE 基礎

### 基本範例

```python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio

app = FastAPI()

async def event_generator():
    &#34;&#34;&#34;產生 SSE 事件&#34;&#34;&#34;
    count = 0
    while True:
        count &#43;= 1
        # SSE 格式：data: 內容\n\n
        yield f&#34;data: 訊息 {count}\n\n&#34;
        await asyncio.sleep(1)

@app.get(&#34;/events&#34;)
async def sse_endpoint():
    return StreamingResponse(
        event_generator(),
        media_type=&#34;text/event-stream&#34;,
        headers={
            &#34;Cache-Control&#34;: &#34;no-cache&#34;,
            &#34;Connection&#34;: &#34;keep-alive&#34;,
        }
    )
```

### 客戶端接收

```javascript
// 瀏覽器 JavaScript
const eventSource = new EventSource(&#34;/events&#34;);

eventSource.onmessage = (event) =&gt; {
    console.log(&#34;收到:&#34;, event.data);
};

eventSource.onerror = (error) =&gt; {
    console.error(&#34;連線錯誤:&#34;, error);
};

// 關閉連線
// eventSource.close();
```

---

## 🔧 SSE 訊息格式

### 標準格式

```python
async def event_generator():
    # 基本訊息
    yield &#34;data: Hello\n\n&#34;

    # 多行訊息
    yield &#34;data: 第一行\n&#34;
    yield &#34;data: 第二行\n\n&#34;

    # 指定事件類型
    yield &#34;event: notification\n&#34;
    yield &#34;data: 新通知\n\n&#34;

    # 指定 ID（用於重連）
    yield &#34;id: 123\n&#34;
    yield &#34;data: 訊息內容\n\n&#34;

    # 設定重試間隔（毫秒）
    yield &#34;retry: 5000\n&#34;
    yield &#34;data: 訊息\n\n&#34;
```

### 客戶端處理不同事件

```javascript
const eventSource = new EventSource(&#34;/events&#34;);

// 預設訊息（沒有 event 欄位）
eventSource.onmessage = (event) =&gt; {
    console.log(&#34;預設:&#34;, event.data);
};

// 自訂事件類型
eventSource.addEventListener(&#34;notification&#34;, (event) =&gt; {
    console.log(&#34;通知:&#34;, event.data);
});

eventSource.addEventListener(&#34;update&#34;, (event) =&gt; {
    console.log(&#34;更新:&#34;, event.data);
});
```

---

## 📊 實用範例

### JSON 資料推送

```python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from datetime import datetime
import asyncio
import json

app = FastAPI()

class StockPrice(BaseModel):
    symbol: str
    price: float
    timestamp: str

async def stock_generator(symbol: str):
    &#34;&#34;&#34;模擬股票價格推送&#34;&#34;&#34;
    import random

    base_price = 100.0

    while True:
        # 模擬價格波動
        change = random.uniform(-1, 1)
        base_price &#43;= change

        stock = StockPrice(
            symbol=symbol,
            price=round(base_price, 2),
            timestamp=datetime.utcnow().isoformat()
        )

        yield f&#34;data: {stock.model_dump_json()}\n\n&#34;
        await asyncio.sleep(1)

@app.get(&#34;/stocks/{symbol}&#34;)
async def stock_stream(symbol: str):
    return StreamingResponse(
        stock_generator(symbol),
        media_type=&#34;text/event-stream&#34;
    )
```

### 任務進度更新

```python
from fastapi import FastAPI, BackgroundTasks
from fastapi.responses import StreamingResponse
import asyncio
from typing import Dict
import uuid

app = FastAPI()

# 儲存任務狀態
task_progress: Dict[str, dict] = {}

async def long_running_task(task_id: str):
    &#34;&#34;&#34;模擬長時間任務&#34;&#34;&#34;
    for i in range(1, 101):
        task_progress[task_id] = {
            &#34;progress&#34;: i,
            &#34;status&#34;: &#34;processing&#34; if i &lt; 100 else &#34;completed&#34;
        }
        await asyncio.sleep(0.1)

@app.post(&#34;/tasks&#34;)
async def create_task(background_tasks: BackgroundTasks):
    &#34;&#34;&#34;建立新任務&#34;&#34;&#34;
    task_id = str(uuid.uuid4())
    task_progress[task_id] = {&#34;progress&#34;: 0, &#34;status&#34;: &#34;pending&#34;}

    background_tasks.add_task(long_running_task, task_id)

    return {&#34;task_id&#34;: task_id}

async def progress_generator(task_id: str):
    &#34;&#34;&#34;產生進度事件&#34;&#34;&#34;
    last_progress = -1

    while True:
        if task_id not in task_progress:
            yield f&#34;data: {json.dumps({&#39;error&#39;: &#39;Task not found&#39;})}\n\n&#34;
            break

        current = task_progress[task_id]

        # 只在進度改變時推送
        if current[&#34;progress&#34;] != last_progress:
            yield f&#34;data: {json.dumps(current)}\n\n&#34;
            last_progress = current[&#34;progress&#34;]

        if current[&#34;status&#34;] == &#34;completed&#34;:
            break

        await asyncio.sleep(0.1)

@app.get(&#34;/tasks/{task_id}/progress&#34;)
async def task_progress_stream(task_id: str):
    return StreamingResponse(
        progress_generator(task_id),
        media_type=&#34;text/event-stream&#34;
    )
```

### 即時日誌串流

```python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from collections import deque
import asyncio
import json
from datetime import datetime

app = FastAPI()

# 日誌緩衝區
log_buffer: deque = deque(maxlen=1000)
log_subscribers: list = []

class LogEntry:
    def __init__(self, level: str, message: str):
        self.level = level
        self.message = message
        self.timestamp = datetime.utcnow().isoformat()

    def to_dict(self):
        return {
            &#34;level&#34;: self.level,
            &#34;message&#34;: self.message,
            &#34;timestamp&#34;: self.timestamp
        }

def add_log(level: str, message: str):
    &#34;&#34;&#34;添加日誌&#34;&#34;&#34;
    entry = LogEntry(level, message)
    log_buffer.append(entry)

    # 通知所有訂閱者
    for queue in log_subscribers:
        queue.put_nowait(entry)

async def log_generator():
    &#34;&#34;&#34;產生日誌事件&#34;&#34;&#34;
    queue = asyncio.Queue()
    log_subscribers.append(queue)

    try:
        # 先發送歷史日誌
        for entry in list(log_buffer):
            yield f&#34;data: {json.dumps(entry.to_dict())}\n\n&#34;

        # 等待新日誌
        while True:
            entry = await queue.get()
            yield f&#34;data: {json.dumps(entry.to_dict())}\n\n&#34;
    finally:
        log_subscribers.remove(queue)

@app.get(&#34;/logs&#34;)
async def log_stream():
    return StreamingResponse(
        log_generator(),
        media_type=&#34;text/event-stream&#34;
    )

@app.post(&#34;/logs&#34;)
async def create_log(level: str, message: str):
    add_log(level, message)
    return {&#34;status&#34;: &#34;ok&#34;}
```

---

## 🔐 認證

### Token 認證

```python
from fastapi import FastAPI, Query, HTTPException
from fastapi.responses import StreamingResponse

app = FastAPI()

def verify_token(token: str) -&gt; bool:
    &#34;&#34;&#34;驗證 token&#34;&#34;&#34;
    # 實際應用中要驗證 JWT 或 session
    return token == &#34;valid_token&#34;

async def authenticated_generator(user_id: str):
    &#34;&#34;&#34;認證後的事件產生器&#34;&#34;&#34;
    count = 0
    while True:
        count &#43;= 1
        yield f&#34;data: User {user_id} 的訊息 {count}\n\n&#34;
        await asyncio.sleep(1)

@app.get(&#34;/secure-events&#34;)
async def secure_sse(token: str = Query(...)):
    if not verify_token(token):
        raise HTTPException(status_code=401, detail=&#34;Invalid token&#34;)

    return StreamingResponse(
        authenticated_generator(&#34;user123&#34;),
        media_type=&#34;text/event-stream&#34;
    )
```

### 客戶端帶 Token

```javascript
// 方法一：Query Parameter
const eventSource = new EventSource(&#34;/secure-events?token=valid_token&#34;);

// 方法二：使用 fetch &#43; ReadableStream
async function connectSSE() {
    const response = await fetch(&#34;/secure-events&#34;, {
        headers: {
            &#34;Authorization&#34;: &#34;Bearer your_token&#34;
        }
    });

    const reader = response.body.getReader();
    const decoder = new TextDecoder();

    while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        const text = decoder.decode(value);
        console.log(&#34;收到:&#34;, text);
    }
}
```

---

## 💓 心跳機制

```python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio

app = FastAPI()

async def heartbeat_generator():
    &#34;&#34;&#34;帶心跳的事件產生器&#34;&#34;&#34;
    heartbeat_interval = 15  # 秒
    last_event_time = asyncio.get_event_loop().time()

    while True:
        current_time = asyncio.get_event_loop().time()

        # 檢查是否需要發送心跳
        if current_time - last_event_time &gt;= heartbeat_interval:
            yield &#34;: heartbeat\n\n&#34;  # 註解格式的心跳
            last_event_time = current_time

        # 檢查是否有實際事件
        event = await check_for_events()
        if event:
            yield f&#34;data: {event}\n\n&#34;
            last_event_time = current_time

        await asyncio.sleep(1)

async def check_for_events():
    &#34;&#34;&#34;檢查是否有新事件&#34;&#34;&#34;
    # 實際應用中從 Redis/資料庫檢查
    import random
    if random.random() &lt; 0.1:
        return &#34;新事件&#34;
    return None

@app.get(&#34;/events&#34;)
async def sse_with_heartbeat():
    return StreamingResponse(
        heartbeat_generator(),
        media_type=&#34;text/event-stream&#34;
    )
```

---

## 📝 完整通知系統

```python
from fastapi import FastAPI, Query
from fastapi.responses import StreamingResponse, HTMLResponse
from pydantic import BaseModel
from typing import Dict, List
import asyncio
import json
from datetime import datetime
import uuid

app = FastAPI()

class Notification(BaseModel):
    id: str
    type: str
    title: str
    message: str
    timestamp: str

class NotificationManager:
    def __init__(self):
        self.subscribers: Dict[str, asyncio.Queue] = {}

    async def subscribe(self, user_id: str):
        &#34;&#34;&#34;訂閱通知&#34;&#34;&#34;
        queue = asyncio.Queue()
        self.subscribers[user_id] = queue
        return queue

    def unsubscribe(self, user_id: str):
        &#34;&#34;&#34;取消訂閱&#34;&#34;&#34;
        self.subscribers.pop(user_id, None)

    async def notify(self, user_id: str, notification: Notification):
        &#34;&#34;&#34;發送通知給特定用戶&#34;&#34;&#34;
        if user_id in self.subscribers:
            await self.subscribers[user_id].put(notification)

    async def broadcast(self, notification: Notification):
        &#34;&#34;&#34;廣播通知給所有用戶&#34;&#34;&#34;
        for queue in self.subscribers.values():
            await queue.put(notification)

manager = NotificationManager()

async def notification_generator(user_id: str):
    &#34;&#34;&#34;產生通知事件&#34;&#34;&#34;
    queue = await manager.subscribe(user_id)

    try:
        while True:
            notification = await queue.get()
            yield f&#34;event: notification\n&#34;
            yield f&#34;id: {notification.id}\n&#34;
            yield f&#34;data: {notification.model_dump_json()}\n\n&#34;
    finally:
        manager.unsubscribe(user_id)

@app.get(&#34;/notifications/{user_id}&#34;)
async def notification_stream(user_id: str):
    return StreamingResponse(
        notification_generator(user_id),
        media_type=&#34;text/event-stream&#34;
    )

@app.post(&#34;/notifications/{user_id}&#34;)
async def send_notification(user_id: str, title: str, message: str):
    &#34;&#34;&#34;發送通知給特定用戶&#34;&#34;&#34;
    notification = Notification(
        id=str(uuid.uuid4()),
        type=&#34;info&#34;,
        title=title,
        message=message,
        timestamp=datetime.utcnow().isoformat()
    )
    await manager.notify(user_id, notification)
    return {&#34;status&#34;: &#34;sent&#34;}

@app.post(&#34;/notifications/broadcast&#34;)
async def broadcast_notification(title: str, message: str):
    &#34;&#34;&#34;廣播通知&#34;&#34;&#34;
    notification = Notification(
        id=str(uuid.uuid4()),
        type=&#34;broadcast&#34;,
        title=title,
        message=message,
        timestamp=datetime.utcnow().isoformat()
    )
    await manager.broadcast(notification)
    return {&#34;status&#34;: &#34;broadcasted&#34;}

# HTML 前端
@app.get(&#34;/&#34;)
async def get_page():
    return HTMLResponse(&#34;&#34;&#34;
    &lt;!DOCTYPE html&gt;
    &lt;html&gt;
    &lt;head&gt;
        &lt;title&gt;SSE 通知&lt;/title&gt;
        &lt;style&gt;
            .notification {
                padding: 10px;
                margin: 5px;
                border: 1px solid #ccc;
                border-radius: 5px;
            }
            .info { background-color: #e3f2fd; }
            .broadcast { background-color: #fff3e0; }
        &lt;/style&gt;
    &lt;/head&gt;
    &lt;body&gt;
        &lt;h1&gt;SSE 通知系統&lt;/h1&gt;
        &lt;div id=&#34;notifications&#34;&gt;&lt;/div&gt;

        &lt;script&gt;
            const userId = &#34;user_&#34; &#43; Math.random().toString(36).substr(2, 9);
            const eventSource = new EventSource(`/notifications/${userId}`);

            eventSource.addEventListener(&#34;notification&#34;, (event) =&gt; {
                const data = JSON.parse(event.data);
                const div = document.createElement(&#34;div&#34;);
                div.className = `notification ${data.type}`;
                div.innerHTML = `
                    &lt;strong&gt;${data.title}&lt;/strong&gt;
                    &lt;p&gt;${data.message}&lt;/p&gt;
                    &lt;small&gt;${data.timestamp}&lt;/small&gt;
                `;
                document.getElementById(&#34;notifications&#34;).prepend(div);
            });

            eventSource.onerror = () =&gt; {
                console.log(&#34;連線中斷，嘗試重連...&#34;);
            };
        &lt;/script&gt;
    &lt;/body&gt;
    &lt;/html&gt;
    &#34;&#34;&#34;)
```

---

## ✅ 重點總結

### SSE 格式

| 欄位 | 說明 |
|------|------|
| `data:` | 訊息內容 |
| `event:` | 事件類型 |
| `id:` | 訊息 ID |
| `retry:` | 重連間隔（毫秒）|

### 最佳實踐

1. **設定適當的 Headers**
   - `Cache-Control: no-cache`
   - `Connection: keep-alive`
2. **實作心跳機制**：防止連線超時
3. **使用 ID**：支援斷線重連
4. **錯誤處理**：客戶端處理 onerror

---

## 🎤 面試這樣答

### Q: SSE 和 WebSocket 的差別？何時選擇 SSE？

**答案：**

&gt; **差別：**
&gt; - SSE 是單向的（伺服器 → 客戶端）
&gt; - WebSocket 是雙向的
&gt;
&gt; **選擇 SSE 的情況：**
&gt; 1. 只需要伺服器推送（通知、更新）
&gt; 2. 不需要客戶端發送大量訊息
&gt; 3. 希望簡化實作（SSE 更簡單）
&gt; 4. 需要自動重連（瀏覽器內建）
&gt;
&gt; ```python
&gt; @app.get(&#34;/events&#34;)
&gt; async def sse_endpoint():
&gt;     async def generator():
&gt;         while True:
&gt;             yield f&#34;data: {datetime.now()}\n\n&#34;
&gt;             await asyncio.sleep(1)
&gt;
&gt;     return StreamingResponse(
&gt;         generator(),
&gt;         media_type=&#34;text/event-stream&#34;
&gt;     )
&gt; ```

---

**上一篇：** [05-5. WebSocket 基礎](./05-5)
**下一篇：** [05-7. 背景任務](./05-7)

---

最後更新：2025-12-18


---

> 作者: luk  
> URL: https://yoru-karu-blog-lalaluk-52581ac5e0cef170a3c8922c19182ecb6f7bd604.gitlab.io/posts/tutorial/fastapi/05-6/  

