10-2. MQTT 協定詳解

深入理解 MQTT 的發布訂閱模式、QoS 機制與實戰應用

🚀 MQTT 協定詳解

MQTT 在網路模型中的位置

┌──────────────────────────────────────────────────────────┐
│            OSI 七層模型          TCP/IP 四層模型          │
├──────────────────────────────────────────────────────────┤
│  7. 應用層 (Application)                                 │
│     ├─ MQTT ───────────────┐    應用層 (Application)     │
│                             │    (MQTT, HTTP, STOMP...)   │
├─────────────────────────────┤                             │
│  6. 表現層 (Presentation)   │                             │
├─────────────────────────────┤                             │
│  5. 會話層 (Session)        │                             │
├─────────────────────────────┼─────────────────────────────┤
│  4. 傳輸層 (Transport)      │    傳輸層 (Transport)       │
│     └─ TCP ─────────────────┘    (TCP)                    │
├─────────────────────────────┼─────────────────────────────┤
│  3. 網路層 (Network)        │    網際網路層 (Internet)    │
│     └─ IP                   │    (IP, ICMP, ARP)          │
├─────────────────────────────┼─────────────────────────────┤
│  2. 資料連結層 (Data Link)  │    網路存取層               │
│  1. 實體層 (Physical)       │    (Network Access)         │
└─────────────────────────────┴─────────────────────────────┘

📍 位置:OSI Layer 7(應用層)/ TCP/IP Layer 4(應用層)
🔌 Port:1883(未加密)/ 8883(MQTT over SSL/TLS)
🚛 傳輸協定:TCP

為什麼 MQTT 用 TCP?

原因說明
可靠傳輸 🎯IoT 裝置的指令不能遺失(開燈指令不能漏掉)
連線保持 🔗MQTT 使用長連接(Persistent Connection),減少重複握手
QoS 機制需要 TCP 的順序保證和確認機制來實現 QoS 1/2
低頻寬友善 💾TCP 雖有額外開銷,但 MQTT Header 只有 2 bytes(超輕量)

💡 特殊情況:MQTT-SN (MQTT for Sensor Networks) 可用 UDP,專為更低功耗場景設計


🎯 什麼是 MQTT?

💡 比喻:訂閱制雜誌服務

你訂閱了「科技雜誌」,出版社(Broker)會自動把新刊物送到你家
你不需要每天打電話問:「有新的雜誌嗎?」
其他訂閱同樣雜誌的人,也會同時收到

Publisher(出版社)→ Broker(郵局)→ Subscriber(訂閱者)

MQTT(Message Queuing Telemetry Transport) 是一種輕量級的發布/訂閱式訊息傳輸協定,專為低頻寬、高延遲、不可靠網路環境設計。

為什麼需要 MQTT?

問題場景: 假設你有 1000 個溫度感測器,每分鐘回報一次數據…

HTTP 輪詢(Polling)

# 每個感測器每分鐘發送 HTTP 請求
while True:
    response = requests.post('http://server.com/temperature', data={'temp': 25.5})
    time.sleep(60)

# 問題:
# 1. 每次都要建立 TCP 連線(三次握手)
# 2. HTTP Header 很大(幾百 bytes)
# 3. 浪費頻寬和電力
# 4. 伺服器需要同時處理 1000 個連線

MQTT

# 感測器連線一次,之後只傳輸數據
client.connect("mqtt.server.com", 1883)
while True:
    client.publish("sensor/temp", "25.5")  # 只傳輸 7 bytes!
    time.sleep(60)

# 優點:
# 1. 保持長連線(TCP keep-alive)
# 2. 極小的封包(header 只需 2 bytes)
# 3. 省電省流量(適合電池供電裝置)
# 4. Broker 統一管理

🏗️ MQTT 架構

核心組件

┌─────────────┐
│  Publisher  │ ──publish──> ┌────────┐ <──subscribe── ┌────────────┐
│ (發布者)     │              │ Broker │                │ Subscriber │
└─────────────┘              │(中介者)│                │  (訂閱者)   │
                             └────────┘                └────────────┘
                                  ↓
                          儲存 & 轉發訊息

1️⃣ Broker(訊息中介者)

