aboutsummaryrefslogtreecommitdiffstats
path: root/python/gevent/pool.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/gevent/pool.py')
-rw-r--r--python/gevent/pool.py759
1 files changed, 759 insertions, 0 deletions
diff --git a/python/gevent/pool.py b/python/gevent/pool.py
new file mode 100644
index 0000000..d0c5cbb
--- /dev/null
+++ b/python/gevent/pool.py
@@ -0,0 +1,759 @@
+# Copyright (c) 2009-2011 Denis Bilenko. See LICENSE for details.
+"""
+Managing greenlets in a group.
+
+The :class:`Group` class in this module abstracts a group of running
+greenlets. When a greenlet dies, it's automatically removed from the
+group. All running greenlets in a group can be waited on with
+:meth:`Group.join`, or all running greenlets can be killed with
+:meth:`Group.kill`.
+
+The :class:`Pool` class, which is a subclass of :class:`Group`,
+provides a way to limit concurrency: its :meth:`spawn <Pool.spawn>`
+method blocks if the number of greenlets in the pool has already
+reached the limit, until there is a free slot.
+"""
+
+from bisect import insort_right
+try:
+ from itertools import izip
+except ImportError:
+ # Python 3
+ izip = zip
+
+from gevent.hub import GreenletExit, getcurrent, kill as _kill
+from gevent.greenlet import joinall, Greenlet
+from gevent.timeout import Timeout
+from gevent.event import Event
+from gevent.lock import Semaphore, DummySemaphore
+
+__all__ = ['Group', 'Pool']
+
+
+class IMapUnordered(Greenlet):
+ """
+ At iterator of map results.
+ """
+
+ _zipped = False
+
+ def __init__(self, func, iterable, spawn=None, maxsize=None, _zipped=False):
+ """
+ An iterator that.
+
+ :keyword int maxsize: If given and not-None, specifies the maximum number of
+ finished results that will be allowed to accumulated awaiting the reader;
+ more than that number of results will cause map function greenlets to begin
+ to block. This is most useful is there is a great disparity in the speed of
+ the mapping code and the consumer and the results consume a great deal of resources.
+ Using a bound is more computationally expensive than not using a bound.
+
+ .. versionchanged:: 1.1b3
+ Added the *maxsize* parameter.
+ """
+ from gevent.queue import Queue
+ Greenlet.__init__(self)
+ if spawn is not None:
+ self.spawn = spawn
+ if _zipped:
+ self._zipped = _zipped
+ self.func = func
+ self.iterable = iterable
+ self.queue = Queue()
+ if maxsize:
+ # Bounding the queue is not enough if we want to keep from
+ # accumulating objects; the result value will be around as
+ # the greenlet's result, blocked on self.queue.put(), and
+ # we'll go on to spawn another greenlet, which in turn can
+ # create the result. So we need a semaphore to prevent a
+ # greenlet from exiting while the queue is full so that we
+ # don't spawn the next greenlet (assuming that self.spawn
+ # is of course bounded). (Alternatively we could have the
+ # greenlet itself do the insert into the pool, but that
+ # takes some rework).
+ #
+ # Given the use of a semaphore at this level, sizing the queue becomes
+ # redundant, and that lets us avoid having to use self.link() instead
+ # of self.rawlink() to avoid having blocking methods called in the
+ # hub greenlet.
+ factory = Semaphore
+ else:
+ factory = DummySemaphore
+ self._result_semaphore = factory(maxsize)
+
+ self.count = 0
+ self.finished = False
+ # If the queue size is unbounded, then we want to call all
+ # the links (_on_finish and _on_result) directly in the hub greenlet
+ # for efficiency. However, if the queue is bounded, we can't do that if
+ # the queue might block (because if there's no waiter the hub can switch to,
+ # the queue simply raises Full). Therefore, in that case, we use
+ # the safer, somewhat-slower (because it spawns a greenlet) link() methods.
+ # This means that _on_finish and _on_result can be called and interleaved in any order
+ # if the call to self.queue.put() blocks..
+ # Note that right now we're not bounding the queue, instead using a semaphore.
+ self.rawlink(self._on_finish)
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ self._result_semaphore.release()
+ value = self._inext()
+ if isinstance(value, Failure):
+ raise value.exc
+ return value
+ __next__ = next
+
+ def _inext(self):
+ return self.queue.get()
+
+ def _ispawn(self, func, item):
+ self._result_semaphore.acquire()
+ self.count += 1
+ g = self.spawn(func, item) if not self._zipped else self.spawn(func, *item)
+ g.rawlink(self._on_result)
+ return g
+
+ def _run(self): # pylint:disable=method-hidden
+ try:
+ func = self.func
+ for item in self.iterable:
+ self._ispawn(func, item)
+ finally:
+ self.__dict__.pop('spawn', None)
+ self.__dict__.pop('func', None)
+ self.__dict__.pop('iterable', None)
+
+ def _on_result(self, greenlet):
+ # This method can either be called in the hub greenlet (if the
+ # queue is unbounded) or its own greenlet. If it's called in
+ # its own greenlet, the calls to put() may block and switch
+ # greenlets, which in turn could mutate our state. So any
+ # state on this object that we need to look at, notably
+ # self.count, we need to capture or mutate *before* we put.
+ # (Note that right now we're not bounding the queue, but we may
+ # choose to do so in the future so this implementation will be left in case.)
+ self.count -= 1
+ count = self.count
+ finished = self.finished
+ ready = self.ready()
+ put_finished = False
+
+ if ready and count <= 0 and not finished:
+ finished = self.finished = True
+ put_finished = True
+
+ if greenlet.successful():
+ self.queue.put(self._iqueue_value_for_success(greenlet))
+ else:
+ self.queue.put(self._iqueue_value_for_failure(greenlet))
+
+ if put_finished:
+ self.queue.put(self._iqueue_value_for_finished())
+
+ def _on_finish(self, _self):
+ if self.finished:
+ return
+
+ if not self.successful():
+ self.finished = True
+ self.queue.put(self._iqueue_value_for_self_failure())
+ return
+
+ if self.count <= 0:
+ self.finished = True
+ self.queue.put(self._iqueue_value_for_finished())
+
+ def _iqueue_value_for_success(self, greenlet):
+ return greenlet.value
+
+ def _iqueue_value_for_failure(self, greenlet):
+ return Failure(greenlet.exception, getattr(greenlet, '_raise_exception'))
+
+ def _iqueue_value_for_finished(self):
+ return Failure(StopIteration)
+
+ def _iqueue_value_for_self_failure(self):
+ return Failure(self.exception, self._raise_exception)
+
+
+class IMap(IMapUnordered):
+ # A specialization of IMapUnordered that returns items
+ # in the order in which they were generated, not
+ # the order in which they finish.
+ # We do this by storing tuples (order, value) in the queue
+ # not just value.
+
+ def __init__(self, *args, **kwargs):
+ self.waiting = [] # QQQ maybe deque will work faster there?
+ self.index = 0
+ self.maxindex = -1
+ IMapUnordered.__init__(self, *args, **kwargs)
+
+ def _inext(self):
+ while True:
+ if self.waiting and self.waiting[0][0] <= self.index:
+ _, value = self.waiting.pop(0)
+ else:
+ index, value = self.queue.get()
+ if index > self.index:
+ insort_right(self.waiting, (index, value))
+ continue
+ self.index += 1
+ return value
+
+ def _ispawn(self, func, item):
+ g = IMapUnordered._ispawn(self, func, item)
+ self.maxindex += 1
+ g.index = self.maxindex
+ return g
+
+ def _iqueue_value_for_success(self, greenlet):
+ return (greenlet.index, IMapUnordered._iqueue_value_for_success(self, greenlet))
+
+ def _iqueue_value_for_failure(self, greenlet):
+ return (greenlet.index, IMapUnordered._iqueue_value_for_failure(self, greenlet))
+
+ def _iqueue_value_for_finished(self):
+ self.maxindex += 1
+ return (self.maxindex, IMapUnordered._iqueue_value_for_finished(self))
+
+ def _iqueue_value_for_self_failure(self):
+ self.maxindex += 1
+ return (self.maxindex, IMapUnordered._iqueue_value_for_self_failure(self))
+
+
+class GroupMappingMixin(object):
+ # Internal, non-public API class.
+ # Provides mixin methods for implementing mapping pools. Subclasses must define:
+
+ # - self.spawn(func, *args, **kwargs): a function that runs `func` with `args`
+ # and `awargs`, potentially asynchronously. Return a value with a `get` method that
+ # blocks until the results of func are available, and a `link` method.
+
+ # - self._apply_immediately(): should the function passed to apply be called immediately,
+ # synchronously?
+
+ # - self._apply_async_use_greenlet(): Should apply_async directly call
+ # Greenlet.spawn(), bypassing self.spawn? Return true when self.spawn would block
+
+ # - self._apply_async_cb_spawn(callback, result): Run the given callback function, possiblly
+ # asynchronously, possibly synchronously.
+
+ def apply_cb(self, func, args=None, kwds=None, callback=None):
+ """
+ :meth:`apply` the given *func(\\*args, \\*\\*kwds)*, and, if a *callback* is given, run it with the
+ results of *func* (unless an exception was raised.)
+
+ The *callback* may be called synchronously or asynchronously. If called
+ asynchronously, it will not be tracked by this group. (:class:`Group` and :class:`Pool`
+ call it asynchronously in a new greenlet; :class:`~gevent.threadpool.ThreadPool` calls
+ it synchronously in the current greenlet.)
+ """
+ result = self.apply(func, args, kwds)
+ if callback is not None:
+ self._apply_async_cb_spawn(callback, result)
+ return result
+
+ def apply_async(self, func, args=None, kwds=None, callback=None):
+ """
+ A variant of the :meth:`apply` method which returns a :class:`~.Greenlet` object.
+
+ When the returned greenlet gets to run, it *will* call :meth:`apply`,
+ passing in *func*, *args* and *kwds*.
+
+ If *callback* is specified, then it should be a callable which
+ accepts a single argument. When the result becomes ready
+ callback is applied to it (unless the call failed).
+
+ This method will never block, even if this group is full (that is,
+ even if :meth:`spawn` would block, this method will not).
+
+ .. caution:: The returned greenlet may or may not be tracked
+ as part of this group, so :meth:`joining <join>` this group is
+ not a reliable way to wait for the results to be available or
+ for the returned greenlet to run; instead, join the returned
+ greenlet.
+
+ .. tip:: Because :class:`~.ThreadPool` objects do not track greenlets, the returned
+ greenlet will never be a part of it. To reduce overhead and improve performance,
+ :class:`Group` and :class:`Pool` may choose to track the returned
+ greenlet. These are implementation details that may change.
+ """
+ if args is None:
+ args = ()
+ if kwds is None:
+ kwds = {}
+ if self._apply_async_use_greenlet():
+ # cannot call self.spawn() directly because it will block
+ # XXX: This is always the case for ThreadPool, but for Group/Pool
+ # of greenlets, this is only the case when they are full...hence
+ # the weasely language about "may or may not be tracked". Should we make
+ # Group/Pool always return true as well so it's never tracked by any
+ # implementation? That would simplify that logic, but could increase
+ # the total number of greenlets in the system and add a layer of
+ # overhead for the simple cases when the pool isn't full.
+ return Greenlet.spawn(self.apply_cb, func, args, kwds, callback)
+
+ greenlet = self.spawn(func, *args, **kwds)
+ if callback is not None:
+ greenlet.link(pass_value(callback))
+ return greenlet
+
+ def apply(self, func, args=None, kwds=None):
+ """
+ Rough quivalent of the :func:`apply()` builtin function blocking until
+ the result is ready and returning it.
+
+ The ``func`` will *usually*, but not *always*, be run in a way
+ that allows the current greenlet to switch out (for example,
+ in a new greenlet or thread, depending on implementation). But
+ if the current greenlet or thread is already one that was
+ spawned by this pool, the pool may choose to immediately run
+ the `func` synchronously.
+
+ Any exception ``func`` raises will be propagated to the caller of ``apply`` (that is,
+ this method will raise the exception that ``func`` raised).
+ """
+ if args is None:
+ args = ()
+ if kwds is None:
+ kwds = {}
+ if self._apply_immediately():
+ return func(*args, **kwds)
+ return self.spawn(func, *args, **kwds).get()
+
+ def map(self, func, iterable):
+ """Return a list made by applying the *func* to each element of
+ the iterable.
+
+ .. seealso:: :meth:`imap`
+ """
+ return list(self.imap(func, iterable))
+
+ def map_cb(self, func, iterable, callback=None):
+ result = self.map(func, iterable)
+ if callback is not None:
+ callback(result)
+ return result
+
+ def map_async(self, func, iterable, callback=None):
+ """
+ A variant of the map() method which returns a Greenlet object that is executing
+ the map function.
+
+ If callback is specified then it should be a callable which accepts a
+ single argument.
+ """
+ return Greenlet.spawn(self.map_cb, func, iterable, callback)
+
+ def __imap(self, cls, func, *iterables, **kwargs):
+ # Python 2 doesn't support the syntax that lets us mix varargs and
+ # a named kwarg, so we have to unpack manually
+ maxsize = kwargs.pop('maxsize', None)
+ if kwargs:
+ raise TypeError("Unsupported keyword arguments")
+ return cls.spawn(func, izip(*iterables), spawn=self.spawn,
+ _zipped=True, maxsize=maxsize)
+
+ def imap(self, func, *iterables, **kwargs):
+ """
+ imap(func, *iterables, maxsize=None) -> iterable
+
+ An equivalent of :func:`itertools.imap`, operating in parallel.
+ The *func* is applied to each element yielded from each
+ iterable in *iterables* in turn, collecting the result.
+
+ If this object has a bound on the number of active greenlets it can
+ contain (such as :class:`Pool`), then at most that number of tasks will operate
+ in parallel.
+
+ :keyword int maxsize: If given and not-None, specifies the maximum number of
+ finished results that will be allowed to accumulate awaiting the reader;
+ more than that number of results will cause map function greenlets to begin
+ to block. This is most useful if there is a great disparity in the speed of
+ the mapping code and the consumer and the results consume a great deal of resources.
+
+ .. note:: This is separate from any bound on the number of active parallel
+ tasks, though they may have some interaction (for example, limiting the
+ number of parallel tasks to the smallest bound).
+
+ .. note:: Using a bound is slightly more computationally expensive than not using a bound.
+
+ .. tip:: The :meth:`imap_unordered` method makes much better
+ use of this parameter. Some additional, unspecified,
+ number of objects may be required to be kept in memory
+ to maintain order by this function.
+
+ :return: An iterable object.
+
+ .. versionchanged:: 1.1b3
+ Added the *maxsize* keyword parameter.
+ .. versionchanged:: 1.1a1
+ Accept multiple *iterables* to iterate in parallel.
+ """
+ return self.__imap(IMap, func, *iterables, **kwargs)
+
+ def imap_unordered(self, func, *iterables, **kwargs):
+ """
+ imap_unordered(func, *iterables, maxsize=None) -> iterable
+
+ The same as :meth:`imap` except that the ordering of the results
+ from the returned iterator should be considered in arbitrary
+ order.
+
+ This is lighter weight than :meth:`imap` and should be preferred if order
+ doesn't matter.
+
+ .. seealso:: :meth:`imap` for more details.
+ """
+ return self.__imap(IMapUnordered, func, *iterables, **kwargs)
+
+
+class Group(GroupMappingMixin):
+ """
+ Maintain a group of greenlets that are still running, without
+ limiting their number.
+
+ Links to each item and removes it upon notification.
+
+ Groups can be iterated to discover what greenlets they are tracking,
+ they can be tested to see if they contain a greenlet, and they know the
+ number (len) of greenlets they are tracking. If they are not tracking any
+ greenlets, they are False in a boolean context.
+ """
+
+ #: The type of Greenlet object we will :meth:`spawn`. This can be changed
+ #: on an instance or in a subclass.
+ greenlet_class = Greenlet
+
+ def __init__(self, *args):
+ assert len(args) <= 1, args
+ self.greenlets = set(*args)
+ if args:
+ for greenlet in args[0]:
+ greenlet.rawlink(self._discard)
+ # each item we kill we place in dying, to avoid killing the same greenlet twice
+ self.dying = set()
+ self._empty_event = Event()
+ self._empty_event.set()
+
+ def __repr__(self):
+ return '<%s at 0x%x %s>' % (self.__class__.__name__, id(self), self.greenlets)
+
+ def __len__(self):
+ """
+ Answer how many greenlets we are tracking. Note that if we are empty,
+ we are False in a boolean context.
+ """
+ return len(self.greenlets)
+
+ def __contains__(self, item):
+ """
+ Answer if we are tracking the given greenlet.
+ """
+ return item in self.greenlets
+
+ def __iter__(self):
+ """
+ Iterate across all the greenlets we are tracking, in no particular order.
+ """
+ return iter(self.greenlets)
+
+ def add(self, greenlet):
+ """
+ Begin tracking the greenlet.
+
+ If this group is :meth:`full`, then this method may block
+ until it is possible to track the greenlet.
+ """
+ try:
+ rawlink = greenlet.rawlink
+ except AttributeError:
+ pass # non-Greenlet greenlet, like MAIN
+ else:
+ rawlink(self._discard)
+ self.greenlets.add(greenlet)
+ self._empty_event.clear()
+
+ def _discard(self, greenlet):
+ self.greenlets.discard(greenlet)
+ self.dying.discard(greenlet)
+ if not self.greenlets:
+ self._empty_event.set()
+
+ def discard(self, greenlet):
+ """
+ Stop tracking the greenlet.
+ """
+ self._discard(greenlet)
+ try:
+ unlink = greenlet.unlink
+ except AttributeError:
+ pass # non-Greenlet greenlet, like MAIN
+ else:
+ unlink(self._discard)
+
+ def start(self, greenlet):
+ """
+ Start the un-started *greenlet* and add it to the collection of greenlets
+ this group is monitoring.
+ """
+ self.add(greenlet)
+ greenlet.start()
+
+ def spawn(self, *args, **kwargs):
+ """
+ Begin a new greenlet with the given arguments (which are passed
+ to the greenlet constructor) and add it to the collection of greenlets
+ this group is monitoring.
+
+ :return: The newly started greenlet.
+ """
+ greenlet = self.greenlet_class(*args, **kwargs)
+ self.start(greenlet)
+ return greenlet
+
+# def close(self):
+# """Prevents any more tasks from being submitted to the pool"""
+# self.add = RaiseException("This %s has been closed" % self.__class__.__name__)
+
+ def join(self, timeout=None, raise_error=False):
+ """
+ Wait for this group to become empty *at least once*.
+
+ If there are no greenlets in the group, returns immediately.
+
+ .. note:: By the time the waiting code (the caller of this
+ method) regains control, a greenlet may have been added to
+ this group, and so this object may no longer be empty. (That
+ is, ``group.join(); assert len(group) == 0`` is not
+ guaranteed to hold.) This method only guarantees that the group
+ reached a ``len`` of 0 at some point.
+
+ :keyword bool raise_error: If True (*not* the default), if any
+ greenlet that finished while the join was in progress raised
+ an exception, that exception will be raised to the caller of
+ this method. If multiple greenlets raised exceptions, which
+ one gets re-raised is not determined. Only greenlets currently
+ in the group when this method is called are guaranteed to
+ be checked for exceptions.
+
+ :return bool: A value indicating whether this group became empty.
+ If the timeout is specified and the group did not become empty
+ during that timeout, then this will be a false value. Otherwise
+ it will be a true value.
+
+ .. versionchanged:: 1.2a1
+ Add the return value.
+ """
+ greenlets = list(self.greenlets) if raise_error else ()
+ result = self._empty_event.wait(timeout=timeout)
+
+ for greenlet in greenlets:
+ if greenlet.exception is not None:
+ if hasattr(greenlet, '_raise_exception'):
+ greenlet._raise_exception()
+ raise greenlet.exception
+
+ return result
+
+ def kill(self, exception=GreenletExit, block=True, timeout=None):
+ """
+ Kill all greenlets being tracked by this group.
+ """
+ timer = Timeout._start_new_or_dummy(timeout)
+ try:
+ while self.greenlets:
+ for greenlet in list(self.greenlets):
+ if greenlet in self.dying:
+ continue
+ try:
+ kill = greenlet.kill
+ except AttributeError:
+ _kill(greenlet, exception)
+ else:
+ kill(exception, block=False)
+ self.dying.add(greenlet)
+ if not block:
+ break
+ joinall(self.greenlets)
+ except Timeout as ex:
+ if ex is not timer:
+ raise
+ finally:
+ timer.cancel()
+
+ def killone(self, greenlet, exception=GreenletExit, block=True, timeout=None):
+ """
+ If the given *greenlet* is running and being tracked by this group,
+ kill it.
+ """
+ if greenlet not in self.dying and greenlet in self.greenlets:
+ greenlet.kill(exception, block=False)
+ self.dying.add(greenlet)
+ if block:
+ greenlet.join(timeout)
+
+ def full(self):
+ """
+ Return a value indicating whether this group can track more greenlets.
+
+ In this implementation, because there are no limits on the number of
+ tracked greenlets, this will always return a ``False`` value.
+ """
+ return False
+
+ def wait_available(self, timeout=None):
+ """
+ Block until it is possible to :meth:`spawn` a new greenlet.
+
+ In this implementation, because there are no limits on the number
+ of tracked greenlets, this will always return immediately.
+ """
+ pass
+
+ # MappingMixin methods
+
+ def _apply_immediately(self):
+ # If apply() is called from one of our own
+ # worker greenlets, don't spawn a new one---if we're full, that
+ # could deadlock.
+ return getcurrent() in self
+
+ def _apply_async_cb_spawn(self, callback, result):
+ Greenlet.spawn(callback, result)
+
+ def _apply_async_use_greenlet(self):
+ # cannot call self.spawn() because it will block, so
+ # use a fresh, untracked greenlet that when run will
+ # (indirectly) call self.spawn() for us.
+ return self.full()
+
+
+class Failure(object):
+ __slots__ = ['exc', '_raise_exception']
+
+ def __init__(self, exc, raise_exception=None):
+ self.exc = exc
+ self._raise_exception = raise_exception
+
+ def raise_exc(self):
+ if self._raise_exception:
+ self._raise_exception()
+ else:
+ raise self.exc
+
+
+class Pool(Group):
+
+ def __init__(self, size=None, greenlet_class=None):
+ """
+ Create a new pool.
+
+ A pool is like a group, but the maximum number of members
+ is governed by the *size* parameter.
+
+ :keyword int size: If given, this non-negative integer is the
+ maximum count of active greenlets that will be allowed in
+ this pool. A few values have special significance:
+
+ * ``None`` (the default) places no limit on the number of
+ greenlets. This is useful when you need to track, but not limit,
+ greenlets, as with :class:`gevent.pywsgi.WSGIServer`. A :class:`Group`
+ may be a more efficient way to achieve the same effect.
+ * ``0`` creates a pool that can never have any active greenlets. Attempting
+ to spawn in this pool will block forever. This is only useful
+ if an application uses :meth:`wait_available` with a timeout and checks
+ :meth:`free_count` before attempting to spawn.
+ """
+ if size is not None and size < 0:
+ raise ValueError('size must not be negative: %r' % (size, ))
+ Group.__init__(self)
+ self.size = size
+ if greenlet_class is not None:
+ self.greenlet_class = greenlet_class
+ if size is None:
+ factory = DummySemaphore
+ else:
+ factory = Semaphore
+ self._semaphore = factory(size)
+
+ def wait_available(self, timeout=None):
+ """
+ Wait until it's possible to spawn a greenlet in this pool.
+
+ :param float timeout: If given, only wait the specified number
+ of seconds.
+
+ .. warning:: If the pool was initialized with a size of 0, this
+ method will block forever unless a timeout is given.
+
+ :return: A number indicating how many new greenlets can be put into
+ the pool without blocking.
+
+ .. versionchanged:: 1.1a3
+ Added the ``timeout`` parameter.
+ """
+ return self._semaphore.wait(timeout=timeout)
+
+ def full(self):
+ """
+ Return a boolean indicating whether this pool has any room for
+ members. (True if it does, False if it doesn't.)
+ """
+ return self.free_count() <= 0
+
+ def free_count(self):
+ """
+ Return a number indicating *approximately* how many more members
+ can be added to this pool.
+ """
+ if self.size is None:
+ return 1
+ return max(0, self.size - len(self))
+
+ def add(self, greenlet):
+ """
+ Begin tracking the given greenlet, blocking until space is available.
+
+ .. seealso:: :meth:`Group.add`
+ """
+ self._semaphore.acquire()
+ try:
+ Group.add(self, greenlet)
+ except:
+ self._semaphore.release()
+ raise
+
+ def _discard(self, greenlet):
+ Group._discard(self, greenlet)
+ self._semaphore.release()
+
+
+class pass_value(object):
+ __slots__ = ['callback']
+
+ def __init__(self, callback):
+ self.callback = callback
+
+ def __call__(self, source):
+ if source.successful():
+ self.callback(source.value)
+
+ def __hash__(self):
+ return hash(self.callback)
+
+ def __eq__(self, other):
+ return self.callback == getattr(other, 'callback', other)
+
+ def __str__(self):
+ return str(self.callback)
+
+ def __repr__(self):
+ return repr(self.callback)
+
+ def __getattr__(self, item):
+ assert item != 'callback'
+ return getattr(self.callback, item)