# 

# 05-2. asyncio 進階

&gt; ⏱️ **閱讀時間：** 18 分鐘
&gt; 🎯 **難度：** ⭐⭐⭐⭐ (高階)

---

## 🤔 一句話解釋

**進階的 asyncio 技巧包括任務管理、錯誤處理、同步原語等，讓你寫出更健壯的非同步程式。**

---

## 🔧 事件迴圈

### 事件迴圈基礎

```python
import asyncio

# 取得事件迴圈
loop = asyncio.get_event_loop()

# 新版本推薦用 asyncio.run()
asyncio.run(main())

# 或者在非同步函數中取得
async def example():
    loop = asyncio.get_running_loop()
    print(f&#34;當前迴圈: {loop}&#34;)
```

### 自訂事件迴圈

```python
import asyncio

async def main():
    print(&#34;Hello&#34;)
    await asyncio.sleep(1)
    print(&#34;World&#34;)

# 使用自訂的事件迴圈設定
if __name__ == &#34;__main__&#34;:
    # 設定 debug 模式
    asyncio.run(main(), debug=True)
```

---

## 📋 任務管理

### Task 物件

```python
import asyncio

async def long_task(name: str):
    print(f&#34;{name} 開始&#34;)
    await asyncio.sleep(2)
    print(f&#34;{name} 完成&#34;)
    return f&#34;{name} 結果&#34;

async def main():
    # 建立任務
    task = asyncio.create_task(long_task(&#34;Task A&#34;))

    # 檢查任務狀態
    print(f&#34;任務完成: {task.done()}&#34;)  # False

    # 等待任務
    result = await task

    print(f&#34;任務完成: {task.done()}&#34;)  # True
    print(f&#34;結果: {result}&#34;)

asyncio.run(main())
```

### 任務群組（Python 3.11&#43;）

```python
import asyncio

async def task(name: str):
    await asyncio.sleep(1)
    if name == &#34;C&#34;:
        raise ValueError(f&#34;{name} 失敗&#34;)
    return f&#34;{name} 完成&#34;

async def main():
    # TaskGroup 會等待所有任務，並收集所有異常
    try:
        async with asyncio.TaskGroup() as tg:
            task_a = tg.create_task(task(&#34;A&#34;))
            task_b = tg.create_task(task(&#34;B&#34;))
            task_c = tg.create_task(task(&#34;C&#34;))
    except* ValueError as eg:
        print(f&#34;捕獲異常: {eg.exceptions}&#34;)

asyncio.run(main())
```

### 任務取消

```python
import asyncio

async def cancellable_task():
    try:
        while True:
            print(&#34;執行中...&#34;)
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print(&#34;收到取消信號&#34;)
        # 清理資源
        print(&#34;清理完成&#34;)
        raise  # 重新拋出

async def main():
    task = asyncio.create_task(cancellable_task())

    await asyncio.sleep(3)

    # 取消任務
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print(&#34;任務已取消&#34;)

asyncio.run(main())
```

---

## ⏱️ 超時處理

### wait_for

```python
import asyncio

async def slow_operation():
    await asyncio.sleep(10)
    return &#34;完成&#34;

async def main():
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=3.0)
        print(result)
    except asyncio.TimeoutError:
        print(&#34;操作超時&#34;)

asyncio.run(main())
```

### timeout（Python 3.11&#43;）

```python
import asyncio

async def main():
    async with asyncio.timeout(3.0):
        await asyncio.sleep(10)  # 會被取消

asyncio.run(main())
```

### 自訂超時處理

```python
import asyncio

async def with_timeout(coro, timeout: float, default=None):
    &#34;&#34;&#34;帶有預設值的超時處理&#34;&#34;&#34;
    try:
        return await asyncio.wait_for(coro, timeout=timeout)
    except asyncio.TimeoutError:
        return default

async def main():
    result = await with_timeout(
        asyncio.sleep(10),
        timeout=1.0,
        default=&#34;超時預設值&#34;
    )
    print(result)  # &#34;超時預設值&#34;

asyncio.run(main())
```

---

## 🔒 同步原語

### Lock（互斥鎖）

```python
import asyncio

shared_resource = []
lock = asyncio.Lock()

async def safe_append(value):
    &#34;&#34;&#34;使用鎖保護共享資源&#34;&#34;&#34;
    async with lock:
        print(f&#34;取得鎖，添加 {value}&#34;)
        shared_resource.append(value)
        await asyncio.sleep(0.1)
        print(f&#34;釋放鎖，{value} 已添加&#34;)

async def main():
    await asyncio.gather(
        safe_append(1),
        safe_append(2),
        safe_append(3),
    )
    print(f&#34;結果: {shared_resource}&#34;)

asyncio.run(main())
```

### Semaphore（信號量）

