Django 面試準備 09-4:緩存更新策略

深入理解緩存與數據庫一致性的各種更新策略

09-4. 緩存更新策略

📌 核心問題:緩存一致性

緩存與數據庫如何保持一致?

緩存:price = 100
數據庫:price = 100
           ↓ 用戶修改價格為 150
緩存:price = 100 ⚠️ 舊數據
數據庫:price = 150 ✅ 新數據

不一致的後果:

  • 用戶看到錯誤的數據
  • 訂單金額計算錯誤
  • 庫存顯示不準確

🎯 四大更新策略概覽

策略誰負責更新流程適用場景
Cache Aside應用程序先更新 DB,再刪除緩存✅ 最常用
Read Through緩存層緩存自動從 DB 加載讀多寫少
Write Through緩存層緩存自動更新 DB強一致性
Write Behind緩存層異步更新 DB高性能要求

🎯 策略 1:Cache Aside(旁路緩存)

最常用的策略

讀流程:

1. 查緩存 → 命中 → 返回
2. 查緩存 → 未命中 → 查數據庫 → 寫入緩存 → 返回

寫流程:

1. 更新數據庫
2. 刪除緩存(不是更新!)

Django 實現

from django.core.cache import cache

# === 讀操作 ===
def get_product(product_id):
    """讀取商品"""
    cache_key = f'product:{product_id}'

    # 1. 查緩存
    product = cache.get(cache_key)
    if product is not None:
        return product

    # 2. 查數據庫
    try:
        product = Product.objects.get(id=product_id)
        # 3. 寫入緩存
        cache.set(cache_key, product, timeout=300)
        return product
    except Product.DoesNotExist:
        return None

# === 寫操作 ===
def update_product(product_id, new_price):
    """更新商品價格"""
    cache_key = f'product:{product_id}'

    # 1. 更新數據庫
    Product.objects.filter(id=product_id).update(price=new_price)

    # 2. 刪除緩存(而不是更新緩存!)
    cache.delete(cache_key)

⚠️ 為什麼刪除而不是更新?

原因 1:避免併發問題

時刻    線程 A                  線程 B
T1      更新 DB: price=100
T2                              更新 DB: price=200
T3                              更新緩存: price=200
T4      更新緩存: price=100     ⚠️ 緩存變成舊值!

原因 2:減少無效更新

# 如果數據很少被讀取,更新緩存就是浪費
Product.objects.filter(id=999).update(stock=50)
cache.set('product:999', ...)  # 可能沒人會讀,浪費資源

# 刪除緩存,等真正需要時再加載
cache.delete('product:999')  # 更高效

🔍 完整範例

from django.core.cache import cache
from django.db import transaction

class ProductService:
    @staticmethod
    def get(product_id):
        """獲取商品"""
        cache_key = f'product:{product_id}'
        product = cache.get(cache_key)

        if product is None:
            product = Product.objects.filter(id=product_id).first()
            if product:
                cache.set(cache_key, product, timeout=300)

        return product

    @staticmethod
    @transaction.atomic
    def update_price(product_id, new_price):
        """更新價格"""
        # 1. 更新數據庫
        updated = Product.objects.filter(
            id=product_id
        ).update(price=new_price)

        if updated:
            # 2. 刪除緩存
            cache_key = f'product:{product_id}'
            cache.delete(cache_key)

            # 3. 刪除相關緩存
            cache.delete(f'product:list:all')
            return True

        return False

🎯 策略 2:Read Through(讀穿透)

特點

緩存層負責從數據庫加載數據,應用程序只與緩存交互。

流程

應用 → 緩存層 → 緩存命中?→ 返回
               ↓ 未命中
              數據庫 → 寫入緩存 → 返回

Django 實現

class CacheLoader:
    """緩存加載器"""

    @staticmethod
    def get_product(product_id):
        """自動加載的緩存"""
        cache_key = f'product:{product_id}'

        def load_from_db():
            """從數據庫加載"""
            return Product.objects.filter(id=product_id).first()

        # 獲取或加載
        return cache.get_or_set(
            cache_key,
            load_from_db,
            timeout=300
        )

