aboutsummaryrefslogtreecommitdiffstats
path: root/python/gevent/_monitor.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/gevent/_monitor.py')
-rw-r--r--python/gevent/_monitor.py325
1 files changed, 0 insertions, 325 deletions
diff --git a/python/gevent/_monitor.py b/python/gevent/_monitor.py
deleted file mode 100644
index 2c4ef03..0000000
--- a/python/gevent/_monitor.py
+++ /dev/null
@@ -1,325 +0,0 @@
-# Copyright (c) 2018 gevent. See LICENSE for details.
-from __future__ import print_function, absolute_import, division
-
-import os
-import sys
-
-from weakref import ref as wref
-
-from greenlet import getcurrent
-
-from gevent import config as GEVENT_CONFIG
-from gevent.monkey import get_original
-from gevent.events import notify
-from gevent.events import EventLoopBlocked
-from gevent.events import MemoryUsageThresholdExceeded
-from gevent.events import MemoryUsageUnderThreshold
-from gevent.events import IPeriodicMonitorThread
-from gevent.events import implementer
-
-from gevent._tracer import GreenletTracer
-from gevent._compat import thread_mod_name
-from gevent._compat import perf_counter
-
-
-
-__all__ = [
- 'PeriodicMonitoringThread',
-]
-
-get_thread_ident = get_original(thread_mod_name, 'get_ident')
-start_new_thread = get_original(thread_mod_name, 'start_new_thread')
-thread_sleep = get_original('time', 'sleep')
-
-
-
-class MonitorWarning(RuntimeWarning):
- """The type of warnings we emit."""
-
-
-class _MonitorEntry(object):
-
- __slots__ = ('function', 'period', 'last_run_time')
-
- def __init__(self, function, period):
- self.function = function
- self.period = period
- self.last_run_time = 0
-
- def __eq__(self, other):
- return self.function == other.function and self.period == other.period
-
- def __repr__(self):
- return repr((self.function, self.period, self.last_run_time))
-
-
-@implementer(IPeriodicMonitorThread)
-class PeriodicMonitoringThread(object):
- # This doesn't extend threading.Thread because that gets monkey-patched.
- # We use the low-level 'start_new_thread' primitive instead.
-
- # The amount of seconds we will sleep when we think we have nothing
- # to do.
- inactive_sleep_time = 2.0
-
- # The absolute minimum we will sleep, regardless of
- # what particular monitoring functions want to say.
- min_sleep_time = 0.005
-
- # The minimum period in seconds at which we will check memory usage.
- # Getting memory usage is fairly expensive.
- min_memory_monitor_period = 2
-
- # A list of _MonitorEntry objects: [(function(hub), period, last_run_time))]
- # The first entry is always our entry for self.monitor_blocking
- _monitoring_functions = None
-
- # The calculated min sleep time for the monitoring functions list.
- _calculated_sleep_time = None
-
- # A boolean value that also happens to capture the
- # memory usage at the time we exceeded the threshold. Reset
- # to 0 when we go back below.
- _memory_exceeded = 0
-
- # The instance of GreenletTracer we're using
- _greenlet_tracer = None
-
- def __init__(self, hub):
- self._hub_wref = wref(hub, self._on_hub_gc)
- self.should_run = True
-
- # Must be installed in the thread that the hub is running in;
- # the trace function is threadlocal
- assert get_thread_ident() == hub.thread_ident
- self._greenlet_tracer = GreenletTracer()
-
- self._monitoring_functions = [_MonitorEntry(self.monitor_blocking,
- GEVENT_CONFIG.max_blocking_time)]
- self._calculated_sleep_time = GEVENT_CONFIG.max_blocking_time
- # Create the actual monitoring thread. This is effectively a "daemon"
- # thread.
- self.monitor_thread_ident = start_new_thread(self, ())
-
- # We must track the PID to know if your thread has died after a fork
- self.pid = os.getpid()
-
- def _on_fork(self):
- # Pseudo-standard method that resolver_ares and threadpool
- # also have, called by hub.reinit()
- pid = os.getpid()
- if pid != self.pid:
- self.pid = pid
- self.monitor_thread_ident = start_new_thread(self, ())
-
- @property
- def hub(self):
- return self._hub_wref()
-
-
- def monitoring_functions(self):
- # Return a list of _MonitorEntry objects
-
- # Update max_blocking_time each time.
- mbt = GEVENT_CONFIG.max_blocking_time # XXX: Events so we know when this changes.
- if mbt != self._monitoring_functions[0].period:
- self._monitoring_functions[0].period = mbt
- self._calculated_sleep_time = min(x.period for x in self._monitoring_functions)
- return self._monitoring_functions
-
- def add_monitoring_function(self, function, period):
- if not callable(function):
- raise ValueError("function must be callable")
-
- if period is None:
- # Remove.
- self._monitoring_functions = [
- x for x in self._monitoring_functions
- if x.function != function
- ]
- elif period <= 0:
- raise ValueError("Period must be positive.")
- else:
- # Add or update period
- entry = _MonitorEntry(function, period)
- self._monitoring_functions = [
- x if x.function != function else entry
- for x in self._monitoring_functions
- ]
- if entry not in self._monitoring_functions:
- self._monitoring_functions.append(entry)
- self._calculated_sleep_time = min(x.period for x in self._monitoring_functions)
-
- def calculate_sleep_time(self):
- min_sleep = self._calculated_sleep_time
- if min_sleep <= 0:
- # Everyone wants to be disabled. Sleep for a longer period of
- # time than usual so we don't spin unnecessarily. We might be
- # enabled again in the future.
- return self.inactive_sleep_time
- return max((min_sleep, self.min_sleep_time))
-
- def kill(self):
- if not self.should_run:
- # Prevent overwriting trace functions.
- return
- # Stop this monitoring thread from running.
- self.should_run = False
- # Uninstall our tracing hook
- self._greenlet_tracer.kill()
-
- def _on_hub_gc(self, _):
- self.kill()
-
- def __call__(self):
- # The function that runs in the monitoring thread.
- # We cannot use threading.current_thread because it would
- # create an immortal DummyThread object.
- getcurrent().gevent_monitoring_thread = wref(self)
-
- try:
- while self.should_run:
- functions = self.monitoring_functions()
- assert functions
- sleep_time = self.calculate_sleep_time()
-
- thread_sleep(sleep_time)
-
- # Make sure the hub is still around, and still active,
- # and keep it around while we are here.
- hub = self.hub
- if not hub:
- self.kill()
-
- if self.should_run:
- this_run = perf_counter()
- for entry in functions:
- f = entry.function
- period = entry.period
- last_run = entry.last_run_time
- if period and last_run + period <= this_run:
- entry.last_run_time = this_run
- f(hub)
- del hub # break our reference to hub while we sleep
-
- except SystemExit:
- pass
- except: # pylint:disable=bare-except
- # We're a daemon thread, so swallow any exceptions that get here
- # during interpreter shutdown.
- if not sys or not sys.stderr: # pragma: no cover
- # Interpreter is shutting down
- pass
- else:
- hub = self.hub
- if hub is not None:
- # XXX: This tends to do bad things like end the process, because we
- # try to switch *threads*, which can't happen. Need something better.
- hub.handle_error(self, *sys.exc_info())
-
- def monitor_blocking(self, hub):
- # Called periodically to see if the trace function has
- # fired to switch greenlets. If not, we will print
- # the greenlet tree.
-
- # For tests, we return a true value when we think we found something
- # blocking
-
- did_block = self._greenlet_tracer.did_block_hub(hub)
- if not did_block:
- return
-
- active_greenlet = did_block[1]
- report = self._greenlet_tracer.did_block_hub_report(
- hub, active_greenlet,
- dict(greenlet_stacks=False, current_thread_ident=self.monitor_thread_ident))
-
- stream = hub.exception_stream
- for line in report:
- # Printing line by line may interleave with other things,
- # but it should also prevent a "reentrant call to print"
- # when the report is large.
- print(line, file=stream)
-
- notify(EventLoopBlocked(active_greenlet, GEVENT_CONFIG.max_blocking_time, report))
- return (active_greenlet, report)
-
- def ignore_current_greenlet_blocking(self):
- self._greenlet_tracer.ignore_current_greenlet_blocking()
-
- def monitor_current_greenlet_blocking(self):
- self._greenlet_tracer.monitor_current_greenlet_blocking()
-
- def _get_process(self): # pylint:disable=method-hidden
- try:
- # The standard library 'resource' module doesn't provide
- # a standard way to get the RSS measure, only the maximum.
- # You might be tempted to try to compute something by adding
- # together text and data sizes, but on many systems those come back
- # zero. So our only option is psutil.
- from psutil import Process, AccessDenied
- # Make sure it works (why would we be denied access to our own process?)
- try:
- proc = Process()
- proc.memory_full_info()
- except AccessDenied: # pragma: no cover
- proc = None
- except ImportError:
- proc = None
-
- self._get_process = lambda: proc
- return proc
-
- def can_monitor_memory_usage(self):
- return self._get_process() is not None
-
- def install_monitor_memory_usage(self):
- # Start monitoring memory usage, if possible.
- # If not possible, emit a warning.
- if not self.can_monitor_memory_usage():
- import warnings
- warnings.warn("Unable to monitor memory usage. Install psutil.",
- MonitorWarning)
- return
-
- self.add_monitoring_function(self.monitor_memory_usage,
- max(GEVENT_CONFIG.memory_monitor_period,
- self.min_memory_monitor_period))
-
- def monitor_memory_usage(self, _hub):
- max_allowed = GEVENT_CONFIG.max_memory_usage
- if not max_allowed:
- # They disabled it.
- return -1 # value for tests
-
- rusage = self._get_process().memory_full_info()
- # uss only documented available on Windows, Linux, and OS X.
- # If not available, fall back to rss as an aproximation.
- mem_usage = getattr(rusage, 'uss', 0) or rusage.rss
-
- event = None # Return value for tests
-
- if mem_usage > max_allowed:
- if mem_usage > self._memory_exceeded:
- # We're still growing
- event = MemoryUsageThresholdExceeded(
- mem_usage, max_allowed, rusage)
- notify(event)
- self._memory_exceeded = mem_usage
- else:
- # we're below. Were we above it last time?
- if self._memory_exceeded:
- event = MemoryUsageUnderThreshold(
- mem_usage, max_allowed, rusage, self._memory_exceeded)
- notify(event)
- self._memory_exceeded = 0
-
- return event
-
- def __repr__(self):
- return '<%s at %s in thread %s greenlet %r for %r>' % (
- self.__class__.__name__,
- hex(id(self)),
- hex(self.monitor_thread_ident),
- getcurrent(),
- self._hub_wref())