```python
import asyncio

# 限制同時執行的任務數量
semaphore = asyncio.Semaphore(3)

async def limited_task(name: str):
    async with semaphore:
        print(f&#34;{name} 開始&#34;)
        await asyncio.sleep(2)
        print(f&#34;{name} 完成&#34;)

async def main():
    # 同時啟動 10 個任務，但最多 3 個並行
    tasks = [limited_task(f&#34;Task {i}&#34;) for i in range(10)]
    await asyncio.gather(*tasks)

asyncio.run(main())
```

### Event（事件）

```python
import asyncio

event = asyncio.Event()

async def waiter(name: str):
    print(f&#34;{name} 等待事件...&#34;)
    await event.wait()
    print(f&#34;{name} 事件已觸發!&#34;)

async def setter():
    await asyncio.sleep(2)
    print(&#34;設定事件&#34;)
    event.set()

async def main():
    await asyncio.gather(
        waiter(&#34;A&#34;),
        waiter(&#34;B&#34;),
        waiter(&#34;C&#34;),
        setter(),
    )

asyncio.run(main())
```

### Condition（條件變數）

```python
import asyncio

condition = asyncio.Condition()
queue = []

async def producer():
    for i in range(5):
        async with condition:
            queue.append(i)
            print(f&#34;生產: {i}&#34;)
            condition.notify()
        await asyncio.sleep(0.5)

async def consumer(name: str):
    while True:
        async with condition:
            while not queue:
                await condition.wait()
            item = queue.pop(0)
            print(f&#34;{name} 消費: {item}&#34;)

async def main():
    consumer_tasks = [
        asyncio.create_task(consumer(f&#34;消費者 {i}&#34;))
        for i in range(2)
    ]

    await producer()

    # 取消消費者
    for task in consumer_tasks:
        task.cancel()

asyncio.run(main())
```

---

## 📬 Queue（佇列）

### 基本用法

```python
import asyncio

async def producer(queue: asyncio.Queue):
    for i in range(10):
        await queue.put(i)
        print(f&#34;生產: {i}&#34;)
        await asyncio.sleep(0.1)

async def consumer(queue: asyncio.Queue, name: str):
    while True:
        item = await queue.get()
        print(f&#34;{name} 處理: {item}&#34;)
        await asyncio.sleep(0.3)
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=5)

    # 啟動消費者
    consumers = [
        asyncio.create_task(consumer(queue, f&#34;消費者 {i}&#34;))
        for i in range(3)
    ]

    # 執行生產者
    await producer(queue)

    # 等待佇列處理完成
    await queue.join()

    # 取消消費者
    for c in consumers:
        c.cancel()

asyncio.run(main())
```

### 優先佇列

```python
import asyncio
from asyncio import PriorityQueue

async def main():
    queue = PriorityQueue()

    # 放入項目（優先級, 資料）
    await queue.put((3, &#34;低優先&#34;))
    await queue.put((1, &#34;高優先&#34;))
    await queue.put((2, &#34;中優先&#34;))

    while not queue.empty():
        priority, item = await queue.get()
        print(f&#34;優先級 {priority}: {item}&#34;)

asyncio.run(main())
# 輸出:
# 優先級 1: 高優先
# 優先級 2: 中優先
# 優先級 3: 低優先
```

---

## 🔄 執行同步程式碼

### run_in_executor

```python
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

def blocking_io():
    &#34;&#34;&#34;模擬同步的 I/O 操作&#34;&#34;&#34;
    time.sleep(2)
    return &#34;I/O 完成&#34;

def cpu_intensive():
    &#34;&#34;&#34;模擬 CPU 密集操作&#34;&#34;&#34;
    total = sum(i * i for i in range(10**7))
    return total

async def main():
    loop = asyncio.get_running_loop()

    # 使用預設的線程池執行同步 I/O
    result = await loop.run_in_executor(None, blocking_io)
    print(result)

    # 使用自訂線程池
    with ThreadPoolExecutor(max_workers=4) as executor:
        result = await loop.run_in_executor(executor, cpu_intensive)
        print(f&#34;計算結果: {result}&#34;)

asyncio.run(main())
```

### to_thread（Python 3.9&#43;）

```python
import asyncio
import time

def blocking_operation():
    time.sleep(2)
    return &#34;完成&#34;

async def main():
    # 更簡潔的方式
    result = await asyncio.to_thread(blocking_operation)
    print(result)

asyncio.run(main())
```

---

## ⚠️ 錯誤處理

### gather 的錯誤處理

```python
import asyncio

async def task(name: str, should_fail: bool = False):
    await asyncio.sleep(1)
    if should_fail:
        raise ValueError(f&#34;{name} 失敗&#34;)
    return f&#34;{name} 成功&#34;

async def main():
    # return_exceptions=True：不會立即拋出異常
    results = await asyncio.gather(
        task(&#34;A&#34;),
        task(&#34;B&#34;, should_fail=True),
        task(&#34;C&#34;),
        return_exceptions=True
    )

    for result in results:
        if isinstance(result, Exception):
            print(f&#34;錯誤: {result}&#34;)
        else:
            print(f&#34;結果: {result}&#34;)

asyncio.run(main())
```

