Django 數據庫連接洩漏完全排查指南

從發現問題到根治方案,掌握連接洩漏的診斷與預防技巧

「FATAL: sorry, too many clients already」— 這個錯誤通常意味著發生了連接洩漏。什麼是連接洩漏?如何快速定位洩漏源頭?如何預防?

本文將提供完整的連接洩漏排查流程,從發現症狀到根治問題,並分享生產環境的預防策略。

🚨 什麼是連接洩漏?

定義

連接洩漏(Connection Leak):數據庫連接被創建後沒有正確釋放,導致連接持續佔用,最終耗盡數據庫連接池。

# ❌ 連接洩漏示例
def bad_view(request):
    cursor = connection.cursor()
    cursor.execute('SELECT * FROM users')
    users = cursor.fetchall()

    # ❌ 忘記關閉 cursor
    # ❌ 如果拋出異常,連接永遠不會釋放

    return JsonResponse({'users': users})

# 結果:
# 1. 每次請求創建連接,但不釋放
# 2. 連接數持續增長
# 3. 達到 max_connections 上限
# 4. 所有後續請求失敗

症狀

立即症狀:
  - 請求失敗:too many connections
  - 響應緩慢:等待可用連接
  - 服務不穩定:間歇性錯誤

長期症狀:
  - 數據庫連接數持續增長
  - idle 狀態連接大量累積
  - 應用重啟後才恢復
  - 定期需要手動清理連接

🔍 快速診斷連接洩漏

步驟 1:確認是否發生洩漏

-- PostgreSQL:查看當前連接數
SELECT count(*) FROM pg_stat_activity;

-- 查看各狀態連接數
SELECT state, count(*)
FROM pg_stat_activity
GROUP BY state;

-- 範例輸出(正常):
  state  | count
---------+-------
 active  |     3
 idle    |     5

-- 範例輸出(洩漏):
  state  | count
---------+-------
 active  |     2
 idle    |    95   大量 idle 連接!

-- MySQL:查看連接數
SHOW PROCESSLIST;
SHOW STATUS LIKE 'Threads_connected';

判斷標準:

正常情況:
  - idle 連接數 ≈ Worker 數量
  - 連接數穩定,不持續增長
  - 重啟應用後,idle 連接清零

洩漏情況:
  - idle 連接數 >> Worker 數量
  - 連接數持續增長
  - idle 連接存在時間長(> 10 分鐘)
  - 重啟應用後才減少

步驟 2:找出洩漏的連接

-- PostgreSQL:找出長時間 idle 的連接
SELECT
    pid,
    usename,
    application_name,
    client_addr,
    backend_start,
    state_change,
    NOW() - state_change AS idle_duration,
    state,
    query
FROM pg_stat_activity
WHERE state = 'idle'
  AND NOW() - state_change > interval '10 minutes'
ORDER BY idle_duration DESC;

-- 範例輸出:
 pid  | usename | application_name | idle_duration |  query
------+---------+------------------+---------------+----------
23456 | django  | Django           | 00:35:12      | SELECT...
23457 | django  | Django           | 00:32:45      | SELECT...
23458 | django  | Django           | 00:28:30      | SELECT...
-- ← 洩漏!這些連接應該被釋放了

步驟 3:分析連接來源

-- PostgreSQL:按應用分組統計
SELECT
    application_name,
    state,
    count(*) as connection_count
FROM pg_stat_activity
WHERE usename = 'django'
GROUP BY application_name, state
ORDER BY connection_count DESC;

-- 按客戶端 IP 分組
SELECT
    client_addr,
    count(*) as connection_count
FROM pg_stat_activity
WHERE usename = 'django'
GROUP BY client_addr
ORDER BY connection_count DESC;

-- 範例輸出:
 client_addr  | connection_count
--------------+------------------
 10.0.1.10    |              45   某台服務器洩漏嚴重
 10.0.1.11    |               8
 10.0.1.12    |               7

🐛 常見洩漏模式與修復

模式 1:忘記關閉 cursor

# ❌ 錯誤:沒有關閉 cursor
def bad_view(request):
    cursor = connection.cursor()
    cursor.execute('SELECT * FROM products')
    products = cursor.fetchall()
    return JsonResponse({'products': products})

