aboutsummaryrefslogtreecommitdiffstats
path: root/python/gevent/pool.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/gevent/pool.py')
-rw-r--r--python/gevent/pool.py366
1 files changed, 137 insertions, 229 deletions
diff --git a/python/gevent/pool.py b/python/gevent/pool.py
index d0c5cbb..48e6a7a 100644
--- a/python/gevent/pool.py
+++ b/python/gevent/pool.py
@@ -13,233 +13,67 @@ provides a way to limit concurrency: its :meth:`spawn <Pool.spawn>`
method blocks if the number of greenlets in the pool has already
reached the limit, until there is a free slot.
"""
+from __future__ import print_function, absolute_import, division
-from bisect import insort_right
-try:
- from itertools import izip
-except ImportError:
- # Python 3
- izip = zip
from gevent.hub import GreenletExit, getcurrent, kill as _kill
from gevent.greenlet import joinall, Greenlet
+from gevent.queue import Full as QueueFull
from gevent.timeout import Timeout
from gevent.event import Event
from gevent.lock import Semaphore, DummySemaphore
-__all__ = ['Group', 'Pool']
+from gevent._compat import izip
+from gevent._imap import IMap
+from gevent._imap import IMapUnordered
+__all__ = [
+ 'Group',
+ 'Pool',
+ 'PoolFull',
+]
-class IMapUnordered(Greenlet):
- """
- At iterator of map results.
- """
-
- _zipped = False
-
- def __init__(self, func, iterable, spawn=None, maxsize=None, _zipped=False):
- """
- An iterator that.
-
- :keyword int maxsize: If given and not-None, specifies the maximum number of
- finished results that will be allowed to accumulated awaiting the reader;
- more than that number of results will cause map function greenlets to begin
- to block. This is most useful is there is a great disparity in the speed of
- the mapping code and the consumer and the results consume a great deal of resources.
- Using a bound is more computationally expensive than not using a bound.
-
- .. versionchanged:: 1.1b3
- Added the *maxsize* parameter.
- """
- from gevent.queue import Queue
- Greenlet.__init__(self)
- if spawn is not None:
- self.spawn = spawn
- if _zipped:
- self._zipped = _zipped
- self.func = func
- self.iterable = iterable
- self.queue = Queue()
- if maxsize:
- # Bounding the queue is not enough if we want to keep from
- # accumulating objects; the result value will be around as
- # the greenlet's result, blocked on self.queue.put(), and
- # we'll go on to spawn another greenlet, which in turn can
- # create the result. So we need a semaphore to prevent a
- # greenlet from exiting while the queue is full so that we
- # don't spawn the next greenlet (assuming that self.spawn
- # is of course bounded). (Alternatively we could have the
- # greenlet itself do the insert into the pool, but that
- # takes some rework).
- #
- # Given the use of a semaphore at this level, sizing the queue becomes
- # redundant, and that lets us avoid having to use self.link() instead
- # of self.rawlink() to avoid having blocking methods called in the
- # hub greenlet.
- factory = Semaphore
- else:
- factory = DummySemaphore
- self._result_semaphore = factory(maxsize)
-
- self.count = 0
- self.finished = False
- # If the queue size is unbounded, then we want to call all
- # the links (_on_finish and _on_result) directly in the hub greenlet
- # for efficiency. However, if the queue is bounded, we can't do that if
- # the queue might block (because if there's no waiter the hub can switch to,
- # the queue simply raises Full). Therefore, in that case, we use
- # the safer, somewhat-slower (because it spawns a greenlet) link() methods.
- # This means that _on_finish and _on_result can be called and interleaved in any order
- # if the call to self.queue.put() blocks..
- # Note that right now we're not bounding the queue, instead using a semaphore.
- self.rawlink(self._on_finish)
-
- def __iter__(self):
- return self
-
- def next(self):
- self._result_semaphore.release()
- value = self._inext()
- if isinstance(value, Failure):
- raise value.exc
- return value
- __next__ = next
-
- def _inext(self):
- return self.queue.get()
-
- def _ispawn(self, func, item):
- self._result_semaphore.acquire()
- self.count += 1
- g = self.spawn(func, item) if not self._zipped else self.spawn(func, *item)
- g.rawlink(self._on_result)
- return g
-
- def _run(self): # pylint:disable=method-hidden
- try:
- func = self.func
- for item in self.iterable:
- self._ispawn(func, item)
- finally:
- self.__dict__.pop('spawn', None)
- self.__dict__.pop('func', None)
- self.__dict__.pop('iterable', None)
-
- def _on_result(self, greenlet):
- # This method can either be called in the hub greenlet (if the
- # queue is unbounded) or its own greenlet. If it's called in
- # its own greenlet, the calls to put() may block and switch
- # greenlets, which in turn could mutate our state. So any
- # state on this object that we need to look at, notably
- # self.count, we need to capture or mutate *before* we put.
- # (Note that right now we're not bounding the queue, but we may
- # choose to do so in the future so this implementation will be left in case.)
- self.count -= 1
- count = self.count
- finished = self.finished
- ready = self.ready()
- put_finished = False
-
- if ready and count <= 0 and not finished:
- finished = self.finished = True
- put_finished = True
-
- if greenlet.successful():
- self.queue.put(self._iqueue_value_for_success(greenlet))
- else:
- self.queue.put(self._iqueue_value_for_failure(greenlet))
- if put_finished:
- self.queue.put(self._iqueue_value_for_finished())
-
- def _on_finish(self, _self):
- if self.finished:
- return
-
- if not self.successful():
- self.finished = True
- self.queue.put(self._iqueue_value_for_self_failure())
- return
-
- if self.count <= 0:
- self.finished = True
- self.queue.put(self._iqueue_value_for_finished())
-
- def _iqueue_value_for_success(self, greenlet):
- return greenlet.value
-
- def _iqueue_value_for_failure(self, greenlet):
- return Failure(greenlet.exception, getattr(greenlet, '_raise_exception'))
-
- def _iqueue_value_for_finished(self):
- return Failure(StopIteration)
-
- def _iqueue_value_for_self_failure(self):
- return Failure(self.exception, self._raise_exception)
-
-
-class IMap(IMapUnordered):
- # A specialization of IMapUnordered that returns items
- # in the order in which they were generated, not
- # the order in which they finish.
- # We do this by storing tuples (order, value) in the queue
- # not just value.
-
- def __init__(self, *args, **kwargs):
- self.waiting = [] # QQQ maybe deque will work faster there?
- self.index = 0
- self.maxindex = -1
- IMapUnordered.__init__(self, *args, **kwargs)
-
- def _inext(self):
- while True:
- if self.waiting and self.waiting[0][0] <= self.index:
- _, value = self.waiting.pop(0)
- else:
- index, value = self.queue.get()
- if index > self.index:
- insort_right(self.waiting, (index, value))
- continue
- self.index += 1
- return value
-
- def _ispawn(self, func, item):
- g = IMapUnordered._ispawn(self, func, item)
- self.maxindex += 1
- g.index = self.maxindex
- return g
-
- def _iqueue_value_for_success(self, greenlet):
- return (greenlet.index, IMapUnordered._iqueue_value_for_success(self, greenlet))
-
- def _iqueue_value_for_failure(self, greenlet):
- return (greenlet.index, IMapUnordered._iqueue_value_for_failure(self, greenlet))
-
- def _iqueue_value_for_finished(self):
- self.maxindex += 1
- return (self.maxindex, IMapUnordered._iqueue_value_for_finished(self))
-
- def _iqueue_value_for_self_failure(self):
- self.maxindex += 1
- return (self.maxindex, IMapUnordered._iqueue_value_for_self_failure(self))
class GroupMappingMixin(object):
# Internal, non-public API class.
# Provides mixin methods for implementing mapping pools. Subclasses must define:
- # - self.spawn(func, *args, **kwargs): a function that runs `func` with `args`
- # and `awargs`, potentially asynchronously. Return a value with a `get` method that
- # blocks until the results of func are available, and a `link` method.
+ def spawn(self, func, *args, **kwargs):
+ """
+ A function that runs *func* with *args* and *kwargs*, potentially
+ asynchronously. Return a value with a ``get`` method that blocks
+ until the results of func are available, and a ``rawlink`` method
+ that calls a callback when the results are available.
+
+ If this object has an upper bound on how many asyncronously executing
+ tasks can exist, this method may block until a slot becomes available.
+ """
+ raise NotImplementedError()
- # - self._apply_immediately(): should the function passed to apply be called immediately,
- # synchronously?
+ def _apply_immediately(self):
+ """
+ should the function passed to apply be called immediately,
+ synchronously?
+ """
+ raise NotImplementedError()
- # - self._apply_async_use_greenlet(): Should apply_async directly call
- # Greenlet.spawn(), bypassing self.spawn? Return true when self.spawn would block
+ def _apply_async_use_greenlet(self):
+ """
+ Should apply_async directly call Greenlet.spawn(), bypassing
+ `spawn`?
- # - self._apply_async_cb_spawn(callback, result): Run the given callback function, possiblly
- # asynchronously, possibly synchronously.
+ Return true when self.spawn would block.
+ """
+ raise NotImplementedError()
+
+ def _apply_async_cb_spawn(self, callback, result):
+ """
+ Run the given callback function, possibly
+ asynchronously, possibly synchronously.
+ """
+ raise NotImplementedError()
def apply_cb(self, func, args=None, kwds=None, callback=None):
"""
@@ -324,13 +158,46 @@ class GroupMappingMixin(object):
return func(*args, **kwds)
return self.spawn(func, *args, **kwds).get()
+ def __map(self, func, iterable):
+ return [g.get() for g in
+ [self.spawn(func, i) for i in iterable]]
+
def map(self, func, iterable):
"""Return a list made by applying the *func* to each element of
the iterable.
.. seealso:: :meth:`imap`
"""
- return list(self.imap(func, iterable))
+ # We can't return until they're all done and in order. It
+ # wouldn't seem to much matter what order we wait on them in,
+ # so the simple, fast (50% faster than imap) solution would be:
+
+ # return [g.get() for g in
+ # [self.spawn(func, i) for i in iterable]]
+
+ # If the pool size is unlimited (or more than the len(iterable)), this
+ # is equivalent to imap (spawn() will never block, all of them run concurrently,
+ # we call get() in the order the iterable was given).
+
+ # Now lets imagine the pool if is limited size. Suppose the
+ # func is time.sleep, our pool is limited to 3 threads, and
+ # our input is [10, 1, 10, 1, 1] We would start three threads,
+ # one to sleep for 10, one to sleep for 1, and the last to
+ # sleep for 10. We would block starting the fourth thread. At
+ # time 1, we would finish the second thread and start another
+ # one for time 1. At time 2, we would finish that one and
+ # start the last thread, and then begin executing get() on the first
+ # thread.
+
+ # Because it's spawn that blocks, this is *also* equivalent to what
+ # imap would do.
+
+ # The one remaining difference is that imap runs in its own
+ # greenlet, potentially changing the way the event loop runs.
+ # That's easy enough to do.
+
+ g = Greenlet.spawn(self.__map, func, iterable)
+ return g.get()
def map_cb(self, func, iterable, callback=None):
result = self.map(func, iterable)
@@ -463,10 +330,14 @@ class Group(GroupMappingMixin):
def add(self, greenlet):
"""
- Begin tracking the greenlet.
+ Begin tracking the *greenlet*.
If this group is :meth:`full`, then this method may block
until it is possible to track the greenlet.
+
+ Typically the *greenlet* should **not** be started when
+ it is added because if this object blocks in this method,
+ then the *greenlet* may run to completion before it is tracked.
"""
try:
rawlink = greenlet.rawlink
@@ -497,13 +368,13 @@ class Group(GroupMappingMixin):
def start(self, greenlet):
"""
- Start the un-started *greenlet* and add it to the collection of greenlets
- this group is monitoring.
+ Add the **unstarted** *greenlet* to the collection of greenlets
+ this group is monitoring, and then start it.
"""
self.add(greenlet)
greenlet.start()
- def spawn(self, *args, **kwargs):
+ def spawn(self, *args, **kwargs): # pylint:disable=arguments-differ
"""
Begin a new greenlet with the given arguments (which are passed
to the greenlet constructor) and add it to the collection of greenlets
@@ -632,18 +503,12 @@ class Group(GroupMappingMixin):
return self.full()
-class Failure(object):
- __slots__ = ['exc', '_raise_exception']
-
- def __init__(self, exc, raise_exception=None):
- self.exc = exc
- self._raise_exception = raise_exception
- def raise_exc(self):
- if self._raise_exception:
- self._raise_exception()
- else:
- raise self.exc
+class PoolFull(QueueFull):
+ """
+ Raised when a Pool is full and an attempt was made to
+ add a new greenlet to it in non-blocking mode.
+ """
class Pool(Group):
@@ -700,8 +565,10 @@ class Pool(Group):
def full(self):
"""
- Return a boolean indicating whether this pool has any room for
- members. (True if it does, False if it doesn't.)
+ Return a boolean indicating whether this pool is full, e.g. if
+ :meth:`add` would block.
+
+ :return: False if there is room for new members, True if there isn't.
"""
return self.free_count() <= 0
@@ -714,13 +581,54 @@ class Pool(Group):
return 1
return max(0, self.size - len(self))
- def add(self, greenlet):
+ def start(self, greenlet, *args, **kwargs): # pylint:disable=arguments-differ
+ """
+ start(greenlet, blocking=True, timeout=None) -> None
+
+ Add the **unstarted** *greenlet* to the collection of greenlets
+ this group is monitoring and then start it.
+
+ Parameters are as for :meth:`add`.
"""
- Begin tracking the given greenlet, blocking until space is available.
+ self.add(greenlet, *args, **kwargs)
+ greenlet.start()
- .. seealso:: :meth:`Group.add`
+ def add(self, greenlet, blocking=True, timeout=None): # pylint:disable=arguments-differ
"""
- self._semaphore.acquire()
+ Begin tracking the given **unstarted** greenlet, possibly blocking
+ until space is available.
+
+ Usually you should call :meth:`start` to track and start the greenlet
+ instead of using this lower-level method, or :meth:`spawn` to
+ also create the greenlet.
+
+ :keyword bool blocking: If True (the default), this function
+ will block until the pool has space or a timeout occurs. If
+ False, this function will immediately raise a Timeout if the
+ pool is currently full.
+ :keyword float timeout: The maximum number of seconds this
+ method will block, if ``blocking`` is True. (Ignored if
+ ``blocking`` is False.)
+ :raises PoolFull: if either ``blocking`` is False and the pool
+ was full, or if ``blocking`` is True and ``timeout`` was
+ exceeded.
+
+ .. caution:: If the *greenlet* has already been started and
+ *blocking* is true, then the greenlet may run to completion
+ while the current greenlet blocks waiting to track it. This would
+ enable higher concurrency than desired.
+
+ .. seealso:: :meth:`Group.add`
+
+ .. versionchanged:: 1.3.0 Added the ``blocking`` and
+ ``timeout`` parameters.
+ """
+ if not self._semaphore.acquire(blocking=blocking, timeout=timeout):
+ # We failed to acquire the semaphore.
+ # If blocking was True, then there was a timeout. If blocking was
+ # False, then there was no capacity. Either way, raise PoolFull.
+ raise PoolFull()
+
try:
Group.add(self, greenlet)
except: