diff options
Diffstat (limited to 'python/gevent/libuv')
| -rw-r--r-- | python/gevent/libuv/__init__.py | 0 | ||||
| -rw-r--r-- | python/gevent/libuv/_corecffi.cp36-win32.pyd | bin | 0 -> 125952 bytes | |||
| -rw-r--r-- | python/gevent/libuv/_corecffi_build.py | 253 | ||||
| -rw-r--r-- | python/gevent/libuv/_corecffi_cdef.c | 393 | ||||
| -rw-r--r-- | python/gevent/libuv/_corecffi_source.c | 181 | ||||
| -rw-r--r-- | python/gevent/libuv/loop.py | 601 | ||||
| -rw-r--r-- | python/gevent/libuv/watcher.py | 732 | 
7 files changed, 2160 insertions, 0 deletions
| diff --git a/python/gevent/libuv/__init__.py b/python/gevent/libuv/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/python/gevent/libuv/__init__.py diff --git a/python/gevent/libuv/_corecffi.cp36-win32.pyd b/python/gevent/libuv/_corecffi.cp36-win32.pydBinary files differ new file mode 100644 index 0000000..0cc476a --- /dev/null +++ b/python/gevent/libuv/_corecffi.cp36-win32.pyd diff --git a/python/gevent/libuv/_corecffi_build.py b/python/gevent/libuv/_corecffi_build.py new file mode 100644 index 0000000..722157d --- /dev/null +++ b/python/gevent/libuv/_corecffi_build.py @@ -0,0 +1,253 @@ +# pylint: disable=no-member + +# This module is only used to create and compile the gevent._corecffi module; +# nothing should be directly imported from it except `ffi`, which should only be +# used for `ffi.compile()`; programs should import gevent._corecfffi. +# However, because we are using "out-of-line" mode, it is necessary to examine +# this file to know what functions are created and available on the generated +# module. +from __future__ import absolute_import, print_function +import sys +import os +import os.path # pylint:disable=no-name-in-module +import struct + +__all__ = [] + +WIN = sys.platform.startswith('win32') + +def system_bits(): +    return struct.calcsize('P') * 8 + + +def st_nlink_type(): +    if sys.platform == "darwin" or sys.platform.startswith("freebsd"): +        return "short" +    if system_bits() == 32: +        return "unsigned long" +    return "long long" + + +from cffi import FFI +ffi = FFI() + +thisdir = os.path.dirname(os.path.abspath(__file__)) +def read_source(name): +    with open(os.path.join(thisdir, name), 'r') as f: +        return f.read() + +_cdef = read_source('_corecffi_cdef.c') +_source = read_source('_corecffi_source.c') + +_cdef = _cdef.replace('#define GEVENT_ST_NLINK_T int', '') +_cdef = _cdef.replace('#define GEVENT_STRUCT_DONE int', '') +_cdef = _cdef.replace('#define GEVENT_UV_OS_SOCK_T int', '') + +_cdef = _cdef.replace('GEVENT_ST_NLINK_T', st_nlink_type()) +_cdef = _cdef.replace("GEVENT_STRUCT_DONE _;", '...;') +# uv_os_sock_t is int on POSIX and SOCKET on Win32, but socket is +# just another name for handle, which is just another name for 'void*' +# which we will treat as an 'unsigned long' or 'unsigned long long' +# since it comes through 'fileno()' where it has been cast as an int. +# See class watcher.io +_void_pointer_as_integer = 'intptr_t' +_cdef = _cdef.replace("GEVENT_UV_OS_SOCK_T", 'int' if not WIN else _void_pointer_as_integer) + + +setup_py_dir = os.path.abspath(os.path.join(thisdir, '..', '..', '..')) +libuv_dir = os.path.abspath(os.path.join(setup_py_dir, 'deps', 'libuv')) + + +LIBUV_INCLUDE_DIRS = [ +    thisdir, # libev_vfd.h +    os.path.join(libuv_dir, 'include'), +    os.path.join(libuv_dir, 'src'), +] + +# Initially based on https://github.com/saghul/pyuv/blob/v1.x/setup_libuv.py + +def _libuv_source(rel_path): +    # Certain versions of setuptools, notably on windows, are *very* +    # picky about what we feed to sources= "setup() arguments must +    # *always* be /-separated paths relative to the setup.py +    # directory, *never* absolute paths." POSIX doesn't have that issue. +    path = os.path.join('deps', 'libuv', 'src', rel_path) +    return path + +LIBUV_SOURCES = [ +    _libuv_source('fs-poll.c'), +    _libuv_source('inet.c'), +    _libuv_source('threadpool.c'), +    _libuv_source('uv-common.c'), +    _libuv_source('version.c'), +    _libuv_source('uv-data-getter-setters.c'), +    _libuv_source('timer.c'), +] + +if WIN: +    LIBUV_SOURCES += [ +        _libuv_source('win/async.c'), +        _libuv_source('win/core.c'), +        _libuv_source('win/detect-wakeup.c'), +        _libuv_source('win/dl.c'), +        _libuv_source('win/error.c'), +        _libuv_source('win/fs-event.c'), +        _libuv_source('win/fs.c'), +        # getaddrinfo.c refers to ConvertInterfaceIndexToLuid +        # and ConvertInterfaceLuidToNameA, which are supposedly in iphlpapi.h +        # and iphlpapi.lib/dll. But on Windows 10 with Python 3.5 and VC 14 (Visual Studio 2015), +        # I get an undefined warning from the compiler for those functions and +        # a link error from the linker, so this file can't be included. +        # This is possibly because the functions are defined for Windows Vista, and +        # Python 3.5 builds with at earlier SDK? +        # Fortunately we don't use those functions. +        #_libuv_source('win/getaddrinfo.c'), +        # getnameinfo.c refers to uv__getaddrinfo_translate_error from +        # getaddrinfo.c, which we don't have. +        #_libuv_source('win/getnameinfo.c'), +        _libuv_source('win/handle.c'), +        _libuv_source('win/loop-watcher.c'), +        _libuv_source('win/pipe.c'), +        _libuv_source('win/poll.c'), +        _libuv_source('win/process-stdio.c'), +        _libuv_source('win/process.c'), +        _libuv_source('win/req.c'), +        _libuv_source('win/signal.c'), +        _libuv_source('win/snprintf.c'), +        _libuv_source('win/stream.c'), +        _libuv_source('win/tcp.c'), +        _libuv_source('win/thread.c'), +        _libuv_source('win/tty.c'), +        _libuv_source('win/udp.c'), +        _libuv_source('win/util.c'), +        _libuv_source('win/winapi.c'), +        _libuv_source('win/winsock.c'), +    ] +else: +    LIBUV_SOURCES += [ +        _libuv_source('unix/async.c'), +        _libuv_source('unix/core.c'), +        _libuv_source('unix/dl.c'), +        _libuv_source('unix/fs.c'), +        _libuv_source('unix/getaddrinfo.c'), +        _libuv_source('unix/getnameinfo.c'), +        _libuv_source('unix/loop-watcher.c'), +        _libuv_source('unix/loop.c'), +        _libuv_source('unix/pipe.c'), +        _libuv_source('unix/poll.c'), +        _libuv_source('unix/process.c'), +        _libuv_source('unix/signal.c'), +        _libuv_source('unix/stream.c'), +        _libuv_source('unix/tcp.c'), +        _libuv_source('unix/thread.c'), +        _libuv_source('unix/tty.c'), +        _libuv_source('unix/udp.c'), +    ] + + +if sys.platform.startswith('linux'): +    LIBUV_SOURCES += [ +        _libuv_source('unix/linux-core.c'), +        _libuv_source('unix/linux-inotify.c'), +        _libuv_source('unix/linux-syscalls.c'), +        _libuv_source('unix/procfs-exepath.c'), +        _libuv_source('unix/proctitle.c'), +        _libuv_source('unix/sysinfo-loadavg.c'), +        _libuv_source('unix/sysinfo-memory.c'), +    ] +elif sys.platform == 'darwin': +    LIBUV_SOURCES += [ +        _libuv_source('unix/bsd-ifaddrs.c'), +        _libuv_source('unix/darwin.c'), +        _libuv_source('unix/darwin-proctitle.c'), +        _libuv_source('unix/fsevents.c'), +        _libuv_source('unix/kqueue.c'), +        _libuv_source('unix/proctitle.c'), +    ] +elif sys.platform.startswith(('freebsd', 'dragonfly')): +    LIBUV_SOURCES += [ +        _libuv_source('unix/bsd-ifaddrs.c'), +        _libuv_source('unix/freebsd.c'), +        _libuv_source('unix/kqueue.c'), +        _libuv_source('unix/posix-hrtime.c'), +    ] +elif sys.platform.startswith('openbsd'): +    LIBUV_SOURCES += [ +        _libuv_source('unix/bsd-ifaddrs.c'), +        _libuv_source('unix/kqueue.c'), +        _libuv_source('unix/openbsd.c'), +        _libuv_source('unix/posix-hrtime.c'), +    ] +elif sys.platform.startswith('netbsd'): +    LIBUV_SOURCES += [ +        _libuv_source('unix/bsd-ifaddrs.c'), +        _libuv_source('unix/kqueue.c'), +        _libuv_source('unix/netbsd.c'), +        _libuv_source('unix/posix-hrtime.c'), +    ] + +elif sys.platform.startswith('sunos'): +    LIBUV_SOURCES += [ +        _libuv_source('unix/no-proctitle.c'), +        _libuv_source('unix/sunos.c'), +    ] + + +LIBUV_MACROS = [] + +def _define_macro(name, value): +    LIBUV_MACROS.append((name, value)) + +LIBUV_LIBRARIES = [] + +def _add_library(name): +    LIBUV_LIBRARIES.append(name) + +if sys.platform != 'win32': +    _define_macro('_LARGEFILE_SOURCE', 1) +    _define_macro('_FILE_OFFSET_BITS', 64) + +if sys.platform.startswith('linux'): +    _add_library('dl') +    _add_library('rt') +    _define_macro('_GNU_SOURCE', 1) +    _define_macro('_POSIX_C_SOURCE', '200112') +elif sys.platform == 'darwin': +    _define_macro('_DARWIN_USE_64_BIT_INODE', 1) +    _define_macro('_DARWIN_UNLIMITED_SELECT', 1) +elif sys.platform.startswith('netbsd'): +    _add_library('kvm') +elif sys.platform.startswith('sunos'): +    _define_macro('__EXTENSIONS__', 1) +    _define_macro('_XOPEN_SOURCE', 500) +    _add_library('kstat') +    _add_library('nsl') +    _add_library('sendfile') +    _add_library('socket') +elif WIN: +    _define_macro('_GNU_SOURCE', 1) +    _define_macro('WIN32', 1) +    _define_macro('_CRT_SECURE_NO_DEPRECATE', 1) +    _define_macro('_CRT_NONSTDC_NO_DEPRECATE', 1) +    _define_macro('_CRT_SECURE_NO_WARNINGS', 1) +    _define_macro('_WIN32_WINNT', '0x0600') +    _define_macro('WIN32_LEAN_AND_MEAN', 1) +    _add_library('advapi32') +    _add_library('iphlpapi') +    _add_library('psapi') +    _add_library('shell32') +    _add_library('user32') +    _add_library('userenv') +    _add_library('ws2_32') + +ffi.cdef(_cdef) +ffi.set_source('gevent.libuv._corecffi', +               _source, +               sources=LIBUV_SOURCES, +               depends=LIBUV_SOURCES, +               include_dirs=LIBUV_INCLUDE_DIRS, +               libraries=list(LIBUV_LIBRARIES), +               define_macros=list(LIBUV_MACROS)) + +if __name__ == '__main__': +    ffi.compile() diff --git a/python/gevent/libuv/_corecffi_cdef.c b/python/gevent/libuv/_corecffi_cdef.c new file mode 100644 index 0000000..0735aea --- /dev/null +++ b/python/gevent/libuv/_corecffi_cdef.c @@ -0,0 +1,393 @@ +/* markers for the CFFI parser. Replaced when the string is read. */ +#define GEVENT_STRUCT_DONE int +#define GEVENT_ST_NLINK_T int +#define GEVENT_UV_OS_SOCK_T int + +#define UV_EBUSY ... + +#define UV_VERSION_MAJOR ... +#define UV_VERSION_MINOR ... +#define UV_VERSION_PATCH ... + +typedef enum { +	UV_RUN_DEFAULT = 0, +	UV_RUN_ONCE, +	UV_RUN_NOWAIT +} uv_run_mode; + +typedef enum { +  UV_UNKNOWN_HANDLE = 0, +  UV_ASYNC, +  UV_CHECK, +  UV_FS_EVENT, +  UV_FS_POLL, +  UV_HANDLE, +  UV_IDLE, +  UV_NAMED_PIPE, +  UV_POLL, +  UV_PREPARE, +  UV_PROCESS, +  UV_STREAM, +  UV_TCP, +  UV_TIMER, +  UV_TTY, +  UV_UDP, +  UV_SIGNAL, +  UV_FILE, +  UV_HANDLE_TYPE_MAX +} uv_handle_type; + +enum uv_poll_event { +	UV_READABLE = 1, +	UV_WRITABLE = 2, +	/* new in 1.9 */ +	UV_DISCONNECT = 4, +	/* new in 1.14.0 */ +	UV_PRIORITIZED = 8, +}; + +enum uv_fs_event { +	UV_RENAME = 1, +	UV_CHANGE = 2 +}; + +enum uv_fs_event_flags { +	/* +	* By default, if the fs event watcher is given a directory name, we will +	* watch for all events in that directory. This flags overrides this behavior +	* and makes fs_event report only changes to the directory entry itself. This +	* flag does not affect individual files watched. +	* This flag is currently not implemented yet on any backend. +	*/ +	UV_FS_EVENT_WATCH_ENTRY = 1, +	/* +	* By default uv_fs_event will try to use a kernel interface such as inotify +	* or kqueue to detect events. This may not work on remote filesystems such +	* as NFS mounts. This flag makes fs_event fall back to calling stat() on a +	* regular interval. +	* This flag is currently not implemented yet on any backend. +	*/ +	UV_FS_EVENT_STAT = 2, +	/* +	* By default, event watcher, when watching directory, is not registering +	* (is ignoring) changes in it's subdirectories. +	* This flag will override this behaviour on platforms that support it. +	*/ +	UV_FS_EVENT_RECURSIVE = 4 +}; + +const char* uv_strerror(int); +const char* uv_err_name(int); +const char* uv_version_string(void); +const char* uv_handle_type_name(uv_handle_type type); + +// handle structs and types +struct uv_loop_s { +	void* data; +	GEVENT_STRUCT_DONE _; +}; +struct uv_handle_s { +	struct uv_loop_s* loop; +	uv_handle_type type; +	void *data; +	GEVENT_STRUCT_DONE _; +}; +struct uv_idle_s { +	struct uv_loop_s* loop; +	uv_handle_type type; +	void *data; +	GEVENT_STRUCT_DONE _; +}; +struct uv_prepare_s { +	struct uv_loop_s* loop; +	uv_handle_type type; +	void *data; +	GEVENT_STRUCT_DONE _; +}; +struct uv_timer_s { +	struct uv_loop_s* loop; +	uv_handle_type type; +	void *data; +	GEVENT_STRUCT_DONE _; +}; +struct uv_signal_s { +	struct uv_loop_s* loop; +	uv_handle_type type; +	void *data; +	GEVENT_STRUCT_DONE _; +}; +struct uv_poll_s { +	struct uv_loop_s* loop; +	uv_handle_type type; +	void *data; +	GEVENT_STRUCT_DONE _; +}; + +struct uv_check_s { +	struct uv_loop_s* loop; +	uv_handle_type type; +	void *data; +	GEVENT_STRUCT_DONE _; +}; + +struct uv_async_s { +	struct uv_loop_s* loop; +	uv_handle_type type; +	void *data; +	void (*async_cb)(struct uv_async_s *); +	GEVENT_STRUCT_DONE _; +}; + +struct uv_fs_event_s { +	struct uv_loop_s* loop; +	uv_handle_type type; +	void *data; +	GEVENT_STRUCT_DONE _; +}; + +struct uv_fs_poll_s { +	struct uv_loop_s* loop; +	uv_handle_type type; +	void *data; +	GEVENT_STRUCT_DONE _; +}; + +typedef struct uv_loop_s uv_loop_t; +typedef struct uv_handle_s uv_handle_t; +typedef struct uv_idle_s uv_idle_t; +typedef struct uv_prepare_s uv_prepare_t; +typedef struct uv_timer_s uv_timer_t; +typedef struct uv_signal_s uv_signal_t; +typedef struct uv_poll_s uv_poll_t; +typedef struct uv_check_s uv_check_t; +typedef struct uv_async_s uv_async_t; +typedef struct uv_fs_event_s uv_fs_event_t; +typedef struct uv_fs_poll_s uv_fs_poll_t; + + +size_t uv_handle_size(uv_handle_type); + +// callbacks with the same signature +typedef void (*uv_close_cb)(uv_handle_t *handle); +typedef void (*uv_idle_cb)(uv_idle_t *handle); +typedef void (*uv_timer_cb)(uv_timer_t *handle); +typedef void (*uv_check_cb)(uv_check_t* handle); +typedef void (*uv_async_cb)(uv_async_t* handle); +typedef void (*uv_prepare_cb)(uv_prepare_t *handle); + +// callbacks with distinct sigs +typedef void (*uv_walk_cb)(uv_handle_t *handle, void *arg); +typedef void (*uv_poll_cb)(uv_poll_t *handle, int status, int events); +typedef void (*uv_signal_cb)(uv_signal_t *handle, int signum); + +// Callback passed to uv_fs_event_start() which will be called +// repeatedly after the handle is started. If the handle was started +// with a directory the filename parameter will be a relative path to +// a file contained in the directory. The events parameter is an ORed +// mask of uv_fs_event elements. +typedef void (*uv_fs_event_cb)(uv_fs_event_t* handle, const char* filename, int events, int status); + +typedef struct { +	long tv_sec; +	long tv_nsec; +} uv_timespec_t; + +typedef struct { +	uint64_t st_dev; +	uint64_t st_mode; +	uint64_t st_nlink; +	uint64_t st_uid; +	uint64_t st_gid; +	uint64_t st_rdev; +	uint64_t st_ino; +	uint64_t st_size; +	uint64_t st_blksize; +	uint64_t st_blocks; +	uint64_t st_flags; +	uint64_t st_gen; +	uv_timespec_t st_atim; +	uv_timespec_t st_mtim; +	uv_timespec_t st_ctim; +	uv_timespec_t st_birthtim; +} uv_stat_t; + +typedef void (*uv_fs_poll_cb)(uv_fs_poll_t* handle, int status, const uv_stat_t* prev, const uv_stat_t* curr); + +// loop functions +uv_loop_t *uv_default_loop(); +uv_loop_t* uv_loop_new(); // not documented; neither is uv_loop_delete +int uv_loop_init(uv_loop_t* loop); +int uv_loop_fork(uv_loop_t* loop); +int uv_loop_alive(const uv_loop_t *loop); +int uv_loop_close(uv_loop_t* loop); +uint64_t uv_backend_timeout(uv_loop_t* loop); +int uv_run(uv_loop_t *, uv_run_mode mode); +int uv_backend_fd(const uv_loop_t* loop); +// The narrative docs for the two time functions say 'const', +// but the header does not. +void uv_update_time(uv_loop_t* loop); +uint64_t uv_now(uv_loop_t* loop); +void uv_stop(uv_loop_t *); +void uv_walk(uv_loop_t *loop, uv_walk_cb walk_cb, void *arg); + +// handle functions +// uv_handle_t is the base type for all libuv handle types. + +void uv_ref(void *); +void uv_unref(void *); +int uv_has_ref(void *); +void uv_close(void *handle, uv_close_cb close_cb); +int uv_is_active(void *handle); +int uv_is_closing(void *handle); + +// idle functions +// Idle handles will run the given callback once per loop iteration, right +// before the uv_prepare_t handles. Note: The notable difference with prepare +// handles is that when there are active idle handles, the loop will perform a +// zero timeout poll instead of blocking for i/o. Warning: Despite the name, +// idle handles will get their callbacks called on every loop iteration, not +// when the loop is actually "idle". +int uv_idle_init(uv_loop_t *, uv_idle_t *idle); +int uv_idle_start(uv_idle_t *idle, uv_idle_cb cb); +int uv_idle_stop(uv_idle_t *idle); + +// prepare functions +// Prepare handles will run the given callback once per loop iteration, right +// before polling for i/o. +int uv_prepare_init(uv_loop_t *, uv_prepare_t *prepare); +int uv_prepare_start(uv_prepare_t *prepare, uv_prepare_cb cb); +int uv_prepare_stop(uv_prepare_t *prepare); + +// check functions +// Check handles will run the given callback once per loop iteration, right +int uv_check_init(uv_loop_t *, uv_check_t *check); +int uv_check_start(uv_check_t *check, uv_check_cb cb); +int uv_check_stop(uv_check_t *check); + +// async functions +// Async handles allow the user to "wakeup" the event loop and get a callback called from another thread. + +int uv_async_init(uv_loop_t *, uv_async_t*, uv_async_cb); +int uv_async_send(uv_async_t*); + +// timer functions +// Timer handles are used to schedule callbacks to be called in the future. +int uv_timer_init(uv_loop_t *, uv_timer_t *handle); +int uv_timer_start(uv_timer_t *handle, uv_timer_cb cb, uint64_t timeout, uint64_t repeat); +int uv_timer_stop(uv_timer_t *handle); +int uv_timer_again(uv_timer_t *handle); +void uv_timer_set_repeat(uv_timer_t *handle, uint64_t repeat); +uint64_t uv_timer_get_repeat(const uv_timer_t *handle); + +// signal functions +// Signal handles implement Unix style signal handling on a per-event loop +// bases. +int uv_signal_init(uv_loop_t *loop, uv_signal_t *handle); +int uv_signal_start(uv_signal_t *handle, uv_signal_cb signal_cb, int signum); +int uv_signal_stop(uv_signal_t *handle); + +// poll functions Poll handles are used to watch file descriptors for +// readability and writability, similar to the purpose of poll(2). It +// is not okay to have multiple active poll handles for the same +// socket, this can cause libuv to busyloop or otherwise malfunction. +// +// The purpose of poll handles is to enable integrating external +// libraries that rely on the event loop to signal it about the socket +// status changes, like c-ares or libssh2. Using uv_poll_t for any +// other purpose is not recommended; uv_tcp_t, uv_udp_t, etc. provide +// an implementation that is faster and more scalable than what can be +// achieved with uv_poll_t, especially on Windows. +// +// Note On windows only sockets can be polled with poll handles. On +// Unix any file descriptor that would be accepted by poll(2) can be +// used. +int uv_poll_init(uv_loop_t *loop, uv_poll_t *handle, int fd); + +// Initialize the handle using a socket descriptor. On Unix this is +// identical to uv_poll_init(). On windows it takes a SOCKET handle; +// SOCKET handles are another name for HANDLE objects in win32, and +// those are defined as PVOID, even though they are not actually +// pointers (they're small integers). CPython and PyPy both return +// the SOCKET (as cast to an int) from the socket.fileno() method. +// libuv uses ``uv_os_sock_t`` for this type, which is defined as an +// int on unix. +int uv_poll_init_socket(uv_loop_t* loop, uv_poll_t* handle, GEVENT_UV_OS_SOCK_T socket); +int uv_poll_start(uv_poll_t *handle, int events, uv_poll_cb cb); +int uv_poll_stop(uv_poll_t *handle); + +// FS Event handles allow the user to monitor a given path for +// changes, for example, if the file was renamed or there was a +// generic change in it. This handle uses the best backend for the job +// on each platform. +// +// Thereas also uv_fs_poll_t that uses stat for filesystems where +// the kernel event isn't available. +int uv_fs_event_init(uv_loop_t*, uv_fs_event_t*); +int uv_fs_event_start(uv_fs_event_t*, uv_fs_event_cb, const char* path, unsigned int flags); +int uv_fs_event_stop(uv_fs_event_t*); +int uv_fs_event_getpath(uv_fs_event_t*, char* buffer, size_t* size); + +// FS Poll handles allow the user to monitor a given path for changes. +// Unlike uv_fs_event_t, fs poll handles use stat to detect when a +// file has changed so they can work on file systems where fs event +// handles can't. +// +// This is a closer match to libev. +int uv_fs_poll_init(void*, void*); +int uv_fs_poll_start(void*, uv_fs_poll_cb, const char* path, unsigned int); +int uv_fs_poll_stop(void*); + + +/* Standard library */ +void* memset(void *b, int c, size_t len); + + +/* gevent callbacks */ +// Implemented in Python code as 'def_extern'. In the case of poll callbacks and fs +// callbacks, if *status* is less than 0, it will be passed in the revents +// field. In cases of no extra arguments, revents will be 0. +// These will be created as static functions at the end of the +// _source.c and must be pre-declared at the top of that file if we +// call them +typedef void* GeventWatcherObject; +extern "Python" { +	// Standard gevent._ffi.loop callbacks. +	int python_callback(GeventWatcherObject handle, int revents); +	void python_handle_error(GeventWatcherObject handle, int revents); +	void python_stop(GeventWatcherObject handle); + +	void python_check_callback(uv_check_t* handle); +	void python_prepare_callback(uv_prepare_t* handle); +	void python_timer0_callback(uv_check_t* handle); + +	// libuv specific callback +	void _uv_close_callback(uv_handle_t* handle); +	void python_sigchld_callback(uv_signal_t* handle, int signum); +	void python_queue_callback(uv_handle_t* handle, int revents); +} +// A variable we fill in. +static void (*gevent_noop)(void* handle); + +static void _gevent_signal_callback1(uv_signal_t* handle, int arg); +static void _gevent_async_callback0(uv_async_t* handle); +static void _gevent_prepare_callback0(uv_prepare_t* handle); +static void _gevent_timer_callback0(uv_timer_t* handle); +static void _gevent_check_callback0(uv_check_t* handle); +static void _gevent_idle_callback0(uv_idle_t* handle); +static void _gevent_poll_callback2(uv_poll_t* handle, int status, int events); +static void _gevent_fs_event_callback3(uv_fs_event_t* handle, const char* filename, int events, int status); + +typedef struct _gevent_fs_poll_s { +	uv_fs_poll_t handle; +	uv_stat_t curr; +	uv_stat_t prev; +} gevent_fs_poll_t; + +static void _gevent_fs_poll_callback3(uv_fs_poll_t* handle, int status, const uv_stat_t* prev, const uv_stat_t* curr); + +static void gevent_uv_walk_callback_close(uv_handle_t* handle, void* arg); +static void gevent_close_all_handles(uv_loop_t* loop); +static void gevent_zero_timer(uv_timer_t* handle); +static void gevent_zero_prepare(uv_prepare_t* handle); +static void gevent_zero_check(uv_check_t* handle); +static void gevent_zero_loop(uv_loop_t* handle); diff --git a/python/gevent/libuv/_corecffi_source.c b/python/gevent/libuv/_corecffi_source.c new file mode 100644 index 0000000..83fe82e --- /dev/null +++ b/python/gevent/libuv/_corecffi_source.c @@ -0,0 +1,181 @@ +#include <string.h> +#include "uv.h" + +typedef void* GeventWatcherObject; + +static int python_callback(GeventWatcherObject handle, int revents); +static void python_queue_callback(uv_handle_t* watcher_ptr, int revents); +static void python_handle_error(GeventWatcherObject handle, int revents); +static void python_stop(GeventWatcherObject handle); + +static void _gevent_noop(void* handle) {} + +static void (*gevent_noop)(void* handle) = &_gevent_noop; + +static void _gevent_generic_callback1_unused(uv_handle_t* watcher, int arg) +{ +    // Python code may set this to NULL or even change it +    // out from under us, which would tend to break things. +    GeventWatcherObject handle = watcher->data; +    const int cb_result = python_callback(handle, arg); +    switch(cb_result) { +        case -1: +            // in case of exception, call self.loop.handle_error; +            // this function is also responsible for stopping the watcher +            // and allowing memory to be freed +            python_handle_error(handle, arg); +        break; +        case 1: +            // Code to stop the event IF NEEDED. Note that if python_callback +            // has disposed of the last reference to the handle, +            // `watcher` could now be invalid/disposed memory! +            if (!uv_is_active(watcher)) { +                if (watcher->data != handle) { +                    if (watcher->data) { +                        // If Python set the data to NULL, then they +                        // expected to be stopped. That's fine. +                        // Otherwise, something weird happened. +                        fprintf(stderr, +                                "WARNING: gevent: watcher handle changed in callback " +                                "from %p to %p for watcher at %p of type %d\n", +                                handle, watcher->data, watcher, watcher->type); +                        // There's a very good chance that the object the +                        // handle referred to has been changed and/or the +                        // old handle has been deallocated (most common), so +                        // passing the old handle will crash. Instead we +                        // pass a sigil to let python distinguish this case. +                        python_stop(NULL); +                    } +                } +                else { +                    python_stop(handle); +                } +            } +        break; +        case 2: +            // watcher is already stopped and dead, nothing to do. +        break; +        default: +            fprintf(stderr, +                    "WARNING: gevent: Unexpected return value %d from Python callback " +                    "for watcher %p (of type %d) and handle %p\n", +                    cb_result, +                    watcher, watcher->type, handle); +            // XXX: Possible leaking of resources here? Should we be +            // closing the watcher? +    } +} + + +static void _gevent_generic_callback1(uv_handle_t* watcher, int arg) +{ +	python_queue_callback(watcher, arg); +} + +static void _gevent_generic_callback0(uv_handle_t* handle) +{ +    _gevent_generic_callback1(handle, 0); +} + +static void _gevent_async_callback0(uv_async_t* handle) +{ +    _gevent_generic_callback0((uv_handle_t*)handle); +} + +static void _gevent_timer_callback0(uv_timer_t* handle) +{ +    _gevent_generic_callback0((uv_handle_t*)handle); +} + +static void _gevent_prepare_callback0(uv_prepare_t* handle) +{ +    _gevent_generic_callback0((uv_handle_t*)handle); +} + +static void _gevent_check_callback0(uv_check_t* handle) +{ +    _gevent_generic_callback0((uv_handle_t*)handle); +} + +static void _gevent_idle_callback0(uv_idle_t* handle) +{ +    _gevent_generic_callback0((uv_handle_t*)handle); +} + +static void _gevent_signal_callback1(uv_signal_t* handle, int signum) +{ +    _gevent_generic_callback1((uv_handle_t*)handle, signum); +} + + +static void _gevent_poll_callback2(void* handle, int status, int events) +{ +    _gevent_generic_callback1(handle, status < 0 ? status : events); +} + +static void _gevent_fs_event_callback3(void* handle, const char* filename, int events, int status) +{ +    _gevent_generic_callback1(handle, status < 0 ? status : events); +} + + +typedef struct _gevent_fs_poll_s { +    uv_fs_poll_t handle; +    uv_stat_t curr; +    uv_stat_t prev; +} gevent_fs_poll_t; + +static void _gevent_fs_poll_callback3(void* handlep, int status, const uv_stat_t* prev, const uv_stat_t* curr) +{ +    // stat pointers are valid for this callback only. +    // if given, copy them into our structure, where they can be reached +    // from python, just like libev's watcher does, before calling +    // the callback. + +    // The callback is invoked with status < 0 if path does not exist +    // or is inaccessible. The watcher is not stopped but your +    // callback is not called again until something changes (e.g. when +    // the file is created or the error reason changes). +    // In that case the fields will be 0 in curr/prev. + + +    gevent_fs_poll_t* handle = (gevent_fs_poll_t*)handlep; +    assert(status == 0); + +    handle->curr = *curr; +    handle->prev = *prev; + +    _gevent_generic_callback1((uv_handle_t*)handle, 0); +} + +static void gevent_uv_walk_callback_close(uv_handle_t* handle, void* arg) +{ +	if( handle && !uv_is_closing(handle) ) { +		uv_close(handle, NULL); +	} +} + +static void gevent_close_all_handles(uv_loop_t* loop) +{ +	uv_walk(loop, gevent_uv_walk_callback_close, NULL); +} + +static void gevent_zero_timer(uv_timer_t* handle) +{ +	memset(handle, 0, sizeof(uv_timer_t)); +} + +static void gevent_zero_check(uv_check_t* handle) +{ +	memset(handle, 0, sizeof(uv_check_t)); +} + +static void gevent_zero_prepare(uv_prepare_t* handle) +{ +	memset(handle, 0, sizeof(uv_prepare_t)); +} + +static void gevent_zero_loop(uv_loop_t* handle) +{ +	memset(handle, 0, sizeof(uv_loop_t)); +} diff --git a/python/gevent/libuv/loop.py b/python/gevent/libuv/loop.py new file mode 100644 index 0000000..0f317c0 --- /dev/null +++ b/python/gevent/libuv/loop.py @@ -0,0 +1,601 @@ +""" +libuv loop implementation +""" +# pylint: disable=no-member +from __future__ import absolute_import, print_function + +import os +from collections import defaultdict +from collections import namedtuple +from operator import delitem +import signal + +from gevent._ffi import _dbg # pylint: disable=unused-import +from gevent._ffi.loop import AbstractLoop +from gevent.libuv import _corecffi # pylint:disable=no-name-in-module,import-error +from gevent._ffi.loop import assign_standard_callbacks +from gevent._ffi.loop import AbstractCallbacks +from gevent._util import implementer +from gevent._interfaces import ILoop + +ffi = _corecffi.ffi +libuv = _corecffi.lib + +__all__ = [ +] + + +class _Callbacks(AbstractCallbacks): + +    def _find_loop_from_c_watcher(self, watcher_ptr): +        loop_handle = ffi.cast('uv_handle_t*', watcher_ptr).data +        return self.from_handle(loop_handle) if loop_handle else None + +    def python_sigchld_callback(self, watcher_ptr, _signum): +        self.from_handle(ffi.cast('uv_handle_t*', watcher_ptr).data)._sigchld_callback() + +    def python_timer0_callback(self, watcher_ptr): +        return self.python_prepare_callback(watcher_ptr) + +    def python_queue_callback(self, watcher_ptr, revents): +        watcher_handle = watcher_ptr.data +        the_watcher = self.from_handle(watcher_handle) + +        the_watcher.loop._queue_callback(watcher_ptr, revents) + + +_callbacks = assign_standard_callbacks( +    ffi, libuv, _Callbacks, +    [('python_sigchld_callback', None), +     ('python_timer0_callback', None), +     ('python_queue_callback', None)]) + +from gevent._ffi.loop import EVENTS +GEVENT_CORE_EVENTS = EVENTS # export + +from gevent.libuv import watcher as _watchers # pylint:disable=no-name-in-module + +_events_to_str = _watchers._events_to_str # export + +READ = libuv.UV_READABLE +WRITE = libuv.UV_WRITABLE + +def get_version(): +    uv_bytes = ffi.string(libuv.uv_version_string()) +    if not isinstance(uv_bytes, str): +        # Py3 +        uv_str = uv_bytes.decode("ascii") +    else: +        uv_str = uv_bytes + +    return 'libuv-' + uv_str + +def get_header_version(): +    return 'libuv-%d.%d.%d' % (libuv.UV_VERSION_MAJOR, libuv.UV_VERSION_MINOR, libuv.UV_VERSION_PATCH) + +def supported_backends(): +    return ['default'] + +@implementer(ILoop) +class loop(AbstractLoop): + +    # XXX: Undocumented. Maybe better named 'timer_resolution'? We can't +    # know this in general on libev +    min_sleep_time = 0.001 # 1ms + +    error_handler = None + +    _CHECK_POINTER = 'uv_check_t *' + +    _PREPARE_POINTER = 'uv_prepare_t *' +    _PREPARE_CALLBACK_SIG = "void(*)(void*)" + +    _TIMER_POINTER = _CHECK_POINTER # This is poorly named. It's for the callback "timer" + +    def __init__(self, flags=None, default=None): +        AbstractLoop.__init__(self, ffi, libuv, _watchers, flags, default) +        self.__loop_pid = os.getpid() +        self._child_watchers = defaultdict(list) +        self._io_watchers = dict() +        self._fork_watchers = set() +        self._pid = os.getpid() +        self._default = self._ptr == libuv.uv_default_loop() +        self._queued_callbacks = [] + +    def _queue_callback(self, watcher_ptr, revents): +        self._queued_callbacks.append((watcher_ptr, revents)) + +    def _init_loop(self, flags, default): +        if default is None: +            default = True +            # Unlike libev, libuv creates a new default +            # loop automatically if the old default loop was +            # closed. + +        if default: +            # XXX: If the default loop had been destroyed, this +            # will create a new one, but we won't destroy it +            ptr = libuv.uv_default_loop() +        else: +            ptr = libuv.uv_loop_new() + + +        if not ptr: +            raise SystemError("Failed to get loop") + +        # Track whether or not any object has destroyed +        # this loop. See _can_destroy_default_loop +        ptr.data = ptr +        return ptr + +    _signal_idle = None + +    def _init_and_start_check(self): +        libuv.uv_check_init(self._ptr, self._check) +        libuv.uv_check_start(self._check, libuv.python_check_callback) +        libuv.uv_unref(self._check) + +        # We also have to have an idle watcher to be able to handle +        # signals in a timely manner. Without them, libuv won't loop again +        # and call into its check and prepare handlers. +        # Note that this basically forces us into a busy-loop +        # XXX: As predicted, using an idle watcher causes our process +        # to eat 100% CPU time. We instead use a timer with a max of a .3 second +        # delay to notice signals. Note that this timeout also implements fork +        # watchers, effectively. + +        # XXX: Perhaps we could optimize this to notice when there are other +        # timers in the loop and start/stop it then. When we have a callback +        # scheduled, this should also be the same and unnecessary? +        # libev does takes this basic approach on Windows. +        self._signal_idle = ffi.new("uv_timer_t*") +        libuv.uv_timer_init(self._ptr, self._signal_idle) +        self._signal_idle.data = self._handle_to_self +        sig_cb = ffi.cast('void(*)(uv_timer_t*)', libuv.python_check_callback) +        libuv.uv_timer_start(self._signal_idle, +                             sig_cb, +                             300, +                             300) +        libuv.uv_unref(self._signal_idle) + +    def _run_callbacks(self): +        # Manually handle fork watchers. +        curpid = os.getpid() +        if curpid != self._pid: +            self._pid = curpid +            for watcher in self._fork_watchers: +                watcher._on_fork() + + +        # The contents of queued_callbacks at this point should be timers +        # that expired when the loop began along with any idle watchers. +        # We need to run them so that any manual callbacks they want to schedule +        # get added to the list and ran next before we go on to poll for IO. +        # This is critical for libuv on linux: closing a socket schedules some manual +        # callbacks to actually stop the watcher; if those don't run before +        # we poll for IO, then libuv can abort the process for the closed file descriptor. + +        # XXX: There's still a race condition here because we may not run *all* the manual +        # callbacks. We need a way to prioritize those. + +        # Running these before the manual callbacks lead to some +        # random test failures. In test__event.TestEvent_SetThenClear +        # we would get a LoopExit sometimes. The problem occurred when +        # a timer expired on entering the first loop; we would process +        # it there, and then process the callback that it created +        # below, leaving nothing for the loop to do. Having the +        # self.run() manually process manual callbacks before +        # continuing solves the problem. (But we must still run callbacks +        # here again.) +        self._prepare_ran_callbacks = self.__run_queued_callbacks() + +        super(loop, self)._run_callbacks() + +    def _init_and_start_prepare(self): +        libuv.uv_prepare_init(self._ptr, self._prepare) +        libuv.uv_prepare_start(self._prepare, libuv.python_prepare_callback) +        libuv.uv_unref(self._prepare) + +    def _init_callback_timer(self): +        libuv.uv_check_init(self._ptr, self._timer0) + +    def _stop_callback_timer(self): +        libuv.uv_check_stop(self._timer0) + +    def _start_callback_timer(self): +        # The purpose of the callback timer is to ensure that we run +        # callbacks as soon as possible on the next iteration of the event loop. + +        # In libev, we set a 0 duration timer with a no-op callback. +        # This executes immediately *after* the IO poll is done (it +        # actually determines the time that the IO poll will block +        # for), so having the timer present simply spins the loop, and +        # our normal prepare watcher kicks in to run the callbacks. + +        # In libuv, however, timers are run *first*, before prepare +        # callbacks and before polling for IO. So a no-op 0 duration +        # timer actually does *nothing*. (Also note that libev queues all +        # watchers found during IO poll to run at the end (I think), while libuv +        # runs them in uv__io_poll itself.) + +        # From the loop inside uv_run: +        # while True: +        #   uv__update_time(loop); +        #   uv__run_timers(loop); +        #   # we don't use pending watchers. They are how libuv +        #   # implements the pipe/udp/tcp streams. +        #   ran_pending = uv__run_pending(loop); +        #   uv__run_idle(loop); +        #   uv__run_prepare(loop); +        #   ... +        #   uv__io_poll(loop, timeout); # <--- IO watchers run here! +        #   uv__run_check(loop); + +        # libev looks something like this (pseudo code because the real code is +        # hard to read): +        # +        # do { +        #    run_fork_callbacks(); +        #    run_prepare_callbacks(); +        #    timeout = min(time of all timers or normal block time) +        #    io_poll() # <--- Only queues IO callbacks +        #    update_now(); calculate_expired_timers(); +        #    run callbacks in this order: (although specificying priorities changes it) +        #        check +        #        stat +        #        child +        #        signal +        #        timer +        #        io +        # } + +        # So instead of running a no-op and letting the side-effect of spinning +        # the loop run the callbacks, we must explicitly run them here. + +        # If we don't, test__systemerror:TestCallback will be flaky, failing +        # one time out of ~20, depending on timing. + +        # To get them to run immediately after this current loop, +        # we use a check watcher, instead of a 0 duration timer entirely. +        # If we use a 0 duration timer, we can get stuck in a timer loop. +        # Python 3.6 fails in test_ftplib.py + +        # As a final note, if we have not yet entered the loop *at +        # all*, and a timer was created with a duration shorter than +        # the amount of time it took for us to enter the loop in the +        # first place, it may expire and get called before our callback +        # does. This could also lead to test__systemerror:TestCallback +        # appearing to be flaky. + +        # As yet another final note, if we are currently running a +        # timer callback, meaning we're inside uv__run_timers() in C, +        # and the Python starts a new timer, if the Python code then +        # update's the loop's time, it's possible that timer will +        # expire *and be run in the same iteration of the loop*. This +        # is trivial to do: In sequential code, anything after +        # `gevent.sleep(0.1)` is running in a timer callback. Starting +        # a new timer---e.g., another gevent.sleep() call---will +        # update the time, *before* uv__run_timers exits, meaning +        # other timers get a chance to run before our check or prepare +        # watcher callbacks do. Therefore, we do indeed have to have a 0 +        # timer to run callbacks---it gets inserted before any other user +        # timers---ideally, this should be especially careful about how much time +        # it runs for. + +        # AND YET: We can't actually do that. We get timeouts that I haven't fully +        # investigated if we do. Probably stuck in a timer loop. + +        # As a partial remedy to this, unlike libev, our timer watcher +        # class doesn't update the loop time by default. + +        libuv.uv_check_start(self._timer0, libuv.python_timer0_callback) + + +    def _stop_aux_watchers(self): +        assert self._prepare +        assert self._check +        assert self._signal_idle +        libuv.uv_prepare_stop(self._prepare) +        libuv.uv_ref(self._prepare) # Why are we doing this? + +        libuv.uv_check_stop(self._check) +        libuv.uv_ref(self._check) + +        libuv.uv_timer_stop(self._signal_idle) +        libuv.uv_ref(self._signal_idle) + +        libuv.uv_check_stop(self._timer0) + +    def _setup_for_run_callback(self): +        self._start_callback_timer() +        libuv.uv_ref(self._timer0) + + +    def _can_destroy_loop(self, ptr): +        # We're being asked to destroy a loop that's, +        # at the time it was constructed, was the default loop. +        # If loop objects were constructed more than once, +        # it may have already been destroyed, though. +        # We track this in the data member. +        return ptr.data + +    def _destroy_loop(self, ptr): +        ptr.data = ffi.NULL +        libuv.uv_stop(ptr) + +        libuv.gevent_close_all_handles(ptr) + +        closed_failed = libuv.uv_loop_close(ptr) +        if closed_failed: +            assert closed_failed == libuv.UV_EBUSY +            # We already closed all the handles. Run the loop +            # once to let them be cut off from the loop. +            ran_has_more_callbacks = libuv.uv_run(ptr, libuv.UV_RUN_ONCE) +            if ran_has_more_callbacks: +                libuv.uv_run(ptr, libuv.UV_RUN_NOWAIT) +            closed_failed = libuv.uv_loop_close(ptr) +            assert closed_failed == 0, closed_failed + +        # Destroy the native resources *after* we have closed +        # the loop. If we do it before, walking the handles +        # attached to the loop is likely to segfault. + +        libuv.gevent_zero_check(self._check) +        libuv.gevent_zero_check(self._timer0) +        libuv.gevent_zero_prepare(self._prepare) +        libuv.gevent_zero_timer(self._signal_idle) +        del self._check +        del self._prepare +        del self._signal_idle +        del self._timer0 + +        libuv.gevent_zero_loop(ptr) + +        # Destroy any watchers we're still holding on to. +        del self._io_watchers +        del self._fork_watchers +        del self._child_watchers + + +    def debug(self): +        """ +        Return all the handles that are open and their ref status. +        """ +        handle_state = namedtuple("HandleState", +                                  ['handle', +                                   'type', +                                   'watcher', +                                   'ref', +                                   'active', +                                   'closing']) +        handles = [] + +        # XXX: Convert this to a modern callback. +        def walk(handle, _arg): +            data = handle.data +            if data: +                watcher = ffi.from_handle(data) +            else: +                watcher = None +            handles.append(handle_state(handle, +                                        ffi.string(libuv.uv_handle_type_name(handle.type)), +                                        watcher, +                                        libuv.uv_has_ref(handle), +                                        libuv.uv_is_active(handle), +                                        libuv.uv_is_closing(handle))) + +        libuv.uv_walk(self._ptr, +                      ffi.callback("void(*)(uv_handle_t*,void*)", +                                   walk), +                      ffi.NULL) +        return handles + +    def ref(self): +        pass + +    def unref(self): +        # XXX: Called by _run_callbacks. +        pass + +    def break_(self, how=None): +        libuv.uv_stop(self._ptr) + +    def reinit(self): +        # TODO: How to implement? We probably have to simply +        # re-__init__ this whole class? Does it matter? +        # OR maybe we need to uv_walk() and close all the handles? + +        # XXX: libuv < 1.12 simply CANNOT handle a fork unless you immediately +        # exec() in the child. There are multiple calls to abort() that +        # will kill the child process: +        # - The OS X poll implementation (kqueue) aborts on an error return +        # value; since kqueue FDs can't be inherited, then the next call +        # to kqueue in the child will fail and get aborted; fork() is likely +        # to be called during the gevent loop, meaning we're deep inside the +        # runloop already, so we can't even close the loop that we're in: +        # it's too late, the next call to kqueue is already scheduled. +        # - The threadpool, should it be in use, also aborts +        # (https://github.com/joyent/libuv/pull/1136) +        # - There global shared state that breaks signal handling +        # and leads to an abort() in the child, EVEN IF the loop in the parent +        # had already been closed +        # (https://github.com/joyent/libuv/issues/1405) + +        # In 1.12, the uv_loop_fork function was added (by gevent!) +        libuv.uv_loop_fork(self._ptr) + +    _prepare_ran_callbacks = False + +    def __run_queued_callbacks(self): +        if not self._queued_callbacks: +            return False + +        cbs = list(self._queued_callbacks) +        self._queued_callbacks = [] + +        for watcher_ptr, arg in cbs: +            handle = watcher_ptr.data +            if not handle: +                # It's been stopped and possibly closed +                assert not libuv.uv_is_active(watcher_ptr) +                continue +            val = _callbacks.python_callback(handle, arg) +            if val == -1: +                _callbacks.python_handle_error(handle, arg) +            elif val == 1: +                if not libuv.uv_is_active(watcher_ptr): +                    if watcher_ptr.data != handle: +                        if watcher_ptr.data: +                            _callbacks.python_stop(None) +                    else: +                        _callbacks.python_stop(handle) +        return True + + +    def run(self, nowait=False, once=False): +        # we can only respect one flag or the other. +        # nowait takes precedence because it can't block +        mode = libuv.UV_RUN_DEFAULT +        if once: +            mode = libuv.UV_RUN_ONCE +        if nowait: +            mode = libuv.UV_RUN_NOWAIT + +        if mode == libuv.UV_RUN_DEFAULT: +            while self._ptr and self._ptr.data: +                # This is here to better preserve order guarantees. See _run_callbacks +                # for details. +                # It may get run again from the prepare watcher, so potentially we +                # could take twice as long as the switch interval. +                self._run_callbacks() +                self._prepare_ran_callbacks = False +                ran_status = libuv.uv_run(self._ptr, libuv.UV_RUN_ONCE) +                # Note that we run queued callbacks when the prepare watcher runs, +                # thus accounting for timers that expired before polling for IO, +                # and idle watchers. This next call should get IO callbacks and +                # callbacks from timers that expired *after* polling for IO. +                ran_callbacks = self.__run_queued_callbacks() + +                if not ran_status and not ran_callbacks and not self._prepare_ran_callbacks: +                    # A return of 0 means there are no referenced and +                    # active handles. The loop is over. +                    # If we didn't run any callbacks, then we couldn't schedule +                    # anything to switch in the future, so there's no point +                    # running again. +                    return ran_status +            return 0 # Somebody closed the loop + +        result = libuv.uv_run(self._ptr, mode) +        self.__run_queued_callbacks() +        return result + +    def now(self): +        # libuv's now is expressed as an integer number of +        # milliseconds, so to get it compatible with time.time units +        # that this method is supposed to return, we have to divide by 1000.0 +        now = libuv.uv_now(self._ptr) +        return now / 1000.0 + +    def update_now(self): +        libuv.uv_update_time(self._ptr) + +    def fileno(self): +        if self._ptr: +            fd = libuv.uv_backend_fd(self._ptr) +            if fd >= 0: +                return fd + +    _sigchld_watcher = None +    _sigchld_callback_ffi = None + +    def install_sigchld(self): +        if not self.default: +            return + +        if self._sigchld_watcher: +            return + +        self._sigchld_watcher = ffi.new('uv_signal_t*') +        libuv.uv_signal_init(self._ptr, self._sigchld_watcher) +        self._sigchld_watcher.data = self._handle_to_self + +        libuv.uv_signal_start(self._sigchld_watcher, +                              libuv.python_sigchld_callback, +                              signal.SIGCHLD) + +    def reset_sigchld(self): +        if not self.default or not self._sigchld_watcher: +            return + +        libuv.uv_signal_stop(self._sigchld_watcher) +        # Must go through this to manage the memory lifetime +        # correctly. Alternately, we could just stop it and restart +        # it in install_sigchld? +        _watchers.watcher._watcher_ffi_close(self._sigchld_watcher) +        del self._sigchld_watcher + + +    def _sigchld_callback(self): +        # Signals can arrive at (relatively) any time. To eliminate +        # race conditions, and behave more like libev, we "queue" +        # sigchld to run when we run callbacks. +        while True: +            try: +                pid, status, _usage = os.wait3(os.WNOHANG) +            except OSError: +                # Python 3 raises ChildProcessError +                break + +            if pid == 0: +                break +            children_watchers = self._child_watchers.get(pid, []) + self._child_watchers.get(0, []) +            for watcher in children_watchers: +                self.run_callback(watcher._set_waitpid_status, pid, status) + +            # Don't invoke child watchers for 0 more than once +            self._child_watchers[0] = [] + +    def _register_child_watcher(self, watcher): +        self._child_watchers[watcher._pid].append(watcher) + +    def _unregister_child_watcher(self, watcher): +        try: +            # stop() should be idempotent +            self._child_watchers[watcher._pid].remove(watcher) +        except ValueError: +            pass + +        # Now's a good time to clean up any dead lists we don't need +        # anymore +        for pid in list(self._child_watchers): +            if not self._child_watchers[pid]: +                del self._child_watchers[pid] + +    def io(self, fd, events, ref=True, priority=None): +        # We rely on hard references here and explicit calls to +        # close() on the returned object to correctly manage +        # the watcher lifetimes. + +        io_watchers = self._io_watchers +        try: +            io_watcher = io_watchers[fd] +            assert io_watcher._multiplex_watchers, ("IO Watcher %s unclosed but should be dead" % io_watcher) +        except KeyError: +            # Start the watcher with just the events that we're interested in. +            # as multiplexers are added, the real event mask will be updated to keep in sync. +            # If we watch for too much, we get spurious wakeups and busy loops. +            io_watcher = self._watchers.io(self, fd, 0) +            io_watchers[fd] = io_watcher +            io_watcher._no_more_watchers = lambda: delitem(io_watchers, fd) + +        return io_watcher.multiplex(events) + +    def prepare(self, ref=True, priority=None): +        # We run arbitrary code in python_prepare_callback. That could switch +        # greenlets. If it does that while also manipulating the active prepare +        # watchers, we could corrupt the process state, since the prepare watcher +        # queue is iterated on the stack (on unix). We could workaround this by implementing +        # prepare watchers in pure Python. +        # See https://github.com/gevent/gevent/issues/1126 +        raise TypeError("prepare watchers are not currently supported in libuv. " +                        "If you need them, please contact the maintainers.") diff --git a/python/gevent/libuv/watcher.py b/python/gevent/libuv/watcher.py new file mode 100644 index 0000000..035cb43 --- /dev/null +++ b/python/gevent/libuv/watcher.py @@ -0,0 +1,732 @@ +# pylint: disable=too-many-lines, protected-access, redefined-outer-name, not-callable +# pylint: disable=no-member +from __future__ import absolute_import, print_function + +import functools +import sys + +from gevent.libuv import _corecffi # pylint:disable=no-name-in-module,import-error + +ffi = _corecffi.ffi +libuv = _corecffi.lib + +from gevent._ffi import watcher as _base +from gevent._ffi import _dbg + +_closing_watchers = set() + +# In debug mode, it would be nice to be able to clear the memory of +# the watcher (its size determined by +# libuv.uv_handle_size(ffi_watcher.type)) using memset so that if we +# are using it after it's supposedly been closed and deleted, we'd +# catch it sooner. BUT doing so breaks test__threadpool. We get errors +# about `pthread_mutex_lock[3]: Invalid argument` (and sometimes we +# crash) suggesting either that we're writing on memory that doesn't +# belong to us, somehow, or that we haven't actually lost all +# references... +_uv_close_callback = ffi.def_extern(name='_uv_close_callback')(_closing_watchers.remove) + + +_events = [(libuv.UV_READABLE, "READ"), +           (libuv.UV_WRITABLE, "WRITE")] + +def _events_to_str(events): # export +    return _base.events_to_str(events, _events) + +class UVFuncallError(ValueError): +    pass + +class libuv_error_wrapper(object): +    # Makes sure that everything stored as a function +    # on the wrapper instances (classes, actually, +    # because this is used by the metaclass) +    # checks its return value and raises an error. +    # This expects that everything we call has an int +    # or void return value and follows the conventions +    # of error handling (that negative values are errors) +    def __init__(self, uv): +        self._libuv = uv + +    def __getattr__(self, name): +        libuv_func = getattr(self._libuv, name) + +        @functools.wraps(libuv_func) +        def wrap(*args, **kwargs): +            if args and isinstance(args[0], watcher): +                args = args[1:] +            res = libuv_func(*args, **kwargs) +            if res is not None and res < 0: +                raise UVFuncallError( +                    str(ffi.string(libuv.uv_err_name(res)).decode('ascii') +                        + ' ' +                        + ffi.string(libuv.uv_strerror(res)).decode('ascii')) +                    + " Args: " + repr(args) + " KWARGS: " + repr(kwargs) +                ) +            return res + +        setattr(self, name, wrap) + +        return wrap + + +class ffi_unwrapper(object): +    # undoes the wrapping of libuv_error_wrapper for +    # the methods used by the metaclass that care + +    def __init__(self, ff): +        self._ffi = ff + +    def __getattr__(self, name): +        return getattr(self._ffi, name) + +    def addressof(self, lib, name): +        assert isinstance(lib, libuv_error_wrapper) +        return self._ffi.addressof(libuv, name) + + +class watcher(_base.watcher): +    _FFI = ffi_unwrapper(ffi) +    _LIB = libuv_error_wrapper(libuv) + +    _watcher_prefix = 'uv' +    _watcher_struct_pattern = '%s_t' + +    @classmethod +    def _watcher_ffi_close(cls, ffi_watcher): +        # Managing the lifetime of _watcher is tricky. +        # They have to be uv_close()'d, but that only +        # queues them to be closed in the *next* loop iteration. +        # The memory must stay valid for at least that long, +        # or assert errors are triggered. We can't use a ffi.gc() +        # pointer to queue the uv_close, because by the time the +        # destructor is called, there's no way to keep the memory alive +        # and it could be re-used. +        # So here we resort to resurrecting the pointer object out +        # of our scope, keeping it alive past this object's lifetime. +        # We then use the uv_close callback to handle removing that +        # reference. There's no context passed to the close callback, +        # so we have to do this globally. + +        # Sadly, doing this causes crashes if there were multiple +        # watchers for a given FD, so we have to take special care +        # about that. See https://github.com/gevent/gevent/issues/790#issuecomment-208076604 + +        # Note that this cannot be a __del__ method, because we store +        # the CFFI handle to self on self, which is a cycle, and +        # objects with a __del__ method cannot be collected on CPython < 3.4 + +        # Instead, this is arranged as a callback to GC when the +        # watcher class dies. Obviously it's important to keep the ffi +        # watcher alive. +        # We can pass in "subclasses" if uv_handle_t that line up at the C level, +        # but that don't in CFFI without a cast. But be careful what we use the cast +        # for, don't pass it back to C. +        ffi_handle_watcher = cls._FFI.cast('uv_handle_t*', ffi_watcher) +        if ffi_handle_watcher.type and not libuv.uv_is_closing(ffi_watcher): +            # If the type isn't set, we were never properly initialized, +            # and trying to close it results in libuv terminating the process. +            # Sigh. Same thing if it's already in the process of being +            # closed. +            _closing_watchers.add(ffi_watcher) +            libuv.uv_close(ffi_watcher, libuv._uv_close_callback) + +        ffi_handle_watcher.data = ffi.NULL + + +    def _watcher_ffi_set_init_ref(self, ref): +        self.ref = ref + +    def _watcher_ffi_init(self, args): +        # TODO: we could do a better job chokepointing this +        return self._watcher_init(self.loop.ptr, +                                  self._watcher, +                                  *args) + +    def _watcher_ffi_start(self): +        self._watcher_start(self._watcher, self._watcher_callback) + +    def _watcher_ffi_stop(self): +        if self._watcher: +            # The multiplexed io watcher deletes self._watcher +            # when it closes down. If that's in the process of +            # an error handler, AbstractCallbacks.unhandled_onerror +            # will try to close us again. +            self._watcher_stop(self._watcher) + +    @_base.only_if_watcher +    def _watcher_ffi_ref(self): +        libuv.uv_ref(self._watcher) + +    @_base.only_if_watcher +    def _watcher_ffi_unref(self): +        libuv.uv_unref(self._watcher) + +    def _watcher_ffi_start_unref(self): +        pass + +    def _watcher_ffi_stop_ref(self): +        pass + +    def _get_ref(self): +        # Convert 1/0 to True/False +        if self._watcher is None: +            return None +        return True if libuv.uv_has_ref(self._watcher) else False + +    def _set_ref(self, value): +        if value: +            self._watcher_ffi_ref() +        else: +            self._watcher_ffi_unref() + +    ref = property(_get_ref, _set_ref) + +    def feed(self, _revents, _callback, *_args): +        raise Exception("Not implemented") + +class io(_base.IoMixin, watcher): +    _watcher_type = 'poll' +    _watcher_callback_name = '_gevent_poll_callback2' + +    # On Windows is critical to be able to garbage collect these +    # objects in a timely fashion so that they don't get reused +    # for multiplexing completely different sockets. This is because +    # uv_poll_init_socket does a lot of setup for the socket to make +    # polling work. If get reused for another socket that has the same +    # fileno, things break badly. (In theory this could be a problem +    # on posix too, but in practice it isn't). + +    # TODO: We should probably generalize this to all +    # ffi watchers. Avoiding GC cycles as much as possible +    # is a good thing, and potentially allocating new handles +    # as needed gets us better memory locality. + +    # Especially on Windows, we must also account for the case that a +    # reference to this object has leaked (e.g., the socket object is +    # still around), but the fileno has been closed and a new one +    # opened. We must still get a new native watcher at that point. We +    # handle this case by simply making sure that we don't even have +    # a native watcher until the object is started, and we shut it down +    # when the object is stopped. + +    # XXX: I was able to solve at least Windows test_ftplib.py issues +    # with more of a careful use of io objects in socket.py, so +    # delaying this entirely is at least temporarily on hold. Instead +    # sticking with the _watcher_create function override for the +    # moment. + +    # XXX: Note 2: Moving to a deterministic close model, which was necessary +    # for PyPy, also seems to solve the Windows issues. So we're completely taking +    # this object out of the loop's registration; we don't want GC callbacks and +    # uv_close anywhere *near* this object. + +    _watcher_registers_with_loop_on_create = False + +    EVENT_MASK = libuv.UV_READABLE | libuv.UV_WRITABLE | libuv.UV_DISCONNECT + +    _multiplex_watchers = () + +    def __init__(self, loop, fd, events, ref=True, priority=None): +        super(io, self).__init__(loop, fd, events, ref=ref, priority=priority, _args=(fd,)) +        self._fd = fd +        self._events = events +        self._multiplex_watchers = [] + +    def _get_fd(self): +        return self._fd + +    @_base.not_while_active +    def _set_fd(self, fd): +        self._fd = fd +        self._watcher_ffi_init((fd,)) + +    def _get_events(self): +        return self._events + +    def _set_events(self, events): +        if events == self._events: +            return +        self._events = events +        if self.active: +            # We're running but libuv specifically says we can +            # call start again to change our event mask. +            assert self._handle is not None +            self._watcher_start(self._watcher, self._events, self._watcher_callback) + +    events = property(_get_events, _set_events) + +    def _watcher_ffi_start(self): +        self._watcher_start(self._watcher, self._events, self._watcher_callback) + +    if sys.platform.startswith('win32'): +        # uv_poll can only handle sockets on Windows, but the plain +        # uv_poll_init we call on POSIX assumes that the fileno +        # argument is already a C fileno, as created by +        # _get_osfhandle. C filenos are limited resources, must be +        # closed with _close. So there are lifetime issues with that: +        # calling the C function _close to dispose of the fileno +        # *also* closes the underlying win32 handle, possibly +        # prematurely. (XXX: Maybe could do something with weak +        # references? But to what?) + +        # All libuv wants to do with the fileno in uv_poll_init is +        # turn it back into a Win32 SOCKET handle. + +        # Now, libuv provides uv_poll_init_socket, which instead of +        # taking a C fileno takes the SOCKET, avoiding the need to dance with +        # the C runtime. + +        # It turns out that SOCKET (win32 handles in general) can be +        # represented with `intptr_t`. It further turns out that +        # CPython *directly* exposes the SOCKET handle as the value of +        # fileno (32-bit PyPy does some munging on it, which should +        # rarely matter). So we can pass socket.fileno() through +        # to uv_poll_init_socket. + +        # See _corecffi_build. +        _watcher_init = watcher._LIB.uv_poll_init_socket + + +    class _multiplexwatcher(object): + +        callback = None +        args = () +        pass_events = False +        ref = True + +        def __init__(self, events, watcher): +            self._events = events + +            # References: +            # These objects must keep the original IO object alive; +            # the IO object SHOULD NOT keep these alive to avoid cycles +            # We MUST NOT rely on GC to clean up the IO objects, but the explicit +            # calls to close(); see _multiplex_closed. +            self._watcher_ref = watcher + +        events = property( +            lambda self: self._events, +            _base.not_while_active(lambda self, nv: setattr(self, '_events', nv))) + +        def start(self, callback, *args, **kwargs): +            self.pass_events = kwargs.get("pass_events") +            self.callback = callback +            self.args = args + +            watcher = self._watcher_ref +            if watcher is not None: +                if not watcher.active: +                    watcher._io_start() +                else: +                    # Make sure we're in the event mask +                    watcher._calc_and_update_events() + +        def stop(self): +            self.callback = None +            self.pass_events = None +            self.args = None +            watcher = self._watcher_ref +            if watcher is not None: +                watcher._io_maybe_stop() + +        def close(self): +            if self._watcher_ref is not None: +                self._watcher_ref._multiplex_closed(self) +            self._watcher_ref = None + +        @property +        def active(self): +            return self.callback is not None + +        @property +        def _watcher(self): +            # For testing. +            return self._watcher_ref._watcher + +        # ares.pyx depends on this property, +        # and test__core uses it too +        fd = property(lambda self: getattr(self._watcher_ref, '_fd', -1), +                      lambda self, nv: self._watcher_ref._set_fd(nv)) + +    def _io_maybe_stop(self): +        self._calc_and_update_events() +        for w in self._multiplex_watchers: +            if w.callback is not None: +                # There's still a reference to it, and it's started, +                # so we can't stop. +                return +        # If we get here, nothing was started +        # so we can take ourself out of the polling set +        self.stop() + +    def _io_start(self): +        self._calc_and_update_events() +        self.start(self._io_callback, pass_events=True) + +    def _calc_and_update_events(self): +        events = 0 +        for watcher in self._multiplex_watchers: +            if watcher.callback is not None: +                # Only ask for events that are active. +                events |= watcher.events +        self._set_events(events) + + +    def multiplex(self, events): +        watcher = self._multiplexwatcher(events, self) +        self._multiplex_watchers.append(watcher) +        self._calc_and_update_events() +        return watcher + +    def close(self): +        super(io, self).close() +        del self._multiplex_watchers + +    def _multiplex_closed(self, watcher): +        self._multiplex_watchers.remove(watcher) +        if not self._multiplex_watchers: +            self.stop() # should already be stopped +            self._no_more_watchers() +            # It is absolutely critical that we control when the call +            # to uv_close() gets made. uv_close() of a uv_poll_t +            # handle winds up calling uv__platform_invalidate_fd, +            # which, as the name implies, destroys any outstanding +            # events for the *fd* that haven't been delivered yet, and also removes +            # the *fd* from the poll set. So if this happens later, at some +            # non-deterministic time when (cyclic or otherwise) GC runs, +            # *and* we've opened a new watcher for the fd, that watcher will +            # suddenly and mysteriously stop seeing events. So we do this now; +            # this method is smart enough not to close the handle twice. +            self.close() +        else: +            self._calc_and_update_events() + +    def _no_more_watchers(self): +        # The loop sets this on an individual watcher to delete it from +        # the active list where it keeps hard references. +        pass + +    def _io_callback(self, events): +        if events < 0: +            # actually a status error code +            _dbg("Callback error on", self._fd, +                 ffi.string(libuv.uv_err_name(events)), +                 ffi.string(libuv.uv_strerror(events))) +            # XXX: We've seen one half of a FileObjectPosix pair +            # (the read side of a pipe) report errno 11 'bad file descriptor' +            # after the write side was closed and its watcher removed. But +            # we still need to attempt to read from it to clear out what's in +            # its buffers--if we return with the watcher inactive before proceeding to wake up +            # the reader, we get a LoopExit. So we can't return here and arguably shouldn't print it +            # either. The negative events mask will match the watcher's mask. +            # See test__fileobject.py:Test.test_newlines for an example. + +            # On Windows (at least with PyPy), we can get ENOTSOCK (socket operation on non-socket) +            # if a socket gets closed. If we don't pass the events on, we hang. +            # See test__makefile_ref.TestSSL for examples. +            # return + +        for watcher in self._multiplex_watchers: +            if not watcher.callback: +                # Stopped +                continue +            assert watcher._watcher_ref is self, (self, watcher._watcher_ref) + +            send_event = (events & watcher.events) or events < 0 +            if send_event: +                if not watcher.pass_events: +                    watcher.callback(*watcher.args) +                else: +                    watcher.callback(events, *watcher.args) + +class _SimulatedWithAsyncMixin(object): +    _watcher_skip_ffi = True + +    def __init__(self, loop, *args, **kwargs): +        self._async = loop.async_() +        try: +            super(_SimulatedWithAsyncMixin, self).__init__(loop, *args, **kwargs) +        except: +            self._async.close() +            raise + +    def _watcher_create(self, _args): +        return + +    @property +    def _watcher_handle(self): +        return None + +    def _watcher_ffi_init(self, _args): +        return + +    def _watcher_ffi_set_init_ref(self, ref): +        self._async.ref = ref + +    @property +    def active(self): +        return self._async.active + +    def start(self, cb, *args): +        self._register_loop_callback() +        self.callback = cb +        self.args = args +        self._async.start(cb, *args) +        #watcher.start(self, cb, *args) + +    def stop(self): +        self._unregister_loop_callback() +        self.callback = None +        self.args = None +        self._async.stop() + +    def close(self): +        if self._async is not None: +            a = self._async +            #self._async = None +            a.close() + +    def _register_loop_callback(self): +        # called from start() +        raise NotImplementedError() + +    def _unregister_loop_callback(self): +        # called from stop +        raise NotImplementedError() + +class fork(_SimulatedWithAsyncMixin, +           _base.ForkMixin, +           watcher): +    # We'll have to implement this one completely manually +    # Right now it doesn't matter much since libuv doesn't survive +    # a fork anyway. (That's a work in progress) +    _watcher_skip_ffi = False + +    def _register_loop_callback(self): +        self.loop._fork_watchers.add(self) + +    def _unregister_loop_callback(self): +        try: +            # stop() should be idempotent +            self.loop._fork_watchers.remove(self) +        except KeyError: +            pass + +    def _on_fork(self): +        self._async.send() + + +class child(_SimulatedWithAsyncMixin, +            _base.ChildMixin, +            watcher): +    _watcher_skip_ffi = True +    # We'll have to implement this one completely manually. +    # Our approach is to use a SIGCHLD handler and the original +    # os.waitpid call. + +    # On Unix, libuv's uv_process_t and uv_spawn use SIGCHLD, +    # just like libev does for its child watchers. So +    # we're not adding any new SIGCHLD related issues not already +    # present in libev. + + +    def _register_loop_callback(self): +        self.loop._register_child_watcher(self) + +    def _unregister_loop_callback(self): +        self.loop._unregister_child_watcher(self) + +    def _set_waitpid_status(self, pid, status): +        self._rpid = pid +        self._rstatus = status +        self._async.send() + + +class async_(_base.AsyncMixin, watcher): +    _watcher_callback_name = '_gevent_async_callback0' + +    def _watcher_ffi_init(self, args): +        # It's dangerous to have a raw, non-initted struct +        # around; it will crash in uv_close() when we get GC'd, +        # and send() will also crash. +        # NOTE: uv_async_init is NOT idempotent. Calling it more than +        # once adds the uv_async_t to the internal queue multiple times, +        # and uv_close only cleans up one of them, meaning that we tend to +        # crash. Thus we have to be very careful not to allow that. +        return self._watcher_init(self.loop.ptr, self._watcher, ffi.NULL) + +    def _watcher_ffi_start(self): +        # we're created in a started state, but we didn't provide a +        # callback (because if we did and we don't have a value in our +        # callback attribute, then python_callback would crash.) Note that +        # uv_async_t->async_cb is not technically documented as public. +        self._watcher.async_cb = self._watcher_callback + +    def _watcher_ffi_stop(self): +        self._watcher.async_cb = ffi.NULL +        # We have to unref this because we're setting the cb behind libuv's +        # back, basically: once a async watcher is started, it can't ever be +        # stopped through libuv interfaces, so it would never lose its active +        # status, and thus if it stays reffed it would keep the event loop +        # from exiting. +        self._watcher_ffi_unref() + +    def send(self): +        if libuv.uv_is_closing(self._watcher): +            raise Exception("Closing handle") +        libuv.uv_async_send(self._watcher) + +    @property +    def pending(self): +        return None + +locals()['async'] = async_ + +class timer(_base.TimerMixin, watcher): + +    _watcher_callback_name = '_gevent_timer_callback0' + +    # In libuv, timer callbacks continue running while any timer is +    # expired, including newly added timers. Newly added non-zero +    # timers (especially of small duration) can be seen to be expired +    # if the loop time is updated while we are in a timer callback. +    # This can lead to us being stuck running timers for a terribly +    # long time, which is not good. So default to not updating the +    # time. + +    # Also, newly-added timers of 0 duration can *also* stall the +    # loop, because they'll be seen to be expired immediately. +    # Updating the time can prevent that, *if* there was already a +    # timer for a longer duration scheduled. + +    # To mitigate the above problems, our loop implementation turns +    # zero duration timers into check watchers instead using OneShotCheck. +    # This ensures the loop cycles. Of course, the 'again' method does +    # nothing on them and doesn't exist. In practice that's not an issue. + +    _again = False + +    def _watcher_ffi_init(self, args): +        self._watcher_init(self.loop._ptr, self._watcher) +        self._after, self._repeat = args +        if self._after and self._after < 0.001: +            import warnings +            # XXX: The stack level is hard to determine, could be getting here +            # through a number of different ways. +            warnings.warn("libuv only supports millisecond timer resolution; " +                          "all times less will be set to 1 ms", +                          stacklevel=6) +            # The alternative is to effectively pass in int(0.1) == 0, which +            # means no sleep at all, which leads to excessive wakeups +            self._after = 0.001 +        if self._repeat and self._repeat < 0.001: +            import warnings +            warnings.warn("libuv only supports millisecond timer resolution; " +                          "all times less will be set to 1 ms", +                          stacklevel=6) +            self._repeat = 0.001 + +    def _watcher_ffi_start(self): +        if self._again: +            libuv.uv_timer_again(self._watcher) +        else: +            try: +                self._watcher_start(self._watcher, self._watcher_callback, +                                    int(self._after * 1000), +                                    int(self._repeat * 1000)) +            except ValueError: +                # in case of non-ints in _after/_repeat +                raise TypeError() + +    def again(self, callback, *args, **kw): +        if not self.active: +            # If we've never been started, this is the same as starting us. +            # libuv makes the distinction, libev doesn't. +            self.start(callback, *args, **kw) +            return + +        self._again = True +        try: +            self.start(callback, *args, **kw) +        finally: +            del self._again + + +class stat(_base.StatMixin, watcher): +    _watcher_type = 'fs_poll' +    _watcher_struct_name = 'gevent_fs_poll_t' +    _watcher_callback_name = '_gevent_fs_poll_callback3' + +    def _watcher_set_data(self, the_watcher, data): +        the_watcher.handle.data = data +        return data + +    def _watcher_ffi_init(self, args): +        return self._watcher_init(self.loop._ptr, self._watcher) + +    MIN_STAT_INTERVAL = 0.1074891 # match libev; 0.0 is default + +    def _watcher_ffi_start(self): +        # libev changes this when the watcher is started +        if self._interval < self.MIN_STAT_INTERVAL: +            self._interval = self.MIN_STAT_INTERVAL +        self._watcher_start(self._watcher, self._watcher_callback, +                            self._cpath, +                            int(self._interval * 1000)) + +    @property +    def _watcher_handle(self): +        return self._watcher.handle.data + +    @property +    def attr(self): +        if not self._watcher.curr.st_nlink: +            return +        return self._watcher.curr + +    @property +    def prev(self): +        if not self._watcher.prev.st_nlink: +            return +        return self._watcher.prev + + +class signal(_base.SignalMixin, watcher): +    _watcher_callback_name = '_gevent_signal_callback1' + +    def _watcher_ffi_init(self, args): +        self._watcher_init(self.loop._ptr, self._watcher) +        self.ref = False # libev doesn't ref these by default + + +    def _watcher_ffi_start(self): +        self._watcher_start(self._watcher, self._watcher_callback, +                            self._signalnum) + + +class idle(_base.IdleMixin, watcher): +    # Because libuv doesn't support priorities, idle watchers are +    # potentially quite a bit different than under libev +    _watcher_callback_name = '_gevent_idle_callback0' + + +class check(_base.CheckMixin, watcher): +    _watcher_callback_name = '_gevent_check_callback0' + +class OneShotCheck(check): + +    _watcher_skip_ffi = True + +    def __make_cb(self, func): +        stop = self.stop +        @functools.wraps(func) +        def cb(*args): +            stop() +            return func(*args) +        return cb + +    def start(self, callback, *args): +        return check.start(self, self.__make_cb(callback), *args) + +class prepare(_base.PrepareMixin, watcher): +    _watcher_callback_name = '_gevent_prepare_callback0' | 
