05-2. asyncio 進階

⏱️ 閱讀時間: 18 分鐘 🎯 難度: ⭐⭐⭐⭐ (高階)


🤔 一句話解釋

進階的 asyncio 技巧包括任務管理、錯誤處理、同步原語等,讓你寫出更健壯的非同步程式。


🔧 事件迴圈

事件迴圈基礎

import asyncio

# 取得事件迴圈
loop = asyncio.get_event_loop()

# 新版本推薦用 asyncio.run()
asyncio.run(main())

# 或者在非同步函數中取得
async def example():
    loop = asyncio.get_running_loop()
    print(f"當前迴圈: {loop}")

自訂事件迴圈

import asyncio

async def main():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

# 使用自訂的事件迴圈設定
if __name__ == "__main__":
    # 設定 debug 模式
    asyncio.run(main(), debug=True)

📋 任務管理

Task 物件

import asyncio

async def long_task(name: str):
    print(f"{name} 開始")
    await asyncio.sleep(2)
    print(f"{name} 完成")
    return f"{name} 結果"

async def main():
    # 建立任務
    task = asyncio.create_task(long_task("Task A"))

    # 檢查任務狀態
    print(f"任務完成: {task.done()}")  # False

    # 等待任務
    result = await task

    print(f"任務完成: {task.done()}")  # True
    print(f"結果: {result}")

asyncio.run(main())

任務群組(Python 3.11+)

import asyncio

async def task(name: str):
    await asyncio.sleep(1)
    if name == "C":
        raise ValueError(f"{name} 失敗")
    return f"{name} 完成"

async def main():
    # TaskGroup 會等待所有任務,並收集所有異常
    try:
        async with asyncio.TaskGroup() as tg:
            task_a = tg.create_task(task("A"))
            task_b = tg.create_task(task("B"))
            task_c = tg.create_task(task("C"))
    except* ValueError as eg:
        print(f"捕獲異常: {eg.exceptions}")

asyncio.run(main())

任務取消

import asyncio

async def cancellable_task():
    try:
        while True:
            print("執行中...")
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print("收到取消信號")
        # 清理資源
        print("清理完成")
        raise  # 重新拋出

async def main():
    task = asyncio.create_task(cancellable_task())

    await asyncio.sleep(3)

    # 取消任務
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("任務已取消")

asyncio.run(main())

⏱️ 超時處理

wait_for

import asyncio

async def slow_operation():
    await asyncio.sleep(10)
    return "完成"

async def main():
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=3.0)
        print(result)
    except asyncio.TimeoutError:
        print("操作超時")

asyncio.run(main())

timeout(Python 3.11+)

import asyncio

async def main():
    async with asyncio.timeout(3.0):
        await asyncio.sleep(10)  # 會被取消

asyncio.run(main())

自訂超時處理

import asyncio

async def with_timeout(coro, timeout: float, default=None):
    """帶有預設值的超時處理"""
    try:
        return await asyncio.wait_for(coro, timeout=timeout)
    except asyncio.TimeoutError:
        return default

async def main():
    result = await with_timeout(
        asyncio.sleep(10),
        timeout=1.0,
        default="超時預設值"
    )
    print(result)  # "超時預設值"

asyncio.run(main())

🔒 同步原語

Lock(互斥鎖)

import asyncio

shared_resource = []
lock = asyncio.Lock()

async def safe_append(value):
    """使用鎖保護共享資源"""
    async with lock:
        print(f"取得鎖,添加 {value}")
        shared_resource.append(value)
        await asyncio.sleep(0.1)
        print(f"釋放鎖,{value} 已添加")

async def main():
    await asyncio.gather(
        safe_append(1),
        safe_append(2),
        safe_append(3),
    )
    print(f"結果: {shared_resource}")

asyncio.run(main())

Semaphore(信號量)

import asyncio

# 限制同時執行的任務數量
semaphore = asyncio.Semaphore(3)

