aboutsummaryrefslogtreecommitdiffstats
path: root/python/gevent/_imap.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/gevent/_imap.py')
-rw-r--r--python/gevent/_imap.py227
1 files changed, 0 insertions, 227 deletions
diff --git a/python/gevent/_imap.py b/python/gevent/_imap.py
deleted file mode 100644
index e976b67..0000000
--- a/python/gevent/_imap.py
+++ /dev/null
@@ -1,227 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright (c) 2018 gevent
-# cython: auto_pickle=False,embedsignature=True,always_allow_keywords=False,infer_types=True
-
-"""
-Iterators across greenlets or AsyncResult objects.
-
-"""
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
-
-from gevent import _semaphore
-from gevent import queue
-
-
-__all__ = [
- 'IMapUnordered',
- 'IMap',
-]
-
-locals()['Greenlet'] = __import__('gevent').Greenlet
-locals()['Semaphore'] = _semaphore.Semaphore
-locals()['UnboundQueue'] = queue.UnboundQueue
-
-
-class Failure(object):
- __slots__ = ('exc', 'raise_exception')
-
- def __init__(self, exc, raise_exception=None):
- self.exc = exc
- self.raise_exception = raise_exception
-
-
-def _raise_exc(failure):
- # For cython.
- if failure.raise_exception:
- failure.raise_exception()
- else:
- raise failure.exc
-
-class IMapUnordered(Greenlet): # pylint:disable=undefined-variable
- """
- At iterator of map results.
- """
-
- def __init__(self, func, iterable, spawn, maxsize=None, _zipped=False):
- """
- An iterator that.
-
- :param callable spawn: The function we use to create new greenlets.
- :keyword int maxsize: If given and not-None, specifies the maximum number of
- finished results that will be allowed to accumulated awaiting the reader;
- more than that number of results will cause map function greenlets to begin
- to block. This is most useful is there is a great disparity in the speed of
- the mapping code and the consumer and the results consume a great deal of resources.
- Using a bound is more computationally expensive than not using a bound.
-
- .. versionchanged:: 1.1b3
- Added the *maxsize* parameter.
- """
- Greenlet.__init__(self) # pylint:disable=undefined-variable
- self.spawn = spawn
- self._zipped = _zipped
- self.func = func
- self.iterable = iterable
- self.queue = UnboundQueue() # pylint:disable=undefined-variable
-
-
- if maxsize:
- # Bounding the queue is not enough if we want to keep from
- # accumulating objects; the result value will be around as
- # the greenlet's result, blocked on self.queue.put(), and
- # we'll go on to spawn another greenlet, which in turn can
- # create the result. So we need a semaphore to prevent a
- # greenlet from exiting while the queue is full so that we
- # don't spawn the next greenlet (assuming that self.spawn
- # is of course bounded). (Alternatively we could have the
- # greenlet itself do the insert into the pool, but that
- # takes some rework).
- #
- # Given the use of a semaphore at this level, sizing the queue becomes
- # redundant, and that lets us avoid having to use self.link() instead
- # of self.rawlink() to avoid having blocking methods called in the
- # hub greenlet.
- self._result_semaphore = Semaphore(maxsize) # pylint:disable=undefined-variable
- else:
- self._result_semaphore = None
-
- self._outstanding_tasks = 0
- # The index (zero based) of the maximum number of
- # results we will have.
- self._max_index = -1
- self.finished = False
-
-
- # We're iterating in a different greenlet than we're running.
- def __iter__(self):
- return self
-
- def __next__(self):
- if self._result_semaphore is not None:
- self._result_semaphore.release()
- value = self._inext()
- if isinstance(value, Failure):
- _raise_exc(value)
- return value
-
- next = __next__ # Py2
-
- def _inext(self):
- return self.queue.get()
-
- def _ispawn(self, func, item, item_index):
- if self._result_semaphore is not None:
- self._result_semaphore.acquire()
- self._outstanding_tasks += 1
- g = self.spawn(func, item) if not self._zipped else self.spawn(func, *item)
- g._imap_task_index = item_index
- g.rawlink(self._on_result)
- return g
-
- def _run(self): # pylint:disable=method-hidden
- try:
- func = self.func
- for item in self.iterable:
- self._max_index += 1
- self._ispawn(func, item, self._max_index)
- self._on_finish(None)
- except BaseException as e:
- self._on_finish(e)
- raise
- finally:
- self.spawn = None
- self.func = None
- self.iterable = None
- self._result_semaphore = None
-
- def _on_result(self, greenlet):
- # This method will be called in the hub greenlet (we rawlink)
- self._outstanding_tasks -= 1
- count = self._outstanding_tasks
- finished = self.finished
- ready = self.ready()
- put_finished = False
-
- if ready and count <= 0 and not finished:
- finished = self.finished = True
- put_finished = True
-
- if greenlet.successful():
- self.queue.put(self._iqueue_value_for_success(greenlet))
- else:
- self.queue.put(self._iqueue_value_for_failure(greenlet))
-
- if put_finished:
- self.queue.put(self._iqueue_value_for_self_finished())
-
- def _on_finish(self, exception):
- # Called in this greenlet.
- if self.finished:
- return
-
- if exception is not None:
- self.finished = True
- self.queue.put(self._iqueue_value_for_self_failure(exception))
- return
-
- if self._outstanding_tasks <= 0:
- self.finished = True
- self.queue.put(self._iqueue_value_for_self_finished())
-
- def _iqueue_value_for_success(self, greenlet):
- return greenlet.value
-
- def _iqueue_value_for_failure(self, greenlet):
- return Failure(greenlet.exception, getattr(greenlet, '_raise_exception'))
-
- def _iqueue_value_for_self_finished(self):
- return Failure(StopIteration())
-
- def _iqueue_value_for_self_failure(self, exception):
- return Failure(exception, self._raise_exception)
-
-
-class IMap(IMapUnordered):
- # A specialization of IMapUnordered that returns items
- # in the order in which they were generated, not
- # the order in which they finish.
-
- def __init__(self, *args, **kwargs):
- # The result dictionary: {index: value}
- self._results = {}
-
- # The index of the result to return next.
- self.index = 0
- IMapUnordered.__init__(self, *args, **kwargs)
-
- def _inext(self):
- try:
- value = self._results.pop(self.index)
- except KeyError:
- # Wait for our index to finish.
- while 1:
- index, value = self.queue.get()
- if index == self.index:
- break
- else:
- self._results[index] = value
- self.index += 1
- return value
-
- def _iqueue_value_for_success(self, greenlet):
- return (greenlet._imap_task_index, IMapUnordered._iqueue_value_for_success(self, greenlet))
-
- def _iqueue_value_for_failure(self, greenlet):
- return (greenlet._imap_task_index, IMapUnordered._iqueue_value_for_failure(self, greenlet))
-
- def _iqueue_value_for_self_finished(self):
- return (self._max_index + 1, IMapUnordered._iqueue_value_for_self_finished(self))
-
- def _iqueue_value_for_self_failure(self, exception):
- return (self._max_index + 1, IMapUnordered._iqueue_value_for_self_failure(self, exception))
-
-from gevent._util import import_c_accel
-import_c_accel(globals(), 'gevent.__imap')