03-8. 多資料庫支援

⏱️ 閱讀時間: 15 分鐘 🎯 難度: ⭐⭐⭐ (進階)


🤔 一句話解釋

多資料庫支援讓你的應用可以同時連接多個資料庫,實現讀寫分離、分庫等架構。


🎯 使用場景

┌─────────────────────────────────────────────────────────┐
│                    多資料庫場景                          │
├─────────────────────────────────────────────────────────┤
│  1. 讀寫分離                                             │
│     ┌────────┐         ┌────────┐                      │
│     │ Master │ ──複製──▶│ Slave  │                     │
│     │  寫入  │         │  讀取  │                      │
│     └────────┘         └────────┘                      │
├─────────────────────────────────────────────────────────┤
│  2. 多租戶                                               │
│     ┌────────┐   ┌────────┐   ┌────────┐              │
│     │ DB_A   │   │ DB_B   │   │ DB_C   │              │
│     │租戶 A  │   │租戶 B  │   │租戶 C  │              │
│     └────────┘   └────────┘   └────────┘              │
├─────────────────────────────────────────────────────────┤
│  3. 微服務整合                                           │
│     ┌────────┐   ┌────────┐   ┌────────┐              │
│     │ Users  │   │ Orders │   │Products│              │
│     │   DB   │   │   DB   │   │   DB   │              │
│     └────────┘   └────────┘   └────────┘              │
└─────────────────────────────────────────────────────────┘

🔧 基本設定

多個 Engine

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

# 主資料庫(寫入)
master_engine = create_async_engine(
    "postgresql+asyncpg://user:pass@master-db:5432/mydb",
    pool_size=10,
    max_overflow=20
)

# 從資料庫(讀取)
slave_engine = create_async_engine(
    "postgresql+asyncpg://user:pass@slave-db:5432/mydb",
    pool_size=20,
    max_overflow=40
)

# Session 工廠
MasterSession = sessionmaker(
    master_engine,
    class_=AsyncSession,
    expire_on_commit=False
)

SlaveSession = sessionmaker(
    slave_engine,
    class_=AsyncSession,
    expire_on_commit=False
)

設定管理

from pydantic_settings import BaseSettings
from typing import Optional

class DatabaseSettings(BaseSettings):
    # 主資料庫
    master_url: str
    master_pool_size: int = 10

    # 從資料庫(可選)
    slave_url: Optional[str] = None
    slave_pool_size: int = 20

    # 其他資料庫
    analytics_url: Optional[str] = None

    class Config:
        env_prefix = "DB_"

settings = DatabaseSettings()

📖 讀寫分離

基本實作

from contextlib import asynccontextmanager
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession

class DatabaseManager:
    """資料庫管理器"""

    def __init__(self, master_url: str, slave_url: str = None):
        self.master_engine = create_async_engine(master_url)
        self.slave_engine = create_async_engine(slave_url or master_url)

        self.master_session_factory = sessionmaker(
            self.master_engine,
            class_=AsyncSession,
            expire_on_commit=False
        )

        self.slave_session_factory = sessionmaker(
            self.slave_engine,
            class_=AsyncSession,
            expire_on_commit=False
        )

    @asynccontextmanager
    async def master(self) -> AsyncGenerator[AsyncSession, None]:
        """取得主資料庫 Session(用於寫入)"""
        async with self.master_session_factory() as session:
            yield session

    @asynccontextmanager
    async def slave(self) -> AsyncGenerator[AsyncSession, None]:
        """取得從資料庫 Session(用於讀取)"""
        async with self.slave_session_factory() as session:
            yield session

    async def close(self):
        """關閉所有連線"""
        await self.master_engine.dispose()
        await self.slave_engine.dispose()


# 使用
db_manager = DatabaseManager(
    master_url="postgresql+asyncpg://user:pass@master:5432/db",
    slave_url="postgresql+asyncpg://user:pass@slave:5432/db"
)

