Django 面試準備 11-3:競態條件處理

深入理解並發環境下的競態條件問題與解決方案

11-3. 競態條件處理(Race Condition Handling)

📌 什麼是競態條件?

簡單說: 多個線程同時操作共享數據,結果取決於執行順序

定義: 當兩個或多個線程並發訪問共享資源,且至少有一個線程進行寫操作時,如果沒有適當的同步機制,程序的行為將變得不可預測。


🔍 經典的競態條件示例

銀行轉帳問題

# ❌ 有競態條件的代碼

class BankAccount:
    def __init__(self, balance=0):
        self.balance = balance

    def withdraw(self, amount):
        # 步驟 1: 檢查餘額
        if self.balance >= amount:
            # 步驟 2: 計算新餘額
            new_balance = self.balance - amount
            # 步驟 3: 更新餘額
            self.balance = new_balance
            return True
        return False

account = BankAccount(balance=100)

競態條件發生:

時間    Thread 1 (提款 80)           Thread 2 (提款 80)          balance
T1      檢查: 100 >= 80 ✓
T2                                      檢查: 100 >= 80 ✓          100
T3      計算: 100 - 80 = 20
T4                                      計算: 100 - 80 = 20        100
T5      寫入: balance = 20
T6                                      寫入: balance = 20          20

結果:兩次提款共 160 元,但餘額只減少 80 元!💥
正確應該是:餘額不足,第二次提款失敗

🎯 Django 中的常見競態條件

場景 1:庫存扣減

# models.py
class Product(models.Model):
    name = models.CharField(max_length=200)
    stock = models.IntegerField(default=0)

# ❌ 有競態條件的代碼
def purchase(request, product_id, quantity):
    product = Product.objects.get(id=product_id)

    # 檢查庫存
    if product.stock >= quantity:
        # 扣減庫存
        product.stock -= quantity
        product.save()

        return JsonResponse({'status': 'success'})

    return JsonResponse({'status': 'insufficient_stock'})

問題:

初始庫存:10

時間    Thread 1 (購買 6)            Thread 2 (購買 8)           stock
T1      讀取: stock = 10
T2                                      讀取: stock = 10            10
T3      檢查: 10 >= 6 ✓
T4                                      檢查: 10 >= 8 ✓             10
T5      計算: 10 - 6 = 4
T6                                      計算: 10 - 8 = 2            10
T7      保存: stock = 4
T8                                      保存: stock = 2              2

結果:賣出 14 件商品,但庫存只有 10 件!超賣了!💥

場景 2:點讚計數

# models.py
class Post(models.Model):
    title = models.CharField(max_length=200)
    likes = models.IntegerField(default=0)

# ❌ 有競態條件的代碼
def like_post(request, post_id):
    post = Post.objects.get(id=post_id)

    # 增加讚數
    post.likes += 1
    post.save()

    return JsonResponse({'likes': post.likes})

問題:

100 個用戶同時點讚:

理想情況:likes: 0 → 100
實際情況:likes: 0 → 67(丟失了 33 個讚)💥

原因:likes += 1 不是原子操作

場景 3:唯一性檢查

# models.py
class User(models.Model):
    username = models.CharField(max_length=50)  # 沒有 unique=True

# ❌ 有競態條件的代碼
def register(request):
    username = request.POST.get('username')

    # 檢查用戶名是否存在
    if User.objects.filter(username=username).exists():
        return JsonResponse({'error': 'Username exists'})

    # 創建用戶
    user = User.objects.create(username=username)

    return JsonResponse({'user_id': user.id})

問題:

時間    Thread 1                    Thread 2                    數據庫
T1      檢查 "alice" 存在?
        → No
T2                                  檢查 "alice" 存在?
                                    → No
T3      創建 User("alice")                                      ✓
T4                                  創建 User("alice")           ✓

結果:創建了兩個相同用戶名的用戶!💥

✅ 解決方案 1:數據庫級原子操作

F() 表達式

from django.db.models import F

# ✅ 使用 F() 表達式(原子操作)
def purchase_v1(request, product_id, quantity):
    # 一條 SQL 完成:UPDATE ... SET stock = stock - ? WHERE id = ? AND stock >= ?
    affected = Product.objects.filter(
        id=product_id,
        stock__gte=quantity
    ).update(stock=F('stock') - quantity)

    if affected > 0:
        return JsonResponse({'status': 'success'})

    return JsonResponse({'status': 'insufficient_stock'})

生成的 SQL:

UPDATE product
SET stock = stock - 6
WHERE id = 123 AND stock >= 6;

為什麼安全? 這是單條原子 SQL,數據庫保證其完整執行


點讚計數優化