# 使用
product = CacheLoader.get_product(123)  # 不關心數據來源

進階:自動刷新

import time
from threading import Thread
from django.core.cache import cache

class AutoRefreshCache:
    """自動刷新緩存"""

    def __init__(self, key, loader_func, timeout=300, refresh_interval=60):
        self.key = key
        self.loader_func = loader_func
        self.timeout = timeout
        self.refresh_interval = refresh_interval

    def get(self):
        """獲取數據"""
        data = cache.get(self.key)

        if data is None:
            # 首次加載
            data = self._load_and_cache()
            # 啟動後台刷新
            self._start_refresh_thread()

        return data

    def _load_and_cache(self):
        """加載並緩存"""
        data = self.loader_func()
        cache.set(self.key, data, timeout=self.timeout)
        return data

    def _start_refresh_thread(self):
        """啟動後台刷新線程"""
        def refresh():
            while True:
                time.sleep(self.refresh_interval)
                self._load_and_cache()

        thread = Thread(target=refresh, daemon=True)
        thread.start()

# 使用
def load_hot_products():
    return Product.objects.filter(is_hot=True)[:10]

hot_products_cache = AutoRefreshCache(
    key='hot_products',
    loader_func=load_hot_products,
    timeout=300,
    refresh_interval=60
)

products = hot_products_cache.get()

🎯 策略 3:Write Through(寫穿透)

特點

寫操作同時更新緩存和數據庫。

流程

應用 → 緩存層 → 同時 → 更新緩存
               ↓
              更新數據庫

Django 實現

from django.db import transaction
from django.core.cache import cache

class WriteThoughCache:
    """寫穿透緩存"""

    @staticmethod
    def update_product(product_id, **updates):
        """同時更新緩存和數據庫"""
        cache_key = f'product:{product_id}'

        with transaction.atomic():
            # 1. 更新數據庫
            Product.objects.filter(id=product_id).update(**updates)

            # 2. 獲取最新數據
            product = Product.objects.get(id=product_id)

            # 3. 更新緩存
            cache.set(cache_key, product, timeout=300)

            return product

# 使用
product = WriteThoughCache.update_product(123, price=150, stock=100)

⚖️ 優缺點

優點:

  • ✅ 緩存始終是最新的
  • ✅ 讀取永遠命中

缺點:

  • ⚠️ 寫入變慢(雙寫)
  • ⚠️ 浪費資源(不常讀的數據也寫緩存)

🎯 策略 4:Write Behind(寫回)

特點

先更新緩存,異步批量更新數據庫。

流程

應用 → 緩存 → 立即返回
        ↓
      (異步)
        ↓
      數據庫

Django + Celery 實現

from django.core.cache import cache
from celery import shared_task

# === 異步任務 ===
@shared_task
def sync_product_to_db(product_id, updates):
    """異步同步到數據庫"""
    Product.objects.filter(id=product_id).update(**updates)

# === 寫入操作 ===
def update_product_write_behind(product_id, **updates):
    """寫回策略"""
    cache_key = f'product:{product_id}'

    # 1. 更新緩存(快速返回)
    product = cache.get(cache_key)
    if product is None:
        product = Product.objects.get(id=product_id)

    # 更新內存中的對象
    for key, value in updates.items():
        setattr(product, key, value)

    cache.set(cache_key, product, timeout=300)

    # 2. 異步更新數據庫
    sync_product_to_db.delay(product_id, updates)

    return product

進階:批量合併更新

from collections import defaultdict
from celery import shared_task
import time

class WriteBehindBuffer:
    """寫回緩衝區"""
    _buffer = defaultdict(dict)

    @classmethod
    def add(cls, model, pk, updates):
        """添加待更新數據"""
        key = f"{model}:{pk}"
        cls._buffer[key].update(updates)

    @classmethod
    def flush(cls):
        """批量刷新到數據庫"""
        for key, updates in cls._buffer.items():
            model, pk = key.split(':')
            if model == 'Product':
                Product.objects.filter(id=pk).update(**updates)

        cls._buffer.clear()