async def create_user(data):
    async with db_manager.master() as db:
        user = User(**data)
        db.add(user)
        await db.commit()
        return user

async def get_users():
    async with db_manager.slave() as db:
        result = await db.execute(select(User))
        return result.scalars().all()

FastAPI 整合

from fastapi import FastAPI, Depends
from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    # 啟動時
    app.state.db_manager = DatabaseManager(
        master_url=settings.master_url,
        slave_url=settings.slave_url
    )
    yield
    # 關閉時
    await app.state.db_manager.close()

app = FastAPI(lifespan=lifespan)

# 依賴項
async def get_write_db() -> AsyncSession:
    async with app.state.db_manager.master() as session:
        yield session

async def get_read_db() -> AsyncSession:
    async with app.state.db_manager.slave() as session:
        yield session

# API 端點
@app.post("/users")
async def create_user(
    user_data: UserCreate,
    db: AsyncSession = Depends(get_write_db)  # 使用主庫
):
    user = User(**user_data.dict())
    db.add(user)
    await db.commit()
    return user

@app.get("/users")
async def list_users(
    db: AsyncSession = Depends(get_read_db)  # 使用從庫
):
    result = await db.execute(select(User))
    return result.scalars().all()

🏢 多租戶架構

Schema 分離

from sqlalchemy import event
from sqlalchemy.orm import Session

class TenantManager:
    """多租戶管理器"""

    def __init__(self, engine):
        self.engine = engine
        self.session_factory = sessionmaker(
            engine,
            class_=AsyncSession,
            expire_on_commit=False
        )

    @asynccontextmanager
    async def get_session(self, tenant_id: str) -> AsyncGenerator[AsyncSession, None]:
        """取得指定租戶的 Session"""
        async with self.session_factory() as session:
            # 設定 search_path 到租戶的 schema
            await session.execute(
                text(f"SET search_path TO tenant_{tenant_id}, public")
            )
            yield session


# 使用
tenant_manager = TenantManager(engine)

async def get_tenant_users(tenant_id: str):
    async with tenant_manager.get_session(tenant_id) as db:
        result = await db.execute(select(User))
        return result.scalars().all()

資料庫分離

from typing import Dict

class MultiTenantManager:
    """多資料庫租戶管理器"""

    def __init__(self):
        self.engines: Dict[str, AsyncEngine] = {}
        self.session_factories: Dict[str, sessionmaker] = {}

    def register_tenant(self, tenant_id: str, database_url: str):
        """註冊租戶資料庫"""
        engine = create_async_engine(database_url)
        self.engines[tenant_id] = engine
        self.session_factories[tenant_id] = sessionmaker(
            engine,
            class_=AsyncSession,
            expire_on_commit=False
        )

    @asynccontextmanager
    async def get_session(self, tenant_id: str) -> AsyncGenerator[AsyncSession, None]:
        """取得租戶的 Session"""
        if tenant_id not in self.session_factories:
            raise ValueError(f"Tenant {tenant_id} not found")

        async with self.session_factories[tenant_id]() as session:
            yield session

    async def close_all(self):
        """關閉所有連線"""
        for engine in self.engines.values():
            await engine.dispose()


# FastAPI 整合
tenant_manager = MultiTenantManager()

# 註冊租戶
tenant_manager.register_tenant("tenant_a", "postgresql+asyncpg://...@db-a/db")
tenant_manager.register_tenant("tenant_b", "postgresql+asyncpg://...@db-b/db")

# 從請求取得租戶 ID
async def get_tenant_id(request: Request) -> str:
    # 可以從 header、subdomain、路徑等取得
    tenant_id = request.headers.get("X-Tenant-ID")
    if not tenant_id:
        raise HTTPException(status_code=400, detail="Tenant ID required")
    return tenant_id

async def get_tenant_db(
    tenant_id: str = Depends(get_tenant_id)
) -> AsyncSession:
    async with tenant_manager.get_session(tenant_id) as session:
        yield session