# ✅ 使用 F() 表達式
def like_post(request, post_id):
    # 原子操作
    Post.objects.filter(id=post_id).update(likes=F('likes') + 1)

    # 獲取更新後的值
    post = Post.objects.get(id=post_id)

    return JsonResponse({'likes': post.likes})

✅ 解決方案 2:數據庫事務 + 鎖

SELECT FOR UPDATE(悲觀鎖)

from django.db import transaction

# ✅ 使用 select_for_update(悲觀鎖)
@transaction.atomic
def purchase_v2(request, product_id, quantity):
    # 鎖定這一行,其他事務必須等待
    product = Product.objects.select_for_update().get(id=product_id)

    if product.stock >= quantity:
        product.stock -= quantity
        product.save()
        return JsonResponse({'status': 'success'})

    return JsonResponse({'status': 'insufficient_stock'})

執行流程:

Thread 1                            Thread 2
開始事務
SELECT ... FOR UPDATE
(鎖定 product)
                                    開始事務
                                    SELECT ... FOR UPDATE
                                    (等待 Thread 1 釋放鎖)⏰
檢查庫存: 10 >= 6 ✓
扣減庫存: 10 - 6 = 4
保存
提交事務(釋放鎖)✅
                                    (獲得鎖)✓
                                    檢查庫存: 4 >= 8 ❌
                                    返回庫存不足

結果:正確處理!✅

SQL:

-- PostgreSQL / MySQL
BEGIN;
SELECT * FROM product WHERE id = 123 FOR UPDATE;
UPDATE product SET stock = 4 WHERE id = 123;
COMMIT;

nowait 參數

@transaction.atomic
def purchase_v3(request, product_id, quantity):
    try:
        # nowait=True:如果無法獲取鎖,立即拋出異常
        product = Product.objects.select_for_update(nowait=True).get(id=product_id)

        if product.stock >= quantity:
            product.stock -= quantity
            product.save()
            return JsonResponse({'status': 'success'})

        return JsonResponse({'status': 'insufficient_stock'})

    except DatabaseError:
        # 無法獲取鎖,說明其他線程正在處理
        return JsonResponse({'status': 'busy', 'message': 'Try again later'})

✅ 解決方案 3:樂觀鎖(版本號)

基本概念

悲觀鎖: 假設一定會衝突,提前加鎖 樂觀鎖: 假設不會衝突,更新時檢查版本

# models.py
class Product(models.Model):
    name = models.CharField(max_length=200)
    stock = models.IntegerField(default=0)
    version = models.IntegerField(default=0)  # 版本號

# ✅ 使用樂觀鎖
def purchase_v4(request, product_id, quantity):
    max_retries = 3

    for attempt in range(max_retries):
        product = Product.objects.get(id=product_id)

        if product.stock < quantity:
            return JsonResponse({'status': 'insufficient_stock'})

        old_version = product.version

        # 嘗試更新(檢查版本號)
        affected = Product.objects.filter(
            id=product_id,
            version=old_version,
            stock__gte=quantity
        ).update(
            stock=F('stock') - quantity,
            version=F('version') + 1
        )

        if affected > 0:
            # 更新成功
            return JsonResponse({'status': 'success'})

        # 版本不匹配,重試
        continue

    return JsonResponse({'status': 'retry_exceeded'})

執行流程:

Thread 1                            Thread 2
讀取: stock=10, version=1
                                    讀取: stock=10, version=1
更新 WHERE version=1 ✓
version -> 2
                                    更新 WHERE version=1 ❌
                                    (version 已經是 2)
                                    重試...
                                    讀取: stock=4, version=2
                                    檢查庫存不足 ✓

✅ 解決方案 4:分布式鎖(Redis)

適用場景

  • 多個服務器實例
  • 需要跨進程同步
  • 高並發場景
from django.core.cache import cache
import time

# ✅ 使用 Redis 分布式鎖
def purchase_v5(request, product_id, quantity):
    lock_key = f'lock:product:{product_id}'

    # 嘗試獲取鎖(10 秒過期)
    acquired = cache.add(lock_key, 'locked', timeout=10)

    if not acquired:
        return JsonResponse({
            'status': 'busy',
            'message': 'Another user is purchasing, please wait'
        })

    try:
        product = Product.objects.get(id=product_id)

        if product.stock >= quantity:
            product.stock -= quantity
            product.save()
            return JsonResponse({'status': 'success'})

        return JsonResponse({'status': 'insufficient_stock'})

    finally:
        # 釋放鎖
        cache.delete(lock_key)

更安全的實現(防止誤刪)

import uuid
from django_redis import get_redis_connection

