aboutsummaryrefslogtreecommitdiffstats
path: root/python/gevent/pool.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/gevent/pool.py')
-rw-r--r--python/gevent/pool.py667
1 files changed, 0 insertions, 667 deletions
diff --git a/python/gevent/pool.py b/python/gevent/pool.py
deleted file mode 100644
index 48e6a7a..0000000
--- a/python/gevent/pool.py
+++ /dev/null
@@ -1,667 +0,0 @@
-# Copyright (c) 2009-2011 Denis Bilenko. See LICENSE for details.
-"""
-Managing greenlets in a group.
-
-The :class:`Group` class in this module abstracts a group of running
-greenlets. When a greenlet dies, it's automatically removed from the
-group. All running greenlets in a group can be waited on with
-:meth:`Group.join`, or all running greenlets can be killed with
-:meth:`Group.kill`.
-
-The :class:`Pool` class, which is a subclass of :class:`Group`,
-provides a way to limit concurrency: its :meth:`spawn <Pool.spawn>`
-method blocks if the number of greenlets in the pool has already
-reached the limit, until there is a free slot.
-"""
-from __future__ import print_function, absolute_import, division
-
-
-from gevent.hub import GreenletExit, getcurrent, kill as _kill
-from gevent.greenlet import joinall, Greenlet
-from gevent.queue import Full as QueueFull
-from gevent.timeout import Timeout
-from gevent.event import Event
-from gevent.lock import Semaphore, DummySemaphore
-
-from gevent._compat import izip
-from gevent._imap import IMap
-from gevent._imap import IMapUnordered
-
-__all__ = [
- 'Group',
- 'Pool',
- 'PoolFull',
-]
-
-
-
-
-class GroupMappingMixin(object):
- # Internal, non-public API class.
- # Provides mixin methods for implementing mapping pools. Subclasses must define:
-
- def spawn(self, func, *args, **kwargs):
- """
- A function that runs *func* with *args* and *kwargs*, potentially
- asynchronously. Return a value with a ``get`` method that blocks
- until the results of func are available, and a ``rawlink`` method
- that calls a callback when the results are available.
-
- If this object has an upper bound on how many asyncronously executing
- tasks can exist, this method may block until a slot becomes available.
- """
- raise NotImplementedError()
-
- def _apply_immediately(self):
- """
- should the function passed to apply be called immediately,
- synchronously?
- """
- raise NotImplementedError()
-
- def _apply_async_use_greenlet(self):
- """
- Should apply_async directly call Greenlet.spawn(), bypassing
- `spawn`?
-
- Return true when self.spawn would block.
- """
- raise NotImplementedError()
-
- def _apply_async_cb_spawn(self, callback, result):
- """
- Run the given callback function, possibly
- asynchronously, possibly synchronously.
- """
- raise NotImplementedError()
-
- def apply_cb(self, func, args=None, kwds=None, callback=None):
- """
- :meth:`apply` the given *func(\\*args, \\*\\*kwds)*, and, if a *callback* is given, run it with the
- results of *func* (unless an exception was raised.)
-
- The *callback* may be called synchronously or asynchronously. If called
- asynchronously, it will not be tracked by this group. (:class:`Group` and :class:`Pool`
- call it asynchronously in a new greenlet; :class:`~gevent.threadpool.ThreadPool` calls
- it synchronously in the current greenlet.)
- """
- result = self.apply(func, args, kwds)
- if callback is not None:
- self._apply_async_cb_spawn(callback, result)
- return result
-
- def apply_async(self, func, args=None, kwds=None, callback=None):
- """
- A variant of the :meth:`apply` method which returns a :class:`~.Greenlet` object.
-
- When the returned greenlet gets to run, it *will* call :meth:`apply`,
- passing in *func*, *args* and *kwds*.
-
- If *callback* is specified, then it should be a callable which
- accepts a single argument. When the result becomes ready
- callback is applied to it (unless the call failed).
-
- This method will never block, even if this group is full (that is,
- even if :meth:`spawn` would block, this method will not).
-
- .. caution:: The returned greenlet may or may not be tracked
- as part of this group, so :meth:`joining <join>` this group is
- not a reliable way to wait for the results to be available or
- for the returned greenlet to run; instead, join the returned
- greenlet.
-
- .. tip:: Because :class:`~.ThreadPool` objects do not track greenlets, the returned
- greenlet will never be a part of it. To reduce overhead and improve performance,
- :class:`Group` and :class:`Pool` may choose to track the returned
- greenlet. These are implementation details that may change.
- """
- if args is None:
- args = ()
- if kwds is None:
- kwds = {}
- if self._apply_async_use_greenlet():
- # cannot call self.spawn() directly because it will block
- # XXX: This is always the case for ThreadPool, but for Group/Pool
- # of greenlets, this is only the case when they are full...hence
- # the weasely language about "may or may not be tracked". Should we make
- # Group/Pool always return true as well so it's never tracked by any
- # implementation? That would simplify that logic, but could increase
- # the total number of greenlets in the system and add a layer of
- # overhead for the simple cases when the pool isn't full.
- return Greenlet.spawn(self.apply_cb, func, args, kwds, callback)
-
- greenlet = self.spawn(func, *args, **kwds)
- if callback is not None:
- greenlet.link(pass_value(callback))
- return greenlet
-
- def apply(self, func, args=None, kwds=None):
- """
- Rough quivalent of the :func:`apply()` builtin function blocking until
- the result is ready and returning it.
-
- The ``func`` will *usually*, but not *always*, be run in a way
- that allows the current greenlet to switch out (for example,
- in a new greenlet or thread, depending on implementation). But
- if the current greenlet or thread is already one that was
- spawned by this pool, the pool may choose to immediately run
- the `func` synchronously.
-
- Any exception ``func`` raises will be propagated to the caller of ``apply`` (that is,
- this method will raise the exception that ``func`` raised).
- """
- if args is None:
- args = ()
- if kwds is None:
- kwds = {}
- if self._apply_immediately():
- return func(*args, **kwds)
- return self.spawn(func, *args, **kwds).get()
-
- def __map(self, func, iterable):
- return [g.get() for g in
- [self.spawn(func, i) for i in iterable]]
-
- def map(self, func, iterable):
- """Return a list made by applying the *func* to each element of
- the iterable.
-
- .. seealso:: :meth:`imap`
- """
- # We can't return until they're all done and in order. It
- # wouldn't seem to much matter what order we wait on them in,
- # so the simple, fast (50% faster than imap) solution would be:
-
- # return [g.get() for g in
- # [self.spawn(func, i) for i in iterable]]
-
- # If the pool size is unlimited (or more than the len(iterable)), this
- # is equivalent to imap (spawn() will never block, all of them run concurrently,
- # we call get() in the order the iterable was given).
-
- # Now lets imagine the pool if is limited size. Suppose the
- # func is time.sleep, our pool is limited to 3 threads, and
- # our input is [10, 1, 10, 1, 1] We would start three threads,
- # one to sleep for 10, one to sleep for 1, and the last to
- # sleep for 10. We would block starting the fourth thread. At
- # time 1, we would finish the second thread and start another
- # one for time 1. At time 2, we would finish that one and
- # start the last thread, and then begin executing get() on the first
- # thread.
-
- # Because it's spawn that blocks, this is *also* equivalent to what
- # imap would do.
-
- # The one remaining difference is that imap runs in its own
- # greenlet, potentially changing the way the event loop runs.
- # That's easy enough to do.
-
- g = Greenlet.spawn(self.__map, func, iterable)
- return g.get()
-
- def map_cb(self, func, iterable, callback=None):
- result = self.map(func, iterable)
- if callback is not None:
- callback(result)
- return result
-
- def map_async(self, func, iterable, callback=None):
- """
- A variant of the map() method which returns a Greenlet object that is executing
- the map function.
-
- If callback is specified then it should be a callable which accepts a
- single argument.
- """
- return Greenlet.spawn(self.map_cb, func, iterable, callback)
-
- def __imap(self, cls, func, *iterables, **kwargs):
- # Python 2 doesn't support the syntax that lets us mix varargs and
- # a named kwarg, so we have to unpack manually
- maxsize = kwargs.pop('maxsize', None)
- if kwargs:
- raise TypeError("Unsupported keyword arguments")
- return cls.spawn(func, izip(*iterables), spawn=self.spawn,
- _zipped=True, maxsize=maxsize)
-
- def imap(self, func, *iterables, **kwargs):
- """
- imap(func, *iterables, maxsize=None) -> iterable
-
- An equivalent of :func:`itertools.imap`, operating in parallel.
- The *func* is applied to each element yielded from each
- iterable in *iterables* in turn, collecting the result.
-
- If this object has a bound on the number of active greenlets it can
- contain (such as :class:`Pool`), then at most that number of tasks will operate
- in parallel.
-
- :keyword int maxsize: If given and not-None, specifies the maximum number of
- finished results that will be allowed to accumulate awaiting the reader;
- more than that number of results will cause map function greenlets to begin
- to block. This is most useful if there is a great disparity in the speed of
- the mapping code and the consumer and the results consume a great deal of resources.
-
- .. note:: This is separate from any bound on the number of active parallel
- tasks, though they may have some interaction (for example, limiting the
- number of parallel tasks to the smallest bound).
-
- .. note:: Using a bound is slightly more computationally expensive than not using a bound.
-
- .. tip:: The :meth:`imap_unordered` method makes much better
- use of this parameter. Some additional, unspecified,
- number of objects may be required to be kept in memory
- to maintain order by this function.
-
- :return: An iterable object.
-
- .. versionchanged:: 1.1b3
- Added the *maxsize* keyword parameter.
- .. versionchanged:: 1.1a1
- Accept multiple *iterables* to iterate in parallel.
- """
- return self.__imap(IMap, func, *iterables, **kwargs)
-
- def imap_unordered(self, func, *iterables, **kwargs):
- """
- imap_unordered(func, *iterables, maxsize=None) -> iterable
-
- The same as :meth:`imap` except that the ordering of the results
- from the returned iterator should be considered in arbitrary
- order.
-
- This is lighter weight than :meth:`imap` and should be preferred if order
- doesn't matter.
-
- .. seealso:: :meth:`imap` for more details.
- """
- return self.__imap(IMapUnordered, func, *iterables, **kwargs)
-
-
-class Group(GroupMappingMixin):
- """
- Maintain a group of greenlets that are still running, without
- limiting their number.
-
- Links to each item and removes it upon notification.
-
- Groups can be iterated to discover what greenlets they are tracking,
- they can be tested to see if they contain a greenlet, and they know the
- number (len) of greenlets they are tracking. If they are not tracking any
- greenlets, they are False in a boolean context.
- """
-
- #: The type of Greenlet object we will :meth:`spawn`. This can be changed
- #: on an instance or in a subclass.
- greenlet_class = Greenlet
-
- def __init__(self, *args):
- assert len(args) <= 1, args
- self.greenlets = set(*args)
- if args:
- for greenlet in args[0]:
- greenlet.rawlink(self._discard)
- # each item we kill we place in dying, to avoid killing the same greenlet twice
- self.dying = set()
- self._empty_event = Event()
- self._empty_event.set()
-
- def __repr__(self):
- return '<%s at 0x%x %s>' % (self.__class__.__name__, id(self), self.greenlets)
-
- def __len__(self):
- """
- Answer how many greenlets we are tracking. Note that if we are empty,
- we are False in a boolean context.
- """
- return len(self.greenlets)
-
- def __contains__(self, item):
- """
- Answer if we are tracking the given greenlet.
- """
- return item in self.greenlets
-
- def __iter__(self):
- """
- Iterate across all the greenlets we are tracking, in no particular order.
- """
- return iter(self.greenlets)
-
- def add(self, greenlet):
- """
- Begin tracking the *greenlet*.
-
- If this group is :meth:`full`, then this method may block
- until it is possible to track the greenlet.
-
- Typically the *greenlet* should **not** be started when
- it is added because if this object blocks in this method,
- then the *greenlet* may run to completion before it is tracked.
- """
- try:
- rawlink = greenlet.rawlink
- except AttributeError:
- pass # non-Greenlet greenlet, like MAIN
- else:
- rawlink(self._discard)
- self.greenlets.add(greenlet)
- self._empty_event.clear()
-
- def _discard(self, greenlet):
- self.greenlets.discard(greenlet)
- self.dying.discard(greenlet)
- if not self.greenlets:
- self._empty_event.set()
-
- def discard(self, greenlet):
- """
- Stop tracking the greenlet.
- """
- self._discard(greenlet)
- try:
- unlink = greenlet.unlink
- except AttributeError:
- pass # non-Greenlet greenlet, like MAIN
- else:
- unlink(self._discard)
-
- def start(self, greenlet):
- """
- Add the **unstarted** *greenlet* to the collection of greenlets
- this group is monitoring, and then start it.
- """
- self.add(greenlet)
- greenlet.start()
-
- def spawn(self, *args, **kwargs): # pylint:disable=arguments-differ
- """
- Begin a new greenlet with the given arguments (which are passed
- to the greenlet constructor) and add it to the collection of greenlets
- this group is monitoring.
-
- :return: The newly started greenlet.
- """
- greenlet = self.greenlet_class(*args, **kwargs)
- self.start(greenlet)
- return greenlet
-
-# def close(self):
-# """Prevents any more tasks from being submitted to the pool"""
-# self.add = RaiseException("This %s has been closed" % self.__class__.__name__)
-
- def join(self, timeout=None, raise_error=False):
- """
- Wait for this group to become empty *at least once*.
-
- If there are no greenlets in the group, returns immediately.
-
- .. note:: By the time the waiting code (the caller of this
- method) regains control, a greenlet may have been added to
- this group, and so this object may no longer be empty. (That
- is, ``group.join(); assert len(group) == 0`` is not
- guaranteed to hold.) This method only guarantees that the group
- reached a ``len`` of 0 at some point.
-
- :keyword bool raise_error: If True (*not* the default), if any
- greenlet that finished while the join was in progress raised
- an exception, that exception will be raised to the caller of
- this method. If multiple greenlets raised exceptions, which
- one gets re-raised is not determined. Only greenlets currently
- in the group when this method is called are guaranteed to
- be checked for exceptions.
-
- :return bool: A value indicating whether this group became empty.
- If the timeout is specified and the group did not become empty
- during that timeout, then this will be a false value. Otherwise
- it will be a true value.
-
- .. versionchanged:: 1.2a1
- Add the return value.
- """
- greenlets = list(self.greenlets) if raise_error else ()
- result = self._empty_event.wait(timeout=timeout)
-
- for greenlet in greenlets:
- if greenlet.exception is not None:
- if hasattr(greenlet, '_raise_exception'):
- greenlet._raise_exception()
- raise greenlet.exception
-
- return result
-
- def kill(self, exception=GreenletExit, block=True, timeout=None):
- """
- Kill all greenlets being tracked by this group.
- """
- timer = Timeout._start_new_or_dummy(timeout)
- try:
- while self.greenlets:
- for greenlet in list(self.greenlets):
- if greenlet in self.dying:
- continue
- try:
- kill = greenlet.kill
- except AttributeError:
- _kill(greenlet, exception)
- else:
- kill(exception, block=False)
- self.dying.add(greenlet)
- if not block:
- break
- joinall(self.greenlets)
- except Timeout as ex:
- if ex is not timer:
- raise
- finally:
- timer.cancel()
-
- def killone(self, greenlet, exception=GreenletExit, block=True, timeout=None):
- """
- If the given *greenlet* is running and being tracked by this group,
- kill it.
- """
- if greenlet not in self.dying and greenlet in self.greenlets:
- greenlet.kill(exception, block=False)
- self.dying.add(greenlet)
- if block:
- greenlet.join(timeout)
-
- def full(self):
- """
- Return a value indicating whether this group can track more greenlets.
-
- In this implementation, because there are no limits on the number of
- tracked greenlets, this will always return a ``False`` value.
- """
- return False
-
- def wait_available(self, timeout=None):
- """
- Block until it is possible to :meth:`spawn` a new greenlet.
-
- In this implementation, because there are no limits on the number
- of tracked greenlets, this will always return immediately.
- """
- pass
-
- # MappingMixin methods
-
- def _apply_immediately(self):
- # If apply() is called from one of our own
- # worker greenlets, don't spawn a new one---if we're full, that
- # could deadlock.
- return getcurrent() in self
-
- def _apply_async_cb_spawn(self, callback, result):
- Greenlet.spawn(callback, result)
-
- def _apply_async_use_greenlet(self):
- # cannot call self.spawn() because it will block, so
- # use a fresh, untracked greenlet that when run will
- # (indirectly) call self.spawn() for us.
- return self.full()
-
-
-
-class PoolFull(QueueFull):
- """
- Raised when a Pool is full and an attempt was made to
- add a new greenlet to it in non-blocking mode.
- """
-
-
-class Pool(Group):
-
- def __init__(self, size=None, greenlet_class=None):
- """
- Create a new pool.
-
- A pool is like a group, but the maximum number of members
- is governed by the *size* parameter.
-
- :keyword int size: If given, this non-negative integer is the
- maximum count of active greenlets that will be allowed in
- this pool. A few values have special significance:
-
- * ``None`` (the default) places no limit on the number of
- greenlets. This is useful when you need to track, but not limit,
- greenlets, as with :class:`gevent.pywsgi.WSGIServer`. A :class:`Group`
- may be a more efficient way to achieve the same effect.
- * ``0`` creates a pool that can never have any active greenlets. Attempting
- to spawn in this pool will block forever. This is only useful
- if an application uses :meth:`wait_available` with a timeout and checks
- :meth:`free_count` before attempting to spawn.
- """
- if size is not None and size < 0:
- raise ValueError('size must not be negative: %r' % (size, ))
- Group.__init__(self)
- self.size = size
- if greenlet_class is not None:
- self.greenlet_class = greenlet_class
- if size is None:
- factory = DummySemaphore
- else:
- factory = Semaphore
- self._semaphore = factory(size)
-
- def wait_available(self, timeout=None):
- """
- Wait until it's possible to spawn a greenlet in this pool.
-
- :param float timeout: If given, only wait the specified number
- of seconds.
-
- .. warning:: If the pool was initialized with a size of 0, this
- method will block forever unless a timeout is given.
-
- :return: A number indicating how many new greenlets can be put into
- the pool without blocking.
-
- .. versionchanged:: 1.1a3
- Added the ``timeout`` parameter.
- """
- return self._semaphore.wait(timeout=timeout)
-
- def full(self):
- """
- Return a boolean indicating whether this pool is full, e.g. if
- :meth:`add` would block.
-
- :return: False if there is room for new members, True if there isn't.
- """
- return self.free_count() <= 0
-
- def free_count(self):
- """
- Return a number indicating *approximately* how many more members
- can be added to this pool.
- """
- if self.size is None:
- return 1
- return max(0, self.size - len(self))
-
- def start(self, greenlet, *args, **kwargs): # pylint:disable=arguments-differ
- """
- start(greenlet, blocking=True, timeout=None) -> None
-
- Add the **unstarted** *greenlet* to the collection of greenlets
- this group is monitoring and then start it.
-
- Parameters are as for :meth:`add`.
- """
- self.add(greenlet, *args, **kwargs)
- greenlet.start()
-
- def add(self, greenlet, blocking=True, timeout=None): # pylint:disable=arguments-differ
- """
- Begin tracking the given **unstarted** greenlet, possibly blocking
- until space is available.
-
- Usually you should call :meth:`start` to track and start the greenlet
- instead of using this lower-level method, or :meth:`spawn` to
- also create the greenlet.
-
- :keyword bool blocking: If True (the default), this function
- will block until the pool has space or a timeout occurs. If
- False, this function will immediately raise a Timeout if the
- pool is currently full.
- :keyword float timeout: The maximum number of seconds this
- method will block, if ``blocking`` is True. (Ignored if
- ``blocking`` is False.)
- :raises PoolFull: if either ``blocking`` is False and the pool
- was full, or if ``blocking`` is True and ``timeout`` was
- exceeded.
-
- .. caution:: If the *greenlet* has already been started and
- *blocking* is true, then the greenlet may run to completion
- while the current greenlet blocks waiting to track it. This would
- enable higher concurrency than desired.
-
- .. seealso:: :meth:`Group.add`
-
- .. versionchanged:: 1.3.0 Added the ``blocking`` and
- ``timeout`` parameters.
- """
- if not self._semaphore.acquire(blocking=blocking, timeout=timeout):
- # We failed to acquire the semaphore.
- # If blocking was True, then there was a timeout. If blocking was
- # False, then there was no capacity. Either way, raise PoolFull.
- raise PoolFull()
-
- try:
- Group.add(self, greenlet)
- except:
- self._semaphore.release()
- raise
-
- def _discard(self, greenlet):
- Group._discard(self, greenlet)
- self._semaphore.release()
-
-
-class pass_value(object):
- __slots__ = ['callback']
-
- def __init__(self, callback):
- self.callback = callback
-
- def __call__(self, source):
- if source.successful():
- self.callback(source.value)
-
- def __hash__(self):
- return hash(self.callback)
-
- def __eq__(self, other):
- return self.callback == getattr(other, 'callback', other)
-
- def __str__(self):
- return str(self.callback)
-
- def __repr__(self):
- return repr(self.callback)
-
- def __getattr__(self, item):
- assert item != 'callback'
- return getattr(self.callback, item)