# ✅ 修復 1:手動關閉
def good_view_1(request):
    cursor = connection.cursor()
    try:
        cursor.execute('SELECT * FROM products')
        products = cursor.fetchall()
        return JsonResponse({'products': products})
    finally:
        cursor.close()

# ✅ 修復 2:使用 context manager
def good_view_2(request):
    with connection.cursor() as cursor:
        cursor.execute('SELECT * FROM products')
        products = cursor.fetchall()
    return JsonResponse({'products': products})

# ✅ 修復 3:使用 ORM(最佳)
def good_view_3(request):
    products = Product.objects.all().values()
    return JsonResponse({'products': list(products)})

模式 2:異常時未釋放連接

# ❌ 錯誤:異常時連接未釋放
def bad_view(request):
    connection = psycopg2.connect(...)
    cursor = connection.cursor()
    cursor.execute('SELECT * FROM users')
    users = cursor.fetchall()

    # ❌ 如果這裡拋出異常,連接永遠不會關閉
    process_users(users)

    cursor.close()
    connection.close()

# ✅ 修復:使用 try-finally
def good_view(request):
    connection = psycopg2.connect(...)
    try:
        cursor = connection.cursor()
        try:
            cursor.execute('SELECT * FROM users')
            users = cursor.fetchall()
            process_users(users)
        finally:
            cursor.close()
    finally:
        connection.close()

# ✅ 更好:使用 context manager
def better_view(request):
    with psycopg2.connect(...) as conn:
        with conn.cursor() as cursor:
            cursor.execute('SELECT * FROM users')
            users = cursor.fetchall()
            process_users(users)

模式 3:多數據庫未正確切換

# ❌ 錯誤:切換數據庫後未切回
def bad_view(request):
    # 使用默認數據庫
    User.objects.using('default').all()

    # 切換到 analytics 數據庫
    from django.db import connections
    connections['analytics'].cursor().execute('SELECT ...')

    # ❌ 忘記切回 default,可能導致連接混亂

# ✅ 修復:明確管理連接
def good_view(request):
    # 使用 default
    User.objects.all()

    # 使用 analytics
    with connections['analytics'].cursor() as cursor:
        cursor.execute('SELECT ...')
        result = cursor.fetchall()

    # 自動恢復

模式 4:Celery 任務中的洩漏

# ❌ 錯誤:Celery 任務未關閉連接
from celery import shared_task

@shared_task
def bad_task():
    cursor = connection.cursor()
    cursor.execute('SELECT * FROM logs')
    logs = cursor.fetchall()
    process_logs(logs)
    # ❌ 沒有關閉連接

# ✅ 修復:確保連接關閉
@shared_task
def good_task():
    try:
        with connection.cursor() as cursor:
            cursor.execute('SELECT * FROM logs')
            logs = cursor.fetchall()
            process_logs(logs)
    finally:
        # Celery 任務結束時關閉連接
        from django.db import connections
        connections.close_all()

模式 5:中間件/信號處理器洩漏

# ❌ 錯誤:信號處理器未釋放連接
from django.db.models.signals import post_save
from django.dispatch import receiver

@receiver(post_save, sender=Order)
def bad_signal_handler(sender, instance, **kwargs):
    cursor = connection.cursor()
    cursor.execute('INSERT INTO audit_log ...')
    # ❌ 未關閉

# ✅ 修復
@receiver(post_save, sender=Order)
def good_signal_handler(sender, instance, **kwargs):
    with connection.cursor() as cursor:
        cursor.execute('INSERT INTO audit_log ...')

🔧 診斷工具與腳本

Django Management Command

創建診斷命令:

# management/commands/check_connections.py
from django.core.management.base import BaseCommand
from django.db import connections