def purchase_v6(request, product_id, quantity):
    redis_conn = get_redis_connection('default')
    lock_key = f'lock:product:{product_id}'
    lock_value = str(uuid.uuid4())  # 唯一標識

    # 嘗試獲取鎖
    if redis_conn.set(lock_key, lock_value, nx=True, ex=10):
        try:
            product = Product.objects.get(id=product_id)

            if product.stock >= quantity:
                product.stock -= quantity
                product.save()
                return JsonResponse({'status': 'success'})

            return JsonResponse({'status': 'insufficient_stock'})

        finally:
            # 安全釋放鎖(只刪除自己的鎖)
            lua_script = """
            if redis.call("get", KEYS[1]) == ARGV[1] then
                return redis.call("del", KEYS[1])
            else
                return 0
            end
            """
            redis_conn.eval(lua_script, 1, lock_key, lock_value)

    return JsonResponse({'status': 'busy'})

✅ 解決方案 5:唯一約束

數據庫級約束

# models.py
class User(models.Model):
    # ✅ 使用 unique 約束
    username = models.CharField(max_length=50, unique=True)

def register(request):
    username = request.POST.get('username')

    try:
        # 即使並發,數據庫也會保證唯一性
        user = User.objects.create(username=username)
        return JsonResponse({'user_id': user.id})

    except IntegrityError:
        # 用戶名已存在
        return JsonResponse({'error': 'Username exists'})

複合唯一約束

# models.py
class Vote(models.Model):
    user = models.ForeignKey(User, on_delete=models.CASCADE)
    post = models.ForeignKey(Post, on_delete=models.CASCADE)
    vote_type = models.CharField(max_length=10)  # up/down

    class Meta:
        # ✅ 複合唯一約束:一個用戶對一篇文章只能投一票
        unique_together = ['user', 'post']

def vote(request, post_id):
    try:
        # 並發情況下,只有一個會成功
        vote = Vote.objects.create(
            user=request.user,
            post_id=post_id,
            vote_type='up'
        )
        return JsonResponse({'status': 'success'})

    except IntegrityError:
        # 已經投過票
        return JsonResponse({'error': 'Already voted'})

📊 方案對比

方案性能複雜度適用場景數據庫支持
F() 表達式⭐⭐⭐⭐⭐簡單字段更新所有
SELECT FOR UPDATE⭐⭐⭐⭐⭐複雜業務邏輯PostgreSQL, MySQL
樂觀鎖⭐⭐⭐⭐⭐⭐⭐衝突較少所有
分布式鎖⭐⭐⭐⭐⭐⭐⭐跨服務器Redis
唯一約束⭐⭐⭐⭐⭐唯一性檢查所有

🎯 實戰案例

案例 1:秒殺系統

from django.db import transaction
from django.db.models import F

def flash_sale(request, product_id):
    """秒殺搶購"""

    # 方案 1:F() 表達式(推薦)
    affected = Product.objects.filter(
        id=product_id,
        stock__gt=0
    ).update(stock=F('stock') - 1)

    if affected == 0:
        return JsonResponse({'status': 'sold_out'})

    # 創建訂單
    order = Order.objects.create(
        user=request.user,
        product_id=product_id,
        quantity=1
    )

    return JsonResponse({'status': 'success', 'order_id': order.id})

案例 2:座位預訂

from django.db import transaction

# models.py
class Seat(models.Model):
    row = models.IntegerField()
    number = models.IntegerField()
    is_booked = models.BooleanField(default=False)
    booked_by = models.ForeignKey(User, null=True, on_delete=models.SET_NULL)
    version = models.IntegerField(default=0)

    class Meta:
        unique_together = ['row', 'number']

# views.py
@transaction.atomic
def book_seat(request, seat_id):
    """預訂座位(樂觀鎖)"""

    max_retries = 3

    for attempt in range(max_retries):
        seat = Seat.objects.get(id=seat_id)

        if seat.is_booked:
            return JsonResponse({'error': 'Seat already booked'})

        old_version = seat.version

        # 嘗試預訂
        affected = Seat.objects.filter(
            id=seat_id,
            version=old_version,
            is_booked=False
        ).update(
            is_booked=True,
            booked_by=request.user,
            version=F('version') + 1
        )

        if affected > 0:
            return JsonResponse({'status': 'success'})

        # 重試
        time.sleep(0.01)

    return JsonResponse({'error': 'Booking failed, please retry'})

案例 3:帳戶餘額轉帳

from django.db import transaction

@transaction.atomic
def transfer_money(from_user_id, to_user_id, amount):
    """轉帳(使用悲觀鎖)"""

    # 鎖定兩個帳戶(按 ID 順序,防止死鎖)
    user_ids = sorted([from_user_id, to_user_id])

    users = User.objects.select_for_update().filter(
        id__in=user_ids
    ).order_by('id')

    from_user = users.get(id=from_user_id)
    to_user = users.get(id=to_user_id)

    # 檢查餘額
    if from_user.balance < amount:
        raise ValueError("Insufficient balance")

    # 轉帳
    from_user.balance -= amount
    to_user.balance += amount

    from_user.save()
    to_user.save()

    return True

