aboutsummaryrefslogtreecommitdiffstats
path: root/python/gevent/queue.py
diff options
context:
space:
mode:
authorJames Taylor <user234683@users.noreply.github.com>2018-09-14 19:32:27 -0700
committerJames Taylor <user234683@users.noreply.github.com>2018-09-14 19:32:27 -0700
commit4212164e91ba2f49583cf44ad623a29b36db8f77 (patch)
tree47aefe3c0162f03e0c823b43873356f69c1cd636 /python/gevent/queue.py
parent6ca20ff7010f2bafc7fefcb8cad982be27a8aeae (diff)
downloadyt-local-4212164e91ba2f49583cf44ad623a29b36db8f77.tar.lz
yt-local-4212164e91ba2f49583cf44ad623a29b36db8f77.tar.xz
yt-local-4212164e91ba2f49583cf44ad623a29b36db8f77.zip
Windows: Use 32-bit distribution of python
Diffstat (limited to 'python/gevent/queue.py')
-rw-r--r--python/gevent/queue.py224
1 files changed, 152 insertions, 72 deletions
diff --git a/python/gevent/queue.py b/python/gevent/queue.py
index 5f1bb47..57b937b 100644
--- a/python/gevent/queue.py
+++ b/python/gevent/queue.py
@@ -1,12 +1,17 @@
# Copyright (c) 2009-2012 Denis Bilenko. See LICENSE for details.
-"""Synchronized queues.
+# copyright (c) 2018 gevent
+# cython: auto_pickle=False,embedsignature=True,always_allow_keywords=False
+"""
+Synchronized queues.
The :mod:`gevent.queue` module implements multi-producer, multi-consumer queues
that work across greenlets, with the API similar to the classes found in the
standard :mod:`Queue` and :class:`multiprocessing <multiprocessing.Queue>` modules.
-The classes in this module implement iterator protocol. Iterating over queue
-means repeatedly calling :meth:`get <Queue.get>` until :meth:`get <Queue.get>` returns ``StopIteration``.
+The classes in this module implement the iterator protocol. Iterating
+over a queue means repeatedly calling :meth:`get <Queue.get>` until
+:meth:`get <Queue.get>` returns ``StopIteration`` (specifically that
+class, not an instance or subclass).
>>> queue = gevent.queue.Queue()
>>> queue.put(1)
@@ -24,23 +29,37 @@ means repeatedly calling :meth:`get <Queue.get>` until :meth:`get <Queue.get>` r
from __future__ import absolute_import
import sys
-import heapq
+from heapq import heappush as _heappush
+from heapq import heappop as _heappop
+from heapq import heapify as _heapify
import collections
if sys.version_info[0] == 2:
- import Queue as __queue__
+ import Queue as __queue__ # python 3: pylint:disable=import-error
else:
import queue as __queue__ # python 2: pylint:disable=import-error
Full = __queue__.Full
Empty = __queue__.Empty
from gevent.timeout import Timeout
-from gevent.hub import get_hub, Waiter, getcurrent
-from gevent.hub import InvalidSwitchError
+from gevent._hub_local import get_hub_noargs as get_hub
+from greenlet import getcurrent
+from gevent.exceptions import InvalidSwitchError
+__all__ = []
+__implements__ = ['Queue', 'PriorityQueue', 'LifoQueue']
+__extensions__ = ['JoinableQueue', 'Channel']
+__imports__ = ['Empty', 'Full']
+if hasattr(__queue__, 'SimpleQueue'):
+ __all__.append('SimpleQueue') # New in 3.7
+ # SimpleQueue is implemented in C and directly allocates locks
+ # unaffected by monkey patching. We need the Python version.
+ SimpleQueue = __queue__._PySimpleQueue # pylint:disable=no-member
+__all__ += (__implements__ + __extensions__ + __imports__)
-__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'JoinableQueue', 'Channel']
+# pylint 2.0.dev2 things collections.dequeue.popleft() doesn't return
+# pylint:disable=assignment-from-no-return
def _safe_remove(deq, item):
# For when the item may have been removed by
@@ -50,6 +69,26 @@ def _safe_remove(deq, item):
except ValueError:
pass
+import gevent._waiter
+locals()['Waiter'] = gevent._waiter.Waiter
+
+class ItemWaiter(Waiter): # pylint:disable=undefined-variable
+ # pylint:disable=assigning-non-slot
+ __slots__ = (
+ 'item',
+ 'queue',
+ )
+
+ def __init__(self, item, queue):
+ Waiter.__init__(self) # pylint:disable=undefined-variable
+ self.item = item
+ self.queue = queue
+
+ def put_and_switch(self):
+ self.queue._put(self.item)
+ self.queue = None
+ self.item = None
+ return self.switch(self)
class Queue(object):
"""
@@ -58,26 +97,41 @@ class Queue(object):
If *maxsize* is less than or equal to zero or ``None``, the queue
size is infinite.
+ Queues have a ``len`` equal to the number of items in them (the :meth:`qsize`),
+ but in a boolean context they are always True.
+
.. versionchanged:: 1.1b3
Queues now support :func:`len`; it behaves the same as :meth:`qsize`.
.. versionchanged:: 1.1b3
Multiple greenlets that block on a call to :meth:`put` for a full queue
- will now be woken up to put their items into the queue in the order in which
+ will now be awakened to put their items into the queue in the order in which
they arrived. Likewise, multiple greenlets that block on a call to :meth:`get` for
an empty queue will now receive items in the order in which they blocked. An
implementation quirk under CPython *usually* ensured this was roughly the case
previously anyway, but that wasn't the case for PyPy.
"""
- def __init__(self, maxsize=None, items=None):
+ __slots__ = (
+ '_maxsize',
+ 'getters',
+ 'putters',
+ 'hub',
+ '_event_unlock',
+ 'queue',
+ '__weakref__',
+ )
+
+ def __init__(self, maxsize=None, items=(), _warn_depth=2):
if maxsize is not None and maxsize <= 0:
- self.maxsize = None
if maxsize == 0:
import warnings
- warnings.warn('Queue(0) now equivalent to Queue(None); if you want a channel, use Channel',
- DeprecationWarning, stacklevel=2)
- else:
- self.maxsize = maxsize
+ warnings.warn(
+ 'Queue(0) now equivalent to Queue(None); if you want a channel, use Channel',
+ DeprecationWarning,
+ stacklevel=_warn_depth)
+ maxsize = None
+
+ self._maxsize = maxsize if maxsize is not None else -1
# Explicitly maintain order for getters and putters that block
# so that callers can consistently rely on getting things out
# in the apparent order they went in. This was once required by
@@ -94,23 +148,25 @@ class Queue(object):
self.putters = collections.deque()
self.hub = get_hub()
self._event_unlock = None
- if items:
- self._init(maxsize, items)
- else:
- self._init(maxsize)
+ self.queue = self._create_queue(items)
- # QQQ make maxsize into a property with setter that schedules unlock if necessary
+ @property
+ def maxsize(self):
+ return self._maxsize if self._maxsize > 0 else None
+
+ @maxsize.setter
+ def maxsize(self, nv):
+ # QQQ make maxsize into a property with setter that schedules unlock if necessary
+ if nv is None or nv <= 0:
+ self._maxsize = -1
+ else:
+ self._maxsize = nv
def copy(self):
return type(self)(self.maxsize, self.queue)
- def _init(self, maxsize, items=None):
- # FIXME: Why is maxsize unused or even passed?
- # pylint:disable=unused-argument
- if items:
- self.queue = collections.deque(items)
- else:
- self.queue = collections.deque()
+ def _create_queue(self, items=()):
+ return collections.deque(items)
def _get(self):
return self.queue.popleft()
@@ -166,7 +222,12 @@ class Queue(object):
to return True for backwards compatibility.
"""
return True
- __nonzero__ = __bool__
+
+ def __nonzero__(self):
+ # Py2.
+ # For Cython; __bool__ becomes a special method that we can't
+ # get by name.
+ return True
def empty(self):
"""Return ``True`` if the queue is empty, ``False`` otherwise."""
@@ -177,7 +238,7 @@ class Queue(object):
``Queue(None)`` is never full.
"""
- return self.maxsize is not None and self.qsize() >= self.maxsize
+ return self._maxsize > 0 and self.qsize() >= self._maxsize
def put(self, item, block=True, timeout=None):
"""Put an item into the queue.
@@ -190,7 +251,7 @@ class Queue(object):
is immediately available, else raise the :class:`Full` exception (*timeout*
is ignored in that case).
"""
- if self.maxsize is None or self.qsize() < self.maxsize:
+ if self._maxsize == -1 or self.qsize() < self._maxsize:
# there's a free slot, put an item right away
self._put(item)
if self.getters:
@@ -198,10 +259,10 @@ class Queue(object):
elif self.hub is getcurrent():
# We're in the mainloop, so we cannot wait; we can switch to other greenlets though.
# Check if possible to get a free slot in the queue.
- while self.getters and self.qsize() and self.qsize() >= self.maxsize:
+ while self.getters and self.qsize() and self.qsize() >= self._maxsize:
getter = self.getters.popleft()
getter.switch(getter)
- if self.qsize() < self.maxsize:
+ if self.qsize() < self._maxsize:
self._put(item)
return
raise Full
@@ -251,7 +312,7 @@ class Queue(object):
# to return. No choice...
raise Empty()
- waiter = Waiter()
+ waiter = Waiter() # pylint:disable=undefined-variable
timeout = Timeout._start_new_or_dummy(timeout, Empty)
try:
self.getters.append(waiter)
@@ -317,7 +378,7 @@ class Queue(object):
def _unlock(self):
while True:
repeat = False
- if self.putters and (self.maxsize is None or self.qsize() < self.maxsize):
+ if self.putters and (self._maxsize == -1 or self.qsize() < self._maxsize):
repeat = True
try:
putter = self.putters.popleft()
@@ -340,29 +401,31 @@ class Queue(object):
def __iter__(self):
return self
- def next(self):
+ def __next__(self):
result = self.get()
if result is StopIteration:
raise result
return result
- __next__ = next
+ next = __next__ # Py2
+class UnboundQueue(Queue):
+ # A specialization of Queue that knows it can never
+ # be bound. Changing its maxsize has no effect.
-class ItemWaiter(Waiter):
- __slots__ = ['item', 'queue']
+ __slots__ = ()
- def __init__(self, item, queue):
- Waiter.__init__(self)
- self.item = item
- self.queue = queue
+ def __init__(self, maxsize=None, items=()):
+ if maxsize is not None:
+ raise ValueError("UnboundQueue has no maxsize")
+ Queue.__init__(self, maxsize, items)
+ self.putters = None # Will never be used.
- def put_and_switch(self):
- self.queue._put(self.item)
- self.queue = None
- self.item = None
- return self.switch(self)
+ def put(self, item, block=True, timeout=None):
+ self._put(item)
+ if self.getters:
+ self._schedule_unlock()
class PriorityQueue(Queue):
@@ -376,30 +439,27 @@ class PriorityQueue(Queue):
Previously it was just assumed that they were already a heap.
'''
- def _init(self, maxsize, items=None):
- if items:
- self.queue = list(items)
- heapq.heapify(self.queue)
- else:
- self.queue = []
+ __slots__ = ()
- def _put(self, item, heappush=heapq.heappush):
- # pylint:disable=arguments-differ
- heappush(self.queue, item)
+ def _create_queue(self, items=()):
+ q = list(items)
+ _heapify(q)
+ return q
- def _get(self, heappop=heapq.heappop):
- # pylint:disable=arguments-differ
- return heappop(self.queue)
+ def _put(self, item):
+ _heappush(self.queue, item)
+
+ def _get(self):
+ return _heappop(self.queue)
class LifoQueue(Queue):
'''A subclass of :class:`Queue` that retrieves most recently added entries first.'''
- def _init(self, maxsize, items=None):
- if items:
- self.queue = list(items)
- else:
- self.queue = []
+ __slots__ = ()
+
+ def _create_queue(self, items=()):
+ return list(items)
def _put(self, item):
self.queue.append(item)
@@ -417,7 +477,12 @@ class JoinableQueue(Queue):
:meth:`task_done` and :meth:`join` methods.
"""
- def __init__(self, maxsize=None, items=None, unfinished_tasks=None):
+ __slots__ = (
+ '_cond',
+ 'unfinished_tasks',
+ )
+
+ def __init__(self, maxsize=None, items=(), unfinished_tasks=None):
"""
.. versionchanged:: 1.1a1
@@ -425,8 +490,9 @@ class JoinableQueue(Queue):
(if any) will be considered unfinished.
"""
+ Queue.__init__(self, maxsize, items, _warn_depth=3)
+
from gevent.event import Event
- Queue.__init__(self, maxsize, items)
self._cond = Event()
self._cond.set()
@@ -493,7 +559,18 @@ class JoinableQueue(Queue):
class Channel(object):
- def __init__(self):
+ __slots__ = (
+ 'getters',
+ 'putters',
+ 'hub',
+ '_event_unlock',
+ '__weakref__',
+ )
+
+ def __init__(self, maxsize=1):
+ # We take maxsize to simplify certain kinds of code
+ if maxsize != 1:
+ raise ValueError("Channels have a maxsize of 1")
self.getters = collections.deque()
self.putters = collections.deque()
self.hub = get_hub()
@@ -537,7 +614,7 @@ class Channel(object):
if not block:
timeout = 0
- waiter = Waiter()
+ waiter = Waiter() # pylint:disable=undefined-variable
item = (item, waiter)
self.putters.append(item)
timeout = Timeout._start_new_or_dummy(timeout, Full)
@@ -566,7 +643,7 @@ class Channel(object):
if not block:
timeout = 0
- waiter = Waiter()
+ waiter = Waiter() # pylint:disable=undefined-variable
timeout = Timeout._start_new_or_dummy(timeout, Empty)
try:
self.getters.append(waiter)
@@ -577,7 +654,7 @@ class Channel(object):
self.getters.remove(waiter)
raise
finally:
- timeout.cancel()
+ timeout.close()
def get_nowait(self):
return self.get(False)
@@ -596,10 +673,13 @@ class Channel(object):
def __iter__(self):
return self
- def next(self):
+ def __next__(self):
result = self.get()
if result is StopIteration:
raise result
return result
- __next__ = next # py3
+ next = __next__ # Py2
+
+from gevent._util import import_c_accel
+import_c_accel(globals(), 'gevent._queue')