aboutsummaryrefslogtreecommitdiffstats
path: root/python/gevent/_monitor.py
blob: 2c4ef030965cdc5595397047125aec65e31a7776 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
# Copyright (c) 2018 gevent. See LICENSE for details.
from __future__ import print_function, absolute_import, division

import os
import sys

from weakref import ref as wref

from greenlet import getcurrent

from gevent import config as GEVENT_CONFIG
from gevent.monkey import get_original
from gevent.events import notify
from gevent.events import EventLoopBlocked
from gevent.events import MemoryUsageThresholdExceeded
from gevent.events import MemoryUsageUnderThreshold
from gevent.events import IPeriodicMonitorThread
from gevent.events import implementer

from gevent._tracer import GreenletTracer
from gevent._compat import thread_mod_name
from gevent._compat import perf_counter



__all__ = [
    'PeriodicMonitoringThread',
]

get_thread_ident = get_original(thread_mod_name, 'get_ident')
start_new_thread = get_original(thread_mod_name, 'start_new_thread')
thread_sleep = get_original('time', 'sleep')



class MonitorWarning(RuntimeWarning):
    """The type of warnings we emit."""


class _MonitorEntry(object):

    __slots__ = ('function', 'period', 'last_run_time')

    def __init__(self, function, period):
        self.function = function
        self.period = period
        self.last_run_time = 0

    def __eq__(self, other):
        return self.function == other.function and self.period == other.period

    def __repr__(self):
        return repr((self.function, self.period, self.last_run_time))