# Celery 定時任務:每 5 秒批量寫入
@shared_task
def flush_write_behind_buffer():
    WriteBehindBuffer.flush()

# 使用
def update_product(product_id, **updates):
    # 更新緩存
    cache_key = f'product:{product_id}'
    product = cache.get(cache_key) or Product.objects.get(id=product_id)

    for key, value in updates.items():
        setattr(product, key, value)

    cache.set(cache_key, product, timeout=300)

    # 添加到緩衝區(異步批量寫入)
    WriteBehindBuffer.add('Product', product_id, updates)

⚠️ 風險

數據丟失風險:

  • 緩存宕機 → 未同步的數據丟失
  • 需要持久化隊列(Celery + RabbitMQ/Redis)

📊 策略選擇指南

決策樹

需要強一致性?
    ├─ 是 → Write Through(同步雙寫)
    └─ 否 ↓

允許數據丟失風險?
    ├─ 否 → Cache Aside(最常用)
    └─ 是 ↓

追求極致性能?
    └─ 是 → Write Behind(異步寫入)

緩存層自動管理?
    └─ 是 → Read Through(自動加載)

對比表

策略一致性性能複雜度適用場景
Cache Aside最終一致⭐⭐⭐⭐⭐⭐通用場景
Read Through最終一致⭐⭐⭐⭐⭐⭐⭐讀多寫少
Write Through強一致⭐⭐⭐⭐⭐⭐金融系統
Write Behind弱一致⭐⭐⭐⭐⭐⭐⭐⭐⭐日誌、統計

🎯 實戰案例

案例 1:電商商品緩存

from django.core.cache import cache
from django.db import transaction
from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver

class ProductCache:
    """商品緩存管理"""

    TIMEOUT = 300

    @staticmethod
    def get_cache_key(product_id):
        return f'product:{product_id}'

    @classmethod
    def get(cls, product_id):
        """獲取商品(Cache Aside)"""
        cache_key = cls.get_cache_key(product_id)
        product = cache.get(cache_key)

        if product is None:
            product = Product.objects.select_related('category').filter(
                id=product_id
            ).first()

            if product:
                cache.set(cache_key, product, timeout=cls.TIMEOUT)

        return product

    @classmethod
    def invalidate(cls, product_id):
        """失效緩存"""
        cache_key = cls.get_cache_key(product_id)
        cache.delete(cache_key)

        # 同時失效列表緩存
        cache.delete('product:list:all')
        cache.delete('product:list:hot')

# 信號處理:自動失效緩存
@receiver([post_save, post_delete], sender=Product)
def invalidate_product_cache(sender, instance, **kwargs):
    ProductCache.invalidate(instance.id)

# 使用
product = ProductCache.get(123)

案例 2:用戶會話緩存

from django.core.cache import cache

class SessionCache:
    """用戶會話緩存(Write Through)"""

    @staticmethod
    def get_session(session_id):
        """獲取會話"""
        cache_key = f'session:{session_id}'
        return cache.get(cache_key)

    @staticmethod
    def update_session(session_id, data, db_sync=True):
        """更新會話(同步到 DB)"""
        cache_key = f'session:{session_id}'

        # 1. 更新緩存
        cache.set(cache_key, data, timeout=1800)

        # 2. 同步更新數據庫(可選)
        if db_sync:
            Session.objects.filter(session_id=session_id).update(
                data=data
            )

# 使用
SessionCache.update_session('abc123', {'user_id': 456, 'cart': [...]})

案例 3:統計數據緩存

from django.core.cache import cache
from celery import shared_task

class StatisticsCache:
    """統計數據緩存(Write Behind)"""

    @staticmethod
    def increment_view_count(product_id):
        """增加瀏覽量(異步寫入)"""
        cache_key = f'stats:views:{product_id}'

        # 1. 緩存中增加
        new_count = cache.incr(cache_key, delta=1)

        # 2. 每 100 次同步一次數據庫
        if new_count % 100 == 0:
            sync_view_count.delay(product_id, new_count)

        return new_count