💡 比喻:郵局總部
所有的信都先送到郵局,郵局再根據地址分發給訂戶

功能:

  • 接收發布者的訊息
  • 管理訂閱者的訂閱清單
  • 轉發訊息給訂閱者
  • 儲存離線訊息(Retained Messages)
  • 處理客戶端斷線(Last Will)

常見 Broker 軟體:

  • Mosquitto:開源、輕量、易於安裝
  • EMQX:支援高併發(百萬級連線)
  • HiveMQ:商業版,提供叢集和管理介面
  • AWS IoT Core:雲端託管服務

安裝 Mosquitto:

# Ubuntu/Debian
sudo apt-get install mosquitto mosquitto-clients

# macOS
brew install mosquitto

# 啟動 Broker
mosquitto -v  # -v 顯示詳細日誌

2️⃣ Publisher(發布者)

import paho.mqtt.client as mqtt

# 建立客戶端
client = mqtt.Client(client_id="temperature_sensor_001")

# 連線到 Broker
client.connect("mqtt.example.com", 1883, 60)

# 發布訊息
client.publish(
    topic="home/living-room/temperature",
    payload="25.5",
    qos=1,
    retain=True
)

client.disconnect()

3️⃣ Subscriber(訂閱者)

import paho.mqtt.client as mqtt

def on_connect(client, userdata, flags, rc):
    print(f"連線結果:{rc}")
    # 訂閱主題
    client.subscribe("home/+/temperature", qos=1)  # + 是萬用字元

def on_message(client, userdata, msg):
    print(f"收到訊息:{msg.topic} = {msg.payload.decode()}")

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

client.connect("mqtt.example.com", 1883, 60)
client.loop_forever()  # 持續監聽

📂 Topic(主題)與 Wildcards(萬用字元)

Topic 命名規則

💡 比喻:檔案系統路徑
home/living-room/temperature  就像  /home/user/documents/file.txt
使用 / 來分層,方便組織和訂閱

範例:

home/living-room/temperature
home/living-room/humidity
home/bedroom/temperature
home/bedroom/light/status

Wildcards(萬用字元)

1. +(單層萬用字元)

# 訂閱所有房間的溫度
client.subscribe("home/+/temperature")

# 會收到:
# home/living-room/temperature ✅
# home/bedroom/temperature ✅
# home/kitchen/temperature ✅

# 不會收到:
# home/bedroom/light/status ❌(超過一層)

2. #(多層萬用字元)

# 訂閱 home 下的所有訊息
client.subscribe("home/#")

# 會收到:
# home/living-room/temperature ✅
# home/bedroom/light/status ✅
# home/kitchen/fridge/door/open ✅(多層也可以)

# 注意:# 必須是最後一個字元
client.subscribe("home/#/temperature")  # ❌ 錯誤!

實際應用範例

# 智慧家居系統

# 訂閱所有感測器數據
client.subscribe("home/+/sensor/#")

# 訂閱所有告警
client.subscribe("home/+/alert")

# 訂閱特定房間的所有裝置
client.subscribe("home/living-room/#")

🎚️ QoS(服務品質等級)

💡 比喻:不同的快遞服務

QoS 0:平信(寄了就算,可能遺失)
QoS 1:掛號信(保證送達,但可能重複收到)
QoS 2:保價快遞(保證送達且只有一份,最可靠但最慢)

QoS 0:At most once(最多一次)

Publisher → Broker → Subscriber

發送 ──────────────> 收到(可能遺失)

特性:

  • ❌ 不確認、不重傳
  • ⚡ 最快、最省頻寬
  • 📉 可能遺失訊息

適用場景:

# 溫度監控(遺失一筆沒關係,下次會再傳)
client.publish("sensor/temp", "25.5", qos=0)

# 即時位置(GPS 座標每秒更新,舊資料無意義)
client.publish("gps/location", "25.033,121.565", qos=0)

QoS 1:At least once(至少一次)

Publisher → Broker → Subscriber
    ↓         ↓
  PUBLISH   PUBACK(確認)
    ↓
  重傳(如果沒收到 PUBACK)

特性:

  • ✅ 保證送達
  • ⚠️ 可能重複收到
  • 🔄 需要確認機制(PUBACK)

適用場景:

