# 

# 05-5. WebSocket 基礎

&gt; ⏱️ **閱讀時間：** 18 分鐘
&gt; 🎯 **難度：** ⭐⭐⭐ (進階)

---

## 🤔 一句話解釋

**WebSocket 是雙向通訊協議，讓伺服器可以主動推送資料給客戶端，適合即時應用。**

---

## 🔄 HTTP vs WebSocket

```
HTTP（請求-回應模式）：
┌────────┐         ┌────────┐
│ Client │ ──請求──▶│ Server │
│        │ ◀─回應── │        │
│        │ ──請求──▶│        │
│        │ ◀─回應── │        │
└────────┘         └────────┘
每次通訊都需要新請求

WebSocket（雙向通訊）：
┌────────┐ ═══════ ┌────────┐
│ Client │◀═══════▶│ Server │
│        │  持久    │        │
│        │  連線    │        │
└────────┘ ═══════ └────────┘
一次握手，雙向即時通訊
```

---

## 🎯 使用場景

| 場景 | 說明 |
|------|------|
| **聊天室** | 即時訊息傳遞 |
| **即時通知** | 推播新消息 |
| **股票行情** | 即時價格更新 |
| **遊戲** | 即時狀態同步 |
| **協作編輯** | 多人同時編輯 |

---

## 📦 FastAPI WebSocket 基礎

### 最簡單的範例

```python
from fastapi import FastAPI, WebSocket

app = FastAPI()

@app.websocket(&#34;/ws&#34;)
async def websocket_endpoint(websocket: WebSocket):
    # 接受連線
    await websocket.accept()

    try:
        while True:
            # 接收訊息
            data = await websocket.receive_text()

            # 回傳訊息
            await websocket.send_text(f&#34;收到: {data}&#34;)
    except Exception:
        pass
    finally:
        await websocket.close()
```

### 客戶端測試

```javascript
// 瀏覽器 JavaScript
const ws = new WebSocket(&#34;ws://localhost:8000/ws&#34;);

ws.onopen = () =&gt; {
    console.log(&#34;連線成功&#34;);
    ws.send(&#34;Hello!&#34;);
};

ws.onmessage = (event) =&gt; {
    console.log(&#34;收到:&#34;, event.data);
};

ws.onclose = () =&gt; {
    console.log(&#34;連線關閉&#34;);
};
```

---

## 🔧 連線管理

### 連線管理器

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List

app = FastAPI()

class ConnectionManager:
    &#34;&#34;&#34;WebSocket 連線管理器&#34;&#34;&#34;

    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        &#34;&#34;&#34;接受新連線&#34;&#34;&#34;
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        &#34;&#34;&#34;移除連線&#34;&#34;&#34;
        self.active_connections.remove(websocket)

    async def send_personal_message(self, message: str, websocket: WebSocket):
        &#34;&#34;&#34;發送給單一用戶&#34;&#34;&#34;
        await websocket.send_text(message)

    async def broadcast(self, message: str):
        &#34;&#34;&#34;廣播給所有用戶&#34;&#34;&#34;
        for connection in self.active_connections:
            await connection.send_text(message)

manager = ConnectionManager()

