# 

# 03-8. 多資料庫支援

&gt; ⏱️ **閱讀時間：** 15 分鐘
&gt; 🎯 **難度：** ⭐⭐⭐ (進階)

---

## 🤔 一句話解釋

**多資料庫支援讓你的應用可以同時連接多個資料庫，實現讀寫分離、分庫等架構。**

---

## 🎯 使用場景

```
┌─────────────────────────────────────────────────────────┐
│                    多資料庫場景                          │
├─────────────────────────────────────────────────────────┤
│  1. 讀寫分離                                             │
│     ┌────────┐         ┌────────┐                      │
│     │ Master │ ──複製──▶│ Slave  │                     │
│     │  寫入  │         │  讀取  │                      │
│     └────────┘         └────────┘                      │
├─────────────────────────────────────────────────────────┤
│  2. 多租戶                                               │
│     ┌────────┐   ┌────────┐   ┌────────┐              │
│     │ DB_A   │   │ DB_B   │   │ DB_C   │              │
│     │租戶 A  │   │租戶 B  │   │租戶 C  │              │
│     └────────┘   └────────┘   └────────┘              │
├─────────────────────────────────────────────────────────┤
│  3. 微服務整合                                           │
│     ┌────────┐   ┌────────┐   ┌────────┐              │
│     │ Users  │   │ Orders │   │Products│              │
│     │   DB   │   │   DB   │   │   DB   │              │
│     └────────┘   └────────┘   └────────┘              │
└─────────────────────────────────────────────────────────┘
```

---

## 🔧 基本設定

### 多個 Engine

```python
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

# 主資料庫（寫入）
master_engine = create_async_engine(
    &#34;postgresql&#43;asyncpg://user:pass@master-db:5432/mydb&#34;,
    pool_size=10,
    max_overflow=20
)

# 從資料庫（讀取）
slave_engine = create_async_engine(
    &#34;postgresql&#43;asyncpg://user:pass@slave-db:5432/mydb&#34;,
    pool_size=20,
    max_overflow=40
)

# Session 工廠
MasterSession = sessionmaker(
    master_engine,
    class_=AsyncSession,
    expire_on_commit=False
)

SlaveSession = sessionmaker(
    slave_engine,
    class_=AsyncSession,
    expire_on_commit=False
)
```

### 設定管理

```python
from pydantic_settings import BaseSettings
from typing import Optional

class DatabaseSettings(BaseSettings):
    # 主資料庫
    master_url: str
    master_pool_size: int = 10

    # 從資料庫（可選）
    slave_url: Optional[str] = None
    slave_pool_size: int = 20

    # 其他資料庫
    analytics_url: Optional[str] = None

    class Config:
        env_prefix = &#34;DB_&#34;

settings = DatabaseSettings()
```

---

## 📖 讀寫分離

### 基本實作

```python
from contextlib import asynccontextmanager
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession

class DatabaseManager:
    &#34;&#34;&#34;資料庫管理器&#34;&#34;&#34;

    def __init__(self, master_url: str, slave_url: str = None):
        self.master_engine = create_async_engine(master_url)
        self.slave_engine = create_async_engine(slave_url or master_url)

        self.master_session_factory = sessionmaker(
            self.master_engine,
            class_=AsyncSession,
            expire_on_commit=False
        )

        self.slave_session_factory = sessionmaker(
            self.slave_engine,
            class_=AsyncSession,
            expire_on_commit=False
        )

    @asynccontextmanager
    async def master(self) -&gt; AsyncGenerator[AsyncSession, None]:
        &#34;&#34;&#34;取得主資料庫 Session（用於寫入）&#34;&#34;&#34;
        async with self.master_session_factory() as session:
            yield session

    @asynccontextmanager
    async def slave(self) -&gt; AsyncGenerator[AsyncSession, None]:
        &#34;&#34;&#34;取得從資料庫 Session（用於讀取）&#34;&#34;&#34;
        async with self.slave_session_factory() as session:
            yield session

    async def close(self):
        &#34;&#34;&#34;關閉所有連線&#34;&#34;&#34;
        await self.master_engine.dispose()
        await self.slave_engine.dispose()


# 使用
db_manager = DatabaseManager(
    master_url=&#34;postgresql&#43;asyncpg://user:pass@master:5432/db&#34;,
    slave_url=&#34;postgresql&#43;asyncpg://user:pass@slave:5432/db&#34;
)

async def create_user(data):
    async with db_manager.master() as db:
        user = User(**data)
        db.add(user)
        await db.commit()
        return user

async def get_users():
    async with db_manager.slave() as db:
        result = await db.execute(select(User))
        return result.scalars().all()
```

