aboutsummaryrefslogtreecommitdiffstats
path: root/python/gevent/queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/gevent/queue.py')
-rw-r--r--python/gevent/queue.py605
1 files changed, 605 insertions, 0 deletions
diff --git a/python/gevent/queue.py b/python/gevent/queue.py
new file mode 100644
index 0000000..5f1bb47
--- /dev/null
+++ b/python/gevent/queue.py
@@ -0,0 +1,605 @@
+# Copyright (c) 2009-2012 Denis Bilenko. See LICENSE for details.
+"""Synchronized queues.
+
+The :mod:`gevent.queue` module implements multi-producer, multi-consumer queues
+that work across greenlets, with the API similar to the classes found in the
+standard :mod:`Queue` and :class:`multiprocessing <multiprocessing.Queue>` modules.
+
+The classes in this module implement iterator protocol. Iterating over queue
+means repeatedly calling :meth:`get <Queue.get>` until :meth:`get <Queue.get>` returns ``StopIteration``.
+
+ >>> queue = gevent.queue.Queue()
+ >>> queue.put(1)
+ >>> queue.put(2)
+ >>> queue.put(StopIteration)
+ >>> for item in queue:
+ ... print(item)
+ 1
+ 2
+
+.. versionchanged:: 1.0
+ ``Queue(0)`` now means queue of infinite size, not a channel. A :exc:`DeprecationWarning`
+ will be issued with this argument.
+"""
+
+from __future__ import absolute_import
+import sys
+import heapq
+import collections
+
+if sys.version_info[0] == 2:
+ import Queue as __queue__
+else:
+ import queue as __queue__ # python 2: pylint:disable=import-error
+Full = __queue__.Full
+Empty = __queue__.Empty
+
+from gevent.timeout import Timeout
+from gevent.hub import get_hub, Waiter, getcurrent
+from gevent.hub import InvalidSwitchError
+
+
+__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'JoinableQueue', 'Channel']
+
+
+def _safe_remove(deq, item):
+ # For when the item may have been removed by
+ # Queue._unlock
+ try:
+ deq.remove(item)
+ except ValueError:
+ pass
+
+
+class Queue(object):
+ """
+ Create a queue object with a given maximum size.
+
+ If *maxsize* is less than or equal to zero or ``None``, the queue
+ size is infinite.
+
+ .. versionchanged:: 1.1b3
+ Queues now support :func:`len`; it behaves the same as :meth:`qsize`.
+ .. versionchanged:: 1.1b3
+ Multiple greenlets that block on a call to :meth:`put` for a full queue
+ will now be woken up to put their items into the queue in the order in which
+ they arrived. Likewise, multiple greenlets that block on a call to :meth:`get` for
+ an empty queue will now receive items in the order in which they blocked. An
+ implementation quirk under CPython *usually* ensured this was roughly the case
+ previously anyway, but that wasn't the case for PyPy.
+ """
+
+ def __init__(self, maxsize=None, items=None):
+ if maxsize is not None and maxsize <= 0:
+ self.maxsize = None
+ if maxsize == 0:
+ import warnings
+ warnings.warn('Queue(0) now equivalent to Queue(None); if you want a channel, use Channel',
+ DeprecationWarning, stacklevel=2)
+ else:
+ self.maxsize = maxsize
+ # Explicitly maintain order for getters and putters that block
+ # so that callers can consistently rely on getting things out
+ # in the apparent order they went in. This was once required by
+ # imap_unordered. Previously these were set() objects, and the
+ # items put in the set have default hash() and eq() methods;
+ # under CPython, since new objects tend to have increasing
+ # hash values, this tended to roughly maintain order anyway,
+ # but that's not true under PyPy. An alternative to a deque
+ # (to avoid the linear scan of remove()) might be an
+ # OrderedDict, but it's 2.7 only; we don't expect to have so
+ # many waiters that removing an arbitrary element is a
+ # bottleneck, though.
+ self.getters = collections.deque()
+ self.putters = collections.deque()
+ self.hub = get_hub()
+ self._event_unlock = None
+ if items:
+ self._init(maxsize, items)
+ else:
+ self._init(maxsize)
+
+ # QQQ make maxsize into a property with setter that schedules unlock if necessary
+
+ def copy(self):
+ return type(self)(self.maxsize, self.queue)
+
+ def _init(self, maxsize, items=None):
+ # FIXME: Why is maxsize unused or even passed?
+ # pylint:disable=unused-argument
+ if items:
+ self.queue = collections.deque(items)
+ else:
+ self.queue = collections.deque()
+
+ def _get(self):
+ return self.queue.popleft()
+
+ def _peek(self):
+ return self.queue[0]
+
+ def _put(self, item):
+ self.queue.append(item)
+
+ def __repr__(self):
+ return '<%s at %s%s>' % (type(self).__name__, hex(id(self)), self._format())
+
+ def __str__(self):
+ return '<%s%s>' % (type(self).__name__, self._format())
+
+ def _format(self):
+ result = []
+ if self.maxsize is not None:
+ result.append('maxsize=%r' % (self.maxsize, ))
+ if getattr(self, 'queue', None):
+ result.append('queue=%r' % (self.queue, ))
+ if self.getters:
+ result.append('getters[%s]' % len(self.getters))
+ if self.putters:
+ result.append('putters[%s]' % len(self.putters))
+ if result:
+ return ' ' + ' '.join(result)
+ return ''
+
+ def qsize(self):
+ """Return the size of the queue."""
+ return len(self.queue)
+
+ def __len__(self):
+ """
+ Return the size of the queue. This is the same as :meth:`qsize`.
+
+ .. versionadded: 1.1b3
+
+ Previously, getting len() of a queue would raise a TypeError.
+ """
+
+ return self.qsize()
+
+ def __bool__(self):
+ """
+ A queue object is always True.
+
+ .. versionadded: 1.1b3
+
+ Now that queues support len(), they need to implement ``__bool__``
+ to return True for backwards compatibility.
+ """
+ return True
+ __nonzero__ = __bool__
+
+ def empty(self):
+ """Return ``True`` if the queue is empty, ``False`` otherwise."""
+ return not self.qsize()
+
+ def full(self):
+ """Return ``True`` if the queue is full, ``False`` otherwise.
+
+ ``Queue(None)`` is never full.
+ """
+ return self.maxsize is not None and self.qsize() >= self.maxsize
+
+ def put(self, item, block=True, timeout=None):
+ """Put an item into the queue.
+
+ If optional arg *block* is true and *timeout* is ``None`` (the default),
+ block if necessary until a free slot is available. If *timeout* is
+ a positive number, it blocks at most *timeout* seconds and raises
+ the :class:`Full` exception if no free slot was available within that time.
+ Otherwise (*block* is false), put an item on the queue if a free slot
+ is immediately available, else raise the :class:`Full` exception (*timeout*
+ is ignored in that case).
+ """
+ if self.maxsize is None or self.qsize() < self.maxsize:
+ # there's a free slot, put an item right away
+ self._put(item)
+ if self.getters:
+ self._schedule_unlock()
+ elif self.hub is getcurrent():
+ # We're in the mainloop, so we cannot wait; we can switch to other greenlets though.
+ # Check if possible to get a free slot in the queue.
+ while self.getters and self.qsize() and self.qsize() >= self.maxsize:
+ getter = self.getters.popleft()
+ getter.switch(getter)
+ if self.qsize() < self.maxsize:
+ self._put(item)
+ return
+ raise Full
+ elif block:
+ waiter = ItemWaiter(item, self)
+ self.putters.append(waiter)
+ timeout = Timeout._start_new_or_dummy(timeout, Full)
+ try:
+ if self.getters:
+ self._schedule_unlock()
+ result = waiter.get()
+ if result is not waiter:
+ raise InvalidSwitchError("Invalid switch into Queue.put: %r" % (result, ))
+ finally:
+ timeout.cancel()
+ _safe_remove(self.putters, waiter)
+ else:
+ raise Full
+
+ def put_nowait(self, item):
+ """Put an item into the queue without blocking.
+
+ Only enqueue the item if a free slot is immediately available.
+ Otherwise raise the :class:`Full` exception.
+ """
+ self.put(item, False)
+
+ def __get_or_peek(self, method, block, timeout):
+ # Internal helper method. The `method` should be either
+ # self._get when called from self.get() or self._peek when
+ # called from self.peek(). Call this after the initial check
+ # to see if there are items in the queue.
+
+ if self.hub is getcurrent():
+ # special case to make get_nowait() or peek_nowait() runnable in the mainloop greenlet
+ # there are no items in the queue; try to fix the situation by unlocking putters
+ while self.putters:
+ # Note: get() used popleft(), peek used pop(); popleft
+ # is almost certainly correct.
+ self.putters.popleft().put_and_switch()
+ if self.qsize():
+ return method()
+ raise Empty()
+
+ if not block:
+ # We can't block, we're not the hub, and we have nothing
+ # to return. No choice...
+ raise Empty()
+
+ waiter = Waiter()
+ timeout = Timeout._start_new_or_dummy(timeout, Empty)
+ try:
+ self.getters.append(waiter)
+ if self.putters:
+ self._schedule_unlock()
+ result = waiter.get()
+ if result is not waiter:
+ raise InvalidSwitchError('Invalid switch into Queue.get: %r' % (result, ))
+ return method()
+ finally:
+ timeout.cancel()
+ _safe_remove(self.getters, waiter)
+
+ def get(self, block=True, timeout=None):
+ """Remove and return an item from the queue.
+
+ If optional args *block* is true and *timeout* is ``None`` (the default),
+ block if necessary until an item is available. If *timeout* is a positive number,
+ it blocks at most *timeout* seconds and raises the :class:`Empty` exception
+ if no item was available within that time. Otherwise (*block* is false), return
+ an item if one is immediately available, else raise the :class:`Empty` exception
+ (*timeout* is ignored in that case).
+ """
+ if self.qsize():
+ if self.putters:
+ self._schedule_unlock()
+ return self._get()
+
+ return self.__get_or_peek(self._get, block, timeout)
+
+ def get_nowait(self):
+ """Remove and return an item from the queue without blocking.
+
+ Only get an item if one is immediately available. Otherwise
+ raise the :class:`Empty` exception.
+ """
+ return self.get(False)
+
+ def peek(self, block=True, timeout=None):
+ """Return an item from the queue without removing it.
+
+ If optional args *block* is true and *timeout* is ``None`` (the default),
+ block if necessary until an item is available. If *timeout* is a positive number,
+ it blocks at most *timeout* seconds and raises the :class:`Empty` exception
+ if no item was available within that time. Otherwise (*block* is false), return
+ an item if one is immediately available, else raise the :class:`Empty` exception
+ (*timeout* is ignored in that case).
+ """
+ if self.qsize():
+ # XXX: Why doesn't this schedule an unlock like get() does?
+ return self._peek()
+
+ return self.__get_or_peek(self._peek, block, timeout)
+
+ def peek_nowait(self):
+ """Return an item from the queue without blocking.
+
+ Only return an item if one is immediately available. Otherwise
+ raise the :class:`Empty` exception.
+ """
+ return self.peek(False)
+
+ def _unlock(self):
+ while True:
+ repeat = False
+ if self.putters and (self.maxsize is None or self.qsize() < self.maxsize):
+ repeat = True
+ try:
+ putter = self.putters.popleft()
+ self._put(putter.item)
+ except: # pylint:disable=bare-except
+ putter.throw(*sys.exc_info())
+ else:
+ putter.switch(putter)
+ if self.getters and self.qsize():
+ repeat = True
+ getter = self.getters.popleft()
+ getter.switch(getter)
+ if not repeat:
+ return
+
+ def _schedule_unlock(self):
+ if not self._event_unlock:
+ self._event_unlock = self.hub.loop.run_callback(self._unlock)
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ result = self.get()
+ if result is StopIteration:
+ raise result
+ return result
+
+ __next__ = next
+
+
+
+class ItemWaiter(Waiter):
+ __slots__ = ['item', 'queue']
+
+ def __init__(self, item, queue):
+ Waiter.__init__(self)
+ self.item = item
+ self.queue = queue
+
+ def put_and_switch(self):
+ self.queue._put(self.item)
+ self.queue = None
+ self.item = None
+ return self.switch(self)
+
+
+class PriorityQueue(Queue):
+ '''A subclass of :class:`Queue` that retrieves entries in priority order (lowest first).
+
+ Entries are typically tuples of the form: ``(priority number, data)``.
+
+ .. versionchanged:: 1.2a1
+ Any *items* given to the constructor will now be passed through
+ :func:`heapq.heapify` to ensure the invariants of this class hold.
+ Previously it was just assumed that they were already a heap.
+ '''
+
+ def _init(self, maxsize, items=None):
+ if items:
+ self.queue = list(items)
+ heapq.heapify(self.queue)
+ else:
+ self.queue = []
+
+ def _put(self, item, heappush=heapq.heappush):
+ # pylint:disable=arguments-differ
+ heappush(self.queue, item)
+
+ def _get(self, heappop=heapq.heappop):
+ # pylint:disable=arguments-differ
+ return heappop(self.queue)
+
+
+class LifoQueue(Queue):
+ '''A subclass of :class:`Queue` that retrieves most recently added entries first.'''
+
+ def _init(self, maxsize, items=None):
+ if items:
+ self.queue = list(items)
+ else:
+ self.queue = []
+
+ def _put(self, item):
+ self.queue.append(item)
+
+ def _get(self):
+ return self.queue.pop()
+
+ def _peek(self):
+ return self.queue[-1]
+
+
+class JoinableQueue(Queue):
+ """
+ A subclass of :class:`Queue` that additionally has
+ :meth:`task_done` and :meth:`join` methods.
+ """
+
+ def __init__(self, maxsize=None, items=None, unfinished_tasks=None):
+ """
+
+ .. versionchanged:: 1.1a1
+ If *unfinished_tasks* is not given, then all the given *items*
+ (if any) will be considered unfinished.
+
+ """
+ from gevent.event import Event
+ Queue.__init__(self, maxsize, items)
+ self._cond = Event()
+ self._cond.set()
+
+ if unfinished_tasks:
+ self.unfinished_tasks = unfinished_tasks
+ elif items:
+ self.unfinished_tasks = len(items)
+ else:
+ self.unfinished_tasks = 0
+
+ if self.unfinished_tasks:
+ self._cond.clear()
+
+ def copy(self):
+ return type(self)(self.maxsize, self.queue, self.unfinished_tasks)
+
+ def _format(self):
+ result = Queue._format(self)
+ if self.unfinished_tasks:
+ result += ' tasks=%s _cond=%s' % (self.unfinished_tasks, self._cond)
+ return result
+
+ def _put(self, item):
+ Queue._put(self, item)
+ self.unfinished_tasks += 1
+ self._cond.clear()
+
+ def task_done(self):
+ '''Indicate that a formerly enqueued task is complete. Used by queue consumer threads.
+ For each :meth:`get <Queue.get>` used to fetch a task, a subsequent call to :meth:`task_done` tells the queue
+ that the processing on the task is complete.
+
+ If a :meth:`join` is currently blocking, it will resume when all items have been processed
+ (meaning that a :meth:`task_done` call was received for every item that had been
+ :meth:`put <Queue.put>` into the queue).
+
+ Raises a :exc:`ValueError` if called more times than there were items placed in the queue.
+ '''
+ if self.unfinished_tasks <= 0:
+ raise ValueError('task_done() called too many times')
+ self.unfinished_tasks -= 1
+ if self.unfinished_tasks == 0:
+ self._cond.set()
+
+ def join(self, timeout=None):
+ '''
+ Block until all items in the queue have been gotten and processed.
+
+ The count of unfinished tasks goes up whenever an item is added to the queue.
+ The count goes down whenever a consumer thread calls :meth:`task_done` to indicate
+ that the item was retrieved and all work on it is complete. When the count of
+ unfinished tasks drops to zero, :meth:`join` unblocks.
+
+ :param float timeout: If not ``None``, then wait no more than this time in seconds
+ for all tasks to finish.
+ :return: ``True`` if all tasks have finished; if ``timeout`` was given and expired before
+ all tasks finished, ``False``.
+
+ .. versionchanged:: 1.1a1
+ Add the *timeout* parameter.
+ '''
+ return self._cond.wait(timeout=timeout)
+
+
+class Channel(object):
+
+ def __init__(self):
+ self.getters = collections.deque()
+ self.putters = collections.deque()
+ self.hub = get_hub()
+ self._event_unlock = None
+
+ def __repr__(self):
+ return '<%s at %s %s>' % (type(self).__name__, hex(id(self)), self._format())
+
+ def __str__(self):
+ return '<%s %s>' % (type(self).__name__, self._format())
+
+ def _format(self):
+ result = ''
+ if self.getters:
+ result += ' getters[%s]' % len(self.getters)
+ if self.putters:
+ result += ' putters[%s]' % len(self.putters)
+ return result
+
+ @property
+ def balance(self):
+ return len(self.putters) - len(self.getters)
+
+ def qsize(self):
+ return 0
+
+ def empty(self):
+ return True
+
+ def full(self):
+ return True
+
+ def put(self, item, block=True, timeout=None):
+ if self.hub is getcurrent():
+ if self.getters:
+ getter = self.getters.popleft()
+ getter.switch(item)
+ return
+ raise Full
+
+ if not block:
+ timeout = 0
+
+ waiter = Waiter()
+ item = (item, waiter)
+ self.putters.append(item)
+ timeout = Timeout._start_new_or_dummy(timeout, Full)
+ try:
+ if self.getters:
+ self._schedule_unlock()
+ result = waiter.get()
+ if result is not waiter:
+ raise InvalidSwitchError("Invalid switch into Channel.put: %r" % (result, ))
+ except:
+ _safe_remove(self.putters, item)
+ raise
+ finally:
+ timeout.cancel()
+
+ def put_nowait(self, item):
+ self.put(item, False)
+
+ def get(self, block=True, timeout=None):
+ if self.hub is getcurrent():
+ if self.putters:
+ item, putter = self.putters.popleft()
+ self.hub.loop.run_callback(putter.switch, putter)
+ return item
+
+ if not block:
+ timeout = 0
+
+ waiter = Waiter()
+ timeout = Timeout._start_new_or_dummy(timeout, Empty)
+ try:
+ self.getters.append(waiter)
+ if self.putters:
+ self._schedule_unlock()
+ return waiter.get()
+ except:
+ self.getters.remove(waiter)
+ raise
+ finally:
+ timeout.cancel()
+
+ def get_nowait(self):
+ return self.get(False)
+
+ def _unlock(self):
+ while self.putters and self.getters:
+ getter = self.getters.popleft()
+ item, putter = self.putters.popleft()
+ getter.switch(item)
+ putter.switch(putter)
+
+ def _schedule_unlock(self):
+ if not self._event_unlock:
+ self._event_unlock = self.hub.loop.run_callback(self._unlock)
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ result = self.get()
+ if result is StopIteration:
+ raise result
+ return result
+
+ __next__ = next # py3