@app.get("/users")
async def list_users(db: AsyncSession = Depends(get_tenant_db)):
    result = await db.execute(select(User))
    return result.scalars().all()

🔄 跨資料庫查詢

基本方式

async def get_user_with_orders(user_id: int):
    """跨資料庫查詢"""
    # 從使用者資料庫取得使用者
    async with user_db_manager.session() as user_db:
        user = await user_db.get(User, user_id)

    # 從訂單資料庫取得訂單
    async with order_db_manager.session() as order_db:
        result = await order_db.execute(
            select(Order).where(Order.user_id == user_id)
        )
        orders = result.scalars().all()

    return {
        "user": user,
        "orders": orders
    }

使用 Service 層封裝

class UserOrderService:
    """跨資料庫服務"""

    def __init__(
        self,
        user_db: AsyncSession,
        order_db: AsyncSession
    ):
        self.user_db = user_db
        self.order_db = order_db

    async def get_user_dashboard(self, user_id: int):
        # 並行查詢
        import asyncio

        user_task = self._get_user(user_id)
        orders_task = self._get_user_orders(user_id)
        stats_task = self._get_user_stats(user_id)

        user, orders, stats = await asyncio.gather(
            user_task, orders_task, stats_task
        )

        return {
            "user": user,
            "orders": orders,
            "stats": stats
        }

    async def _get_user(self, user_id: int):
        return await self.user_db.get(User, user_id)

    async def _get_user_orders(self, user_id: int):
        result = await self.order_db.execute(
            select(Order)
            .where(Order.user_id == user_id)
            .order_by(Order.created_at.desc())
            .limit(10)
        )
        return result.scalars().all()

    async def _get_user_stats(self, user_id: int):
        result = await self.order_db.execute(
            select(
                func.count(Order.id).label("total_orders"),
                func.sum(Order.total).label("total_amount")
            )
            .where(Order.user_id == user_id)
        )
        row = result.one()
        return {
            "total_orders": row.total_orders,
            "total_amount": row.total_amount
        }

⚠️ 注意事項

1. 複製延遲

async def create_and_get_user(data):
    """處理複製延遲"""
    # 寫入主庫
    async with db_manager.master() as db:
        user = User(**data)
        db.add(user)
        await db.commit()
        user_id = user.id

    # 讀取時使用主庫(避免複製延遲)
    async with db_manager.master() as db:  # 注意:不是 slave
        return await db.get(User, user_id)

# 或者等待一小段時間
async def create_and_get_user_with_delay(data):
    async with db_manager.master() as db:
        user = User(**data)
        db.add(user)
        await db.commit()
        user_id = user.id

    # 等待複製
    await asyncio.sleep(0.1)

    async with db_manager.slave() as db:
        return await db.get(User, user_id)

2. 連線池管理

from sqlalchemy.pool import QueuePool, NullPool

# 開發環境:不使用連線池
dev_engine = create_async_engine(
    DATABASE_URL,
    poolclass=NullPool
)

# 生產環境:使用連線池
prod_engine = create_async_engine(
    DATABASE_URL,
    poolclass=QueuePool,
    pool_size=10,
    max_overflow=20,
    pool_timeout=30,
    pool_recycle=1800,
    pool_pre_ping=True
)

3. 健康檢查

async def check_database_health():
    """檢查資料庫健康狀態"""
    results = {}

    # 檢查主庫
    try:
        async with db_manager.master() as db:
            await db.execute(text("SELECT 1"))
        results["master"] = "healthy"
    except Exception as e:
        results["master"] = f"unhealthy: {str(e)}"

    # 檢查從庫
    try:
        async with db_manager.slave() as db:
            await db.execute(text("SELECT 1"))
        results["slave"] = "healthy"
    except Exception as e:
        results["slave"] = f"unhealthy: {str(e)}"

    return results