class Command(BaseCommand):
    help = '檢查數據庫連接狀態'

    def handle(self, *args, **options):
        for alias in connections:
            conn = connections[alias]

            self.stdout.write(f'\n=== 數據庫: {alias} ===')
            self.stdout.write(f'連接對象: {conn.connection}')

            if conn.connection:
                self.stdout.write(self.style.WARNING('連接已打開'))

                # PostgreSQL 特定
                if 'postgresql' in conn.settings_dict['ENGINE']:
                    with conn.cursor() as cursor:
                        # 查看當前連接
                        cursor.execute("""
                            SELECT count(*) FROM pg_stat_activity
                            WHERE usename = %s
                        """, [conn.settings_dict['USER']])
                        count = cursor.fetchone()[0]
                        self.stdout.write(f'總連接數: {count}')

                        # idle 連接
                        cursor.execute("""
                            SELECT count(*) FROM pg_stat_activity
                            WHERE usename = %s AND state = 'idle'
                        """, [conn.settings_dict['USER']])
                        idle_count = cursor.fetchone()[0]
                        self.stdout.write(f'idle 連接數: {idle_count}')

                        if idle_count > 10:
                            self.stdout.write(
                                self.style.ERROR('⚠️  idle 連接過多,可能洩漏!')
                            )
            else:
                self.stdout.write(self.style.SUCCESS('連接未打開'))

使用:

python manage.py check_connections

PostgreSQL 監控腳本

#!/bin/bash
# check_pg_connections.sh

echo "=== PostgreSQL 連接狀態 ==="

psql -U postgres -d mydb -c "
SELECT
    count(*) as total,
    count(*) FILTER (WHERE state = 'active') as active,
    count(*) FILTER (WHERE state = 'idle') as idle,
    count(*) FILTER (WHERE state = 'idle in transaction') as idle_in_transaction
FROM pg_stat_activity
WHERE usename = 'django';
"

echo ""
echo "=== 長時間 idle 連接(> 5 分鐘)==="

psql -U postgres -d mydb -c "
SELECT
    pid,
    NOW() - state_change AS duration,
    query
FROM pg_stat_activity
WHERE state = 'idle'
  AND usename = 'django'
  AND NOW() - state_change > interval '5 minutes'
ORDER BY duration DESC
LIMIT 10;
"

自動化監控

# 使用 Prometheus + Django Prometheus
# requirements.txt
django-prometheus

# settings.py
INSTALLED_APPS = [
    'django_prometheus',
    ...
]

MIDDLEWARE = [
    'django_prometheus.middleware.PrometheusBeforeMiddleware',
    ...
    'django_prometheus.middleware.PrometheusAfterMiddleware',
]

# urls.py
urlpatterns = [
    path('', include('django_prometheus.urls')),
]

# 訪問 /metrics 查看指標
# 包含數據庫連接池狀態

🛡️ 預防措施

1. 使用 ORM 而非原生 SQL

# ❌ 容易洩漏
cursor = connection.cursor()
cursor.execute('SELECT * FROM users')
users = cursor.fetchall()
cursor.close()  # 容易忘記

# ✅ ORM 自動管理
users = User.objects.all()  # Django 自動管理連接

2. 配置連接超時

# settings.py
DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.postgresql',
        'OPTIONS': {
            # 連接超時
            'connect_timeout': 10,

            # 語句超時(PostgreSQL)
            'options': '-c statement_timeout=30000',  # 30 秒

            # 空閒連接超時(需要數據庫支持)
            'idle_in_transaction_session_timeout': 60000,  # 60 秒
        },
    }
}

3. 使用連接池

# django-db-connection-pool
DATABASES = {
    'default': {
        'ENGINE': 'dj_db_conn_pool.backends.postgresql',
        'POOL_OPTIONS': {
            'POOL_SIZE': 10,
            'MAX_OVERFLOW': 5,

            # ✅ 關鍵:定期回收連接
            'POOL_RECYCLE': 3600,  # 1 小時回收

            # ✅ 關鍵:借用前測試
            'POOL_PRE_PING': True,  # ping 測試連接
        }
    }
}

4. 添加連接監控

# middleware/connection_monitor.py
from django.db import connections
import logging

logger = logging.getLogger(__name__)

class ConnectionMonitorMiddleware:
    def __init__(self, get_response):
        self.get_response = get_response

    def __call__(self, request):
        # 請求前:記錄連接狀態
        before = {}
        for alias in connections:
            before[alias] = connections[alias].connection is not None

        response = self.get_response(request)

        # 請求後:檢查連接是否洩漏
        for alias in connections:
            after = connections[alias].connection is not None

            if not before[alias] and after:
                # 新建了連接但未關閉
                logger.warning(
                    f'連接可能洩漏: {request.path} 使用數據庫 {alias}'
                )

        return response

