diff options
Diffstat (limited to 'python/gevent/queue.py')
-rw-r--r-- | python/gevent/queue.py | 224 |
1 files changed, 152 insertions, 72 deletions
diff --git a/python/gevent/queue.py b/python/gevent/queue.py index 5f1bb47..57b937b 100644 --- a/python/gevent/queue.py +++ b/python/gevent/queue.py @@ -1,12 +1,17 @@ # Copyright (c) 2009-2012 Denis Bilenko. See LICENSE for details. -"""Synchronized queues. +# copyright (c) 2018 gevent +# cython: auto_pickle=False,embedsignature=True,always_allow_keywords=False +""" +Synchronized queues. The :mod:`gevent.queue` module implements multi-producer, multi-consumer queues that work across greenlets, with the API similar to the classes found in the standard :mod:`Queue` and :class:`multiprocessing <multiprocessing.Queue>` modules. -The classes in this module implement iterator protocol. Iterating over queue -means repeatedly calling :meth:`get <Queue.get>` until :meth:`get <Queue.get>` returns ``StopIteration``. +The classes in this module implement the iterator protocol. Iterating +over a queue means repeatedly calling :meth:`get <Queue.get>` until +:meth:`get <Queue.get>` returns ``StopIteration`` (specifically that +class, not an instance or subclass). >>> queue = gevent.queue.Queue() >>> queue.put(1) @@ -24,23 +29,37 @@ means repeatedly calling :meth:`get <Queue.get>` until :meth:`get <Queue.get>` r from __future__ import absolute_import import sys -import heapq +from heapq import heappush as _heappush +from heapq import heappop as _heappop +from heapq import heapify as _heapify import collections if sys.version_info[0] == 2: - import Queue as __queue__ + import Queue as __queue__ # python 3: pylint:disable=import-error else: import queue as __queue__ # python 2: pylint:disable=import-error Full = __queue__.Full Empty = __queue__.Empty from gevent.timeout import Timeout -from gevent.hub import get_hub, Waiter, getcurrent -from gevent.hub import InvalidSwitchError +from gevent._hub_local import get_hub_noargs as get_hub +from greenlet import getcurrent +from gevent.exceptions import InvalidSwitchError +__all__ = [] +__implements__ = ['Queue', 'PriorityQueue', 'LifoQueue'] +__extensions__ = ['JoinableQueue', 'Channel'] +__imports__ = ['Empty', 'Full'] +if hasattr(__queue__, 'SimpleQueue'): + __all__.append('SimpleQueue') # New in 3.7 + # SimpleQueue is implemented in C and directly allocates locks + # unaffected by monkey patching. We need the Python version. + SimpleQueue = __queue__._PySimpleQueue # pylint:disable=no-member +__all__ += (__implements__ + __extensions__ + __imports__) -__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'JoinableQueue', 'Channel'] +# pylint 2.0.dev2 things collections.dequeue.popleft() doesn't return +# pylint:disable=assignment-from-no-return def _safe_remove(deq, item): # For when the item may have been removed by @@ -50,6 +69,26 @@ def _safe_remove(deq, item): except ValueError: pass +import gevent._waiter +locals()['Waiter'] = gevent._waiter.Waiter + +class ItemWaiter(Waiter): # pylint:disable=undefined-variable + # pylint:disable=assigning-non-slot + __slots__ = ( + 'item', + 'queue', + ) + + def __init__(self, item, queue): + Waiter.__init__(self) # pylint:disable=undefined-variable + self.item = item + self.queue = queue + + def put_and_switch(self): + self.queue._put(self.item) + self.queue = None + self.item = None + return self.switch(self) class Queue(object): """ @@ -58,26 +97,41 @@ class Queue(object): If *maxsize* is less than or equal to zero or ``None``, the queue size is infinite. + Queues have a ``len`` equal to the number of items in them (the :meth:`qsize`), + but in a boolean context they are always True. + .. versionchanged:: 1.1b3 Queues now support :func:`len`; it behaves the same as :meth:`qsize`. .. versionchanged:: 1.1b3 Multiple greenlets that block on a call to :meth:`put` for a full queue - will now be woken up to put their items into the queue in the order in which + will now be awakened to put their items into the queue in the order in which they arrived. Likewise, multiple greenlets that block on a call to :meth:`get` for an empty queue will now receive items in the order in which they blocked. An implementation quirk under CPython *usually* ensured this was roughly the case previously anyway, but that wasn't the case for PyPy. """ - def __init__(self, maxsize=None, items=None): + __slots__ = ( + '_maxsize', + 'getters', + 'putters', + 'hub', + '_event_unlock', + 'queue', + '__weakref__', + ) + + def __init__(self, maxsize=None, items=(), _warn_depth=2): if maxsize is not None and maxsize <= 0: - self.maxsize = None if maxsize == 0: import warnings - warnings.warn('Queue(0) now equivalent to Queue(None); if you want a channel, use Channel', - DeprecationWarning, stacklevel=2) - else: - self.maxsize = maxsize + warnings.warn( + 'Queue(0) now equivalent to Queue(None); if you want a channel, use Channel', + DeprecationWarning, + stacklevel=_warn_depth) + maxsize = None + + self._maxsize = maxsize if maxsize is not None else -1 # Explicitly maintain order for getters and putters that block # so that callers can consistently rely on getting things out # in the apparent order they went in. This was once required by @@ -94,23 +148,25 @@ class Queue(object): self.putters = collections.deque() self.hub = get_hub() self._event_unlock = None - if items: - self._init(maxsize, items) - else: - self._init(maxsize) + self.queue = self._create_queue(items) - # QQQ make maxsize into a property with setter that schedules unlock if necessary + @property + def maxsize(self): + return self._maxsize if self._maxsize > 0 else None + + @maxsize.setter + def maxsize(self, nv): + # QQQ make maxsize into a property with setter that schedules unlock if necessary + if nv is None or nv <= 0: + self._maxsize = -1 + else: + self._maxsize = nv def copy(self): return type(self)(self.maxsize, self.queue) - def _init(self, maxsize, items=None): - # FIXME: Why is maxsize unused or even passed? - # pylint:disable=unused-argument - if items: - self.queue = collections.deque(items) - else: - self.queue = collections.deque() + def _create_queue(self, items=()): + return collections.deque(items) def _get(self): return self.queue.popleft() @@ -166,7 +222,12 @@ class Queue(object): to return True for backwards compatibility. """ return True - __nonzero__ = __bool__ + + def __nonzero__(self): + # Py2. + # For Cython; __bool__ becomes a special method that we can't + # get by name. + return True def empty(self): """Return ``True`` if the queue is empty, ``False`` otherwise.""" @@ -177,7 +238,7 @@ class Queue(object): ``Queue(None)`` is never full. """ - return self.maxsize is not None and self.qsize() >= self.maxsize + return self._maxsize > 0 and self.qsize() >= self._maxsize def put(self, item, block=True, timeout=None): """Put an item into the queue. @@ -190,7 +251,7 @@ class Queue(object): is immediately available, else raise the :class:`Full` exception (*timeout* is ignored in that case). """ - if self.maxsize is None or self.qsize() < self.maxsize: + if self._maxsize == -1 or self.qsize() < self._maxsize: # there's a free slot, put an item right away self._put(item) if self.getters: @@ -198,10 +259,10 @@ class Queue(object): elif self.hub is getcurrent(): # We're in the mainloop, so we cannot wait; we can switch to other greenlets though. # Check if possible to get a free slot in the queue. - while self.getters and self.qsize() and self.qsize() >= self.maxsize: + while self.getters and self.qsize() and self.qsize() >= self._maxsize: getter = self.getters.popleft() getter.switch(getter) - if self.qsize() < self.maxsize: + if self.qsize() < self._maxsize: self._put(item) return raise Full @@ -251,7 +312,7 @@ class Queue(object): # to return. No choice... raise Empty() - waiter = Waiter() + waiter = Waiter() # pylint:disable=undefined-variable timeout = Timeout._start_new_or_dummy(timeout, Empty) try: self.getters.append(waiter) @@ -317,7 +378,7 @@ class Queue(object): def _unlock(self): while True: repeat = False - if self.putters and (self.maxsize is None or self.qsize() < self.maxsize): + if self.putters and (self._maxsize == -1 or self.qsize() < self._maxsize): repeat = True try: putter = self.putters.popleft() @@ -340,29 +401,31 @@ class Queue(object): def __iter__(self): return self - def next(self): + def __next__(self): result = self.get() if result is StopIteration: raise result return result - __next__ = next + next = __next__ # Py2 +class UnboundQueue(Queue): + # A specialization of Queue that knows it can never + # be bound. Changing its maxsize has no effect. -class ItemWaiter(Waiter): - __slots__ = ['item', 'queue'] + __slots__ = () - def __init__(self, item, queue): - Waiter.__init__(self) - self.item = item - self.queue = queue + def __init__(self, maxsize=None, items=()): + if maxsize is not None: + raise ValueError("UnboundQueue has no maxsize") + Queue.__init__(self, maxsize, items) + self.putters = None # Will never be used. - def put_and_switch(self): - self.queue._put(self.item) - self.queue = None - self.item = None - return self.switch(self) + def put(self, item, block=True, timeout=None): + self._put(item) + if self.getters: + self._schedule_unlock() class PriorityQueue(Queue): @@ -376,30 +439,27 @@ class PriorityQueue(Queue): Previously it was just assumed that they were already a heap. ''' - def _init(self, maxsize, items=None): - if items: - self.queue = list(items) - heapq.heapify(self.queue) - else: - self.queue = [] + __slots__ = () - def _put(self, item, heappush=heapq.heappush): - # pylint:disable=arguments-differ - heappush(self.queue, item) + def _create_queue(self, items=()): + q = list(items) + _heapify(q) + return q - def _get(self, heappop=heapq.heappop): - # pylint:disable=arguments-differ - return heappop(self.queue) + def _put(self, item): + _heappush(self.queue, item) + + def _get(self): + return _heappop(self.queue) class LifoQueue(Queue): '''A subclass of :class:`Queue` that retrieves most recently added entries first.''' - def _init(self, maxsize, items=None): - if items: - self.queue = list(items) - else: - self.queue = [] + __slots__ = () + + def _create_queue(self, items=()): + return list(items) def _put(self, item): self.queue.append(item) @@ -417,7 +477,12 @@ class JoinableQueue(Queue): :meth:`task_done` and :meth:`join` methods. """ - def __init__(self, maxsize=None, items=None, unfinished_tasks=None): + __slots__ = ( + '_cond', + 'unfinished_tasks', + ) + + def __init__(self, maxsize=None, items=(), unfinished_tasks=None): """ .. versionchanged:: 1.1a1 @@ -425,8 +490,9 @@ class JoinableQueue(Queue): (if any) will be considered unfinished. """ + Queue.__init__(self, maxsize, items, _warn_depth=3) + from gevent.event import Event - Queue.__init__(self, maxsize, items) self._cond = Event() self._cond.set() @@ -493,7 +559,18 @@ class JoinableQueue(Queue): class Channel(object): - def __init__(self): + __slots__ = ( + 'getters', + 'putters', + 'hub', + '_event_unlock', + '__weakref__', + ) + + def __init__(self, maxsize=1): + # We take maxsize to simplify certain kinds of code + if maxsize != 1: + raise ValueError("Channels have a maxsize of 1") self.getters = collections.deque() self.putters = collections.deque() self.hub = get_hub() @@ -537,7 +614,7 @@ class Channel(object): if not block: timeout = 0 - waiter = Waiter() + waiter = Waiter() # pylint:disable=undefined-variable item = (item, waiter) self.putters.append(item) timeout = Timeout._start_new_or_dummy(timeout, Full) @@ -566,7 +643,7 @@ class Channel(object): if not block: timeout = 0 - waiter = Waiter() + waiter = Waiter() # pylint:disable=undefined-variable timeout = Timeout._start_new_or_dummy(timeout, Empty) try: self.getters.append(waiter) @@ -577,7 +654,7 @@ class Channel(object): self.getters.remove(waiter) raise finally: - timeout.cancel() + timeout.close() def get_nowait(self): return self.get(False) @@ -596,10 +673,13 @@ class Channel(object): def __iter__(self): return self - def next(self): + def __next__(self): result = self.get() if result is StopIteration: raise result return result - __next__ = next # py3 + next = __next__ # Py2 + +from gevent._util import import_c_accel +import_c_accel(globals(), 'gevent._queue') |