# 告警通知(不能遺失,重複可以去重)
client.publish("alert/fire", "Kitchen fire detected!", qos=1)

# 使用者操作(開燈、關門)
client.publish("home/light/living-room", "ON", qos=1)

處理重複訊息:

received_messages = set()  # 用於去重

def on_message(client, userdata, msg):
    message_id = msg.mid  # Message ID
    if message_id in received_messages:
        print("重複訊息,忽略")
        return

    received_messages.add(message_id)
    print(f"處理訊息:{msg.payload.decode()}")

QoS 2:Exactly once(恰好一次)

Publisher → Broker → Subscriber

PUBLISH ────────>
        <──── PUBREC(收到)
PUBREL ────────>(釋放)
        <──── PUBCOMP(完成)

特性:

  • ✅✅ 保證送達且不重複
  • 🐢 最慢(四次握手)
  • 💾 需要儲存狀態

適用場景:

# 金流交易(絕對不能重複扣款)
client.publish("payment/debit", json.dumps({
    'user_id': 'user_001',
    'amount': 1000
}), qos=2)

# 庫存扣減(不能重複扣庫存)
client.publish("inventory/reduce", json.dumps({
    'product_id': 'P001',
    'quantity': 1
}), qos=2)

QoS 等級比較

QoS訊息保證速度頻寬適用場景
0可能遺失最快 ⚡最省感測器數據、GPS
1至少一次中等中等告警、控制指令
2恰好一次最慢 🐢最多金流、庫存

記憶口訣:「零忘一重二恰好」


🔖 Retained Message(保留訊息)

💡 比喻:布告欄上的公告
新同學加入群組時,可以直接看到布告欄上的最新公告
不用等到下次有人發言

功能: Broker 會儲存每個主題的最後一則 Retained 訊息,新訂閱者連線時會立即收到。

範例:

# 發布者:發布裝置狀態(保留訊息)
client.publish("device/status", "online", retain=True)

# 當裝置離線時
client.publish("device/status", "offline", retain=True)
# 訂閱者:剛連線就能知道裝置狀態
def on_connect(client, userdata, flags, rc):
    client.subscribe("device/status")

def on_message(client, userdata, msg):
    print(f"裝置狀態:{msg.payload.decode()}")  # 立即收到 "offline"

應用場景:

  • 裝置狀態(online/offline)
  • 最新配置(溫度設定值)
  • 儀表板初始值(當前溫度、濕度)

清除 Retained 訊息:

# 發布空訊息來清除
client.publish("device/status", None, retain=True)

⚰️ Last Will and Testament(遺囑訊息)

💡 比喻:遺囑
如果我意外死亡(斷線),請幫我告訴家人(訂閱者)
「我不在了」這個消息

功能: 當客戶端異常斷線時(沒有正常發送 DISCONNECT),Broker 會自動發送預先設定的遺囑訊息。

設定遺囑:

import paho.mqtt.client as mqtt

client = mqtt.Client()

# 設定遺囑訊息(連線前設定)
client.will_set(
    topic="device/sensor_001/status",
    payload="offline",
    qos=1,
    retain=True
)

client.connect("mqtt.example.com", 1883, 60)

# 正常運行...
client.publish("sensor/temp", "25.5")

# 如果異常斷線(網路中斷、程式崩潰)
# Broker 會自動發送:device/sensor_001/status = "offline"

監控裝置狀態:

# 監控系統訂閱所有裝置狀態
def on_message(client, userdata, msg):
    if msg.topic.endswith("/status"):
        device_id = msg.topic.split('/')[1]
        status = msg.payload.decode()

        if status == "offline":
            print(f"⚠️ 裝置 {device_id} 已斷線!")
            # 發送告警通知
            send_alert(f"Device {device_id} is offline")

client.subscribe("device/+/status")
client.on_message = on_message
client.loop_forever()

完整生命週期範例:

import paho.mqtt.client as mqtt
import time

def on_connect(client, userdata, flags, rc):
    print("已連線")
    # 連線成功後發送 online 狀態
    client.publish("device/sensor_001/status", "online", retain=True)

client = mqtt.Client()
client.on_connect = on_connect

# 設定遺囑:如果異常斷線,自動發送 offline
client.will_set("device/sensor_001/status", "offline", qos=1, retain=True)