### FastAPI 整合

```python
from fastapi import FastAPI, Depends
from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    # 啟動時
    app.state.db_manager = DatabaseManager(
        master_url=settings.master_url,
        slave_url=settings.slave_url
    )
    yield
    # 關閉時
    await app.state.db_manager.close()

app = FastAPI(lifespan=lifespan)

# 依賴項
async def get_write_db() -&gt; AsyncSession:
    async with app.state.db_manager.master() as session:
        yield session

async def get_read_db() -&gt; AsyncSession:
    async with app.state.db_manager.slave() as session:
        yield session

# API 端點
@app.post(&#34;/users&#34;)
async def create_user(
    user_data: UserCreate,
    db: AsyncSession = Depends(get_write_db)  # 使用主庫
):
    user = User(**user_data.dict())
    db.add(user)
    await db.commit()
    return user

@app.get(&#34;/users&#34;)
async def list_users(
    db: AsyncSession = Depends(get_read_db)  # 使用從庫
):
    result = await db.execute(select(User))
    return result.scalars().all()
```

---

## 🏢 多租戶架構

### Schema 分離

```python
from sqlalchemy import event
from sqlalchemy.orm import Session

class TenantManager:
    &#34;&#34;&#34;多租戶管理器&#34;&#34;&#34;

    def __init__(self, engine):
        self.engine = engine
        self.session_factory = sessionmaker(
            engine,
            class_=AsyncSession,
            expire_on_commit=False
        )

    @asynccontextmanager
    async def get_session(self, tenant_id: str) -&gt; AsyncGenerator[AsyncSession, None]:
        &#34;&#34;&#34;取得指定租戶的 Session&#34;&#34;&#34;
        async with self.session_factory() as session:
            # 設定 search_path 到租戶的 schema
            await session.execute(
                text(f&#34;SET search_path TO tenant_{tenant_id}, public&#34;)
            )
            yield session


# 使用
tenant_manager = TenantManager(engine)

async def get_tenant_users(tenant_id: str):
    async with tenant_manager.get_session(tenant_id) as db:
        result = await db.execute(select(User))
        return result.scalars().all()
```

### 資料庫分離

```python
from typing import Dict

class MultiTenantManager:
    &#34;&#34;&#34;多資料庫租戶管理器&#34;&#34;&#34;

    def __init__(self):
        self.engines: Dict[str, AsyncEngine] = {}
        self.session_factories: Dict[str, sessionmaker] = {}

    def register_tenant(self, tenant_id: str, database_url: str):
        &#34;&#34;&#34;註冊租戶資料庫&#34;&#34;&#34;
        engine = create_async_engine(database_url)
        self.engines[tenant_id] = engine
        self.session_factories[tenant_id] = sessionmaker(
            engine,
            class_=AsyncSession,
            expire_on_commit=False
        )

    @asynccontextmanager
    async def get_session(self, tenant_id: str) -&gt; AsyncGenerator[AsyncSession, None]:
        &#34;&#34;&#34;取得租戶的 Session&#34;&#34;&#34;
        if tenant_id not in self.session_factories:
            raise ValueError(f&#34;Tenant {tenant_id} not found&#34;)

        async with self.session_factories[tenant_id]() as session:
            yield session

    async def close_all(self):
        &#34;&#34;&#34;關閉所有連線&#34;&#34;&#34;
        for engine in self.engines.values():
            await engine.dispose()


# FastAPI 整合
tenant_manager = MultiTenantManager()

# 註冊租戶
tenant_manager.register_tenant(&#34;tenant_a&#34;, &#34;postgresql&#43;asyncpg://...@db-a/db&#34;)
tenant_manager.register_tenant(&#34;tenant_b&#34;, &#34;postgresql&#43;asyncpg://...@db-b/db&#34;)

# 從請求取得租戶 ID
async def get_tenant_id(request: Request) -&gt; str:
    # 可以從 header、subdomain、路徑等取得
    tenant_id = request.headers.get(&#34;X-Tenant-ID&#34;)
    if not tenant_id:
        raise HTTPException(status_code=400, detail=&#34;Tenant ID required&#34;)
    return tenant_id

async def get_tenant_db(
    tenant_id: str = Depends(get_tenant_id)
) -&gt; AsyncSession:
    async with tenant_manager.get_session(tenant_id) as session:
        yield session

@app.get(&#34;/users&#34;)
async def list_users(db: AsyncSession = Depends(get_tenant_db)):
    result = await db.execute(select(User))
    return result.scalars().all()
```

---