@shared_task
def sync_view_count(product_id, count):
    """異步同步瀏覽量"""
    Product.objects.filter(id=product_id).update(view_count=count)

# 使用
StatisticsCache.increment_view_count(123)  # 快速返回

🔍 一致性問題深入

問題 1:先刪緩存還是先更新 DB?

方案 A:先刪緩存,再更新 DB

cache.delete(cache_key)
Product.objects.filter(id=product_id).update(price=150)

問題:

時刻    線程 A                  線程 B
T1      刪除緩存
T2                              讀緩存 → 未命中
T3                              讀 DB → price=100(舊值)
T4                              寫緩存 → price=100
T5      更新 DB → price=150
結果:緩存是舊值 100,DB 是新值 150 ⚠️

方案 B:先更新 DB,再刪緩存(推薦)

Product.objects.filter(id=product_id).update(price=150)
cache.delete(cache_key)

更安全,但仍有極小概率不一致。


問題 2:延遲雙刪

from django.db import transaction

def update_product_with_double_delete(product_id, new_price):
    """延遲雙刪策略"""
    cache_key = f'product:{product_id}'

    with transaction.atomic():
        # 1. 刪除緩存(第一次)
        cache.delete(cache_key)

        # 2. 更新數據庫
        Product.objects.filter(id=product_id).update(price=new_price)

        # 3. 延遲刪除緩存(第二次)
        delete_cache_delayed.apply_async(
            args=[cache_key],
            countdown=1  # 延遲 1 秒
        )

@shared_task
def delete_cache_delayed(cache_key):
    """延遲刪除緩存"""
    cache.delete(cache_key)

💡 最佳實踐

1. 設置合理的過期時間

# 不同數據不同策略
CACHE_TIMEOUTS = {
    'user_profile': 1800,      # 用戶資料:30 分鐘
    'product_detail': 300,     # 商品詳情:5 分鐘
    'hot_products': 60,        # 熱門商品:1 分鐘
    'site_config': 86400,      # 站點配置:1 天
}

2. 監控緩存命中率

from django.core.cache import cache

def get_with_metrics(key, loader_func):
    """帶監控的緩存獲取"""
    value = cache.get(key)

    if value is not None:
        # 命中
        cache.incr('metrics:cache_hits')
        return value
    else:
        # 未命中
        cache.incr('metrics:cache_misses')
        value = loader_func()
        cache.set(key, value)
        return value

3. 使用版本控制

# 統一修改緩存版本,使舊緩存失效
CACHE_VERSION = 2

cache.set('product:123', product, version=CACHE_VERSION)
cache.get('product:123', version=CACHE_VERSION)

💡 面試要點

Q1: Cache Aside 為什麼先更新 DB 再刪緩存?

答:

  • 先刪緩存:高並發下可能寫入舊數據
  • 先更新 DB:即使緩存刪除失敗,下次也會讀到新數據(過期後)
  • 可用「延遲雙刪」進一步保證

Q2: Write Through 和 Write Behind 的區別?

答:

  • Write Through:同步雙寫,強一致性,性能較低
  • Write Behind:異步寫入,高性能,可能丟數據

Q3: 如何保證緩存與數據庫的強一致性?

答:

  1. 分布式鎖:寫操作加鎖
  2. Write Through:同步更新
  3. 訂閱 binlog:監聽 DB 變化自動刪緩存
  4. 降低緩存時間:減少不一致窗口

Q4: 緩存更新失敗怎麼辦?

答:

  1. 重試機制:Celery 重試
  2. 消息隊列:持久化更新任務
  3. 監控告警:及時發現問題
  4. 定時校驗:定期對比緩存與 DB

🎓 總結

選擇建議:

  1. 通用場景 → Cache Aside
  2. 讀多寫少 → Read Through
  3. 強一致性 → Write Through
  4. 高性能統計 → Write Behind

一致性保證:

  • 先更新 DB,再刪緩存
  • 使用延遲雙刪
  • 設置合理的過期時間
  • 監控 + 告警

恭喜!你已完成第 09 章「緩存策略實戰」的所有內容!

系列文章:

下一章: 10. SQL 查詢優化

0%