【Python】深入理解 Python 的 asyncio 库

【Python】深入理解 Python 的 asyncio 库多进程 像有多个小明独立完成任务 各自分配独立的资源

大家好,欢迎来到IT知识分享网。

1、引言

asyncio 是 Python 标准库中的一个库,提供了对异步 I/O、事件循环、协程和任务等异步编程模型的支持。它使得编写高效、可扩展的并发代码变得更加容易。本文将详细讲解 asyncio 的用法、本质、应用场景、注意事项以及一些高级技巧

2、进程,线程,协程

  1. 线程
    线程是操作系统调度的基本单位,同一个进程中的多个线程共享相同的内存空间。线程之间的切换由操作系统内核负责。
  2. 协程是由程序自身调度的函数,可以在执行过程中暂停和恢复,协程的切换由程序自身完成,而不是依赖操作系统。
  3. 进程是操作系统资源分配的基本单位,每个进程有独立的内存空间,进程之间不能直接共享内存,需要通过进程间通信(IPC)。

    举个例子,一个经典案例:
    小明在家需要完成以下事情:
    电饭锅煮饭大约30分钟
    洗衣机洗衣服大约40分钟
    写作业大约50分钟



    多进程

    多线程

    一个小明在不同任务之间切换

    多线程的情况就像是只有一个小明,但他可以在不同任务之间快速切换:

    共享同一资源空间,切换时有开销,但比进程小
    适合I/O密集型任务
    需要管理资源竞争问题

    协程

    一个小明按计划有序地完成任务

    协程的情况就像是只有一个小明,但他能非常有计划地按顺序完成各个任务,并且知道什么时候可以暂时停下一个任务去做另一个任务:

    总结

3、asyncio 的基本用法

3.1 异步函数和 await

异步函数使用 async def 声明,await 关键字用于等待一个异步操作完成

import asyncio async def say_hello(): print("Hello") await asyncio.sleep(1) print("World") # 运行异步函数 asyncio.run(say_hello()) 

3.2 任务 (Tasks)

任务用于调度和管理协程的执行。

async def greet(name): print(f"Hello, { 
     name}") await asyncio.sleep(1) print(f"Goodbye, { 
     name}") async def main(): task1 = asyncio.create_task(greet("Alice")) task2 = asyncio.create_task(greet("Bob")) await task1 await task2 asyncio.run(main()) 

4、本质

4.1 协程

协程是可以暂停和恢复的函数。与传统的函数不同,协程可以在执行过程中暂停,以便其他协程可以运行。Python 使用 async def 声明协程函数,使用 await 暂停协程的执行。

4.2 事件循环

事件循环是 asyncio 的核心,用于调度和执行协程。事件循环负责处理异步任务、I/O 事件、定时器等。

4.3 任务和 Future

任务是协程的高级抽象,用于调度协程的执行。Future 是表示异步操作结果的低级抽象,可以与任务一起使用。

5、应用场景

5.1 网络 I/O

异步编程非常适合处理网络 I/O 操作,如 HTTP 请求、WebSocket、数据库查询等。

import aiohttp async def fetch_url(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.text() async def main(): urls = ["http://example.com", "http://example.org"] results = await asyncio.gather(*[fetch_url(url) for url in urls]) for result in results: print(result) asyncio.run(main()) 

5.2 并发任务

asyncio 可以用于并发执行多个 I/O 密集型任务,如爬虫、数据处理等。

async def task1(): await asyncio.sleep(1) print("Task 1 done") async def task2(): await asyncio.sleep(2) print("Task 2 done") async def main(): await asyncio.gather(task1(), task2()) asyncio.run(main()) 

5.3 事件驱动的程序

asyncio 适合编写需要响应多个事件的程序,如 GUI 应用、实时聊天服务器等。

6、高级用法

6.1 超时控制

使用 asyncio.wait_for 可以设置协程的超时时间

async def long_running_task(): await asyncio.sleep(10) async def main(): try: await asyncio.wait_for(long_running_task(), timeout=5) except asyncio.TimeoutError: print("The task took too long!") asyncio.run(main()) 

6.2 取消任务

可以取消正在运行的任务。

async def task(): try: while True: print("Running...") await asyncio.sleep(1) except asyncio.CancelledError: print("Task was cancelled") async def main(): t = asyncio.create_task(task()) await asyncio.sleep(3) t.cancel() await t asyncio.run(main()) 

6.3 信号量 (Semaphore)

async def limited_task(semaphore, name): async with semaphore: print(f"Running { 
     name}") await asyncio.sleep(2) async def main(): semaphore = asyncio.Semaphore(2) tasks = [limited_task(semaphore, f"Task { 
     i}") for i in range(5)] await asyncio.gather(*tasks) asyncio.run(main()) 

6.4 使用 asyncio.Queue

使用队列在协程之间传递数据。

async def producer(queue): for i in range(5): await asyncio.sleep(1) await queue.put(i) print(f"Produced { 
     i}") async def consumer(queue): while True: item = await queue.get() if item is None: break print(f"Consumed { 
     item}") queue.task_done() async def main(): queue = asyncio.Queue() producer_task = asyncio.create_task(producer(queue)) consumer_task = asyncio.create_task(consumer(queue)) await producer_task await queue.put(None) # 用于通知消费者结束 await consumer_task asyncio.run(main()) 

7、实战一下

  • 爬取一组网页的内容
  • 并发地进行爬取操作
  • 统计每个网页的单词数量
  • 使用信号量限制并发请求数量
  • 使用队列在任务之间传递数据
    PS:asyncio只支持aiohttp,不支持requests
import asyncio import aiohttp from aiohttp import ClientSession from collections import Counter from typing import List # 要爬取的URL列表 # 笔趣阁链接:它搞盗版 我来练习 我们都有光明的未来 URLS = [ 'https://m.xbiqugew.com/book/53099/40832438.html', 'https://m.xbiqugew.com/book/53099/40832438_2.html', 'https://m.xbiqugew.com/book/53099/40822138.html', 'https://m.xbiqugew.com/book/53099/40818457.html', 'https://m.xbiqugew.com/book/53099/40808676.html', 'https://m.xbiqugew.com/book/53099/40806884.html' ] # 限制并发请求数量 async def fetch(url: str, session: ClientSession, sem: asyncio.Semaphore) -> str: async with sem: async with session.get(url) as response: return await response.text() async def count_words(url: str, session: ClientSession, sem: asyncio.Semaphore) -> None: content = await fetch(url, session, sem) words = content.split() word_count = Counter(words) print(f"URL: { 
     url}, Word count: { 
     len(words)}, Most common words: { 
     word_count.most_common(5)}") async def main(urls: List[str]) -> None: sem = asyncio.Semaphore(3) async with aiohttp.ClientSession() as session: tasks = [count_words(url, session, sem) for url in urls] await asyncio.gather(*tasks) if __name__ == '__main__': asyncio.run(main(URLS)) 

