大家好,欢迎来到IT知识分享网。
在Python中,队列(Queue)是一种抽象的数据类型,它遵循先进先出(FIFO)的原则。队列是一种特殊的线性表,只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作。
Python标准库中的queue
模块提供了多种队列的实现,包括:
- Queue:这是一个简单的队列类,可以用来实现先进先出的数据结构。
- LifoQueue:这是一个后进先出(LIFO)的数据结构,与栈类似。
- PriorityQueue:这是一个优先级队列,可以根据元素的优先级进行排序。
下面是一个使用queue.Queue
的简单示例:
import queue # 创建一个队列 q = queue.Queue() # 向队列中添加元素 for i in range(5): q.put(i) # 从队列中获取元素,它会按照先进先出的原则返回元素 while not q.empty(): print(q.get())
输出:
0 1 2 3 4
queue.Queue还提供了其他一些有用的方法,如
q.full()(检查队列是否已满)、
q.maxsize`(获取队列的最大大小)等。此外,它还支持线程安全,可以在多线程环境中安全地使用。
属性和方法以及用法
- qsize():返回队列中的元素个数。
import queue # 创建一个队列 q = queue.Queue() # 向队列中添加元素 q.put("item1") q.put("item2") q.put("item3") # 获取队列中的元素个数 size = q.qsize() print("队列大小:", size)
- mutex:返回队列的锁对象,用于多线程环境中的同步操作。通过使用mutex属性,可以确保在多线程访问队列时,对队列的操作是线程安全的。代码如下:
import queue import threading # 创建一个队列和锁对象 q = queue.Queue() mutex = q.mutex # 定义一个线程函数,用于向队列中添加元素 def add_item(item): with mutex: q.put(item) # 定义一个线程函数,用于从队列中获取元素 def get_item(): with mutex: item = q.get() return item # 创建两个线程,分别执行添加和获取操作 thread1 = threading.Thread(target=add_item, args=("item1",)) thread2 = threading.Thread(target=get_item) # 启动线程 thread1.start() thread2.start() # 等待线程结束 thread1.join() thread2.join()
- not_full():如果队列未满,则返回True,否则返回False。
- maxsize(可选参数,默认为0,用于设定队列长度。maxsize小于等于0表示队列长度无限。)
- put(item, block=True, timeout=None)(在队尾插入一个项目。如果block参数为True(默认值),且队列为空,该方法会阻塞直到有空间可用。如果timeout参数提供,该方法在等待时有超时限制。如果队列已满且block为False,将引发Full异常。)
import queue # 创建一个队列 q = queue.Queue() # 向队列中添加元素 for i in range(5): q.put(i) # 获取队列的大小 size = q.qsize() print("队列大小:", size) # 从队列中获取元素并打印 while not q.empty(): item = q.get() print(item)
- get(block=True, timeout=None)(从队头删除并返回一个项目。如果block参数为True(默认值),且队列为空,该方法会阻塞直到有项目可用。如果timeout参数提供,该方法在等待时有超时限制。如果队列为空且block为False,将引发Empty异常。)
- empty()(检查队列是否为空,返回True如果队列为空,否则返回False。)
import queue # 创建一个队列 q = queue.Queue() # 检查队列是否为空 if q.empty(): print("队列为空") else: print("队列非空") # 向队列中添加元素 q.put("item1") q.put("item2") # 检查队列是否为空 if q.empty(): print("队列为空") else: print("队列非空")
- full()(检查队列是否已满。如果队列已满,返回True,否则返回False。)
import queue # 创建一个队列,容量为3 q = queue.Queue(maxsize=3) # 向队列中添加元素 for i in range(5): try: q.put(i, block=False) except queue.Full: print("队列已满,添加元素失败") # 检查队列是否已满 if q.full(): print("队列已满") else: print("队列未满")
- get_nowait():与get()方法类似,但不进行阻塞等待。如果队列为空,将引发Empty异常。
import queue q = queue.Queue() try: item = q.get_nowait() except queue.Empty: print("队列为空,无法获取元素。")
- put_nowait():与put()方法类似,但不进行阻塞等待。如果队列已满,将引发Full异常。
import queue q = queue.Queue(maxsize=3) try: q.put_nowait("item") q.put_nowait("item1") q.put_nowait("item2") q.put_nowait("item3") except queue.Full: print("队列已满,无法添加元素。")
- join():Queue.join() 是 Python 的 queue 模块中的一个方法,用于阻塞当前线程,直到队列中的所有任务都已完成处理。
当你使用多线程或多进程从队列中获取并处理任务时,Queue.join() 方法可以确保主线程等待所有任务都已完成后再继续执行。这样可以避免因任务未完成而导致主线程提前结束。
多线程中的Quene
task_done() 是 Python 的 queue 模块中的一个方法,用于标记队列中的一个任务已经完成。
当多个线程或进程同时从队列中获取任务并处理时,可以使用 task_done() 方法来通知队列中已经完成了一个任务。这有助于确保队列中的任务能够被正确地处理,并且可以避免出现死锁或阻塞的情况。
代码如下:
import queue import threading import time # 创建一个队列 q = queue.Queue() # 定义一个生产者线程函数,将数据添加到队列中 def producer(thread_id): for i in range(5): print(f"生产者 {thread_id} 生产了 {i}") q.put(i) time.sleep(1) # 定义一个消费者线程函数,从队列中获取数据并处理 def consumer(thread_id): while True: item = q.get() print(f"消费者 {thread_id} 消费了 {item}") time.sleep(1) q.task_done() # 创建生产者线程 threads = [] for i in range(2): t = threading.Thread(target=producer, args=(i,)) threads.append(t) t.start() # 创建消费者线程 for i in range(3): t = threading.Thread(target=consumer, args=(i,)) threads.append(t) t.start() # 等待所有任务完成 q.join()
如果使用get_nowait,应该怎么写呢?代码如下:
import queue import threading import time # 创建一个队列 q = queue.Queue() # 定义一个生产者线程函数,将数据添加到队列中 def producer(thread_id): for i in range(5): print(f"生产者 {thread_id} 生产了 {i}") q.put(i) time.sleep(1) # 定义一个消费者线程函数,从队列中获取数据并处理 def consumer(thread_id): while True: try: item = q.get_nowait() print(f"消费者 {thread_id} 消费了 {item}") time.sleep(1) except queue.Empty: print("队列为空,无法获取元素。") pass time.sleep(1) # 创建生产者线程 threads = [] for i in range(2): t = threading.Thread(target=producer, args=(i,)) threads.append(t) t.start() # 创建消费者线程 for i in range(3): t = threading.Thread(target=consumer, args=(i,)) threads.append(t) t.start() # 等待所有任务完成 q.join()
多进程中的Quene
在Python的multiprocessing
模块中,Queue
是一个进程安全的队列类,它允许在多个进程之间进行安全的通信。这个队列是基于Python标准库中的queue
模块实现的,并添加了多进程的支持。
多进程存,主线程取
import multiprocessing import time def worker(q, num): """ Worker function that puts data into the queue. """ for i in range(5): print(f"Worker {
num} produced {
i}") q.put(i) q.put("STOP") # 用于告诉主进程所有工作已经完成 if __name__ == "__main__": q = multiprocessing.Queue() # 创建一个队列对象 processes = [] for i in range(3): # 创建3个工作进程 p = multiprocessing.Process(target=worker, args=(q, i)) processes.append(p) p.start() while True: item = q.get() # 从队列中获取数据 if item == "STOP": # 如果获取到"STOP",则所有工作进程已经完成 break print(f"Main process consumed {
item}") for p in processes: # 等待所有工作进程结束 p.join()
在这个示例中,我们创建了一个Queue
对象q
,并创建了三个工作进程。每个工作进程将数据添加到队列中,主进程从队列中获取数据并处理。当工作进程完成所有任务后,它会在队列中放置一个”STOP”标记,主进程通过检查这个标记来知道所有工作进程已经完成。最后,主进程等待所有工作进程结束。
注意,在使用multiprocessing.Queue
时,你需要确保传递给子进程的队列对象是通过multiprocessing.Queue()
创建的,而不是通过标准库中的queue.Queue()
创建的。因为标准库中的queue.Queue
不是线程安全的,也不支持多进程。
多进程存,多进程取
import multiprocessing import time def worker(q, num): for i in range(5): print(f"Worker {num} produced {i}") q.put(i) def get_item(q): while True: item = q.get() # 从队列中获取数据 print(f"get process consumed {item}") if __name__ == "__main__": q = multiprocessing.Queue() # 创建一个队列对象 processes = [] for i in range(3): # 创建3个工作进程 p = multiprocessing.Process(target=worker, args=(q, i)) processes.append(p) p.start() for i in range(3): # 创建3个工作进程 p = multiprocessing.Process(target=get_item,args=(q,)) processes.append(p) p.start() for p in processes: # 等待所有工作进程结束 p.join()
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/125243.html