diff options
author | James Taylor <user234683@users.noreply.github.com> | 2018-09-14 19:32:27 -0700 |
---|---|---|
committer | James Taylor <user234683@users.noreply.github.com> | 2018-09-14 19:32:27 -0700 |
commit | 4212164e91ba2f49583cf44ad623a29b36db8f77 (patch) | |
tree | 47aefe3c0162f03e0c823b43873356f69c1cd636 /python/gevent/_imap.py | |
parent | 6ca20ff7010f2bafc7fefcb8cad982be27a8aeae (diff) | |
download | yt-local-4212164e91ba2f49583cf44ad623a29b36db8f77.tar.lz yt-local-4212164e91ba2f49583cf44ad623a29b36db8f77.tar.xz yt-local-4212164e91ba2f49583cf44ad623a29b36db8f77.zip |
Windows: Use 32-bit distribution of python
Diffstat (limited to 'python/gevent/_imap.py')
-rw-r--r-- | python/gevent/_imap.py | 227 |
1 files changed, 227 insertions, 0 deletions
diff --git a/python/gevent/_imap.py b/python/gevent/_imap.py new file mode 100644 index 0000000..e976b67 --- /dev/null +++ b/python/gevent/_imap.py @@ -0,0 +1,227 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2018 gevent +# cython: auto_pickle=False,embedsignature=True,always_allow_keywords=False,infer_types=True + +""" +Iterators across greenlets or AsyncResult objects. + +""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + + +from gevent import _semaphore +from gevent import queue + + +__all__ = [ + 'IMapUnordered', + 'IMap', +] + +locals()['Greenlet'] = __import__('gevent').Greenlet +locals()['Semaphore'] = _semaphore.Semaphore +locals()['UnboundQueue'] = queue.UnboundQueue + + +class Failure(object): + __slots__ = ('exc', 'raise_exception') + + def __init__(self, exc, raise_exception=None): + self.exc = exc + self.raise_exception = raise_exception + + +def _raise_exc(failure): + # For cython. + if failure.raise_exception: + failure.raise_exception() + else: + raise failure.exc + +class IMapUnordered(Greenlet): # pylint:disable=undefined-variable + """ + At iterator of map results. + """ + + def __init__(self, func, iterable, spawn, maxsize=None, _zipped=False): + """ + An iterator that. + + :param callable spawn: The function we use to create new greenlets. + :keyword int maxsize: If given and not-None, specifies the maximum number of + finished results that will be allowed to accumulated awaiting the reader; + more than that number of results will cause map function greenlets to begin + to block. This is most useful is there is a great disparity in the speed of + the mapping code and the consumer and the results consume a great deal of resources. + Using a bound is more computationally expensive than not using a bound. + + .. versionchanged:: 1.1b3 + Added the *maxsize* parameter. + """ + Greenlet.__init__(self) # pylint:disable=undefined-variable + self.spawn = spawn + self._zipped = _zipped + self.func = func + self.iterable = iterable + self.queue = UnboundQueue() # pylint:disable=undefined-variable + + + if maxsize: + # Bounding the queue is not enough if we want to keep from + # accumulating objects; the result value will be around as + # the greenlet's result, blocked on self.queue.put(), and + # we'll go on to spawn another greenlet, which in turn can + # create the result. So we need a semaphore to prevent a + # greenlet from exiting while the queue is full so that we + # don't spawn the next greenlet (assuming that self.spawn + # is of course bounded). (Alternatively we could have the + # greenlet itself do the insert into the pool, but that + # takes some rework). + # + # Given the use of a semaphore at this level, sizing the queue becomes + # redundant, and that lets us avoid having to use self.link() instead + # of self.rawlink() to avoid having blocking methods called in the + # hub greenlet. + self._result_semaphore = Semaphore(maxsize) # pylint:disable=undefined-variable + else: + self._result_semaphore = None + + self._outstanding_tasks = 0 + # The index (zero based) of the maximum number of + # results we will have. + self._max_index = -1 + self.finished = False + + + # We're iterating in a different greenlet than we're running. + def __iter__(self): + return self + + def __next__(self): + if self._result_semaphore is not None: + self._result_semaphore.release() + value = self._inext() + if isinstance(value, Failure): + _raise_exc(value) + return value + + next = __next__ # Py2 + + def _inext(self): + return self.queue.get() + + def _ispawn(self, func, item, item_index): + if self._result_semaphore is not None: + self._result_semaphore.acquire() + self._outstanding_tasks += 1 + g = self.spawn(func, item) if not self._zipped else self.spawn(func, *item) + g._imap_task_index = item_index + g.rawlink(self._on_result) + return g + + def _run(self): # pylint:disable=method-hidden + try: + func = self.func + for item in self.iterable: + self._max_index += 1 + self._ispawn(func, item, self._max_index) + self._on_finish(None) + except BaseException as e: + self._on_finish(e) + raise + finally: + self.spawn = None + self.func = None + self.iterable = None + self._result_semaphore = None + + def _on_result(self, greenlet): + # This method will be called in the hub greenlet (we rawlink) + self._outstanding_tasks -= 1 + count = self._outstanding_tasks + finished = self.finished + ready = self.ready() + put_finished = False + + if ready and count <= 0 and not finished: + finished = self.finished = True + put_finished = True + + if greenlet.successful(): + self.queue.put(self._iqueue_value_for_success(greenlet)) + else: + self.queue.put(self._iqueue_value_for_failure(greenlet)) + + if put_finished: + self.queue.put(self._iqueue_value_for_self_finished()) + + def _on_finish(self, exception): + # Called in this greenlet. + if self.finished: + return + + if exception is not None: + self.finished = True + self.queue.put(self._iqueue_value_for_self_failure(exception)) + return + + if self._outstanding_tasks <= 0: + self.finished = True + self.queue.put(self._iqueue_value_for_self_finished()) + + def _iqueue_value_for_success(self, greenlet): + return greenlet.value + + def _iqueue_value_for_failure(self, greenlet): + return Failure(greenlet.exception, getattr(greenlet, '_raise_exception')) + + def _iqueue_value_for_self_finished(self): + return Failure(StopIteration()) + + def _iqueue_value_for_self_failure(self, exception): + return Failure(exception, self._raise_exception) + + +class IMap(IMapUnordered): + # A specialization of IMapUnordered that returns items + # in the order in which they were generated, not + # the order in which they finish. + + def __init__(self, *args, **kwargs): + # The result dictionary: {index: value} + self._results = {} + + # The index of the result to return next. + self.index = 0 + IMapUnordered.__init__(self, *args, **kwargs) + + def _inext(self): + try: + value = self._results.pop(self.index) + except KeyError: + # Wait for our index to finish. + while 1: + index, value = self.queue.get() + if index == self.index: + break + else: + self._results[index] = value + self.index += 1 + return value + + def _iqueue_value_for_success(self, greenlet): + return (greenlet._imap_task_index, IMapUnordered._iqueue_value_for_success(self, greenlet)) + + def _iqueue_value_for_failure(self, greenlet): + return (greenlet._imap_task_index, IMapUnordered._iqueue_value_for_failure(self, greenlet)) + + def _iqueue_value_for_self_finished(self): + return (self._max_index + 1, IMapUnordered._iqueue_value_for_self_finished(self)) + + def _iqueue_value_for_self_failure(self, exception): + return (self._max_index + 1, IMapUnordered._iqueue_value_for_self_failure(self, exception)) + +from gevent._util import import_c_accel +import_c_accel(globals(), 'gevent.__imap') |