@implementer(IPeriodicMonitorThread)
class PeriodicMonitoringThread(object):
    # This doesn't extend threading.Thread because that gets monkey-patched.
    # We use the low-level 'start_new_thread' primitive instead.

    # The amount of seconds we will sleep when we think we have nothing
    # to do.
    inactive_sleep_time = 2.0

    # The absolute minimum we will sleep, regardless of
    # what particular monitoring functions want to say.
    min_sleep_time = 0.005

    # The minimum period in seconds at which we will check memory usage.
    # Getting memory usage is fairly expensive.
    min_memory_monitor_period = 2

    # A list of _MonitorEntry objects: [(function(hub), period, last_run_time))]
    # The first entry is always our entry for self.monitor_blocking
    _monitoring_functions = None

    # The calculated min sleep time for the monitoring functions list.
    _calculated_sleep_time = None

    # A boolean value that also happens to capture the
    # memory usage at the time we exceeded the threshold. Reset
    # to 0 when we go back below.
    _memory_exceeded = 0

    # The instance of GreenletTracer we're using
    _greenlet_tracer = None

    def __init__(self, hub):
        self._hub_wref = wref(hub, self._on_hub_gc)
        self.should_run = True

        # Must be installed in the thread that the hub is running in;
        # the trace function is threadlocal
        assert get_thread_ident() == hub.thread_ident
        self._greenlet_tracer = GreenletTracer()

        self._monitoring_functions = [_MonitorEntry(self.monitor_blocking,
                                                    GEVENT_CONFIG.max_blocking_time)]
        self._calculated_sleep_time = GEVENT_CONFIG.max_blocking_time
        # Create the actual monitoring thread. This is effectively a "daemon"
        # thread.
        self.monitor_thread_ident = start_new_thread(self, ())

        # We must track the PID to know if your thread has died after a fork
        self.pid = os.getpid()

    def _on_fork(self):
        # Pseudo-standard method that resolver_ares and threadpool
        # also have, called by hub.reinit()
        pid = os.getpid()
        if pid != self.pid:
            self.pid = pid
            self.monitor_thread_ident = start_new_thread(self, ())

    @property
    def hub(self):
        return self._hub_wref()


    def monitoring_functions(self):
        # Return a list of _MonitorEntry objects

        # Update max_blocking_time each time.
        mbt = GEVENT_CONFIG.max_blocking_time # XXX: Events so we know when this changes.
        if mbt != self._monitoring_functions[0].period:
            self._monitoring_functions[0].period = mbt
            self._calculated_sleep_time = min(x.period for x in self._monitoring_functions)
        return self._monitoring_functions

    def add_monitoring_function(self, function, period):
        if not callable(function):
            raise ValueError("function must be callable")

        if period is None:
            # Remove.
            self._monitoring_functions = [
                x for x in self._monitoring_functions
                if x.function != function
            ]
        elif period <= 0:
            raise ValueError("Period must be positive.")
        else:
            # Add or update period
            entry = _MonitorEntry(function, period)
            self._monitoring_functions = [
                x if x.function != function else entry
                for x in self._monitoring_functions
            ]
            if entry not in self._monitoring_functions:
                self._monitoring_functions.append(entry)
        self._calculated_sleep_time = min(x.period for x in self._monitoring_functions)

    def calculate_sleep_time(self):
        min_sleep = self._calculated_sleep_time
        if min_sleep <= 0:
            # Everyone wants to be disabled. Sleep for a longer period of
            # time than usual so we don't spin unnecessarily. We might be
            # enabled again in the future.
            return self.inactive_sleep_time
        return max((min_sleep, self.min_sleep_time))

    def kill(self):
        if not self.should_run:
            # Prevent overwriting trace functions.
            return
        # Stop this monitoring thread from running.
        self.should_run = False
        # Uninstall our tracing hook
        self._greenlet_tracer.kill()

    def _on_hub_gc(self, _):
        self.kill()

    def __call__(self):
        # The function that runs in the monitoring thread.
        # We cannot use threading.current_thread because it would
        # create an immortal DummyThread object.
        getcurrent().gevent_monitoring_thread = wref(self)

        try:
            while self.should_run:
                functions = self.monitoring_functions()
                assert functions
                sleep_time = self.calculate_sleep_time()

                thread_sleep(sleep_time)

                # Make sure the hub is still around, and still active,
                # and keep it around while we are here.
                hub = self.hub
                if not hub:
                    self.kill()

                if self.should_run:
                    this_run = perf_counter()
                    for entry in functions:
                        f = entry.function
                        period = entry.period
                        last_run = entry.last_run_time
                        if period and last_run + period <= this_run:
                            entry.last_run_time = this_run
                            f(hub)
                del hub # break our reference to hub while we sleep

        except SystemExit:
            pass
        except: # pylint:disable=bare-except
            # We're a daemon thread, so swallow any exceptions that get here
            # during interpreter shutdown.
            if not sys or not sys.stderr: # pragma: no cover
                # Interpreter is shutting down
                pass
            else:
                hub = self.hub
                if hub is not None:
                    # XXX: This tends to do bad things like end the process, because we
                    # try to switch *threads*, which can't happen. Need something better.
                    hub.handle_error(self, *sys.exc_info())

    def monitor_blocking(self, hub):
        # Called periodically to see if the trace function has
        # fired to switch greenlets. If not, we will print
        # the greenlet tree.

        # For tests, we return a true value when we think we found something
        # blocking

        did_block = self._greenlet_tracer.did_block_hub(hub)
        if not did_block:
            return

        active_greenlet = did_block[1]
        report = self._greenlet_tracer.did_block_hub_report(
            hub, active_greenlet,
            dict(greenlet_stacks=False, current_thread_ident=self.monitor_thread_ident))

        stream = hub.exception_stream
        for line in report:
            # Printing line by line may interleave with other things,
            # but it should also prevent a "reentrant call to print"
            # when the report is large.
            print(line, file=stream)

        notify(EventLoopBlocked(active_greenlet, GEVENT_CONFIG.max_blocking_time, report))
        return (active_greenlet, report)

    def ignore_current_greenlet_blocking(self):
        self._greenlet_tracer.ignore_current_greenlet_blocking()

    def monitor_current_greenlet_blocking(self):
        self._greenlet_tracer.monitor_current_greenlet_blocking()

    def _get_process(self): # pylint:disable=method-hidden
        try:
            # The standard library 'resource' module doesn't provide
            # a standard way to get the RSS measure, only the maximum.
            # You might be tempted to try to compute something by adding
            # together text and data sizes, but on many systems those come back
            # zero. So our only option is psutil.
            from psutil import Process, AccessDenied
            # Make sure it works (why would we be denied access to our own process?)
            try:
                proc = Process()
                proc.memory_full_info()
            except AccessDenied: # pragma: no cover
                proc = None
        except ImportError:
            proc = None

        self._get_process = lambda: proc
        return proc

    def can_monitor_memory_usage(self):
        return self._get_process() is not None

    def install_monitor_memory_usage(self):
        # Start monitoring memory usage, if possible.
        # If not possible, emit a warning.
        if not self.can_monitor_memory_usage():
            import warnings
            warnings.warn("Unable to monitor memory usage. Install psutil.",
                          MonitorWarning)
            return

        self.add_monitoring_function(self.monitor_memory_usage,
                                     max(GEVENT_CONFIG.memory_monitor_period,
                                         self.min_memory_monitor_period))

    def monitor_memory_usage(self, _hub):
        max_allowed = GEVENT_CONFIG.max_memory_usage
        if not max_allowed:
            # They disabled it.
            return -1 # value for tests

        rusage = self._get_process().memory_full_info()
        # uss only documented available on Windows, Linux, and OS X.
        # If not available, fall back to rss as an aproximation.
        mem_usage = getattr(rusage, 'uss', 0) or rusage.rss

        event = None # Return value for tests

        if mem_usage > max_allowed:
            if mem_usage > self._memory_exceeded:
                # We're still growing
                event = MemoryUsageThresholdExceeded(
                    mem_usage, max_allowed, rusage)
                notify(event)
            self._memory_exceeded = mem_usage
        else:
            # we're below. Were we above it last time?
            if self._memory_exceeded:
                event = MemoryUsageUnderThreshold(
                    mem_usage, max_allowed, rusage, self._memory_exceeded)
                notify(event)
            self._memory_exceeded = 0

        return event

    def __repr__(self):
        return '<%s at %s in thread %s greenlet %r for %r>' % (
            self.__class__.__name__,
            hex(id(self)),
            hex(self.monitor_thread_ident),
            getcurrent(),
            self._hub_wref())