client.connect("mqtt.example.com", 1883, 60)
client.loop_start()

try:
    while True:
        # 發送感測器數據
        client.publish("sensor/temp", "25.5")
        time.sleep(60)
except KeyboardInterrupt:
    # 正常退出時,手動發送 offline
    client.publish("device/sensor_001/status", "offline", retain=True)
    client.disconnect()

🔐 MQTT 安全性

1️⃣ 傳輸加密(TLS/SSL)

import paho.mqtt.client as mqtt
import ssl

client = mqtt.Client()

# 設定 TLS/SSL
client.tls_set(
    ca_certs="ca.crt",            # CA 憑證
    certfile="client.crt",         # 客戶端憑證
    keyfile="client.key",          # 客戶端私鑰
    tls_version=ssl.PROTOCOL_TLSv1_2
)

# 連線到加密端口(通常是 8883)
client.connect("mqtt.example.com", 8883, 60)

2️⃣ 身份驗證

# 使用者名稱 + 密碼
client.username_pw_set("username", "password")
client.connect("mqtt.example.com", 1883, 60)

Broker 端設定(Mosquitto):

# 建立密碼檔
mosquitto_passwd -c /etc/mosquitto/passwd username

# mosquitto.conf
allow_anonymous false
password_file /etc/mosquitto/passwd

3️⃣ 主題存取控制(ACL)

# /etc/mosquitto/acl
# user1 只能發布溫度數據
user user1
topic write sensor/temperature

# user2 只能訂閱數據
user user2
topic read sensor/#

🛠️ 完整實作範例

智慧家居系統

溫度感測器(Publisher):

import paho.mqtt.client as mqtt
import time
import random

class TemperatureSensor:
    def __init__(self, sensor_id, room):
        self.sensor_id = sensor_id
        self.room = room
        self.client = mqtt.Client(client_id=f"sensor_{sensor_id}")

        # 設定遺囑
        self.client.will_set(
            f"home/{room}/sensor/status",
            "offline",
            qos=1,
            retain=True
        )

    def on_connect(self, client, userdata, flags, rc):
        print(f"感測器 {self.sensor_id} 已連線")
        # 發送上線狀態
        self.client.publish(
            f"home/{self.room}/sensor/status",
            "online",
            retain=True
        )

    def start(self):
        self.client.on_connect = self.on_connect
        self.client.connect("mqtt.example.com", 1883, 60)
        self.client.loop_start()

        try:
            while True:
                # 模擬讀取溫度
                temp = round(20 + random.uniform(-5, 5), 1)

                # 發布溫度數據
                self.client.publish(
                    f"home/{self.room}/temperature",
                    str(temp),
                    qos=1
                )

                print(f"[{self.room}] 溫度:{temp}°C")

                # 檢查告警
                if temp > 30:
                    self.client.publish(
                        f"home/{self.room}/alert",
                        f"高溫警告!{temp}°C",
                        qos=1
                    )

                time.sleep(10)

        except KeyboardInterrupt:
            self.client.publish(
                f"home/{self.room}/sensor/status",
                "offline",
                retain=True
            )
            self.client.disconnect()

# 啟動感測器
sensor = TemperatureSensor(sensor_id="001", room="living-room")
sensor.start()

監控系統(Subscriber):

import paho.mqtt.client as mqtt
import json
from datetime import datetime