### wait 的錯誤處理

```python
import asyncio

async def task(name: str, delay: float, should_fail: bool = False):
    await asyncio.sleep(delay)
    if should_fail:
        raise ValueError(f&#34;{name} 失敗&#34;)
    return f&#34;{name} 成功&#34;

async def main():
    tasks = [
        asyncio.create_task(task(&#34;A&#34;, 1)),
        asyncio.create_task(task(&#34;B&#34;, 2, should_fail=True)),
        asyncio.create_task(task(&#34;C&#34;, 3)),
    ]

    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_EXCEPTION
    )

    print(f&#34;完成: {len(done)}, 待處理: {len(pending)}&#34;)

    for task in done:
        try:
            print(f&#34;結果: {task.result()}&#34;)
        except Exception as e:
            print(f&#34;異常: {e}&#34;)

    # 取消待處理的任務
    for task in pending:
        task.cancel()

asyncio.run(main())
```

---

## 📝 實戰範例：HTTP 爬蟲

```python
import asyncio
import httpx
from dataclasses import dataclass
from typing import Optional

@dataclass
class FetchResult:
    url: str
    status: int
    content_length: int
    error: Optional[str] = None

class AsyncCrawler:
    def __init__(self, max_concurrent: int = 10, timeout: float = 30.0):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.timeout = timeout

    async def fetch_url(
        self,
        client: httpx.AsyncClient,
        url: str
    ) -&gt; FetchResult:
        &#34;&#34;&#34;取得單一 URL&#34;&#34;&#34;
        async with self.semaphore:
            try:
                response = await client.get(url, timeout=self.timeout)
                return FetchResult(
                    url=url,
                    status=response.status_code,
                    content_length=len(response.content)
                )
            except httpx.TimeoutException:
                return FetchResult(
                    url=url,
                    status=0,
                    content_length=0,
                    error=&#34;Timeout&#34;
                )
            except Exception as e:
                return FetchResult(
                    url=url,
                    status=0,
                    content_length=0,
                    error=str(e)
                )

    async def crawl(self, urls: list[str]) -&gt; list[FetchResult]:
        &#34;&#34;&#34;並行爬取多個 URL&#34;&#34;&#34;
        async with httpx.AsyncClient() as client:
            tasks = [self.fetch_url(client, url) for url in urls]
            return await asyncio.gather(*tasks)

async def main():
    urls = [
        &#34;https://httpbin.org/get&#34;,
        &#34;https://httpbin.org/delay/1&#34;,
        &#34;https://httpbin.org/status/404&#34;,
        &#34;https://invalid-url.example.com&#34;,
    ]

    crawler = AsyncCrawler(max_concurrent=5, timeout=5.0)
    results = await crawler.crawl(urls)

    for result in results:
        if result.error:
            print(f&#34;❌ {result.url}: {result.error}&#34;)
        else:
            print(f&#34;✅ {result.url}: {result.status} ({result.content_length} bytes)&#34;)

asyncio.run(main())
```

---

## ✅ 重點總結

### 同步原語

| 原語 | 用途 |
|------|------|
| `Lock` | 互斥存取 |
| `Semaphore` | 限制並行數 |
| `Event` | 事件通知 |
| `Condition` | 條件等待 |
| `Queue` | 任務佇列 |

### 任務管理

| 方法 | 說明 |
|------|------|
| `create_task()` | 建立任務 |
| `gather()` | 並行執行 |
| `wait()` | 等待任務 |
| `wait_for()` | 超時等待 |
| `TaskGroup` | 任務群組（3.11&#43;）|

### 錯誤處理

1. `gather(return_exceptions=True)` 收集異常
2. `wait(return_when=FIRST_EXCEPTION)` 第一個異常時返回
3. `try/except asyncio.CancelledError` 處理取消

---

## 🎤 面試這樣答

### Q: 如何在 asyncio 中限制並行數量？

**答案：**

&gt; 使用 `asyncio.Semaphore` 限制並行數量：
&gt;
&gt; ```python
&gt; semaphore = asyncio.Semaphore(10)  # 最多 10 個並行
&gt;
&gt; async def limited_task():
&gt;     async with semaphore:  # 取得信號量
&gt;         await do_something()
&gt;     # 離開時自動釋放
&gt; ```
&gt;
&gt; 這樣即使啟動 100 個任務，同時執行的最多 10 個。

---

**上一篇：** [05-1. 非同步程式設計基礎](./05-1)
**下一篇：** [05-3. 非同步上下文管理器](./05-3)

---

最後更新：2025-12-17


---

> 作者: luk  
> URL: https://yoru-karu-blog-lalaluk-52581ac5e0cef170a3c8922c19182ecb6f7bd604.gitlab.io/posts/tutorial/fastapi/05-2/  