@app.websocket(&#34;/ws/{client_id}&#34;)
async def websocket_endpoint(websocket: WebSocket, client_id: str):
    await manager.connect(websocket)
    await manager.broadcast(f&#34;{client_id} 加入聊天室&#34;)

    try:
        while True:
            data = await websocket.receive_text()
            await manager.broadcast(f&#34;{client_id}: {data}&#34;)
    except WebSocketDisconnect:
        manager.disconnect(websocket)
        await manager.broadcast(f&#34;{client_id} 離開聊天室&#34;)
```

### 房間管理

```python
from collections import defaultdict
from typing import Dict, Set

class RoomManager:
    &#34;&#34;&#34;聊天室管理器&#34;&#34;&#34;

    def __init__(self):
        self.rooms: Dict[str, Set[WebSocket]] = defaultdict(set)
        self.user_rooms: Dict[WebSocket, str] = {}

    async def join_room(
        self,
        websocket: WebSocket,
        room_id: str,
        username: str
    ):
        &#34;&#34;&#34;加入房間&#34;&#34;&#34;
        await websocket.accept()
        self.rooms[room_id].add(websocket)
        self.user_rooms[websocket] = room_id

        await self.broadcast_to_room(
            room_id,
            {&#34;type&#34;: &#34;join&#34;, &#34;user&#34;: username}
        )

    async def leave_room(self, websocket: WebSocket, username: str):
        &#34;&#34;&#34;離開房間&#34;&#34;&#34;
        room_id = self.user_rooms.get(websocket)
        if room_id:
            self.rooms[room_id].discard(websocket)
            del self.user_rooms[websocket]

            await self.broadcast_to_room(
                room_id,
                {&#34;type&#34;: &#34;leave&#34;, &#34;user&#34;: username}
            )

    async def broadcast_to_room(self, room_id: str, message: dict):
        &#34;&#34;&#34;廣播到房間&#34;&#34;&#34;
        import json
        for connection in self.rooms[room_id]:
            await connection.send_text(json.dumps(message))

room_manager = RoomManager()

@app.websocket(&#34;/ws/room/{room_id}&#34;)
async def room_websocket(
    websocket: WebSocket,
    room_id: str,
    username: str
):
    await room_manager.join_room(websocket, room_id, username)

    try:
        while True:
            data = await websocket.receive_json()
            data[&#34;user&#34;] = username
            await room_manager.broadcast_to_room(room_id, data)
    except WebSocketDisconnect:
        await room_manager.leave_room(websocket, username)
```

---

## 🔐 認證

### Token 認證

```python
from fastapi import WebSocket, WebSocketDisconnect, Query, HTTPException
from jose import JWTError, jwt

async def get_current_user_ws(
    websocket: WebSocket,
    token: str = Query(...)
) -&gt; dict:
    &#34;&#34;&#34;WebSocket 認證&#34;&#34;&#34;
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        return {&#34;user_id&#34;: payload[&#34;sub&#34;], &#34;username&#34;: payload[&#34;name&#34;]}
    except JWTError:
        await websocket.close(code=4001, reason=&#34;Invalid token&#34;)
        raise HTTPException(status_code=401)

@app.websocket(&#34;/ws/chat&#34;)
async def authenticated_websocket(
    websocket: WebSocket,
    token: str = Query(...)
):
    # 驗證 token
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        user_id = payload[&#34;sub&#34;]
    except JWTError:
        await websocket.close(code=4001, reason=&#34;Invalid token&#34;)
        return

    await manager.connect(websocket)

    try:
        while True:
            data = await websocket.receive_text()
            # 處理訊息...
    except WebSocketDisconnect:
        manager.disconnect(websocket)
```

### Cookie 認證

```python
from fastapi import Cookie, WebSocket

@app.websocket(&#34;/ws/secure&#34;)
async def secure_websocket(
    websocket: WebSocket,
    session: str = Cookie(None)
):
    if not session or not verify_session(session):
        await websocket.close(code=4001, reason=&#34;Unauthorized&#34;)
        return

    await websocket.accept()
    # ...
```

---

## 📊 訊息格式

### JSON 訊息

```python
from pydantic import BaseModel
from typing import Literal
import json

class WSMessage(BaseModel):
    type: Literal[&#34;message&#34;, &#34;typing&#34;, &#34;read&#34;]
    content: str | None = None
    room_id: str | None = None

@app.websocket(&#34;/ws/chat/{room_id}&#34;)
async def chat_websocket(websocket: WebSocket, room_id: str):
    await websocket.accept()

    try:
        while True:
            # 接收 JSON
            data = await websocket.receive_json()

            # 驗證訊息格式
            try:
                message = WSMessage(**data)
            except Exception:
                await websocket.send_json({
                    &#34;type&#34;: &#34;error&#34;,
                    &#34;content&#34;: &#34;Invalid message format&#34;
                })
                continue

            # 處理不同類型的訊息
            if message.type == &#34;message&#34;:
                await manager.broadcast_to_room(room_id, {
                    &#34;type&#34;: &#34;message&#34;,
                    &#34;content&#34;: message.content,
                    &#34;timestamp&#34;: datetime.utcnow().isoformat()
                })
            elif message.type == &#34;typing&#34;:
                await manager.broadcast_to_room(room_id, {
                    &#34;type&#34;: &#34;typing&#34;,
                    &#34;user&#34;: &#34;username&#34;
                })

    except WebSocketDisconnect:
        pass
```

---

## 💓 心跳機制

```python
import asyncio
from datetime import datetime, timedelta

class HeartbeatManager:
    &#34;&#34;&#34;心跳管理器&#34;&#34;&#34;

    def __init__(self, timeout: int = 30):
        self.timeout = timeout
        self.last_heartbeat: Dict[WebSocket, datetime] = {}

    async def start_heartbeat(self, websocket: WebSocket):
        &#34;&#34;&#34;開始心跳檢測&#34;&#34;&#34;
        self.last_heartbeat[websocket] = datetime.utcnow()

        while True:
            await asyncio.sleep(self.timeout / 2)

            # 檢查是否超時
            last = self.last_heartbeat.get(websocket)
            if not last:
                break

            if datetime.utcnow() - last &gt; timedelta(seconds=self.timeout):
                # 超時，關閉連線
                await websocket.close(code=1000, reason=&#34;Heartbeat timeout&#34;)
                break

            # 發送 ping
            try:
                await websocket.send_json({&#34;type&#34;: &#34;ping&#34;})
            except:
                break

    def update_heartbeat(self, websocket: WebSocket):
        &#34;&#34;&#34;更新心跳時間&#34;&#34;&#34;
        self.last_heartbeat[websocket] = datetime.utcnow()

    def remove(self, websocket: WebSocket):
        &#34;&#34;&#34;移除連線&#34;&#34;&#34;
        self.last_heartbeat.pop(websocket, None)

heartbeat_manager = HeartbeatManager()

@app.websocket(&#34;/ws/heartbeat&#34;)
async def websocket_with_heartbeat(websocket: WebSocket):
    await websocket.accept()

    # 啟動心跳任務
    heartbeat_task = asyncio.create_task(
        heartbeat_manager.start_heartbeat(websocket)
    )

    try:
        while True:
            data = await websocket.receive_json()

            if data.get(&#34;type&#34;) == &#34;pong&#34;:
                heartbeat_manager.update_heartbeat(websocket)
            else:
                # 處理其他訊息
                pass
    except WebSocketDisconnect:
        pass
    finally:
        heartbeat_task.cancel()
        heartbeat_manager.remove(websocket)
```

---

## 📝 完整聊天室範例

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
from typing import Dict, Set
from datetime import datetime
import json

app = FastAPI()

class ChatMessage(BaseModel):
    type: str
    content: str = &#34;&#34;
    username: str = &#34;&#34;
    timestamp: str = &#34;&#34;

class ChatRoom:
    def __init__(self):
        self.connections: Dict[str, WebSocket] = {}
        self.messages: list = []

    async def connect(self, websocket: WebSocket, username: str):
        await websocket.accept()
        self.connections[username] = websocket

        # 發送歷史訊息
        for msg in self.messages[-50:]:  # 最近 50 條
            await websocket.send_json(msg)

        # 廣播加入訊息
        await self.broadcast({
            &#34;type&#34;: &#34;system&#34;,
            &#34;content&#34;: f&#34;{username} 加入了聊天室&#34;,
            &#34;timestamp&#34;: datetime.utcnow().isoformat()
        })

    async def disconnect(self, username: str):
        self.connections.pop(username, None)
        await self.broadcast({
            &#34;type&#34;: &#34;system&#34;,
            &#34;content&#34;: f&#34;{username} 離開了聊天室&#34;,
            &#34;timestamp&#34;: datetime.utcnow().isoformat()
        })

    async def broadcast(self, message: dict):
        self.messages.append(message)

        # 保留最近 100 條訊息
        if len(self.messages) &gt; 100:
            self.messages = self.messages[-100:]

        for connection in self.connections.values():
            try:
                await connection.send_json(message)
            except:
                pass

    async def send_to_user(self, username: str, message: dict):
        ws = self.connections.get(username)
        if ws:
            await ws.send_json(message)

chat_room = ChatRoom()

@app.websocket(&#34;/ws/chat&#34;)
async def chat_endpoint(websocket: WebSocket, username: str):
    await chat_room.connect(websocket, username)

    try:
        while True:
            data = await websocket.receive_json()

            message = {
                &#34;type&#34;: &#34;message&#34;,
                &#34;content&#34;: data.get(&#34;content&#34;, &#34;&#34;),
                &#34;username&#34;: username,
                &#34;timestamp&#34;: datetime.utcnow().isoformat()
            }

            await chat_room.broadcast(message)

    except WebSocketDisconnect:
        await chat_room.disconnect(username)


# HTML 前端
@app.get(&#34;/&#34;)
async def get_chat_page():
    return HTMLResponse(&#34;&#34;&#34;
    &lt;!DOCTYPE html&gt;
    &lt;html&gt;
    &lt;head&gt;
        &lt;title&gt;聊天室&lt;/title&gt;
        &lt;style&gt;
            #messages { height: 400px; overflow-y: scroll; border: 1px solid #ccc; padding: 10px; }
            .system { color: gray; font-style: italic; }
            .message { margin: 5px 0; }
        &lt;/style&gt;
    &lt;/head&gt;
    &lt;body&gt;
        &lt;h1&gt;聊天室&lt;/h1&gt;
        &lt;div id=&#34;messages&#34;&gt;&lt;/div&gt;
        &lt;input type=&#34;text&#34; id=&#34;messageInput&#34; placeholder=&#34;輸入訊息...&#34;&gt;
        &lt;button onclick=&#34;sendMessage()&#34;&gt;發送&lt;/button&gt;

        &lt;script&gt;
            const username = prompt(&#34;請輸入你的名稱:&#34;);
            const ws = new WebSocket(`ws://localhost:8000/ws/chat?username=${username}`);

            ws.onmessage = (event) =&gt; {
                const data = JSON.parse(event.data);
                const messagesDiv = document.getElementById(&#34;messages&#34;);

                const msgDiv = document.createElement(&#34;div&#34;);
                msgDiv.className = data.type === &#34;system&#34; ? &#34;system&#34; : &#34;message&#34;;

                if (data.type === &#34;message&#34;) {
                    msgDiv.innerHTML = `&lt;strong&gt;${data.username}&lt;/strong&gt;: ${data.content}`;
                } else {
                    msgDiv.textContent = data.content;
                }

                messagesDiv.appendChild(msgDiv);
                messagesDiv.scrollTop = messagesDiv.scrollHeight;
            };

            function sendMessage() {
                const input = document.getElementById(&#34;messageInput&#34;);
                if (input.value) {
                    ws.send(JSON.stringify({ content: input.value }));
                    input.value = &#34;&#34;;
                }
            }

            document.getElementById(&#34;messageInput&#34;).addEventListener(&#34;keypress&#34;, (e) =&gt; {
                if (e.key === &#34;Enter&#34;) sendMessage();
            });
        &lt;/script&gt;
    &lt;/body&gt;
    &lt;/html&gt;
    &#34;&#34;&#34;)
```

---

## ✅ 重點總結

### WebSocket 生命週期

| 階段 | 方法 |
|------|------|
| 連線 | `await websocket.accept()` |
| 接收 | `await websocket.receive_text/json()` |
| 發送 | `await websocket.send_text/json()` |
| 關閉 | `await websocket.close()` |

### 最佳實踐

1. 使用連線管理器管理多個連線
2. 實作心跳機制檢測斷線
3. 驗證訊息格式
4. 處理 WebSocketDisconnect 異常
5. 實作認證機制

---

## 🎤 面試這樣答

### Q: WebSocket 和 HTTP 的差別？

**答案：**

&gt; **HTTP：**
&gt; - 單向通訊，客戶端發請求，伺服器回應
&gt; - 每次請求都要建立連線
&gt; - 無狀態
&gt;
&gt; **WebSocket：**
&gt; - 雙向通訊，伺服器可以主動推送
&gt; - 一次握手，持久連線
&gt; - 有狀態，適合即時應用
&gt;
&gt; ```python
&gt; @app.websocket(&#34;/ws&#34;)
&gt; async def websocket_endpoint(websocket: WebSocket):
&gt;     await websocket.accept()  # 建立連線
&gt;     while True:
&gt;         data = await websocket.receive_text()  # 接收
&gt;         await websocket.send_text(f&#34;Echo: {data}&#34;)  # 發送
&gt; ```

---

**上一篇：** [05-4. 並行請求處理](./05-4)
**下一篇：** [05-6. Server-Sent Events](./05-6)

---

最後更新：2025-12-17


---

> 作者: luk  
> URL: https://yoru-karu-blog-lalaluk-52581ac5e0cef170a3c8922c19182ecb6f7bd604.gitlab.io/posts/tutorial/fastapi/05-5/  