🔍 如何檢測競態條件?

1. 並發測試

import threading
from django.test import TestCase

class RaceConditionTest(TestCase):
    def test_concurrent_purchase(self):
        """測試並發購買"""

        # 創建商品,庫存 10
        product = Product.objects.create(name='Test', stock=10)

        results = []

        def purchase():
            # 嘗試購買 6 件
            response = self.client.post(f'/purchase/{product.id}/', {'quantity': 6})
            results.append(response.json()['status'])

        # 啟動 3 個線程同時購買
        threads = [threading.Thread(target=purchase) for _ in range(3)]

        for t in threads:
            t.start()

        for t in threads:
            t.join()

        # 檢查結果
        successes = results.count('success')
        self.assertEqual(successes, 1, "應該只有一個購買成功")

        # 檢查庫存
        product.refresh_from_db()
        self.assertEqual(product.stock, 4, "庫存應該是 4")

2. 壓力測試

# 使用 locust 測試
pip install locust

# locustfile.py
from locust import HttpUser, task

class StressTest(HttpUser):
    @task
    def purchase(self):
        self.client.post('/purchase/123/', json={'quantity': 1})

# 運行
locust -f locustfile.py --users 100 --spawn-rate 10

3. 代碼審查

# 檢查這些模式:

# ❌ 讀-改-寫操作
obj = Model.objects.get(id=1)
obj.field += 1
obj.save()

# ❌ 檢查-然後-操作
if Model.objects.filter(...).exists():
    Model.objects.create(...)

# ❌ 沒有事務保護的多步操作
user.balance -= 100
user.save()
merchant.balance += 100
merchant.save()

💡 最佳實踐

1. 優先使用數據庫原子操作

# ✅ 優先級 1:F() 表達式
Product.objects.filter(id=1).update(stock=F('stock') - 1)

# ✅ 優先級 2:唯一約束
User.objects.create(username='alice')  # unique=True

# ✅ 優先級 3:SELECT FOR UPDATE
with transaction.atomic():
    obj = Model.objects.select_for_update().get(id=1)
    obj.field += 1
    obj.save()

2. 避免長時間持有鎖

# ❌ 不好:鎖持有太久
@transaction.atomic
def process(request):
    obj = Model.objects.select_for_update().get(id=1)
    # 執行耗時操作
    time.sleep(5)  # 糟糕!
    obj.save()

# ✅ 好:縮小鎖範圍
def process(request):
    # 耗時操作放在外面
    result = expensive_computation()

    # 只在必要時加鎖
    with transaction.atomic():
        obj = Model.objects.select_for_update().get(id=1)
        obj.field = result
        obj.save()

3. 注意死鎖

# ❌ 可能死鎖
def func1():
    with transaction.atomic():
        a = A.objects.select_for_update().get(id=1)
        b = B.objects.select_for_update().get(id=2)

def func2():
    with transaction.atomic():
        b = B.objects.select_for_update().get(id=2)  # 順序相反!
        a = A.objects.select_for_update().get(id=1)

# ✅ 避免死鎖:固定鎖順序
def lock_in_order(*ids):
    return Model.objects.select_for_update().filter(
        id__in=ids
    ).order_by('id')

💡 面試要點

Q1: 什麼是競態條件?如何避免?

答:

  • 多個線程並發訪問共享資源,結果不可預測
  • 避免方法:原子操作、鎖、事務、版本控制

Q2: 樂觀鎖和悲觀鎖的區別?

答:

  • 悲觀鎖:SELECT FOR UPDATE,假設一定衝突,提前加鎖
  • 樂觀鎖:版本號檢查,假設不衝突,更新時檢測
  • 悲觀鎖性能低但安全,樂觀鎖性能高但需重試

Q3: F() 表達式為什麼是原子的?

答:

  • 生成單條 SQL:UPDATE ... SET field = field + 1
  • 在數據庫層執行,不經過 Python
  • 數據庫保證 UPDATE 的原子性

Q4: 如何選擇並發控制方案?

答:

  1. 簡單字段更新 → F() 表達式
  2. 複雜業務邏輯 → SELECT FOR UPDATE
  3. 衝突較少 → 樂觀鎖
  4. 跨服務器 → 分布式鎖

🔗 下一篇

在下一篇文章中,我們將深入學習 threading.local 原理,了解 Django 如何使用線程本地存儲實現請求隔離。

閱讀時間:10 分鐘

0%