## 🔄 跨資料庫查詢

### 基本方式

```python
async def get_user_with_orders(user_id: int):
    &#34;&#34;&#34;跨資料庫查詢&#34;&#34;&#34;
    # 從使用者資料庫取得使用者
    async with user_db_manager.session() as user_db:
        user = await user_db.get(User, user_id)

    # 從訂單資料庫取得訂單
    async with order_db_manager.session() as order_db:
        result = await order_db.execute(
            select(Order).where(Order.user_id == user_id)
        )
        orders = result.scalars().all()

    return {
        &#34;user&#34;: user,
        &#34;orders&#34;: orders
    }
```

### 使用 Service 層封裝

```python
class UserOrderService:
    &#34;&#34;&#34;跨資料庫服務&#34;&#34;&#34;

    def __init__(
        self,
        user_db: AsyncSession,
        order_db: AsyncSession
    ):
        self.user_db = user_db
        self.order_db = order_db

    async def get_user_dashboard(self, user_id: int):
        # 並行查詢
        import asyncio

        user_task = self._get_user(user_id)
        orders_task = self._get_user_orders(user_id)
        stats_task = self._get_user_stats(user_id)

        user, orders, stats = await asyncio.gather(
            user_task, orders_task, stats_task
        )

        return {
            &#34;user&#34;: user,
            &#34;orders&#34;: orders,
            &#34;stats&#34;: stats
        }

    async def _get_user(self, user_id: int):
        return await self.user_db.get(User, user_id)

    async def _get_user_orders(self, user_id: int):
        result = await self.order_db.execute(
            select(Order)
            .where(Order.user_id == user_id)
            .order_by(Order.created_at.desc())
            .limit(10)
        )
        return result.scalars().all()

    async def _get_user_stats(self, user_id: int):
        result = await self.order_db.execute(
            select(
                func.count(Order.id).label(&#34;total_orders&#34;),
                func.sum(Order.total).label(&#34;total_amount&#34;)
            )
            .where(Order.user_id == user_id)
        )
        row = result.one()
        return {
            &#34;total_orders&#34;: row.total_orders,
            &#34;total_amount&#34;: row.total_amount
        }
```

---

## ⚠️ 注意事項

### 1. 複製延遲

```python
async def create_and_get_user(data):
    &#34;&#34;&#34;處理複製延遲&#34;&#34;&#34;
    # 寫入主庫
    async with db_manager.master() as db:
        user = User(**data)
        db.add(user)
        await db.commit()
        user_id = user.id

    # 讀取時使用主庫（避免複製延遲）
    async with db_manager.master() as db:  # 注意：不是 slave
        return await db.get(User, user_id)

# 或者等待一小段時間
async def create_and_get_user_with_delay(data):
    async with db_manager.master() as db:
        user = User(**data)
        db.add(user)
        await db.commit()
        user_id = user.id

    # 等待複製
    await asyncio.sleep(0.1)

    async with db_manager.slave() as db:
        return await db.get(User, user_id)
```

### 2. 連線池管理

```python
from sqlalchemy.pool import QueuePool, NullPool

# 開發環境：不使用連線池
dev_engine = create_async_engine(
    DATABASE_URL,
    poolclass=NullPool
)

# 生產環境：使用連線池
prod_engine = create_async_engine(
    DATABASE_URL,
    poolclass=QueuePool,
    pool_size=10,
    max_overflow=20,
    pool_timeout=30,
    pool_recycle=1800,
    pool_pre_ping=True
)
```

### 3. 健康檢查

```python
async def check_database_health():
    &#34;&#34;&#34;檢查資料庫健康狀態&#34;&#34;&#34;
    results = {}

    # 檢查主庫
    try:
        async with db_manager.master() as db:
            await db.execute(text(&#34;SELECT 1&#34;))
        results[&#34;master&#34;] = &#34;healthy&#34;
    except Exception as e:
        results[&#34;master&#34;] = f&#34;unhealthy: {str(e)}&#34;

    # 檢查從庫
    try:
        async with db_manager.slave() as db:
            await db.execute(text(&#34;SELECT 1&#34;))
        results[&#34;slave&#34;] = &#34;healthy&#34;
    except Exception as e:
        results[&#34;slave&#34;] = f&#34;unhealthy: {str(e)}&#34;

    return results


@app.get(&#34;/health/db&#34;)
async def database_health():
    return await check_database_health()
```

---

## 📝 完整範例