async def limited_task(name: str):
    async with semaphore:
        print(f"{name} 開始")
        await asyncio.sleep(2)
        print(f"{name} 完成")

async def main():
    # 同時啟動 10 個任務,但最多 3 個並行
    tasks = [limited_task(f"Task {i}") for i in range(10)]
    await asyncio.gather(*tasks)

asyncio.run(main())

Event(事件)

import asyncio

event = asyncio.Event()

async def waiter(name: str):
    print(f"{name} 等待事件...")
    await event.wait()
    print(f"{name} 事件已觸發!")

async def setter():
    await asyncio.sleep(2)
    print("設定事件")
    event.set()

async def main():
    await asyncio.gather(
        waiter("A"),
        waiter("B"),
        waiter("C"),
        setter(),
    )

asyncio.run(main())

Condition(條件變數)

import asyncio

condition = asyncio.Condition()
queue = []

async def producer():
    for i in range(5):
        async with condition:
            queue.append(i)
            print(f"生產: {i}")
            condition.notify()
        await asyncio.sleep(0.5)

async def consumer(name: str):
    while True:
        async with condition:
            while not queue:
                await condition.wait()
            item = queue.pop(0)
            print(f"{name} 消費: {item}")

async def main():
    consumer_tasks = [
        asyncio.create_task(consumer(f"消費者 {i}"))
        for i in range(2)
    ]

    await producer()

    # 取消消費者
    for task in consumer_tasks:
        task.cancel()

asyncio.run(main())

📬 Queue(佇列)

基本用法

import asyncio

async def producer(queue: asyncio.Queue):
    for i in range(10):
        await queue.put(i)
        print(f"生產: {i}")
        await asyncio.sleep(0.1)

async def consumer(queue: asyncio.Queue, name: str):
    while True:
        item = await queue.get()
        print(f"{name} 處理: {item}")
        await asyncio.sleep(0.3)
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=5)

    # 啟動消費者
    consumers = [
        asyncio.create_task(consumer(queue, f"消費者 {i}"))
        for i in range(3)
    ]

    # 執行生產者
    await producer(queue)

    # 等待佇列處理完成
    await queue.join()

    # 取消消費者
    for c in consumers:
        c.cancel()

asyncio.run(main())

優先佇列

import asyncio
from asyncio import PriorityQueue

async def main():
    queue = PriorityQueue()

    # 放入項目(優先級, 資料)
    await queue.put((3, "低優先"))
    await queue.put((1, "高優先"))
    await queue.put((2, "中優先"))

    while not queue.empty():
        priority, item = await queue.get()
        print(f"優先級 {priority}: {item}")

asyncio.run(main())
# 輸出:
# 優先級 1: 高優先
# 優先級 2: 中優先
# 優先級 3: 低優先

🔄 執行同步程式碼

run_in_executor

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

def blocking_io():
    """模擬同步的 I/O 操作"""
    time.sleep(2)
    return "I/O 完成"

def cpu_intensive():
    """模擬 CPU 密集操作"""
    total = sum(i * i for i in range(10**7))
    return total

async def main():
    loop = asyncio.get_running_loop()

    # 使用預設的線程池執行同步 I/O
    result = await loop.run_in_executor(None, blocking_io)
    print(result)

    # 使用自訂線程池
    with ThreadPoolExecutor(max_workers=4) as executor:
        result = await loop.run_in_executor(executor, cpu_intensive)
        print(f"計算結果: {result}")

asyncio.run(main())

to_thread(Python 3.9+)

import asyncio
import time

def blocking_operation():
    time.sleep(2)
    return "完成"

async def main():
    # 更簡潔的方式
    result = await asyncio.to_thread(blocking_operation)
    print(result)

asyncio.run(main())

⚠️ 錯誤處理

gather 的錯誤處理

import asyncio

async def task(name: str, should_fail: bool = False):
    await asyncio.sleep(1)
    if should_fail:
        raise ValueError(f"{name} 失敗")
    return f"{name} 成功"

