大家好,欢迎来到IT知识分享网。
深入理解 Python 的 asyncio 库
1、引言
asyncio 是 Python 标准库中的一个库,提供了对异步 I/O、事件循环、协程和任务等异步编程模型的支持。它使得编写高效、可扩展的并发代码变得更加容易。本文将详细讲解 asyncio 的用法、本质、应用场景、注意事项以及一些高级技巧
2、进程,线程,协程
- 线程
线程是操作系统调度的基本单位,同一个进程中的多个线程共享相同的内存空间。线程之间的切换由操作系统内核负责。 - 协程是由程序自身调度的函数,可以在执行过程中暂停和恢复,协程的切换由程序自身完成,而不是依赖操作系统。
- 进程是操作系统资源分配的基本单位,每个进程有独立的内存空间,进程之间不能直接共享内存,需要通过进程间通信(IPC)。
举个例子,一个经典案例:
小明在家需要完成以下事情:
电饭锅煮饭大约30分钟
洗衣机洗衣服大约40分钟
写作业大约50分钟多进程
多线程
一个小明在不同任务之间切换
多线程的情况就像是只有一个小明,但他可以在不同任务之间快速切换:
共享同一资源空间,切换时有开销,但比进程小
适合I/O密集型任务
需要管理资源竞争问题协程
一个小明按计划有序地完成任务
协程的情况就像是只有一个小明,但他能非常有计划地按顺序完成各个任务,并且知道什么时候可以暂时停下一个任务去做另一个任务:
总结
3、asyncio 的基本用法
3.1 异步函数和 await
异步函数使用 async def 声明,await 关键字用于等待一个异步操作完成
import asyncio async def say_hello(): print("Hello") await asyncio.sleep(1) print("World") # 运行异步函数 asyncio.run(say_hello())
3.2 任务 (Tasks)
任务用于调度和管理协程的执行。
async def greet(name): print(f"Hello, {
name}") await asyncio.sleep(1) print(f"Goodbye, {
name}") async def main(): task1 = asyncio.create_task(greet("Alice")) task2 = asyncio.create_task(greet("Bob")) await task1 await task2 asyncio.run(main())
4、本质
4.1 协程
协程是可以暂停和恢复的函数。与传统的函数不同,协程可以在执行过程中暂停,以便其他协程可以运行。Python 使用 async def 声明协程函数,使用 await 暂停协程的执行。
4.2 事件循环
事件循环是 asyncio 的核心,用于调度和执行协程。事件循环负责处理异步任务、I/O 事件、定时器等。
4.3 任务和 Future
任务是协程的高级抽象,用于调度协程的执行。Future 是表示异步操作结果的低级抽象,可以与任务一起使用。
5、应用场景
5.1 网络 I/O
异步编程非常适合处理网络 I/O 操作,如 HTTP 请求、WebSocket、数据库查询等。
import aiohttp async def fetch_url(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.text() async def main(): urls = ["http://example.com", "http://example.org"] results = await asyncio.gather(*[fetch_url(url) for url in urls]) for result in results: print(result) asyncio.run(main())
5.2 并发任务
asyncio 可以用于并发执行多个 I/O 密集型任务,如爬虫、数据处理等。
async def task1(): await asyncio.sleep(1) print("Task 1 done") async def task2(): await asyncio.sleep(2) print("Task 2 done") async def main(): await asyncio.gather(task1(), task2()) asyncio.run(main())
5.3 事件驱动的程序
asyncio 适合编写需要响应多个事件的程序,如 GUI 应用、实时聊天服务器等。
6、高级用法
6.1 超时控制
使用 asyncio.wait_for 可以设置协程的超时时间
async def long_running_task(): await asyncio.sleep(10) async def main(): try: await asyncio.wait_for(long_running_task(), timeout=5) except asyncio.TimeoutError: print("The task took too long!") asyncio.run(main())
6.2 取消任务
可以取消正在运行的任务。
async def task(): try: while True: print("Running...") await asyncio.sleep(1) except asyncio.CancelledError: print("Task was cancelled") async def main(): t = asyncio.create_task(task()) await asyncio.sleep(3) t.cancel() await t asyncio.run(main())
6.3 信号量 (Semaphore)
async def limited_task(semaphore, name): async with semaphore: print(f"Running {
name}") await asyncio.sleep(2) async def main(): semaphore = asyncio.Semaphore(2) tasks = [limited_task(semaphore, f"Task {
i}") for i in range(5)] await asyncio.gather(*tasks) asyncio.run(main())
6.4 使用 asyncio.Queue
使用队列在协程之间传递数据。
async def producer(queue): for i in range(5): await asyncio.sleep(1) await queue.put(i) print(f"Produced {
i}") async def consumer(queue): while True: item = await queue.get() if item is None: break print(f"Consumed {
item}") queue.task_done() async def main(): queue = asyncio.Queue() producer_task = asyncio.create_task(producer(queue)) consumer_task = asyncio.create_task(consumer(queue)) await producer_task await queue.put(None) # 用于通知消费者结束 await consumer_task asyncio.run(main())
7、实战一下
- 爬取一组网页的内容
- 并发地进行爬取操作
- 统计每个网页的单词数量
- 使用信号量限制并发请求数量
- 使用队列在任务之间传递数据
PS:asyncio只支持aiohttp,不支持requests
import asyncio import aiohttp from aiohttp import ClientSession from collections import Counter from typing import List # 要爬取的URL列表 # 笔趣阁链接:它搞盗版 我来练习 我们都有光明的未来 URLS = [ 'https://m.xbiqugew.com/book/53099/40832438.html', 'https://m.xbiqugew.com/book/53099/40832438_2.html', 'https://m.xbiqugew.com/book/53099/40822138.html', 'https://m.xbiqugew.com/book/53099/40818457.html', 'https://m.xbiqugew.com/book/53099/40808676.html', 'https://m.xbiqugew.com/book/53099/40806884.html' ] # 限制并发请求数量 async def fetch(url: str, session: ClientSession, sem: asyncio.Semaphore) -> str: async with sem: async with session.get(url) as response: return await response.text() async def count_words(url: str, session: ClientSession, sem: asyncio.Semaphore) -> None: content = await fetch(url, session, sem) words = content.split() word_count = Counter(words) print(f"URL: {
url}, Word count: {
len(words)}, Most common words: {
word_count.most_common(5)}") async def main(urls: List[str]) -> None: sem = asyncio.Semaphore(3) async with aiohttp.ClientSession() as session: tasks = [count_words(url, session, sem) for url in urls] await asyncio.gather(*tasks) if __name__ == '__main__': asyncio.run(main(URLS))
8、设计模式
七中的练习适用于一些脚本,日常需求,生产的话将其改造的面向对象一点。
我们将把代码分为多个类,每个类负责不同的功能。主要包括:
Fetcher 类:负责发送 HTTP 请求并获取网页内容。
WordCounter 类:负责统计网页的单词数量。
Crawler 类:协调整个爬虫过程。
Logger 类:负责日志记录。
import asyncio import aiohttp from aiohttp import ClientSession from collections import Counter from typing import List, Dict class Logger: _instance = None def __new__(cls, *args, kwargs): if not cls._instance: cls._instance = super(Logger, cls).__new__(cls, *args, kwargs) return cls._instance def log(self, message: str): print(message) class Fetcher: async def fetch(self, url: str, session: ClientSession, sem: asyncio.Semaphore) -> str: async with sem: async with session.get(url) as response: Logger().log(f"Fetching URL: {
url}") return await response.text() class WordCounter: @staticmethod def count(content: str) -> Dict[str, int]: words = content.split() word_count = Counter(words) return word_count class Crawler: def __init__(self, urls: List[str], concurrency: int): self.urls = urls self.concurrency = concurrency self.fetcher = Fetcher() self.results = [] async def count_words_in_url(self, url: str, session: ClientSession, sem: asyncio.Semaphore): content = await self.fetcher.fetch(url, session, sem) word_count = WordCounter.count(content) Logger().log(f"URL: {
url}, Word count: {
len(content.split())}, Most common words: {
word_count.most_common(5)}") self.results.append((url, word_count)) async def run(self): sem = asyncio.Semaphore(self.concurrency) async with aiohttp.ClientSession() as session: tasks = [self.count_words_in_url(url, session, sem) for url in self.urls] await asyncio.gather(*tasks) if __name__ == '__main__': URLS = [ 'https://m.xbiqugew.com/book/53099/40832438.html', 'https://m.xbiqugew.com/book/53099/40832438_2.html', 'https://m.xbiqugew.com/book/53099/40822138.html', 'https://m.xbiqugew.com/book/53099/40818457.html', 'https://m.xbiqugew.com/book/53099/40808676.html', 'https://m.xbiqugew.com/book/53099/40806884.html' ] crawler = Crawler(URLS, concurrency=5) asyncio.run(crawler.run())
9. 注意事项
9.1 线程与协程
协程是单线程的,不会并行运行。在 I/O 密集型任务中,协程表现出色,但在 CPU 密集型任务中,需要配合线程或进程。
9.2 阻塞代码
在异步函数中避免使用阻塞操作,如 time.sleep,应使用 await asyncio.sleep。
9.3 异常处理
在异步代码中需要特别注意异常处理,避免未捕获的异常导致程序崩溃。
async def risky_operation(): try: # 执行可能引发异常的异步操作 await some_async_function() except Exception as e: print(f"An error occurred: {
e}") asyncio.run(risky_operation())
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/109728.html