```python
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from contextlib import asynccontextmanager
from typing import AsyncGenerator

class DatabaseCluster:
    &#34;&#34;&#34;資料庫叢集管理&#34;&#34;&#34;

    def __init__(self, config: DatabaseSettings):
        # 主庫
        self.master_engine = create_async_engine(
            config.master_url,
            pool_size=config.master_pool_size,
            pool_pre_ping=True
        )
        self.master_factory = sessionmaker(
            self.master_engine,
            class_=AsyncSession,
            expire_on_commit=False
        )

        # 從庫（可選）
        if config.slave_url:
            self.slave_engine = create_async_engine(
                config.slave_url,
                pool_size=config.slave_pool_size,
                pool_pre_ping=True
            )
            self.slave_factory = sessionmaker(
                self.slave_engine,
                class_=AsyncSession,
                expire_on_commit=False
            )
        else:
            self.slave_engine = self.master_engine
            self.slave_factory = self.master_factory

    @asynccontextmanager
    async def write_session(self) -&gt; AsyncGenerator[AsyncSession, None]:
        &#34;&#34;&#34;寫入用 Session&#34;&#34;&#34;
        async with self.master_factory() as session:
            try:
                yield session
            except Exception:
                await session.rollback()
                raise

    @asynccontextmanager
    async def read_session(self) -&gt; AsyncGenerator[AsyncSession, None]:
        &#34;&#34;&#34;讀取用 Session&#34;&#34;&#34;
        async with self.slave_factory() as session:
            yield session

    @asynccontextmanager
    async def session(self, read_only: bool = False) -&gt; AsyncGenerator[AsyncSession, None]:
        &#34;&#34;&#34;通用 Session&#34;&#34;&#34;
        if read_only:
            async with self.read_session() as session:
                yield session
        else:
            async with self.write_session() as session:
                yield session

    async def dispose(self):
        &#34;&#34;&#34;關閉所有連線&#34;&#34;&#34;
        await self.master_engine.dispose()
        if self.slave_engine != self.master_engine:
            await self.slave_engine.dispose()


# FastAPI 整合
db_cluster: DatabaseCluster = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global db_cluster
    db_cluster = DatabaseCluster(settings.database)
    yield
    await db_cluster.dispose()

app = FastAPI(lifespan=lifespan)

# 依賴項
async def get_write_db():
    async with db_cluster.write_session() as session:
        yield session

async def get_read_db():
    async with db_cluster.read_session() as session:
        yield session

# API 端點
@app.post(&#34;/users&#34;, response_model=UserResponse)
async def create_user(
    data: UserCreate,
    db: AsyncSession = Depends(get_write_db)
):
    user = User(**data.dict())
    db.add(user)
    await db.commit()
    await db.refresh(user)
    return user

@app.get(&#34;/users&#34;, response_model=list[UserResponse])
async def list_users(db: AsyncSession = Depends(get_read_db)):
    result = await db.execute(select(User))
    return result.scalars().all()
```

---

## ✅ 重點總結

### 多資料庫場景

| 場景 | 方案 |
|------|------|
| 讀寫分離 | Master/Slave 架構 |
| 多租戶 | Schema 或資料庫分離 |
| 微服務 | 每服務獨立資料庫 |

### 注意事項

1. **複製延遲**：寫入後立即讀取要用主庫
2. **連線池**：合理設定 pool_size
3. **健康檢查**：監控所有資料庫狀態
4. **錯誤處理**：處理資料庫切換失敗

---

## 🎤 面試這樣答

### Q: 如何實現讀寫分離？

**答案：**

&gt; 讀寫分離的實作方式：
&gt;
&gt; 1. **建立多個 Engine**：分別連接主庫和從庫
&gt; 2. **建立對應的 Session 工廠**
&gt; 3. **根據操作類型選擇 Session**
&gt;
&gt; ```python
&gt; class DatabaseManager:
&gt;     async def master(self):  # 寫入用
&gt;         async with self.master_factory() as session:
&gt;             yield session
&gt;
&gt;     async def slave(self):   # 讀取用
&gt;         async with self.slave_factory() as session:
&gt;             yield session
&gt; ```
&gt;
&gt; **注意事項：**
&gt; - 複製延遲：寫入後立即讀取要用主庫
&gt; - 健康檢查：監控從庫狀態
&gt; - 負載均衡：多個從庫時需要分配策略

---

**上一篇：** [03-7. 查詢優化](./03-7)
**下一篇：** [03-9. 測試資料庫操作](./03-9)

---

最後更新：2025-12-17


---

> 作者: luk  
> URL: https://yoru-karu-blog-lalaluk-52581ac5e0cef170a3c8922c19182ecb6f7bd604.gitlab.io/posts/tutorial/fastapi/03-8/  

