diff options
Diffstat (limited to 'python/gevent/pool.py')
-rw-r--r-- | python/gevent/pool.py | 366 |
1 files changed, 137 insertions, 229 deletions
diff --git a/python/gevent/pool.py b/python/gevent/pool.py index d0c5cbb..48e6a7a 100644 --- a/python/gevent/pool.py +++ b/python/gevent/pool.py @@ -13,233 +13,67 @@ provides a way to limit concurrency: its :meth:`spawn <Pool.spawn>` method blocks if the number of greenlets in the pool has already reached the limit, until there is a free slot. """ +from __future__ import print_function, absolute_import, division -from bisect import insort_right -try: - from itertools import izip -except ImportError: - # Python 3 - izip = zip from gevent.hub import GreenletExit, getcurrent, kill as _kill from gevent.greenlet import joinall, Greenlet +from gevent.queue import Full as QueueFull from gevent.timeout import Timeout from gevent.event import Event from gevent.lock import Semaphore, DummySemaphore -__all__ = ['Group', 'Pool'] +from gevent._compat import izip +from gevent._imap import IMap +from gevent._imap import IMapUnordered +__all__ = [ + 'Group', + 'Pool', + 'PoolFull', +] -class IMapUnordered(Greenlet): - """ - At iterator of map results. - """ - - _zipped = False - - def __init__(self, func, iterable, spawn=None, maxsize=None, _zipped=False): - """ - An iterator that. - - :keyword int maxsize: If given and not-None, specifies the maximum number of - finished results that will be allowed to accumulated awaiting the reader; - more than that number of results will cause map function greenlets to begin - to block. This is most useful is there is a great disparity in the speed of - the mapping code and the consumer and the results consume a great deal of resources. - Using a bound is more computationally expensive than not using a bound. - - .. versionchanged:: 1.1b3 - Added the *maxsize* parameter. - """ - from gevent.queue import Queue - Greenlet.__init__(self) - if spawn is not None: - self.spawn = spawn - if _zipped: - self._zipped = _zipped - self.func = func - self.iterable = iterable - self.queue = Queue() - if maxsize: - # Bounding the queue is not enough if we want to keep from - # accumulating objects; the result value will be around as - # the greenlet's result, blocked on self.queue.put(), and - # we'll go on to spawn another greenlet, which in turn can - # create the result. So we need a semaphore to prevent a - # greenlet from exiting while the queue is full so that we - # don't spawn the next greenlet (assuming that self.spawn - # is of course bounded). (Alternatively we could have the - # greenlet itself do the insert into the pool, but that - # takes some rework). - # - # Given the use of a semaphore at this level, sizing the queue becomes - # redundant, and that lets us avoid having to use self.link() instead - # of self.rawlink() to avoid having blocking methods called in the - # hub greenlet. - factory = Semaphore - else: - factory = DummySemaphore - self._result_semaphore = factory(maxsize) - - self.count = 0 - self.finished = False - # If the queue size is unbounded, then we want to call all - # the links (_on_finish and _on_result) directly in the hub greenlet - # for efficiency. However, if the queue is bounded, we can't do that if - # the queue might block (because if there's no waiter the hub can switch to, - # the queue simply raises Full). Therefore, in that case, we use - # the safer, somewhat-slower (because it spawns a greenlet) link() methods. - # This means that _on_finish and _on_result can be called and interleaved in any order - # if the call to self.queue.put() blocks.. - # Note that right now we're not bounding the queue, instead using a semaphore. - self.rawlink(self._on_finish) - - def __iter__(self): - return self - - def next(self): - self._result_semaphore.release() - value = self._inext() - if isinstance(value, Failure): - raise value.exc - return value - __next__ = next - - def _inext(self): - return self.queue.get() - - def _ispawn(self, func, item): - self._result_semaphore.acquire() - self.count += 1 - g = self.spawn(func, item) if not self._zipped else self.spawn(func, *item) - g.rawlink(self._on_result) - return g - - def _run(self): # pylint:disable=method-hidden - try: - func = self.func - for item in self.iterable: - self._ispawn(func, item) - finally: - self.__dict__.pop('spawn', None) - self.__dict__.pop('func', None) - self.__dict__.pop('iterable', None) - - def _on_result(self, greenlet): - # This method can either be called in the hub greenlet (if the - # queue is unbounded) or its own greenlet. If it's called in - # its own greenlet, the calls to put() may block and switch - # greenlets, which in turn could mutate our state. So any - # state on this object that we need to look at, notably - # self.count, we need to capture or mutate *before* we put. - # (Note that right now we're not bounding the queue, but we may - # choose to do so in the future so this implementation will be left in case.) - self.count -= 1 - count = self.count - finished = self.finished - ready = self.ready() - put_finished = False - - if ready and count <= 0 and not finished: - finished = self.finished = True - put_finished = True - - if greenlet.successful(): - self.queue.put(self._iqueue_value_for_success(greenlet)) - else: - self.queue.put(self._iqueue_value_for_failure(greenlet)) - if put_finished: - self.queue.put(self._iqueue_value_for_finished()) - - def _on_finish(self, _self): - if self.finished: - return - - if not self.successful(): - self.finished = True - self.queue.put(self._iqueue_value_for_self_failure()) - return - - if self.count <= 0: - self.finished = True - self.queue.put(self._iqueue_value_for_finished()) - - def _iqueue_value_for_success(self, greenlet): - return greenlet.value - - def _iqueue_value_for_failure(self, greenlet): - return Failure(greenlet.exception, getattr(greenlet, '_raise_exception')) - - def _iqueue_value_for_finished(self): - return Failure(StopIteration) - - def _iqueue_value_for_self_failure(self): - return Failure(self.exception, self._raise_exception) - - -class IMap(IMapUnordered): - # A specialization of IMapUnordered that returns items - # in the order in which they were generated, not - # the order in which they finish. - # We do this by storing tuples (order, value) in the queue - # not just value. - - def __init__(self, *args, **kwargs): - self.waiting = [] # QQQ maybe deque will work faster there? - self.index = 0 - self.maxindex = -1 - IMapUnordered.__init__(self, *args, **kwargs) - - def _inext(self): - while True: - if self.waiting and self.waiting[0][0] <= self.index: - _, value = self.waiting.pop(0) - else: - index, value = self.queue.get() - if index > self.index: - insort_right(self.waiting, (index, value)) - continue - self.index += 1 - return value - - def _ispawn(self, func, item): - g = IMapUnordered._ispawn(self, func, item) - self.maxindex += 1 - g.index = self.maxindex - return g - - def _iqueue_value_for_success(self, greenlet): - return (greenlet.index, IMapUnordered._iqueue_value_for_success(self, greenlet)) - - def _iqueue_value_for_failure(self, greenlet): - return (greenlet.index, IMapUnordered._iqueue_value_for_failure(self, greenlet)) - - def _iqueue_value_for_finished(self): - self.maxindex += 1 - return (self.maxindex, IMapUnordered._iqueue_value_for_finished(self)) - - def _iqueue_value_for_self_failure(self): - self.maxindex += 1 - return (self.maxindex, IMapUnordered._iqueue_value_for_self_failure(self)) class GroupMappingMixin(object): # Internal, non-public API class. # Provides mixin methods for implementing mapping pools. Subclasses must define: - # - self.spawn(func, *args, **kwargs): a function that runs `func` with `args` - # and `awargs`, potentially asynchronously. Return a value with a `get` method that - # blocks until the results of func are available, and a `link` method. + def spawn(self, func, *args, **kwargs): + """ + A function that runs *func* with *args* and *kwargs*, potentially + asynchronously. Return a value with a ``get`` method that blocks + until the results of func are available, and a ``rawlink`` method + that calls a callback when the results are available. + + If this object has an upper bound on how many asyncronously executing + tasks can exist, this method may block until a slot becomes available. + """ + raise NotImplementedError() - # - self._apply_immediately(): should the function passed to apply be called immediately, - # synchronously? + def _apply_immediately(self): + """ + should the function passed to apply be called immediately, + synchronously? + """ + raise NotImplementedError() - # - self._apply_async_use_greenlet(): Should apply_async directly call - # Greenlet.spawn(), bypassing self.spawn? Return true when self.spawn would block + def _apply_async_use_greenlet(self): + """ + Should apply_async directly call Greenlet.spawn(), bypassing + `spawn`? - # - self._apply_async_cb_spawn(callback, result): Run the given callback function, possiblly - # asynchronously, possibly synchronously. + Return true when self.spawn would block. + """ + raise NotImplementedError() + + def _apply_async_cb_spawn(self, callback, result): + """ + Run the given callback function, possibly + asynchronously, possibly synchronously. + """ + raise NotImplementedError() def apply_cb(self, func, args=None, kwds=None, callback=None): """ @@ -324,13 +158,46 @@ class GroupMappingMixin(object): return func(*args, **kwds) return self.spawn(func, *args, **kwds).get() + def __map(self, func, iterable): + return [g.get() for g in + [self.spawn(func, i) for i in iterable]] + def map(self, func, iterable): """Return a list made by applying the *func* to each element of the iterable. .. seealso:: :meth:`imap` """ - return list(self.imap(func, iterable)) + # We can't return until they're all done and in order. It + # wouldn't seem to much matter what order we wait on them in, + # so the simple, fast (50% faster than imap) solution would be: + + # return [g.get() for g in + # [self.spawn(func, i) for i in iterable]] + + # If the pool size is unlimited (or more than the len(iterable)), this + # is equivalent to imap (spawn() will never block, all of them run concurrently, + # we call get() in the order the iterable was given). + + # Now lets imagine the pool if is limited size. Suppose the + # func is time.sleep, our pool is limited to 3 threads, and + # our input is [10, 1, 10, 1, 1] We would start three threads, + # one to sleep for 10, one to sleep for 1, and the last to + # sleep for 10. We would block starting the fourth thread. At + # time 1, we would finish the second thread and start another + # one for time 1. At time 2, we would finish that one and + # start the last thread, and then begin executing get() on the first + # thread. + + # Because it's spawn that blocks, this is *also* equivalent to what + # imap would do. + + # The one remaining difference is that imap runs in its own + # greenlet, potentially changing the way the event loop runs. + # That's easy enough to do. + + g = Greenlet.spawn(self.__map, func, iterable) + return g.get() def map_cb(self, func, iterable, callback=None): result = self.map(func, iterable) @@ -463,10 +330,14 @@ class Group(GroupMappingMixin): def add(self, greenlet): """ - Begin tracking the greenlet. + Begin tracking the *greenlet*. If this group is :meth:`full`, then this method may block until it is possible to track the greenlet. + + Typically the *greenlet* should **not** be started when + it is added because if this object blocks in this method, + then the *greenlet* may run to completion before it is tracked. """ try: rawlink = greenlet.rawlink @@ -497,13 +368,13 @@ class Group(GroupMappingMixin): def start(self, greenlet): """ - Start the un-started *greenlet* and add it to the collection of greenlets - this group is monitoring. + Add the **unstarted** *greenlet* to the collection of greenlets + this group is monitoring, and then start it. """ self.add(greenlet) greenlet.start() - def spawn(self, *args, **kwargs): + def spawn(self, *args, **kwargs): # pylint:disable=arguments-differ """ Begin a new greenlet with the given arguments (which are passed to the greenlet constructor) and add it to the collection of greenlets @@ -632,18 +503,12 @@ class Group(GroupMappingMixin): return self.full() -class Failure(object): - __slots__ = ['exc', '_raise_exception'] - - def __init__(self, exc, raise_exception=None): - self.exc = exc - self._raise_exception = raise_exception - def raise_exc(self): - if self._raise_exception: - self._raise_exception() - else: - raise self.exc +class PoolFull(QueueFull): + """ + Raised when a Pool is full and an attempt was made to + add a new greenlet to it in non-blocking mode. + """ class Pool(Group): @@ -700,8 +565,10 @@ class Pool(Group): def full(self): """ - Return a boolean indicating whether this pool has any room for - members. (True if it does, False if it doesn't.) + Return a boolean indicating whether this pool is full, e.g. if + :meth:`add` would block. + + :return: False if there is room for new members, True if there isn't. """ return self.free_count() <= 0 @@ -714,13 +581,54 @@ class Pool(Group): return 1 return max(0, self.size - len(self)) - def add(self, greenlet): + def start(self, greenlet, *args, **kwargs): # pylint:disable=arguments-differ + """ + start(greenlet, blocking=True, timeout=None) -> None + + Add the **unstarted** *greenlet* to the collection of greenlets + this group is monitoring and then start it. + + Parameters are as for :meth:`add`. """ - Begin tracking the given greenlet, blocking until space is available. + self.add(greenlet, *args, **kwargs) + greenlet.start() - .. seealso:: :meth:`Group.add` + def add(self, greenlet, blocking=True, timeout=None): # pylint:disable=arguments-differ """ - self._semaphore.acquire() + Begin tracking the given **unstarted** greenlet, possibly blocking + until space is available. + + Usually you should call :meth:`start` to track and start the greenlet + instead of using this lower-level method, or :meth:`spawn` to + also create the greenlet. + + :keyword bool blocking: If True (the default), this function + will block until the pool has space or a timeout occurs. If + False, this function will immediately raise a Timeout if the + pool is currently full. + :keyword float timeout: The maximum number of seconds this + method will block, if ``blocking`` is True. (Ignored if + ``blocking`` is False.) + :raises PoolFull: if either ``blocking`` is False and the pool + was full, or if ``blocking`` is True and ``timeout`` was + exceeded. + + .. caution:: If the *greenlet* has already been started and + *blocking* is true, then the greenlet may run to completion + while the current greenlet blocks waiting to track it. This would + enable higher concurrency than desired. + + .. seealso:: :meth:`Group.add` + + .. versionchanged:: 1.3.0 Added the ``blocking`` and + ``timeout`` parameters. + """ + if not self._semaphore.acquire(blocking=blocking, timeout=timeout): + # We failed to acquire the semaphore. + # If blocking was True, then there was a timeout. If blocking was + # False, then there was no capacity. Either way, raise PoolFull. + raise PoolFull() + try: Group.add(self, greenlet) except: |