8、设计模式

七中的练习适用于一些脚本,日常需求,生产的话将其改造的面向对象一点。

我们将把代码分为多个类,每个类负责不同的功能。主要包括:

Fetcher 类:负责发送 HTTP 请求并获取网页内容。
WordCounter 类:负责统计网页的单词数量。
Crawler 类:协调整个爬虫过程。
Logger 类:负责日志记录。


import asyncio import aiohttp from aiohttp import ClientSession from collections import Counter from typing import List, Dict class Logger: _instance = None def __new__(cls, *args, kwargs): if not cls._instance: cls._instance = super(Logger, cls).__new__(cls, *args, kwargs) return cls._instance def log(self, message: str): print(message) class Fetcher: async def fetch(self, url: str, session: ClientSession, sem: asyncio.Semaphore) -> str: async with sem: async with session.get(url) as response: Logger().log(f"Fetching URL: { 
     url}") return await response.text() class WordCounter: @staticmethod def count(content: str) -> Dict[str, int]: words = content.split() word_count = Counter(words) return word_count class Crawler: def __init__(self, urls: List[str], concurrency: int): self.urls = urls self.concurrency = concurrency self.fetcher = Fetcher() self.results = [] async def count_words_in_url(self, url: str, session: ClientSession, sem: asyncio.Semaphore): content = await self.fetcher.fetch(url, session, sem) word_count = WordCounter.count(content) Logger().log(f"URL: { 
     url}, Word count: { 
     len(content.split())}, Most common words: { 
     word_count.most_common(5)}") self.results.append((url, word_count)) async def run(self): sem = asyncio.Semaphore(self.concurrency) async with aiohttp.ClientSession() as session: tasks = [self.count_words_in_url(url, session, sem) for url in self.urls] await asyncio.gather(*tasks) if __name__ == '__main__': URLS = [ 'https://m.xbiqugew.com/book/53099/40832438.html', 'https://m.xbiqugew.com/book/53099/40832438_2.html', 'https://m.xbiqugew.com/book/53099/40822138.html', 'https://m.xbiqugew.com/book/53099/40818457.html', 'https://m.xbiqugew.com/book/53099/40808676.html', 'https://m.xbiqugew.com/book/53099/40806884.html' ] crawler = Crawler(URLS, concurrency=5) asyncio.run(crawler.run()) 

9. 注意事项

9.1 线程与协程

协程是单线程的,不会并行运行。在 I/O 密集型任务中,协程表现出色,但在 CPU 密集型任务中,需要配合线程或进程。

9.2 阻塞代码

在异步函数中避免使用阻塞操作,如 time.sleep,应使用 await asyncio.sleep。

9.3 异常处理

在异步代码中需要特别注意异常处理,避免未捕获的异常导致程序崩溃。

async def risky_operation(): try: # 执行可能引发异常的异步操作 await some_async_function() except Exception as e: print(f"An error occurred: { 
     e}") asyncio.run(risky_operation()) 

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/109728.html

(0)
上一篇 2026-02-04 22:33
下一篇 2026-02-05 07:00

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信