Очереди¶
Source code: Lib/asyncio/queues.py
Очереди asyncio разработаны аналогично классам модуля queue. Хотя
asyncio очереди не потокобезопасны, они предназначены для использования специально
в async/await код.
Обратите внимание, что методы очереди asyncio не имеют параметра
timeout; используйте asyncio.wait_for() функцию для выполнения операций очереди с
таймаутом.
См. также раздел Примеры ниже.
Очередь¶
- 
class 
asyncio.Queue(maxsize=0, *, loop=None)¶ Очередь первого входа, первого выхода (FIFO).
Если maxsize меньше или равно нулю, размер очереди бесконечен. Если это целое число больше
0, тоawait put()блокируется, когда очередь достигает maxsize, пока элемент не будет удален поget().В отличие от стандартных
queueмногопоточности библиотеки, размер очереди всегда известен и может быть возвращенный путем вызоваqsize()метод.Deprecated since version 3.8, will be removed in version 3.10: Параметр loop.
Это класс not thread safe.
- 
maxsize¶ Число элементов, разрешенных в очереди.
- 
empty()¶ Возвращает
True, если очередь пуста,Falseв противном случае.
- 
full()¶ Возвращает
True, если в очередиmaxsizeэлементов.Если очередь была инициализирована с
maxsize=0(дефолт), тоfull()никогда возвращаетsTrue.
- 
coroutine 
get()¶ Удаление и возвращает элемента из очереди. Если очередь пуста, дождитесь доступности элемента.
- 
get_nowait()¶ Возвращает элемент, если он доступен немедленно, в противном случае вызовите
QueueEmpty.
- 
coroutine 
join()¶ Блокировать до тех пор, пока все элементы в очереди не будут получены и обработаны.
Количество незавершенных задач увеличивается при добавлении элемента в очередь. Счетчик уменьшается каждый раз, когда потребитель вызывает
task_done(), чтобы указать, что элемент был извлечен и вся работа над ним завершена. Когда количество незавершенных задач падает до нуля,join()разблокируется.
- 
coroutine 
put(item)¶ Поместить элемент в очередь. Если очередь заполнена, дождитесь наличия свободного слота перед добавлением элемента.
- 
put_nowait(item)¶ Поместить элемент в очередь без блокировки.
Если свободный слот не доступен немедленно, поднимите
QueueFull.
- 
qsize()¶ Возвращает количество элементов в очереди.
- 
task_done()¶ Укажите, что ранее поставленная в очередь задача завершена.
Используется потребителями очереди. Для каждого
get(), используемый для выборки задачи, последующий вызовtask_done()сообщает очереди, что обработка задачи завершена.Если
join()в данный момент блокируется, он возобновится после обработки всех элементов (это означает, что был полученtask_done()вызов для каждого элемента,put()в очередь).Вызывает
ValueError, если вызывается больше раз, чем элементы, помещенные в очередь.
- 
 
Приоритетная очередь¶
Очередь LIFO¶
Исключения¶
- 
exception 
asyncio.QueueEmpty¶ Это исключение возникает при вызове
get_nowait()метод в пустой очереди.
- 
exception 
asyncio.QueueFull¶ Исключение возникает при вызове
put_nowait()метод в очереди, которая достигла своего максимального размера.
Примеры¶
Очереди можно используемый для распределения рабочей нагрузки между несколькими параллельными задачами:
import asyncio
import random
import time
async def worker(name, queue):
    while True:
        # Получить "рабочий элемент" вне очереди.
        sleep_for = await queue.get()
        # Спать "sleep_for" секунд.
        await asyncio.sleep(sleep_for)
        # Сообщение очереди, для обработки «рабочего элемента».
        queue.task_done()
        print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
    # Создайть очередь, которую мы будем использовать для хранения нашей "рабочей нагрузки".
    queue = asyncio.Queue()
    # Генерирует случайные тайминги и вставляет их в очередь.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)
    # Создать три рабочих задачи для одновременной обработки очереди.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)
    # Подождать, пока очередь не будет полностью обработана.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at
    # Отменить рабочие задания.
    for task in tasks:
        task.cancel()
    # Подождать, пока все рабочие задачи не будут отменены.
    await asyncio.gather(*tasks, return_exceptions=True)
    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())