class HomeMonitor:
    def __init__(self):
        self.client = mqtt.Client(client_id="home_monitor")
        self.temperature_data = {}

    def on_connect(self, client, userdata, flags, rc):
        print("監控系統已連線")

        # 訂閱所有房間的溫度
        self.client.subscribe("home/+/temperature", qos=1)

        # 訂閱所有告警
        self.client.subscribe("home/+/alert", qos=1)

        # 訂閱感測器狀態
        self.client.subscribe("home/+/sensor/status", qos=1)

    def on_message(self, client, userdata, msg):
        topic_parts = msg.topic.split('/')
        room = topic_parts[1]

        if msg.topic.endswith("/temperature"):
            temp = float(msg.payload.decode())
            self.temperature_data[room] = {
                'value': temp,
                'timestamp': datetime.now().isoformat()
            }
            print(f"📊 [{room}] 溫度:{temp}°C")

        elif msg.topic.endswith("/alert"):
            alert_msg = msg.payload.decode()
            print(f"🚨 [{room}] 告警:{alert_msg}")
            self.send_notification(room, alert_msg)

        elif msg.topic.endswith("/status"):
            status = msg.payload.decode()
            if status == "offline":
                print(f"⚠️ [{room}] 感測器離線!")
                self.send_notification(room, "感測器離線")
            else:
                print(f"✅ [{room}] 感測器上線")

    def send_notification(self, room, message):
        # 發送通知(Email、簡訊、LINE 等)
        print(f"📧 發送通知:{room} - {message}")

    def get_summary(self):
        print("\n=== 溫度摘要 ===")
        for room, data in self.temperature_data.items():
            print(f"{room}: {data['value']}°C ({data['timestamp']})")

    def start(self):
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message

        self.client.connect("mqtt.example.com", 1883, 60)
        self.client.loop_forever()

# 啟動監控
monitor = HomeMonitor()
monitor.start()

🎓 常見面試題

Q1:MQTT 為什麼比 HTTP 更適合 IoT?

答案:

特性HTTPMQTT
連線模式短連線(每次請求建立)長連線(保持連線)
Header 大小幾百 bytes2 bytes
方向單向(請求-回應)雙向(發布-訂閱)
頻寬消耗極低
電力消耗高(頻繁連線)低(保持連線)
即時性需要輪詢即時推送

實際數據比較:

發送 "25.5" 這個溫度數據:

HTTP POST:
  TCP 握手:3 個封包
  HTTP Header:~200 bytes
  總共:~500 bytes

MQTT Publish:
  已連線狀態
  MQTT Header:2 bytes
  總共:6 bytes(省 98.8% 流量!)

記憶技巧:「MQTT = 省省省(省電、省流量、省資源)」


Q2:QoS 1 和 QoS 2 的主要差異?

答案:

QoS 1:

Publisher → Broker
  PUBLISH ──>
  <── PUBACK(確認)

問題:如果 PUBACK 遺失,Publisher 會重傳
      → Subscriber 可能收到兩次

QoS 2:

Publisher → Broker
  PUBLISH ──>
  <── PUBREC(已收到)
  PUBREL ──>(可以釋放了)
  <── PUBCOMP(完成)

保證:四次握手確保 Broker 只轉發一次給 Subscriber

選擇建議:

  • QoS 1:可容忍重複(用 message ID 去重)→ 告警通知
  • QoS 2:絕對不能重複 → 金流、庫存

程式碼範例:

# QoS 1:需要處理重複
received_ids = set()

def on_message_qos1(client, userdata, msg):
    if msg.mid in received_ids:
        return  # 重複訊息,忽略
    received_ids.add(msg.mid)
    process_alert(msg.payload)

# QoS 2:不需要去重(協定保證)
def on_message_qos2(client, userdata, msg):
    deduct_payment(msg.payload)  # 直接處理,不會重複

Q3:什麼是 Clean Session?

答案:

Clean Session 決定客戶端斷線後,Broker 是否保留訂閱資訊和未送達的訊息。

# Clean Session = True(預設)
client = mqtt.Client(clean_session=True)
# 斷線後:訂閱清除、離線訊息丟棄
# Clean Session = False
client = mqtt.Client(client_id="unique_id", clean_session=False)
# 斷線後:保留訂閱、保留 QoS 1/2 訊息

比喻:

Clean Session = True:旅館(退房後不保留物品)
Clean Session = False:自己的家(離開後東西還在)

應用場景:

# 場景 1:即時數據(不需要離線訊息)
client = mqtt.Client(clean_session=True)
client.subscribe("sensor/temperature")
# 斷線期間的溫度數據不重要(過時了)

# 場景 2:重要通知(需要離線訊息)
client = mqtt.Client(client_id="notification_service", clean_session=False)
client.subscribe("alerts/#", qos=1)
# 斷線期間的告警訊息都要收到

重連後的行為:

