diff options
Diffstat (limited to 'python/gevent/_monitor.py')
-rw-r--r-- | python/gevent/_monitor.py | 325 |
1 files changed, 325 insertions, 0 deletions
diff --git a/python/gevent/_monitor.py b/python/gevent/_monitor.py new file mode 100644 index 0000000..2c4ef03 --- /dev/null +++ b/python/gevent/_monitor.py @@ -0,0 +1,325 @@ +# Copyright (c) 2018 gevent. See LICENSE for details. +from __future__ import print_function, absolute_import, division + +import os +import sys + +from weakref import ref as wref + +from greenlet import getcurrent + +from gevent import config as GEVENT_CONFIG +from gevent.monkey import get_original +from gevent.events import notify +from gevent.events import EventLoopBlocked +from gevent.events import MemoryUsageThresholdExceeded +from gevent.events import MemoryUsageUnderThreshold +from gevent.events import IPeriodicMonitorThread +from gevent.events import implementer + +from gevent._tracer import GreenletTracer +from gevent._compat import thread_mod_name +from gevent._compat import perf_counter + + + +__all__ = [ + 'PeriodicMonitoringThread', +] + +get_thread_ident = get_original(thread_mod_name, 'get_ident') +start_new_thread = get_original(thread_mod_name, 'start_new_thread') +thread_sleep = get_original('time', 'sleep') + + + +class MonitorWarning(RuntimeWarning): + """The type of warnings we emit.""" + + +class _MonitorEntry(object): + + __slots__ = ('function', 'period', 'last_run_time') + + def __init__(self, function, period): + self.function = function + self.period = period + self.last_run_time = 0 + + def __eq__(self, other): + return self.function == other.function and self.period == other.period + + def __repr__(self): + return repr((self.function, self.period, self.last_run_time)) + + +@implementer(IPeriodicMonitorThread) +class PeriodicMonitoringThread(object): + # This doesn't extend threading.Thread because that gets monkey-patched. + # We use the low-level 'start_new_thread' primitive instead. + + # The amount of seconds we will sleep when we think we have nothing + # to do. + inactive_sleep_time = 2.0 + + # The absolute minimum we will sleep, regardless of + # what particular monitoring functions want to say. + min_sleep_time = 0.005 + + # The minimum period in seconds at which we will check memory usage. + # Getting memory usage is fairly expensive. + min_memory_monitor_period = 2 + + # A list of _MonitorEntry objects: [(function(hub), period, last_run_time))] + # The first entry is always our entry for self.monitor_blocking + _monitoring_functions = None + + # The calculated min sleep time for the monitoring functions list. + _calculated_sleep_time = None + + # A boolean value that also happens to capture the + # memory usage at the time we exceeded the threshold. Reset + # to 0 when we go back below. + _memory_exceeded = 0 + + # The instance of GreenletTracer we're using + _greenlet_tracer = None + + def __init__(self, hub): + self._hub_wref = wref(hub, self._on_hub_gc) + self.should_run = True + + # Must be installed in the thread that the hub is running in; + # the trace function is threadlocal + assert get_thread_ident() == hub.thread_ident + self._greenlet_tracer = GreenletTracer() + + self._monitoring_functions = [_MonitorEntry(self.monitor_blocking, + GEVENT_CONFIG.max_blocking_time)] + self._calculated_sleep_time = GEVENT_CONFIG.max_blocking_time + # Create the actual monitoring thread. This is effectively a "daemon" + # thread. + self.monitor_thread_ident = start_new_thread(self, ()) + + # We must track the PID to know if your thread has died after a fork + self.pid = os.getpid() + + def _on_fork(self): + # Pseudo-standard method that resolver_ares and threadpool + # also have, called by hub.reinit() + pid = os.getpid() + if pid != self.pid: + self.pid = pid + self.monitor_thread_ident = start_new_thread(self, ()) + + @property + def hub(self): + return self._hub_wref() + + + def monitoring_functions(self): + # Return a list of _MonitorEntry objects + + # Update max_blocking_time each time. + mbt = GEVENT_CONFIG.max_blocking_time # XXX: Events so we know when this changes. + if mbt != self._monitoring_functions[0].period: + self._monitoring_functions[0].period = mbt + self._calculated_sleep_time = min(x.period for x in self._monitoring_functions) + return self._monitoring_functions + + def add_monitoring_function(self, function, period): + if not callable(function): + raise ValueError("function must be callable") + + if period is None: + # Remove. + self._monitoring_functions = [ + x for x in self._monitoring_functions + if x.function != function + ] + elif period <= 0: + raise ValueError("Period must be positive.") + else: + # Add or update period + entry = _MonitorEntry(function, period) + self._monitoring_functions = [ + x if x.function != function else entry + for x in self._monitoring_functions + ] + if entry not in self._monitoring_functions: + self._monitoring_functions.append(entry) + self._calculated_sleep_time = min(x.period for x in self._monitoring_functions) + + def calculate_sleep_time(self): + min_sleep = self._calculated_sleep_time + if min_sleep <= 0: + # Everyone wants to be disabled. Sleep for a longer period of + # time than usual so we don't spin unnecessarily. We might be + # enabled again in the future. + return self.inactive_sleep_time + return max((min_sleep, self.min_sleep_time)) + + def kill(self): + if not self.should_run: + # Prevent overwriting trace functions. + return + # Stop this monitoring thread from running. + self.should_run = False + # Uninstall our tracing hook + self._greenlet_tracer.kill() + + def _on_hub_gc(self, _): + self.kill() + + def __call__(self): + # The function that runs in the monitoring thread. + # We cannot use threading.current_thread because it would + # create an immortal DummyThread object. + getcurrent().gevent_monitoring_thread = wref(self) + + try: + while self.should_run: + functions = self.monitoring_functions() + assert functions + sleep_time = self.calculate_sleep_time() + + thread_sleep(sleep_time) + + # Make sure the hub is still around, and still active, + # and keep it around while we are here. + hub = self.hub + if not hub: + self.kill() + + if self.should_run: + this_run = perf_counter() + for entry in functions: + f = entry.function + period = entry.period + last_run = entry.last_run_time + if period and last_run + period <= this_run: + entry.last_run_time = this_run + f(hub) + del hub # break our reference to hub while we sleep + + except SystemExit: + pass + except: # pylint:disable=bare-except + # We're a daemon thread, so swallow any exceptions that get here + # during interpreter shutdown. + if not sys or not sys.stderr: # pragma: no cover + # Interpreter is shutting down + pass + else: + hub = self.hub + if hub is not None: + # XXX: This tends to do bad things like end the process, because we + # try to switch *threads*, which can't happen. Need something better. + hub.handle_error(self, *sys.exc_info()) + + def monitor_blocking(self, hub): + # Called periodically to see if the trace function has + # fired to switch greenlets. If not, we will print + # the greenlet tree. + + # For tests, we return a true value when we think we found something + # blocking + + did_block = self._greenlet_tracer.did_block_hub(hub) + if not did_block: + return + + active_greenlet = did_block[1] + report = self._greenlet_tracer.did_block_hub_report( + hub, active_greenlet, + dict(greenlet_stacks=False, current_thread_ident=self.monitor_thread_ident)) + + stream = hub.exception_stream + for line in report: + # Printing line by line may interleave with other things, + # but it should also prevent a "reentrant call to print" + # when the report is large. + print(line, file=stream) + + notify(EventLoopBlocked(active_greenlet, GEVENT_CONFIG.max_blocking_time, report)) + return (active_greenlet, report) + + def ignore_current_greenlet_blocking(self): + self._greenlet_tracer.ignore_current_greenlet_blocking() + + def monitor_current_greenlet_blocking(self): + self._greenlet_tracer.monitor_current_greenlet_blocking() + + def _get_process(self): # pylint:disable=method-hidden + try: + # The standard library 'resource' module doesn't provide + # a standard way to get the RSS measure, only the maximum. + # You might be tempted to try to compute something by adding + # together text and data sizes, but on many systems those come back + # zero. So our only option is psutil. + from psutil import Process, AccessDenied + # Make sure it works (why would we be denied access to our own process?) + try: + proc = Process() + proc.memory_full_info() + except AccessDenied: # pragma: no cover + proc = None + except ImportError: + proc = None + + self._get_process = lambda: proc + return proc + + def can_monitor_memory_usage(self): + return self._get_process() is not None + + def install_monitor_memory_usage(self): + # Start monitoring memory usage, if possible. + # If not possible, emit a warning. + if not self.can_monitor_memory_usage(): + import warnings + warnings.warn("Unable to monitor memory usage. Install psutil.", + MonitorWarning) + return + + self.add_monitoring_function(self.monitor_memory_usage, + max(GEVENT_CONFIG.memory_monitor_period, + self.min_memory_monitor_period)) + + def monitor_memory_usage(self, _hub): + max_allowed = GEVENT_CONFIG.max_memory_usage + if not max_allowed: + # They disabled it. + return -1 # value for tests + + rusage = self._get_process().memory_full_info() + # uss only documented available on Windows, Linux, and OS X. + # If not available, fall back to rss as an aproximation. + mem_usage = getattr(rusage, 'uss', 0) or rusage.rss + + event = None # Return value for tests + + if mem_usage > max_allowed: + if mem_usage > self._memory_exceeded: + # We're still growing + event = MemoryUsageThresholdExceeded( + mem_usage, max_allowed, rusage) + notify(event) + self._memory_exceeded = mem_usage + else: + # we're below. Were we above it last time? + if self._memory_exceeded: + event = MemoryUsageUnderThreshold( + mem_usage, max_allowed, rusage, self._memory_exceeded) + notify(event) + self._memory_exceeded = 0 + + return event + + def __repr__(self): + return '<%s at %s in thread %s greenlet %r for %r>' % ( + self.__class__.__name__, + hex(id(self)), + hex(self.monitor_thread_ident), + getcurrent(), + self._hub_wref()) |