@app.get("/health/db")
async def database_health():
    return await check_database_health()

📝 完整範例

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from contextlib import asynccontextmanager
from typing import AsyncGenerator

class DatabaseCluster:
    """資料庫叢集管理"""

    def __init__(self, config: DatabaseSettings):
        # 主庫
        self.master_engine = create_async_engine(
            config.master_url,
            pool_size=config.master_pool_size,
            pool_pre_ping=True
        )
        self.master_factory = sessionmaker(
            self.master_engine,
            class_=AsyncSession,
            expire_on_commit=False
        )

        # 從庫(可選)
        if config.slave_url:
            self.slave_engine = create_async_engine(
                config.slave_url,
                pool_size=config.slave_pool_size,
                pool_pre_ping=True
            )
            self.slave_factory = sessionmaker(
                self.slave_engine,
                class_=AsyncSession,
                expire_on_commit=False
            )
        else:
            self.slave_engine = self.master_engine
            self.slave_factory = self.master_factory

    @asynccontextmanager
    async def write_session(self) -> AsyncGenerator[AsyncSession, None]:
        """寫入用 Session"""
        async with self.master_factory() as session:
            try:
                yield session
            except Exception:
                await session.rollback()
                raise

    @asynccontextmanager
    async def read_session(self) -> AsyncGenerator[AsyncSession, None]:
        """讀取用 Session"""
        async with self.slave_factory() as session:
            yield session

    @asynccontextmanager
    async def session(self, read_only: bool = False) -> AsyncGenerator[AsyncSession, None]:
        """通用 Session"""
        if read_only:
            async with self.read_session() as session:
                yield session
        else:
            async with self.write_session() as session:
                yield session

    async def dispose(self):
        """關閉所有連線"""
        await self.master_engine.dispose()
        if self.slave_engine != self.master_engine:
            await self.slave_engine.dispose()


# FastAPI 整合
db_cluster: DatabaseCluster = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global db_cluster
    db_cluster = DatabaseCluster(settings.database)
    yield
    await db_cluster.dispose()

app = FastAPI(lifespan=lifespan)

# 依賴項
async def get_write_db():
    async with db_cluster.write_session() as session:
        yield session

async def get_read_db():
    async with db_cluster.read_session() as session:
        yield session

# API 端點
@app.post("/users", response_model=UserResponse)
async def create_user(
    data: UserCreate,
    db: AsyncSession = Depends(get_write_db)
):
    user = User(**data.dict())
    db.add(user)
    await db.commit()
    await db.refresh(user)
    return user

@app.get("/users", response_model=list[UserResponse])
async def list_users(db: AsyncSession = Depends(get_read_db)):
    result = await db.execute(select(User))
    return result.scalars().all()

✅ 重點總結

多資料庫場景

場景方案
讀寫分離Master/Slave 架構
多租戶Schema 或資料庫分離
微服務每服務獨立資料庫

注意事項

  1. 複製延遲:寫入後立即讀取要用主庫
  2. 連線池:合理設定 pool_size
  3. 健康檢查:監控所有資料庫狀態
  4. 錯誤處理:處理資料庫切換失敗

🎤 面試這樣答

Q: 如何實現讀寫分離?

答案:

讀寫分離的實作方式:

  1. 建立多個 Engine:分別連接主庫和從庫
  2. 建立對應的 Session 工廠
  3. 根據操作類型選擇 Session
class DatabaseManager:
    async def master(self):  # 寫入用
        async with self.master_factory() as session:
            yield session

    async def slave(self):   # 讀取用
        async with self.slave_factory() as session:
            yield session

注意事項:

  • 複製延遲:寫入後立即讀取要用主庫
  • 健康檢查:監控從庫狀態
  • 負載均衡:多個從庫時需要分配策略

上一篇: 03-7. 查詢優化 下一篇: 03-9. 測試資料庫操作


最後更新:2025-12-17

0%