目錄
03-8. 多資料庫支援
⏱️ 閱讀時間: 15 分鐘 🎯 難度: ⭐⭐⭐ (進階)
🤔 一句話解釋
多資料庫支援讓你的應用可以同時連接多個資料庫,實現讀寫分離、分庫等架構。
🎯 使用場景
┌─────────────────────────────────────────────────────────┐
│ 多資料庫場景 │
├─────────────────────────────────────────────────────────┤
│ 1. 讀寫分離 │
│ ┌────────┐ ┌────────┐ │
│ │ Master │ ──複製──▶│ Slave │ │
│ │ 寫入 │ │ 讀取 │ │
│ └────────┘ └────────┘ │
├─────────────────────────────────────────────────────────┤
│ 2. 多租戶 │
│ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │ DB_A │ │ DB_B │ │ DB_C │ │
│ │租戶 A │ │租戶 B │ │租戶 C │ │
│ └────────┘ └────────┘ └────────┘ │
├─────────────────────────────────────────────────────────┤
│ 3. 微服務整合 │
│ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │ Users │ │ Orders │ │Products│ │
│ │ DB │ │ DB │ │ DB │ │
│ └────────┘ └────────┘ └────────┘ │
└─────────────────────────────────────────────────────────┘🔧 基本設定
多個 Engine
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
# 主資料庫(寫入)
master_engine = create_async_engine(
"postgresql+asyncpg://user:pass@master-db:5432/mydb",
pool_size=10,
max_overflow=20
)
# 從資料庫(讀取)
slave_engine = create_async_engine(
"postgresql+asyncpg://user:pass@slave-db:5432/mydb",
pool_size=20,
max_overflow=40
)
# Session 工廠
MasterSession = sessionmaker(
master_engine,
class_=AsyncSession,
expire_on_commit=False
)
SlaveSession = sessionmaker(
slave_engine,
class_=AsyncSession,
expire_on_commit=False
)設定管理
from pydantic_settings import BaseSettings
from typing import Optional
class DatabaseSettings(BaseSettings):
# 主資料庫
master_url: str
master_pool_size: int = 10
# 從資料庫(可選)
slave_url: Optional[str] = None
slave_pool_size: int = 20
# 其他資料庫
analytics_url: Optional[str] = None
class Config:
env_prefix = "DB_"
settings = DatabaseSettings()📖 讀寫分離
基本實作
from contextlib import asynccontextmanager
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession
class DatabaseManager:
"""資料庫管理器"""
def __init__(self, master_url: str, slave_url: str = None):
self.master_engine = create_async_engine(master_url)
self.slave_engine = create_async_engine(slave_url or master_url)
self.master_session_factory = sessionmaker(
self.master_engine,
class_=AsyncSession,
expire_on_commit=False
)
self.slave_session_factory = sessionmaker(
self.slave_engine,
class_=AsyncSession,
expire_on_commit=False
)
@asynccontextmanager
async def master(self) -> AsyncGenerator[AsyncSession, None]:
"""取得主資料庫 Session(用於寫入)"""
async with self.master_session_factory() as session:
yield session
@asynccontextmanager
async def slave(self) -> AsyncGenerator[AsyncSession, None]:
"""取得從資料庫 Session(用於讀取)"""
async with self.slave_session_factory() as session:
yield session
async def close(self):
"""關閉所有連線"""
await self.master_engine.dispose()
await self.slave_engine.dispose()
# 使用
db_manager = DatabaseManager(
master_url="postgresql+asyncpg://user:pass@master:5432/db",
slave_url="postgresql+asyncpg://user:pass@slave:5432/db"
)
async def create_user(data):
async with db_manager.master() as db:
user = User(**data)
db.add(user)
await db.commit()
return user
async def get_users():
async with db_manager.slave() as db:
result = await db.execute(select(User))
return result.scalars().all()FastAPI 整合
from fastapi import FastAPI, Depends
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
# 啟動時
app.state.db_manager = DatabaseManager(
master_url=settings.master_url,
slave_url=settings.slave_url
)
yield
# 關閉時
await app.state.db_manager.close()
app = FastAPI(lifespan=lifespan)
# 依賴項
async def get_write_db() -> AsyncSession:
async with app.state.db_manager.master() as session:
yield session
async def get_read_db() -> AsyncSession:
async with app.state.db_manager.slave() as session:
yield session
# API 端點
@app.post("/users")
async def create_user(
user_data: UserCreate,
db: AsyncSession = Depends(get_write_db) # 使用主庫
):
user = User(**user_data.dict())
db.add(user)
await db.commit()
return user
@app.get("/users")
async def list_users(
db: AsyncSession = Depends(get_read_db) # 使用從庫
):
result = await db.execute(select(User))
return result.scalars().all()🏢 多租戶架構
Schema 分離
from sqlalchemy import event
from sqlalchemy.orm import Session
class TenantManager:
"""多租戶管理器"""
def __init__(self, engine):
self.engine = engine
self.session_factory = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
@asynccontextmanager
async def get_session(self, tenant_id: str) -> AsyncGenerator[AsyncSession, None]:
"""取得指定租戶的 Session"""
async with self.session_factory() as session:
# 設定 search_path 到租戶的 schema
await session.execute(
text(f"SET search_path TO tenant_{tenant_id}, public")
)
yield session
# 使用
tenant_manager = TenantManager(engine)
async def get_tenant_users(tenant_id: str):
async with tenant_manager.get_session(tenant_id) as db:
result = await db.execute(select(User))
return result.scalars().all()資料庫分離
from typing import Dict
class MultiTenantManager:
"""多資料庫租戶管理器"""
def __init__(self):
self.engines: Dict[str, AsyncEngine] = {}
self.session_factories: Dict[str, sessionmaker] = {}
def register_tenant(self, tenant_id: str, database_url: str):
"""註冊租戶資料庫"""
engine = create_async_engine(database_url)
self.engines[tenant_id] = engine
self.session_factories[tenant_id] = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
@asynccontextmanager
async def get_session(self, tenant_id: str) -> AsyncGenerator[AsyncSession, None]:
"""取得租戶的 Session"""
if tenant_id not in self.session_factories:
raise ValueError(f"Tenant {tenant_id} not found")
async with self.session_factories[tenant_id]() as session:
yield session
async def close_all(self):
"""關閉所有連線"""
for engine in self.engines.values():
await engine.dispose()
# FastAPI 整合
tenant_manager = MultiTenantManager()
# 註冊租戶
tenant_manager.register_tenant("tenant_a", "postgresql+asyncpg://...@db-a/db")
tenant_manager.register_tenant("tenant_b", "postgresql+asyncpg://...@db-b/db")
# 從請求取得租戶 ID
async def get_tenant_id(request: Request) -> str:
# 可以從 header、subdomain、路徑等取得
tenant_id = request.headers.get("X-Tenant-ID")
if not tenant_id:
raise HTTPException(status_code=400, detail="Tenant ID required")
return tenant_id
async def get_tenant_db(
tenant_id: str = Depends(get_tenant_id)
) -> AsyncSession:
async with tenant_manager.get_session(tenant_id) as session:
yield session
@app.get("/users")
async def list_users(db: AsyncSession = Depends(get_tenant_db)):
result = await db.execute(select(User))
return result.scalars().all()🔄 跨資料庫查詢
基本方式
async def get_user_with_orders(user_id: int):
"""跨資料庫查詢"""
# 從使用者資料庫取得使用者
async with user_db_manager.session() as user_db:
user = await user_db.get(User, user_id)
# 從訂單資料庫取得訂單
async with order_db_manager.session() as order_db:
result = await order_db.execute(
select(Order).where(Order.user_id == user_id)
)
orders = result.scalars().all()
return {
"user": user,
"orders": orders
}使用 Service 層封裝
class UserOrderService:
"""跨資料庫服務"""
def __init__(
self,
user_db: AsyncSession,
order_db: AsyncSession
):
self.user_db = user_db
self.order_db = order_db
async def get_user_dashboard(self, user_id: int):
# 並行查詢
import asyncio
user_task = self._get_user(user_id)
orders_task = self._get_user_orders(user_id)
stats_task = self._get_user_stats(user_id)
user, orders, stats = await asyncio.gather(
user_task, orders_task, stats_task
)
return {
"user": user,
"orders": orders,
"stats": stats
}
async def _get_user(self, user_id: int):
return await self.user_db.get(User, user_id)
async def _get_user_orders(self, user_id: int):
result = await self.order_db.execute(
select(Order)
.where(Order.user_id == user_id)
.order_by(Order.created_at.desc())
.limit(10)
)
return result.scalars().all()
async def _get_user_stats(self, user_id: int):
result = await self.order_db.execute(
select(
func.count(Order.id).label("total_orders"),
func.sum(Order.total).label("total_amount")
)
.where(Order.user_id == user_id)
)
row = result.one()
return {
"total_orders": row.total_orders,
"total_amount": row.total_amount
}⚠️ 注意事項
1. 複製延遲
async def create_and_get_user(data):
"""處理複製延遲"""
# 寫入主庫
async with db_manager.master() as db:
user = User(**data)
db.add(user)
await db.commit()
user_id = user.id
# 讀取時使用主庫(避免複製延遲)
async with db_manager.master() as db: # 注意:不是 slave
return await db.get(User, user_id)
# 或者等待一小段時間
async def create_and_get_user_with_delay(data):
async with db_manager.master() as db:
user = User(**data)
db.add(user)
await db.commit()
user_id = user.id
# 等待複製
await asyncio.sleep(0.1)
async with db_manager.slave() as db:
return await db.get(User, user_id)2. 連線池管理
from sqlalchemy.pool import QueuePool, NullPool
# 開發環境:不使用連線池
dev_engine = create_async_engine(
DATABASE_URL,
poolclass=NullPool
)
# 生產環境:使用連線池
prod_engine = create_async_engine(
DATABASE_URL,
poolclass=QueuePool,
pool_size=10,
max_overflow=20,
pool_timeout=30,
pool_recycle=1800,
pool_pre_ping=True
)3. 健康檢查
async def check_database_health():
"""檢查資料庫健康狀態"""
results = {}
# 檢查主庫
try:
async with db_manager.master() as db:
await db.execute(text("SELECT 1"))
results["master"] = "healthy"
except Exception as e:
results["master"] = f"unhealthy: {str(e)}"
# 檢查從庫
try:
async with db_manager.slave() as db:
await db.execute(text("SELECT 1"))
results["slave"] = "healthy"
except Exception as e:
results["slave"] = f"unhealthy: {str(e)}"
return results
@app.get("/health/db")
async def database_health():
return await check_database_health()📝 完整範例
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from contextlib import asynccontextmanager
from typing import AsyncGenerator
class DatabaseCluster:
"""資料庫叢集管理"""
def __init__(self, config: DatabaseSettings):
# 主庫
self.master_engine = create_async_engine(
config.master_url,
pool_size=config.master_pool_size,
pool_pre_ping=True
)
self.master_factory = sessionmaker(
self.master_engine,
class_=AsyncSession,
expire_on_commit=False
)
# 從庫(可選)
if config.slave_url:
self.slave_engine = create_async_engine(
config.slave_url,
pool_size=config.slave_pool_size,
pool_pre_ping=True
)
self.slave_factory = sessionmaker(
self.slave_engine,
class_=AsyncSession,
expire_on_commit=False
)
else:
self.slave_engine = self.master_engine
self.slave_factory = self.master_factory
@asynccontextmanager
async def write_session(self) -> AsyncGenerator[AsyncSession, None]:
"""寫入用 Session"""
async with self.master_factory() as session:
try:
yield session
except Exception:
await session.rollback()
raise
@asynccontextmanager
async def read_session(self) -> AsyncGenerator[AsyncSession, None]:
"""讀取用 Session"""
async with self.slave_factory() as session:
yield session
@asynccontextmanager
async def session(self, read_only: bool = False) -> AsyncGenerator[AsyncSession, None]:
"""通用 Session"""
if read_only:
async with self.read_session() as session:
yield session
else:
async with self.write_session() as session:
yield session
async def dispose(self):
"""關閉所有連線"""
await self.master_engine.dispose()
if self.slave_engine != self.master_engine:
await self.slave_engine.dispose()
# FastAPI 整合
db_cluster: DatabaseCluster = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global db_cluster
db_cluster = DatabaseCluster(settings.database)
yield
await db_cluster.dispose()
app = FastAPI(lifespan=lifespan)
# 依賴項
async def get_write_db():
async with db_cluster.write_session() as session:
yield session
async def get_read_db():
async with db_cluster.read_session() as session:
yield session
# API 端點
@app.post("/users", response_model=UserResponse)
async def create_user(
data: UserCreate,
db: AsyncSession = Depends(get_write_db)
):
user = User(**data.dict())
db.add(user)
await db.commit()
await db.refresh(user)
return user
@app.get("/users", response_model=list[UserResponse])
async def list_users(db: AsyncSession = Depends(get_read_db)):
result = await db.execute(select(User))
return result.scalars().all()✅ 重點總結
多資料庫場景
| 場景 | 方案 |
|---|---|
| 讀寫分離 | Master/Slave 架構 |
| 多租戶 | Schema 或資料庫分離 |
| 微服務 | 每服務獨立資料庫 |
注意事項
- 複製延遲:寫入後立即讀取要用主庫
- 連線池:合理設定 pool_size
- 健康檢查:監控所有資料庫狀態
- 錯誤處理:處理資料庫切換失敗
🎤 面試這樣答
Q: 如何實現讀寫分離?
答案:
讀寫分離的實作方式:
- 建立多個 Engine:分別連接主庫和從庫
- 建立對應的 Session 工廠
- 根據操作類型選擇 Session
class DatabaseManager: async def master(self): # 寫入用 async with self.master_factory() as session: yield session async def slave(self): # 讀取用 async with self.slave_factory() as session: yield session注意事項:
- 複製延遲:寫入後立即讀取要用主庫
- 健康檢查:監控從庫狀態
- 負載均衡:多個從庫時需要分配策略
上一篇: 03-7. 查詢優化 下一篇: 03-9. 測試資料庫操作
最後更新:2025-12-17