async def main():
    # return_exceptions=True:不會立即拋出異常
    results = await asyncio.gather(
        task("A"),
        task("B", should_fail=True),
        task("C"),
        return_exceptions=True
    )

    for result in results:
        if isinstance(result, Exception):
            print(f"錯誤: {result}")
        else:
            print(f"結果: {result}")

asyncio.run(main())

wait 的錯誤處理

import asyncio

async def task(name: str, delay: float, should_fail: bool = False):
    await asyncio.sleep(delay)
    if should_fail:
        raise ValueError(f"{name} 失敗")
    return f"{name} 成功"

async def main():
    tasks = [
        asyncio.create_task(task("A", 1)),
        asyncio.create_task(task("B", 2, should_fail=True)),
        asyncio.create_task(task("C", 3)),
    ]

    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_EXCEPTION
    )

    print(f"完成: {len(done)}, 待處理: {len(pending)}")

    for task in done:
        try:
            print(f"結果: {task.result()}")
        except Exception as e:
            print(f"異常: {e}")

    # 取消待處理的任務
    for task in pending:
        task.cancel()

asyncio.run(main())

📝 實戰範例:HTTP 爬蟲

import asyncio
import httpx
from dataclasses import dataclass
from typing import Optional

@dataclass
class FetchResult:
    url: str
    status: int
    content_length: int
    error: Optional[str] = None

class AsyncCrawler:
    def __init__(self, max_concurrent: int = 10, timeout: float = 30.0):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.timeout = timeout

    async def fetch_url(
        self,
        client: httpx.AsyncClient,
        url: str
    ) -> FetchResult:
        """取得單一 URL"""
        async with self.semaphore:
            try:
                response = await client.get(url, timeout=self.timeout)
                return FetchResult(
                    url=url,
                    status=response.status_code,
                    content_length=len(response.content)
                )
            except httpx.TimeoutException:
                return FetchResult(
                    url=url,
                    status=0,
                    content_length=0,
                    error="Timeout"
                )
            except Exception as e:
                return FetchResult(
                    url=url,
                    status=0,
                    content_length=0,
                    error=str(e)
                )

    async def crawl(self, urls: list[str]) -> list[FetchResult]:
        """並行爬取多個 URL"""
        async with httpx.AsyncClient() as client:
            tasks = [self.fetch_url(client, url) for url in urls]
            return await asyncio.gather(*tasks)

async def main():
    urls = [
        "https://httpbin.org/get",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/status/404",
        "https://invalid-url.example.com",
    ]

    crawler = AsyncCrawler(max_concurrent=5, timeout=5.0)
    results = await crawler.crawl(urls)

    for result in results:
        if result.error:
            print(f"❌ {result.url}: {result.error}")
        else:
            print(f"✅ {result.url}: {result.status} ({result.content_length} bytes)")

asyncio.run(main())

✅ 重點總結

同步原語

原語用途
Lock互斥存取
Semaphore限制並行數
Event事件通知
Condition條件等待
Queue任務佇列

任務管理

方法說明
create_task()建立任務
gather()並行執行
wait()等待任務
wait_for()超時等待
TaskGroup任務群組(3.11+)

錯誤處理

  1. gather(return_exceptions=True) 收集異常
  2. wait(return_when=FIRST_EXCEPTION) 第一個異常時返回
  3. try/except asyncio.CancelledError 處理取消

🎤 面試這樣答

Q: 如何在 asyncio 中限制並行數量?

答案:

使用 asyncio.Semaphore 限制並行數量:

semaphore = asyncio.Semaphore(10)  # 最多 10 個並行

async def limited_task():
    async with semaphore:  # 取得信號量
        await do_something()
    # 離開時自動釋放

這樣即使啟動 100 個任務,同時執行的最多 10 個。


上一篇: 05-1. 非同步程式設計基礎 下一篇: 05-3. 非同步上下文管理器


最後更新:2025-12-17

0%