# Clean Session = False 的客戶端重連後
def on_connect(client, userdata, flags, rc):
    if flags['session_present']:
        print("恢復舊 session,有離線訊息")
        # 不需要重新訂閱,Broker 會立即推送離線訊息
    else:
        print("新 session,需要重新訂閱")
        client.subscribe("topic")

Q4:如何實作 MQTT 心跳機制?

答案:

MQTT 內建 Keep Alive 機制,不需要手動實作心跳。

原理:

client.connect("mqtt.example.com", 1883, keepalive=60)
# keepalive=60:每 60 秒沒有訊息就發送 PINGREQ

客戶端  Broker
  60 秒內沒有 PUBLISH
  PINGREQ ──>我還活著嗎?)
  <── PINGRESP是的連線正常

如果 Broker 沒有回應:

def on_disconnect(client, userdata, rc):
    if rc != 0:
        print("非預期斷線,嘗試重連...")
        while True:
            try:
                client.reconnect()
                break
            except:
                time.sleep(5)

自訂心跳(應用層):

import threading

def heartbeat():
    while True:
        client.publish("device/heartbeat", json.dumps({
            'device_id': 'sensor_001',
            'timestamp': datetime.now().isoformat(),
            'cpu': psutil.cpu_percent(),
            'memory': psutil.virtual_memory().percent
        }))
        time.sleep(30)

# 在背景執行緒中執行
threading.Thread(target=heartbeat, daemon=True).start()

Q5:MQTT 如何處理大量訊息?

答案:

多種策略組合:

1. 訊息限流(Rate Limiting):

import time

class RateLimiter:
    def __init__(self, max_rate):
        self.max_rate = max_rate  # 每秒最多幾則
        self.last_sent = 0

    def publish(self, client, topic, payload):
        now = time.time()
        if now - self.last_sent < 1 / self.max_rate:
            time.sleep(1 / self.max_rate - (now - self.last_sent))

        client.publish(topic, payload)
        self.last_sent = time.time()

# 使用:每秒最多 10 則訊息
limiter = RateLimiter(max_rate=10)
limiter.publish(client, "topic", "data")

2. 訊息批次(Batching):

import json

class MessageBatcher:
    def __init__(self, batch_size=10, batch_interval=5):
        self.batch = []
        self.batch_size = batch_size
        self.batch_interval = batch_interval
        self.last_sent = time.time()

    def add(self, client, topic, data):
        self.batch.append(data)

        # 達到批次大小或時間間隔,發送
        if len(self.batch) >= self.batch_size or \
           time.time() - self.last_sent >= self.batch_interval:
            self.flush(client, topic)

    def flush(self, client, topic):
        if self.batch:
            client.publish(topic, json.dumps(self.batch))
            self.batch = []
            self.last_sent = time.time()

# 使用
batcher = MessageBatcher()
for i in range(100):
    batcher.add(client, "sensor/batch", {'temp': 25.5, 'id': i})

3. 壓縮:

import zlib
import json

def publish_compressed(client, topic, data):
    # 序列化並壓縮
    json_data = json.dumps(data)
    compressed = zlib.compress(json_data.encode())

    client.publish(topic, compressed)

def on_message_decompress(client, userdata, msg):
    # 解壓縮
    decompressed = zlib.decompress(msg.payload)
    data = json.loads(decompressed.decode())
    print(data)

4. Broker 擴展(叢集):

使用 EMQX 或 HiveMQ 叢集版
支援水平擴展,處理百萬級連線

Client A ──┐
Client B ──┼──> Load Balancer ──┬──> Broker 1
Client C ──┘                     ├──> Broker 2
                                 └──> Broker 3

📝 總結

MQTT 是物聯網世界的「輕量級快遞」:

  • 輕量:2 bytes header,省電省流量
  • 可靠:QoS 0/1/2 三種等級,按需選擇
  • 靈活:Pub/Sub 模式,訂閱者隨時加入
  • 智慧:Retained Message 保留最新狀態,Last Will 處理異常

記憶口訣:

  • 「輕(輕量)、可(可靠)、靈(靈活)、智(智慧)」

適用場景:

  • 🏠 智慧家居(感測器、控制器)
  • 🚗 車聯網(即時位置、車況)
  • 🏭 工業 4.0(設備監控)
  • 📱 行動 App(推送通知)

🔗 延伸閱讀

0%