# settings.py
MIDDLEWARE = [
    'middleware.connection_monitor.ConnectionMonitorMiddleware',
    ...
]

5. Celery 任務後關閉連接

# celery.py
from celery import Celery
from celery.signals import task_postrun

app = Celery('myproject')

@task_postrun.connect
def close_db_connections(**kwargs):
    """任務完成後關閉所有數據庫連接"""
    from django.db import connections
    connections.close_all()

6. 定期清理連接(臨時方案)

-- PostgreSQL:手動終止長時間 idle 連接
SELECT pg_terminate_backend(pid)
FROM pg_stat_activity
WHERE state = 'idle'
  AND NOW() - state_change > interval '30 minutes'
  AND usename = 'django';

-- MySQL:終止連接
KILL [CONNECTION] thread_id;

自動化(cron):

# /etc/cron.d/cleanup_idle_connections
*/10 * * * * postgres psql -d mydb -c "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE state = 'idle' AND NOW() - state_change > interval '30 minutes' AND usename = 'django';"

📊 案例研究

案例 1:Celery 任務導致洩漏

症狀:

現象:運行 Celery 任務後,數據庫 idle 連接持續增長
連接數:從 10 增長到 100+
復現:每執行 10 個任務,增加 10 個 idle 連接

診斷:

# 發現問題代碼
@shared_task
def export_report(user_id):
    user = User.objects.get(id=user_id)
    data = generate_report(user)

    # ❌ 連接未關閉
    # Celery worker 不會自動關閉連接

修復:

from celery.signals import task_postrun

@task_postrun.connect
def close_connections(**kwargs):
    from django.db import connections
    connections.close_all()

案例 2:中間件異常處理

症狀:

現象:偶爾出現連接洩漏
頻率:每天 2-3 次
特點:與請求失敗相關

診斷:

# 問題中間件
class AnalyticsMiddleware:
    def __call__(self, request):
        cursor = connection.cursor()
        cursor.execute('INSERT INTO page_views ...')

        response = self.get_response(request)

        cursor.close()  # ❌ 如果 get_response 拋異常,不會執行
        return response

修復:

class AnalyticsMiddleware:
    def __call__(self, request):
        try:
            with connection.cursor() as cursor:
                cursor.execute('INSERT INTO page_views ...')
        except Exception as e:
            logger.error(f'Analytics 失敗: {e}')

        return self.get_response(request)

🎯 排查流程總結

1. 確認洩漏:
   - [ ] 查看數據庫連接數
   - [ ] 統計 idle 連接數量
   - [ ] 檢查連接持續時間

2. 定位來源:
   - [ ] 按應用分組統計
   - [ ] 按 IP 分組統計
   - [ ] 查看最後執行的 SQL

3. 代碼審查:
   - [ ] 搜索 connection.cursor()
   - [ ] 檢查是否使用 try-finally
   - [ ] 審查 Celery 任務
   - [ ] 檢查中間件/信號處理器

4. 應用修復:
   - [ ] 使用 context manager
   - [ ] 添加 finally 塊
   - [ ] 配置連接回收
   - [ ] 添加監控告警

5. 預防措施:
   - [ ] 優先使用 ORM
   - [ ] 配置 POOL_RECYCLE
   - [ ] 啟用 POOL_PRE_PING
   - [ ] 添加連接監控
   - [ ] 定期審查代碼

📌 最佳實踐

✅ 推薦做法:
1. 優先使用 Django ORM,避免原生 SQL
2. 原生 SQL 必須使用 context manager
3. 配置連接池回收(POOL_RECYCLE = 3600)
4. 啟用連接健康檢查(POOL_PRE_PING = True)
5. Celery 任務後關閉連接
6. 添加連接數監控和告警
7. 定期審查代碼,搜索 cursor()

❌ 避免做法:
1. 手動管理連接而不使用 try-finally
2. 忘記關閉 cursor
3. 異常處理不當導致連接未釋放
4. 沒有連接數監控
5. CONN_MAX_AGE = None 且無連接池

掌握連接洩漏的排查與預防,是保障 Django 應用穩定性的關鍵技能。記住:預防永遠比治療容易

至此,Django 連接池系列文章完結!

0%