diff options
Diffstat (limited to 'python/gevent/libuv')
-rw-r--r-- | python/gevent/libuv/__init__.py | 0 | ||||
-rw-r--r-- | python/gevent/libuv/_corecffi.cp36-win32.pyd | bin | 0 -> 125952 bytes | |||
-rw-r--r-- | python/gevent/libuv/_corecffi_build.py | 253 | ||||
-rw-r--r-- | python/gevent/libuv/_corecffi_cdef.c | 393 | ||||
-rw-r--r-- | python/gevent/libuv/_corecffi_source.c | 181 | ||||
-rw-r--r-- | python/gevent/libuv/loop.py | 601 | ||||
-rw-r--r-- | python/gevent/libuv/watcher.py | 732 |
7 files changed, 2160 insertions, 0 deletions
diff --git a/python/gevent/libuv/__init__.py b/python/gevent/libuv/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/python/gevent/libuv/__init__.py diff --git a/python/gevent/libuv/_corecffi.cp36-win32.pyd b/python/gevent/libuv/_corecffi.cp36-win32.pyd Binary files differnew file mode 100644 index 0000000..0cc476a --- /dev/null +++ b/python/gevent/libuv/_corecffi.cp36-win32.pyd diff --git a/python/gevent/libuv/_corecffi_build.py b/python/gevent/libuv/_corecffi_build.py new file mode 100644 index 0000000..722157d --- /dev/null +++ b/python/gevent/libuv/_corecffi_build.py @@ -0,0 +1,253 @@ +# pylint: disable=no-member + +# This module is only used to create and compile the gevent._corecffi module; +# nothing should be directly imported from it except `ffi`, which should only be +# used for `ffi.compile()`; programs should import gevent._corecfffi. +# However, because we are using "out-of-line" mode, it is necessary to examine +# this file to know what functions are created and available on the generated +# module. +from __future__ import absolute_import, print_function +import sys +import os +import os.path # pylint:disable=no-name-in-module +import struct + +__all__ = [] + +WIN = sys.platform.startswith('win32') + +def system_bits(): + return struct.calcsize('P') * 8 + + +def st_nlink_type(): + if sys.platform == "darwin" or sys.platform.startswith("freebsd"): + return "short" + if system_bits() == 32: + return "unsigned long" + return "long long" + + +from cffi import FFI +ffi = FFI() + +thisdir = os.path.dirname(os.path.abspath(__file__)) +def read_source(name): + with open(os.path.join(thisdir, name), 'r') as f: + return f.read() + +_cdef = read_source('_corecffi_cdef.c') +_source = read_source('_corecffi_source.c') + +_cdef = _cdef.replace('#define GEVENT_ST_NLINK_T int', '') +_cdef = _cdef.replace('#define GEVENT_STRUCT_DONE int', '') +_cdef = _cdef.replace('#define GEVENT_UV_OS_SOCK_T int', '') + +_cdef = _cdef.replace('GEVENT_ST_NLINK_T', st_nlink_type()) +_cdef = _cdef.replace("GEVENT_STRUCT_DONE _;", '...;') +# uv_os_sock_t is int on POSIX and SOCKET on Win32, but socket is +# just another name for handle, which is just another name for 'void*' +# which we will treat as an 'unsigned long' or 'unsigned long long' +# since it comes through 'fileno()' where it has been cast as an int. +# See class watcher.io +_void_pointer_as_integer = 'intptr_t' +_cdef = _cdef.replace("GEVENT_UV_OS_SOCK_T", 'int' if not WIN else _void_pointer_as_integer) + + +setup_py_dir = os.path.abspath(os.path.join(thisdir, '..', '..', '..')) +libuv_dir = os.path.abspath(os.path.join(setup_py_dir, 'deps', 'libuv')) + + +LIBUV_INCLUDE_DIRS = [ + thisdir, # libev_vfd.h + os.path.join(libuv_dir, 'include'), + os.path.join(libuv_dir, 'src'), +] + +# Initially based on https://github.com/saghul/pyuv/blob/v1.x/setup_libuv.py + +def _libuv_source(rel_path): + # Certain versions of setuptools, notably on windows, are *very* + # picky about what we feed to sources= "setup() arguments must + # *always* be /-separated paths relative to the setup.py + # directory, *never* absolute paths." POSIX doesn't have that issue. + path = os.path.join('deps', 'libuv', 'src', rel_path) + return path + +LIBUV_SOURCES = [ + _libuv_source('fs-poll.c'), + _libuv_source('inet.c'), + _libuv_source('threadpool.c'), + _libuv_source('uv-common.c'), + _libuv_source('version.c'), + _libuv_source('uv-data-getter-setters.c'), + _libuv_source('timer.c'), +] + +if WIN: + LIBUV_SOURCES += [ + _libuv_source('win/async.c'), + _libuv_source('win/core.c'), + _libuv_source('win/detect-wakeup.c'), + _libuv_source('win/dl.c'), + _libuv_source('win/error.c'), + _libuv_source('win/fs-event.c'), + _libuv_source('win/fs.c'), + # getaddrinfo.c refers to ConvertInterfaceIndexToLuid + # and ConvertInterfaceLuidToNameA, which are supposedly in iphlpapi.h + # and iphlpapi.lib/dll. But on Windows 10 with Python 3.5 and VC 14 (Visual Studio 2015), + # I get an undefined warning from the compiler for those functions and + # a link error from the linker, so this file can't be included. + # This is possibly because the functions are defined for Windows Vista, and + # Python 3.5 builds with at earlier SDK? + # Fortunately we don't use those functions. + #_libuv_source('win/getaddrinfo.c'), + # getnameinfo.c refers to uv__getaddrinfo_translate_error from + # getaddrinfo.c, which we don't have. + #_libuv_source('win/getnameinfo.c'), + _libuv_source('win/handle.c'), + _libuv_source('win/loop-watcher.c'), + _libuv_source('win/pipe.c'), + _libuv_source('win/poll.c'), + _libuv_source('win/process-stdio.c'), + _libuv_source('win/process.c'), + _libuv_source('win/req.c'), + _libuv_source('win/signal.c'), + _libuv_source('win/snprintf.c'), + _libuv_source('win/stream.c'), + _libuv_source('win/tcp.c'), + _libuv_source('win/thread.c'), + _libuv_source('win/tty.c'), + _libuv_source('win/udp.c'), + _libuv_source('win/util.c'), + _libuv_source('win/winapi.c'), + _libuv_source('win/winsock.c'), + ] +else: + LIBUV_SOURCES += [ + _libuv_source('unix/async.c'), + _libuv_source('unix/core.c'), + _libuv_source('unix/dl.c'), + _libuv_source('unix/fs.c'), + _libuv_source('unix/getaddrinfo.c'), + _libuv_source('unix/getnameinfo.c'), + _libuv_source('unix/loop-watcher.c'), + _libuv_source('unix/loop.c'), + _libuv_source('unix/pipe.c'), + _libuv_source('unix/poll.c'), + _libuv_source('unix/process.c'), + _libuv_source('unix/signal.c'), + _libuv_source('unix/stream.c'), + _libuv_source('unix/tcp.c'), + _libuv_source('unix/thread.c'), + _libuv_source('unix/tty.c'), + _libuv_source('unix/udp.c'), + ] + + +if sys.platform.startswith('linux'): + LIBUV_SOURCES += [ + _libuv_source('unix/linux-core.c'), + _libuv_source('unix/linux-inotify.c'), + _libuv_source('unix/linux-syscalls.c'), + _libuv_source('unix/procfs-exepath.c'), + _libuv_source('unix/proctitle.c'), + _libuv_source('unix/sysinfo-loadavg.c'), + _libuv_source('unix/sysinfo-memory.c'), + ] +elif sys.platform == 'darwin': + LIBUV_SOURCES += [ + _libuv_source('unix/bsd-ifaddrs.c'), + _libuv_source('unix/darwin.c'), + _libuv_source('unix/darwin-proctitle.c'), + _libuv_source('unix/fsevents.c'), + _libuv_source('unix/kqueue.c'), + _libuv_source('unix/proctitle.c'), + ] +elif sys.platform.startswith(('freebsd', 'dragonfly')): + LIBUV_SOURCES += [ + _libuv_source('unix/bsd-ifaddrs.c'), + _libuv_source('unix/freebsd.c'), + _libuv_source('unix/kqueue.c'), + _libuv_source('unix/posix-hrtime.c'), + ] +elif sys.platform.startswith('openbsd'): + LIBUV_SOURCES += [ + _libuv_source('unix/bsd-ifaddrs.c'), + _libuv_source('unix/kqueue.c'), + _libuv_source('unix/openbsd.c'), + _libuv_source('unix/posix-hrtime.c'), + ] +elif sys.platform.startswith('netbsd'): + LIBUV_SOURCES += [ + _libuv_source('unix/bsd-ifaddrs.c'), + _libuv_source('unix/kqueue.c'), + _libuv_source('unix/netbsd.c'), + _libuv_source('unix/posix-hrtime.c'), + ] + +elif sys.platform.startswith('sunos'): + LIBUV_SOURCES += [ + _libuv_source('unix/no-proctitle.c'), + _libuv_source('unix/sunos.c'), + ] + + +LIBUV_MACROS = [] + +def _define_macro(name, value): + LIBUV_MACROS.append((name, value)) + +LIBUV_LIBRARIES = [] + +def _add_library(name): + LIBUV_LIBRARIES.append(name) + +if sys.platform != 'win32': + _define_macro('_LARGEFILE_SOURCE', 1) + _define_macro('_FILE_OFFSET_BITS', 64) + +if sys.platform.startswith('linux'): + _add_library('dl') + _add_library('rt') + _define_macro('_GNU_SOURCE', 1) + _define_macro('_POSIX_C_SOURCE', '200112') +elif sys.platform == 'darwin': + _define_macro('_DARWIN_USE_64_BIT_INODE', 1) + _define_macro('_DARWIN_UNLIMITED_SELECT', 1) +elif sys.platform.startswith('netbsd'): + _add_library('kvm') +elif sys.platform.startswith('sunos'): + _define_macro('__EXTENSIONS__', 1) + _define_macro('_XOPEN_SOURCE', 500) + _add_library('kstat') + _add_library('nsl') + _add_library('sendfile') + _add_library('socket') +elif WIN: + _define_macro('_GNU_SOURCE', 1) + _define_macro('WIN32', 1) + _define_macro('_CRT_SECURE_NO_DEPRECATE', 1) + _define_macro('_CRT_NONSTDC_NO_DEPRECATE', 1) + _define_macro('_CRT_SECURE_NO_WARNINGS', 1) + _define_macro('_WIN32_WINNT', '0x0600') + _define_macro('WIN32_LEAN_AND_MEAN', 1) + _add_library('advapi32') + _add_library('iphlpapi') + _add_library('psapi') + _add_library('shell32') + _add_library('user32') + _add_library('userenv') + _add_library('ws2_32') + +ffi.cdef(_cdef) +ffi.set_source('gevent.libuv._corecffi', + _source, + sources=LIBUV_SOURCES, + depends=LIBUV_SOURCES, + include_dirs=LIBUV_INCLUDE_DIRS, + libraries=list(LIBUV_LIBRARIES), + define_macros=list(LIBUV_MACROS)) + +if __name__ == '__main__': + ffi.compile() diff --git a/python/gevent/libuv/_corecffi_cdef.c b/python/gevent/libuv/_corecffi_cdef.c new file mode 100644 index 0000000..0735aea --- /dev/null +++ b/python/gevent/libuv/_corecffi_cdef.c @@ -0,0 +1,393 @@ +/* markers for the CFFI parser. Replaced when the string is read. */ +#define GEVENT_STRUCT_DONE int +#define GEVENT_ST_NLINK_T int +#define GEVENT_UV_OS_SOCK_T int + +#define UV_EBUSY ... + +#define UV_VERSION_MAJOR ... +#define UV_VERSION_MINOR ... +#define UV_VERSION_PATCH ... + +typedef enum { + UV_RUN_DEFAULT = 0, + UV_RUN_ONCE, + UV_RUN_NOWAIT +} uv_run_mode; + +typedef enum { + UV_UNKNOWN_HANDLE = 0, + UV_ASYNC, + UV_CHECK, + UV_FS_EVENT, + UV_FS_POLL, + UV_HANDLE, + UV_IDLE, + UV_NAMED_PIPE, + UV_POLL, + UV_PREPARE, + UV_PROCESS, + UV_STREAM, + UV_TCP, + UV_TIMER, + UV_TTY, + UV_UDP, + UV_SIGNAL, + UV_FILE, + UV_HANDLE_TYPE_MAX +} uv_handle_type; + +enum uv_poll_event { + UV_READABLE = 1, + UV_WRITABLE = 2, + /* new in 1.9 */ + UV_DISCONNECT = 4, + /* new in 1.14.0 */ + UV_PRIORITIZED = 8, +}; + +enum uv_fs_event { + UV_RENAME = 1, + UV_CHANGE = 2 +}; + +enum uv_fs_event_flags { + /* + * By default, if the fs event watcher is given a directory name, we will + * watch for all events in that directory. This flags overrides this behavior + * and makes fs_event report only changes to the directory entry itself. This + * flag does not affect individual files watched. + * This flag is currently not implemented yet on any backend. + */ + UV_FS_EVENT_WATCH_ENTRY = 1, + /* + * By default uv_fs_event will try to use a kernel interface such as inotify + * or kqueue to detect events. This may not work on remote filesystems such + * as NFS mounts. This flag makes fs_event fall back to calling stat() on a + * regular interval. + * This flag is currently not implemented yet on any backend. + */ + UV_FS_EVENT_STAT = 2, + /* + * By default, event watcher, when watching directory, is not registering + * (is ignoring) changes in it's subdirectories. + * This flag will override this behaviour on platforms that support it. + */ + UV_FS_EVENT_RECURSIVE = 4 +}; + +const char* uv_strerror(int); +const char* uv_err_name(int); +const char* uv_version_string(void); +const char* uv_handle_type_name(uv_handle_type type); + +// handle structs and types +struct uv_loop_s { + void* data; + GEVENT_STRUCT_DONE _; +}; +struct uv_handle_s { + struct uv_loop_s* loop; + uv_handle_type type; + void *data; + GEVENT_STRUCT_DONE _; +}; +struct uv_idle_s { + struct uv_loop_s* loop; + uv_handle_type type; + void *data; + GEVENT_STRUCT_DONE _; +}; +struct uv_prepare_s { + struct uv_loop_s* loop; + uv_handle_type type; + void *data; + GEVENT_STRUCT_DONE _; +}; +struct uv_timer_s { + struct uv_loop_s* loop; + uv_handle_type type; + void *data; + GEVENT_STRUCT_DONE _; +}; +struct uv_signal_s { + struct uv_loop_s* loop; + uv_handle_type type; + void *data; + GEVENT_STRUCT_DONE _; +}; +struct uv_poll_s { + struct uv_loop_s* loop; + uv_handle_type type; + void *data; + GEVENT_STRUCT_DONE _; +}; + +struct uv_check_s { + struct uv_loop_s* loop; + uv_handle_type type; + void *data; + GEVENT_STRUCT_DONE _; +}; + +struct uv_async_s { + struct uv_loop_s* loop; + uv_handle_type type; + void *data; + void (*async_cb)(struct uv_async_s *); + GEVENT_STRUCT_DONE _; +}; + +struct uv_fs_event_s { + struct uv_loop_s* loop; + uv_handle_type type; + void *data; + GEVENT_STRUCT_DONE _; +}; + +struct uv_fs_poll_s { + struct uv_loop_s* loop; + uv_handle_type type; + void *data; + GEVENT_STRUCT_DONE _; +}; + +typedef struct uv_loop_s uv_loop_t; +typedef struct uv_handle_s uv_handle_t; +typedef struct uv_idle_s uv_idle_t; +typedef struct uv_prepare_s uv_prepare_t; +typedef struct uv_timer_s uv_timer_t; +typedef struct uv_signal_s uv_signal_t; +typedef struct uv_poll_s uv_poll_t; +typedef struct uv_check_s uv_check_t; +typedef struct uv_async_s uv_async_t; +typedef struct uv_fs_event_s uv_fs_event_t; +typedef struct uv_fs_poll_s uv_fs_poll_t; + + +size_t uv_handle_size(uv_handle_type); + +// callbacks with the same signature +typedef void (*uv_close_cb)(uv_handle_t *handle); +typedef void (*uv_idle_cb)(uv_idle_t *handle); +typedef void (*uv_timer_cb)(uv_timer_t *handle); +typedef void (*uv_check_cb)(uv_check_t* handle); +typedef void (*uv_async_cb)(uv_async_t* handle); +typedef void (*uv_prepare_cb)(uv_prepare_t *handle); + +// callbacks with distinct sigs +typedef void (*uv_walk_cb)(uv_handle_t *handle, void *arg); +typedef void (*uv_poll_cb)(uv_poll_t *handle, int status, int events); +typedef void (*uv_signal_cb)(uv_signal_t *handle, int signum); + +// Callback passed to uv_fs_event_start() which will be called +// repeatedly after the handle is started. If the handle was started +// with a directory the filename parameter will be a relative path to +// a file contained in the directory. The events parameter is an ORed +// mask of uv_fs_event elements. +typedef void (*uv_fs_event_cb)(uv_fs_event_t* handle, const char* filename, int events, int status); + +typedef struct { + long tv_sec; + long tv_nsec; +} uv_timespec_t; + +typedef struct { + uint64_t st_dev; + uint64_t st_mode; + uint64_t st_nlink; + uint64_t st_uid; + uint64_t st_gid; + uint64_t st_rdev; + uint64_t st_ino; + uint64_t st_size; + uint64_t st_blksize; + uint64_t st_blocks; + uint64_t st_flags; + uint64_t st_gen; + uv_timespec_t st_atim; + uv_timespec_t st_mtim; + uv_timespec_t st_ctim; + uv_timespec_t st_birthtim; +} uv_stat_t; + +typedef void (*uv_fs_poll_cb)(uv_fs_poll_t* handle, int status, const uv_stat_t* prev, const uv_stat_t* curr); + +// loop functions +uv_loop_t *uv_default_loop(); +uv_loop_t* uv_loop_new(); // not documented; neither is uv_loop_delete +int uv_loop_init(uv_loop_t* loop); +int uv_loop_fork(uv_loop_t* loop); +int uv_loop_alive(const uv_loop_t *loop); +int uv_loop_close(uv_loop_t* loop); +uint64_t uv_backend_timeout(uv_loop_t* loop); +int uv_run(uv_loop_t *, uv_run_mode mode); +int uv_backend_fd(const uv_loop_t* loop); +// The narrative docs for the two time functions say 'const', +// but the header does not. +void uv_update_time(uv_loop_t* loop); +uint64_t uv_now(uv_loop_t* loop); +void uv_stop(uv_loop_t *); +void uv_walk(uv_loop_t *loop, uv_walk_cb walk_cb, void *arg); + +// handle functions +// uv_handle_t is the base type for all libuv handle types. + +void uv_ref(void *); +void uv_unref(void *); +int uv_has_ref(void *); +void uv_close(void *handle, uv_close_cb close_cb); +int uv_is_active(void *handle); +int uv_is_closing(void *handle); + +// idle functions +// Idle handles will run the given callback once per loop iteration, right +// before the uv_prepare_t handles. Note: The notable difference with prepare +// handles is that when there are active idle handles, the loop will perform a +// zero timeout poll instead of blocking for i/o. Warning: Despite the name, +// idle handles will get their callbacks called on every loop iteration, not +// when the loop is actually "idle". +int uv_idle_init(uv_loop_t *, uv_idle_t *idle); +int uv_idle_start(uv_idle_t *idle, uv_idle_cb cb); +int uv_idle_stop(uv_idle_t *idle); + +// prepare functions +// Prepare handles will run the given callback once per loop iteration, right +// before polling for i/o. +int uv_prepare_init(uv_loop_t *, uv_prepare_t *prepare); +int uv_prepare_start(uv_prepare_t *prepare, uv_prepare_cb cb); +int uv_prepare_stop(uv_prepare_t *prepare); + +// check functions +// Check handles will run the given callback once per loop iteration, right +int uv_check_init(uv_loop_t *, uv_check_t *check); +int uv_check_start(uv_check_t *check, uv_check_cb cb); +int uv_check_stop(uv_check_t *check); + +// async functions +// Async handles allow the user to "wakeup" the event loop and get a callback called from another thread. + +int uv_async_init(uv_loop_t *, uv_async_t*, uv_async_cb); +int uv_async_send(uv_async_t*); + +// timer functions +// Timer handles are used to schedule callbacks to be called in the future. +int uv_timer_init(uv_loop_t *, uv_timer_t *handle); +int uv_timer_start(uv_timer_t *handle, uv_timer_cb cb, uint64_t timeout, uint64_t repeat); +int uv_timer_stop(uv_timer_t *handle); +int uv_timer_again(uv_timer_t *handle); +void uv_timer_set_repeat(uv_timer_t *handle, uint64_t repeat); +uint64_t uv_timer_get_repeat(const uv_timer_t *handle); + +// signal functions +// Signal handles implement Unix style signal handling on a per-event loop +// bases. +int uv_signal_init(uv_loop_t *loop, uv_signal_t *handle); +int uv_signal_start(uv_signal_t *handle, uv_signal_cb signal_cb, int signum); +int uv_signal_stop(uv_signal_t *handle); + +// poll functions Poll handles are used to watch file descriptors for +// readability and writability, similar to the purpose of poll(2). It +// is not okay to have multiple active poll handles for the same +// socket, this can cause libuv to busyloop or otherwise malfunction. +// +// The purpose of poll handles is to enable integrating external +// libraries that rely on the event loop to signal it about the socket +// status changes, like c-ares or libssh2. Using uv_poll_t for any +// other purpose is not recommended; uv_tcp_t, uv_udp_t, etc. provide +// an implementation that is faster and more scalable than what can be +// achieved with uv_poll_t, especially on Windows. +// +// Note On windows only sockets can be polled with poll handles. On +// Unix any file descriptor that would be accepted by poll(2) can be +// used. +int uv_poll_init(uv_loop_t *loop, uv_poll_t *handle, int fd); + +// Initialize the handle using a socket descriptor. On Unix this is +// identical to uv_poll_init(). On windows it takes a SOCKET handle; +// SOCKET handles are another name for HANDLE objects in win32, and +// those are defined as PVOID, even though they are not actually +// pointers (they're small integers). CPython and PyPy both return +// the SOCKET (as cast to an int) from the socket.fileno() method. +// libuv uses ``uv_os_sock_t`` for this type, which is defined as an +// int on unix. +int uv_poll_init_socket(uv_loop_t* loop, uv_poll_t* handle, GEVENT_UV_OS_SOCK_T socket); +int uv_poll_start(uv_poll_t *handle, int events, uv_poll_cb cb); +int uv_poll_stop(uv_poll_t *handle); + +// FS Event handles allow the user to monitor a given path for +// changes, for example, if the file was renamed or there was a +// generic change in it. This handle uses the best backend for the job +// on each platform. +// +// Thereas also uv_fs_poll_t that uses stat for filesystems where +// the kernel event isn't available. +int uv_fs_event_init(uv_loop_t*, uv_fs_event_t*); +int uv_fs_event_start(uv_fs_event_t*, uv_fs_event_cb, const char* path, unsigned int flags); +int uv_fs_event_stop(uv_fs_event_t*); +int uv_fs_event_getpath(uv_fs_event_t*, char* buffer, size_t* size); + +// FS Poll handles allow the user to monitor a given path for changes. +// Unlike uv_fs_event_t, fs poll handles use stat to detect when a +// file has changed so they can work on file systems where fs event +// handles can't. +// +// This is a closer match to libev. +int uv_fs_poll_init(void*, void*); +int uv_fs_poll_start(void*, uv_fs_poll_cb, const char* path, unsigned int); +int uv_fs_poll_stop(void*); + + +/* Standard library */ +void* memset(void *b, int c, size_t len); + + +/* gevent callbacks */ +// Implemented in Python code as 'def_extern'. In the case of poll callbacks and fs +// callbacks, if *status* is less than 0, it will be passed in the revents +// field. In cases of no extra arguments, revents will be 0. +// These will be created as static functions at the end of the +// _source.c and must be pre-declared at the top of that file if we +// call them +typedef void* GeventWatcherObject; +extern "Python" { + // Standard gevent._ffi.loop callbacks. + int python_callback(GeventWatcherObject handle, int revents); + void python_handle_error(GeventWatcherObject handle, int revents); + void python_stop(GeventWatcherObject handle); + + void python_check_callback(uv_check_t* handle); + void python_prepare_callback(uv_prepare_t* handle); + void python_timer0_callback(uv_check_t* handle); + + // libuv specific callback + void _uv_close_callback(uv_handle_t* handle); + void python_sigchld_callback(uv_signal_t* handle, int signum); + void python_queue_callback(uv_handle_t* handle, int revents); +} +// A variable we fill in. +static void (*gevent_noop)(void* handle); + +static void _gevent_signal_callback1(uv_signal_t* handle, int arg); +static void _gevent_async_callback0(uv_async_t* handle); +static void _gevent_prepare_callback0(uv_prepare_t* handle); +static void _gevent_timer_callback0(uv_timer_t* handle); +static void _gevent_check_callback0(uv_check_t* handle); +static void _gevent_idle_callback0(uv_idle_t* handle); +static void _gevent_poll_callback2(uv_poll_t* handle, int status, int events); +static void _gevent_fs_event_callback3(uv_fs_event_t* handle, const char* filename, int events, int status); + +typedef struct _gevent_fs_poll_s { + uv_fs_poll_t handle; + uv_stat_t curr; + uv_stat_t prev; +} gevent_fs_poll_t; + +static void _gevent_fs_poll_callback3(uv_fs_poll_t* handle, int status, const uv_stat_t* prev, const uv_stat_t* curr); + +static void gevent_uv_walk_callback_close(uv_handle_t* handle, void* arg); +static void gevent_close_all_handles(uv_loop_t* loop); +static void gevent_zero_timer(uv_timer_t* handle); +static void gevent_zero_prepare(uv_prepare_t* handle); +static void gevent_zero_check(uv_check_t* handle); +static void gevent_zero_loop(uv_loop_t* handle); diff --git a/python/gevent/libuv/_corecffi_source.c b/python/gevent/libuv/_corecffi_source.c new file mode 100644 index 0000000..83fe82e --- /dev/null +++ b/python/gevent/libuv/_corecffi_source.c @@ -0,0 +1,181 @@ +#include <string.h> +#include "uv.h" + +typedef void* GeventWatcherObject; + +static int python_callback(GeventWatcherObject handle, int revents); +static void python_queue_callback(uv_handle_t* watcher_ptr, int revents); +static void python_handle_error(GeventWatcherObject handle, int revents); +static void python_stop(GeventWatcherObject handle); + +static void _gevent_noop(void* handle) {} + +static void (*gevent_noop)(void* handle) = &_gevent_noop; + +static void _gevent_generic_callback1_unused(uv_handle_t* watcher, int arg) +{ + // Python code may set this to NULL or even change it + // out from under us, which would tend to break things. + GeventWatcherObject handle = watcher->data; + const int cb_result = python_callback(handle, arg); + switch(cb_result) { + case -1: + // in case of exception, call self.loop.handle_error; + // this function is also responsible for stopping the watcher + // and allowing memory to be freed + python_handle_error(handle, arg); + break; + case 1: + // Code to stop the event IF NEEDED. Note that if python_callback + // has disposed of the last reference to the handle, + // `watcher` could now be invalid/disposed memory! + if (!uv_is_active(watcher)) { + if (watcher->data != handle) { + if (watcher->data) { + // If Python set the data to NULL, then they + // expected to be stopped. That's fine. + // Otherwise, something weird happened. + fprintf(stderr, + "WARNING: gevent: watcher handle changed in callback " + "from %p to %p for watcher at %p of type %d\n", + handle, watcher->data, watcher, watcher->type); + // There's a very good chance that the object the + // handle referred to has been changed and/or the + // old handle has been deallocated (most common), so + // passing the old handle will crash. Instead we + // pass a sigil to let python distinguish this case. + python_stop(NULL); + } + } + else { + python_stop(handle); + } + } + break; + case 2: + // watcher is already stopped and dead, nothing to do. + break; + default: + fprintf(stderr, + "WARNING: gevent: Unexpected return value %d from Python callback " + "for watcher %p (of type %d) and handle %p\n", + cb_result, + watcher, watcher->type, handle); + // XXX: Possible leaking of resources here? Should we be + // closing the watcher? + } +} + + +static void _gevent_generic_callback1(uv_handle_t* watcher, int arg) +{ + python_queue_callback(watcher, arg); +} + +static void _gevent_generic_callback0(uv_handle_t* handle) +{ + _gevent_generic_callback1(handle, 0); +} + +static void _gevent_async_callback0(uv_async_t* handle) +{ + _gevent_generic_callback0((uv_handle_t*)handle); +} + +static void _gevent_timer_callback0(uv_timer_t* handle) +{ + _gevent_generic_callback0((uv_handle_t*)handle); +} + +static void _gevent_prepare_callback0(uv_prepare_t* handle) +{ + _gevent_generic_callback0((uv_handle_t*)handle); +} + +static void _gevent_check_callback0(uv_check_t* handle) +{ + _gevent_generic_callback0((uv_handle_t*)handle); +} + +static void _gevent_idle_callback0(uv_idle_t* handle) +{ + _gevent_generic_callback0((uv_handle_t*)handle); +} + +static void _gevent_signal_callback1(uv_signal_t* handle, int signum) +{ + _gevent_generic_callback1((uv_handle_t*)handle, signum); +} + + +static void _gevent_poll_callback2(void* handle, int status, int events) +{ + _gevent_generic_callback1(handle, status < 0 ? status : events); +} + +static void _gevent_fs_event_callback3(void* handle, const char* filename, int events, int status) +{ + _gevent_generic_callback1(handle, status < 0 ? status : events); +} + + +typedef struct _gevent_fs_poll_s { + uv_fs_poll_t handle; + uv_stat_t curr; + uv_stat_t prev; +} gevent_fs_poll_t; + +static void _gevent_fs_poll_callback3(void* handlep, int status, const uv_stat_t* prev, const uv_stat_t* curr) +{ + // stat pointers are valid for this callback only. + // if given, copy them into our structure, where they can be reached + // from python, just like libev's watcher does, before calling + // the callback. + + // The callback is invoked with status < 0 if path does not exist + // or is inaccessible. The watcher is not stopped but your + // callback is not called again until something changes (e.g. when + // the file is created or the error reason changes). + // In that case the fields will be 0 in curr/prev. + + + gevent_fs_poll_t* handle = (gevent_fs_poll_t*)handlep; + assert(status == 0); + + handle->curr = *curr; + handle->prev = *prev; + + _gevent_generic_callback1((uv_handle_t*)handle, 0); +} + +static void gevent_uv_walk_callback_close(uv_handle_t* handle, void* arg) +{ + if( handle && !uv_is_closing(handle) ) { + uv_close(handle, NULL); + } +} + +static void gevent_close_all_handles(uv_loop_t* loop) +{ + uv_walk(loop, gevent_uv_walk_callback_close, NULL); +} + +static void gevent_zero_timer(uv_timer_t* handle) +{ + memset(handle, 0, sizeof(uv_timer_t)); +} + +static void gevent_zero_check(uv_check_t* handle) +{ + memset(handle, 0, sizeof(uv_check_t)); +} + +static void gevent_zero_prepare(uv_prepare_t* handle) +{ + memset(handle, 0, sizeof(uv_prepare_t)); +} + +static void gevent_zero_loop(uv_loop_t* handle) +{ + memset(handle, 0, sizeof(uv_loop_t)); +} diff --git a/python/gevent/libuv/loop.py b/python/gevent/libuv/loop.py new file mode 100644 index 0000000..0f317c0 --- /dev/null +++ b/python/gevent/libuv/loop.py @@ -0,0 +1,601 @@ +""" +libuv loop implementation +""" +# pylint: disable=no-member +from __future__ import absolute_import, print_function + +import os +from collections import defaultdict +from collections import namedtuple +from operator import delitem +import signal + +from gevent._ffi import _dbg # pylint: disable=unused-import +from gevent._ffi.loop import AbstractLoop +from gevent.libuv import _corecffi # pylint:disable=no-name-in-module,import-error +from gevent._ffi.loop import assign_standard_callbacks +from gevent._ffi.loop import AbstractCallbacks +from gevent._util import implementer +from gevent._interfaces import ILoop + +ffi = _corecffi.ffi +libuv = _corecffi.lib + +__all__ = [ +] + + +class _Callbacks(AbstractCallbacks): + + def _find_loop_from_c_watcher(self, watcher_ptr): + loop_handle = ffi.cast('uv_handle_t*', watcher_ptr).data + return self.from_handle(loop_handle) if loop_handle else None + + def python_sigchld_callback(self, watcher_ptr, _signum): + self.from_handle(ffi.cast('uv_handle_t*', watcher_ptr).data)._sigchld_callback() + + def python_timer0_callback(self, watcher_ptr): + return self.python_prepare_callback(watcher_ptr) + + def python_queue_callback(self, watcher_ptr, revents): + watcher_handle = watcher_ptr.data + the_watcher = self.from_handle(watcher_handle) + + the_watcher.loop._queue_callback(watcher_ptr, revents) + + +_callbacks = assign_standard_callbacks( + ffi, libuv, _Callbacks, + [('python_sigchld_callback', None), + ('python_timer0_callback', None), + ('python_queue_callback', None)]) + +from gevent._ffi.loop import EVENTS +GEVENT_CORE_EVENTS = EVENTS # export + +from gevent.libuv import watcher as _watchers # pylint:disable=no-name-in-module + +_events_to_str = _watchers._events_to_str # export + +READ = libuv.UV_READABLE +WRITE = libuv.UV_WRITABLE + +def get_version(): + uv_bytes = ffi.string(libuv.uv_version_string()) + if not isinstance(uv_bytes, str): + # Py3 + uv_str = uv_bytes.decode("ascii") + else: + uv_str = uv_bytes + + return 'libuv-' + uv_str + +def get_header_version(): + return 'libuv-%d.%d.%d' % (libuv.UV_VERSION_MAJOR, libuv.UV_VERSION_MINOR, libuv.UV_VERSION_PATCH) + +def supported_backends(): + return ['default'] + +@implementer(ILoop) +class loop(AbstractLoop): + + # XXX: Undocumented. Maybe better named 'timer_resolution'? We can't + # know this in general on libev + min_sleep_time = 0.001 # 1ms + + error_handler = None + + _CHECK_POINTER = 'uv_check_t *' + + _PREPARE_POINTER = 'uv_prepare_t *' + _PREPARE_CALLBACK_SIG = "void(*)(void*)" + + _TIMER_POINTER = _CHECK_POINTER # This is poorly named. It's for the callback "timer" + + def __init__(self, flags=None, default=None): + AbstractLoop.__init__(self, ffi, libuv, _watchers, flags, default) + self.__loop_pid = os.getpid() + self._child_watchers = defaultdict(list) + self._io_watchers = dict() + self._fork_watchers = set() + self._pid = os.getpid() + self._default = self._ptr == libuv.uv_default_loop() + self._queued_callbacks = [] + + def _queue_callback(self, watcher_ptr, revents): + self._queued_callbacks.append((watcher_ptr, revents)) + + def _init_loop(self, flags, default): + if default is None: + default = True + # Unlike libev, libuv creates a new default + # loop automatically if the old default loop was + # closed. + + if default: + # XXX: If the default loop had been destroyed, this + # will create a new one, but we won't destroy it + ptr = libuv.uv_default_loop() + else: + ptr = libuv.uv_loop_new() + + + if not ptr: + raise SystemError("Failed to get loop") + + # Track whether or not any object has destroyed + # this loop. See _can_destroy_default_loop + ptr.data = ptr + return ptr + + _signal_idle = None + + def _init_and_start_check(self): + libuv.uv_check_init(self._ptr, self._check) + libuv.uv_check_start(self._check, libuv.python_check_callback) + libuv.uv_unref(self._check) + + # We also have to have an idle watcher to be able to handle + # signals in a timely manner. Without them, libuv won't loop again + # and call into its check and prepare handlers. + # Note that this basically forces us into a busy-loop + # XXX: As predicted, using an idle watcher causes our process + # to eat 100% CPU time. We instead use a timer with a max of a .3 second + # delay to notice signals. Note that this timeout also implements fork + # watchers, effectively. + + # XXX: Perhaps we could optimize this to notice when there are other + # timers in the loop and start/stop it then. When we have a callback + # scheduled, this should also be the same and unnecessary? + # libev does takes this basic approach on Windows. + self._signal_idle = ffi.new("uv_timer_t*") + libuv.uv_timer_init(self._ptr, self._signal_idle) + self._signal_idle.data = self._handle_to_self + sig_cb = ffi.cast('void(*)(uv_timer_t*)', libuv.python_check_callback) + libuv.uv_timer_start(self._signal_idle, + sig_cb, + 300, + 300) + libuv.uv_unref(self._signal_idle) + + def _run_callbacks(self): + # Manually handle fork watchers. + curpid = os.getpid() + if curpid != self._pid: + self._pid = curpid + for watcher in self._fork_watchers: + watcher._on_fork() + + + # The contents of queued_callbacks at this point should be timers + # that expired when the loop began along with any idle watchers. + # We need to run them so that any manual callbacks they want to schedule + # get added to the list and ran next before we go on to poll for IO. + # This is critical for libuv on linux: closing a socket schedules some manual + # callbacks to actually stop the watcher; if those don't run before + # we poll for IO, then libuv can abort the process for the closed file descriptor. + + # XXX: There's still a race condition here because we may not run *all* the manual + # callbacks. We need a way to prioritize those. + + # Running these before the manual callbacks lead to some + # random test failures. In test__event.TestEvent_SetThenClear + # we would get a LoopExit sometimes. The problem occurred when + # a timer expired on entering the first loop; we would process + # it there, and then process the callback that it created + # below, leaving nothing for the loop to do. Having the + # self.run() manually process manual callbacks before + # continuing solves the problem. (But we must still run callbacks + # here again.) + self._prepare_ran_callbacks = self.__run_queued_callbacks() + + super(loop, self)._run_callbacks() + + def _init_and_start_prepare(self): + libuv.uv_prepare_init(self._ptr, self._prepare) + libuv.uv_prepare_start(self._prepare, libuv.python_prepare_callback) + libuv.uv_unref(self._prepare) + + def _init_callback_timer(self): + libuv.uv_check_init(self._ptr, self._timer0) + + def _stop_callback_timer(self): + libuv.uv_check_stop(self._timer0) + + def _start_callback_timer(self): + # The purpose of the callback timer is to ensure that we run + # callbacks as soon as possible on the next iteration of the event loop. + + # In libev, we set a 0 duration timer with a no-op callback. + # This executes immediately *after* the IO poll is done (it + # actually determines the time that the IO poll will block + # for), so having the timer present simply spins the loop, and + # our normal prepare watcher kicks in to run the callbacks. + + # In libuv, however, timers are run *first*, before prepare + # callbacks and before polling for IO. So a no-op 0 duration + # timer actually does *nothing*. (Also note that libev queues all + # watchers found during IO poll to run at the end (I think), while libuv + # runs them in uv__io_poll itself.) + + # From the loop inside uv_run: + # while True: + # uv__update_time(loop); + # uv__run_timers(loop); + # # we don't use pending watchers. They are how libuv + # # implements the pipe/udp/tcp streams. + # ran_pending = uv__run_pending(loop); + # uv__run_idle(loop); + # uv__run_prepare(loop); + # ... + # uv__io_poll(loop, timeout); # <--- IO watchers run here! + # uv__run_check(loop); + + # libev looks something like this (pseudo code because the real code is + # hard to read): + # + # do { + # run_fork_callbacks(); + # run_prepare_callbacks(); + # timeout = min(time of all timers or normal block time) + # io_poll() # <--- Only queues IO callbacks + # update_now(); calculate_expired_timers(); + # run callbacks in this order: (although specificying priorities changes it) + # check + # stat + # child + # signal + # timer + # io + # } + + # So instead of running a no-op and letting the side-effect of spinning + # the loop run the callbacks, we must explicitly run them here. + + # If we don't, test__systemerror:TestCallback will be flaky, failing + # one time out of ~20, depending on timing. + + # To get them to run immediately after this current loop, + # we use a check watcher, instead of a 0 duration timer entirely. + # If we use a 0 duration timer, we can get stuck in a timer loop. + # Python 3.6 fails in test_ftplib.py + + # As a final note, if we have not yet entered the loop *at + # all*, and a timer was created with a duration shorter than + # the amount of time it took for us to enter the loop in the + # first place, it may expire and get called before our callback + # does. This could also lead to test__systemerror:TestCallback + # appearing to be flaky. + + # As yet another final note, if we are currently running a + # timer callback, meaning we're inside uv__run_timers() in C, + # and the Python starts a new timer, if the Python code then + # update's the loop's time, it's possible that timer will + # expire *and be run in the same iteration of the loop*. This + # is trivial to do: In sequential code, anything after + # `gevent.sleep(0.1)` is running in a timer callback. Starting + # a new timer---e.g., another gevent.sleep() call---will + # update the time, *before* uv__run_timers exits, meaning + # other timers get a chance to run before our check or prepare + # watcher callbacks do. Therefore, we do indeed have to have a 0 + # timer to run callbacks---it gets inserted before any other user + # timers---ideally, this should be especially careful about how much time + # it runs for. + + # AND YET: We can't actually do that. We get timeouts that I haven't fully + # investigated if we do. Probably stuck in a timer loop. + + # As a partial remedy to this, unlike libev, our timer watcher + # class doesn't update the loop time by default. + + libuv.uv_check_start(self._timer0, libuv.python_timer0_callback) + + + def _stop_aux_watchers(self): + assert self._prepare + assert self._check + assert self._signal_idle + libuv.uv_prepare_stop(self._prepare) + libuv.uv_ref(self._prepare) # Why are we doing this? + + libuv.uv_check_stop(self._check) + libuv.uv_ref(self._check) + + libuv.uv_timer_stop(self._signal_idle) + libuv.uv_ref(self._signal_idle) + + libuv.uv_check_stop(self._timer0) + + def _setup_for_run_callback(self): + self._start_callback_timer() + libuv.uv_ref(self._timer0) + + + def _can_destroy_loop(self, ptr): + # We're being asked to destroy a loop that's, + # at the time it was constructed, was the default loop. + # If loop objects were constructed more than once, + # it may have already been destroyed, though. + # We track this in the data member. + return ptr.data + + def _destroy_loop(self, ptr): + ptr.data = ffi.NULL + libuv.uv_stop(ptr) + + libuv.gevent_close_all_handles(ptr) + + closed_failed = libuv.uv_loop_close(ptr) + if closed_failed: + assert closed_failed == libuv.UV_EBUSY + # We already closed all the handles. Run the loop + # once to let them be cut off from the loop. + ran_has_more_callbacks = libuv.uv_run(ptr, libuv.UV_RUN_ONCE) + if ran_has_more_callbacks: + libuv.uv_run(ptr, libuv.UV_RUN_NOWAIT) + closed_failed = libuv.uv_loop_close(ptr) + assert closed_failed == 0, closed_failed + + # Destroy the native resources *after* we have closed + # the loop. If we do it before, walking the handles + # attached to the loop is likely to segfault. + + libuv.gevent_zero_check(self._check) + libuv.gevent_zero_check(self._timer0) + libuv.gevent_zero_prepare(self._prepare) + libuv.gevent_zero_timer(self._signal_idle) + del self._check + del self._prepare + del self._signal_idle + del self._timer0 + + libuv.gevent_zero_loop(ptr) + + # Destroy any watchers we're still holding on to. + del self._io_watchers + del self._fork_watchers + del self._child_watchers + + + def debug(self): + """ + Return all the handles that are open and their ref status. + """ + handle_state = namedtuple("HandleState", + ['handle', + 'type', + 'watcher', + 'ref', + 'active', + 'closing']) + handles = [] + + # XXX: Convert this to a modern callback. + def walk(handle, _arg): + data = handle.data + if data: + watcher = ffi.from_handle(data) + else: + watcher = None + handles.append(handle_state(handle, + ffi.string(libuv.uv_handle_type_name(handle.type)), + watcher, + libuv.uv_has_ref(handle), + libuv.uv_is_active(handle), + libuv.uv_is_closing(handle))) + + libuv.uv_walk(self._ptr, + ffi.callback("void(*)(uv_handle_t*,void*)", + walk), + ffi.NULL) + return handles + + def ref(self): + pass + + def unref(self): + # XXX: Called by _run_callbacks. + pass + + def break_(self, how=None): + libuv.uv_stop(self._ptr) + + def reinit(self): + # TODO: How to implement? We probably have to simply + # re-__init__ this whole class? Does it matter? + # OR maybe we need to uv_walk() and close all the handles? + + # XXX: libuv < 1.12 simply CANNOT handle a fork unless you immediately + # exec() in the child. There are multiple calls to abort() that + # will kill the child process: + # - The OS X poll implementation (kqueue) aborts on an error return + # value; since kqueue FDs can't be inherited, then the next call + # to kqueue in the child will fail and get aborted; fork() is likely + # to be called during the gevent loop, meaning we're deep inside the + # runloop already, so we can't even close the loop that we're in: + # it's too late, the next call to kqueue is already scheduled. + # - The threadpool, should it be in use, also aborts + # (https://github.com/joyent/libuv/pull/1136) + # - There global shared state that breaks signal handling + # and leads to an abort() in the child, EVEN IF the loop in the parent + # had already been closed + # (https://github.com/joyent/libuv/issues/1405) + + # In 1.12, the uv_loop_fork function was added (by gevent!) + libuv.uv_loop_fork(self._ptr) + + _prepare_ran_callbacks = False + + def __run_queued_callbacks(self): + if not self._queued_callbacks: + return False + + cbs = list(self._queued_callbacks) + self._queued_callbacks = [] + + for watcher_ptr, arg in cbs: + handle = watcher_ptr.data + if not handle: + # It's been stopped and possibly closed + assert not libuv.uv_is_active(watcher_ptr) + continue + val = _callbacks.python_callback(handle, arg) + if val == -1: + _callbacks.python_handle_error(handle, arg) + elif val == 1: + if not libuv.uv_is_active(watcher_ptr): + if watcher_ptr.data != handle: + if watcher_ptr.data: + _callbacks.python_stop(None) + else: + _callbacks.python_stop(handle) + return True + + + def run(self, nowait=False, once=False): + # we can only respect one flag or the other. + # nowait takes precedence because it can't block + mode = libuv.UV_RUN_DEFAULT + if once: + mode = libuv.UV_RUN_ONCE + if nowait: + mode = libuv.UV_RUN_NOWAIT + + if mode == libuv.UV_RUN_DEFAULT: + while self._ptr and self._ptr.data: + # This is here to better preserve order guarantees. See _run_callbacks + # for details. + # It may get run again from the prepare watcher, so potentially we + # could take twice as long as the switch interval. + self._run_callbacks() + self._prepare_ran_callbacks = False + ran_status = libuv.uv_run(self._ptr, libuv.UV_RUN_ONCE) + # Note that we run queued callbacks when the prepare watcher runs, + # thus accounting for timers that expired before polling for IO, + # and idle watchers. This next call should get IO callbacks and + # callbacks from timers that expired *after* polling for IO. + ran_callbacks = self.__run_queued_callbacks() + + if not ran_status and not ran_callbacks and not self._prepare_ran_callbacks: + # A return of 0 means there are no referenced and + # active handles. The loop is over. + # If we didn't run any callbacks, then we couldn't schedule + # anything to switch in the future, so there's no point + # running again. + return ran_status + return 0 # Somebody closed the loop + + result = libuv.uv_run(self._ptr, mode) + self.__run_queued_callbacks() + return result + + def now(self): + # libuv's now is expressed as an integer number of + # milliseconds, so to get it compatible with time.time units + # that this method is supposed to return, we have to divide by 1000.0 + now = libuv.uv_now(self._ptr) + return now / 1000.0 + + def update_now(self): + libuv.uv_update_time(self._ptr) + + def fileno(self): + if self._ptr: + fd = libuv.uv_backend_fd(self._ptr) + if fd >= 0: + return fd + + _sigchld_watcher = None + _sigchld_callback_ffi = None + + def install_sigchld(self): + if not self.default: + return + + if self._sigchld_watcher: + return + + self._sigchld_watcher = ffi.new('uv_signal_t*') + libuv.uv_signal_init(self._ptr, self._sigchld_watcher) + self._sigchld_watcher.data = self._handle_to_self + + libuv.uv_signal_start(self._sigchld_watcher, + libuv.python_sigchld_callback, + signal.SIGCHLD) + + def reset_sigchld(self): + if not self.default or not self._sigchld_watcher: + return + + libuv.uv_signal_stop(self._sigchld_watcher) + # Must go through this to manage the memory lifetime + # correctly. Alternately, we could just stop it and restart + # it in install_sigchld? + _watchers.watcher._watcher_ffi_close(self._sigchld_watcher) + del self._sigchld_watcher + + + def _sigchld_callback(self): + # Signals can arrive at (relatively) any time. To eliminate + # race conditions, and behave more like libev, we "queue" + # sigchld to run when we run callbacks. + while True: + try: + pid, status, _usage = os.wait3(os.WNOHANG) + except OSError: + # Python 3 raises ChildProcessError + break + + if pid == 0: + break + children_watchers = self._child_watchers.get(pid, []) + self._child_watchers.get(0, []) + for watcher in children_watchers: + self.run_callback(watcher._set_waitpid_status, pid, status) + + # Don't invoke child watchers for 0 more than once + self._child_watchers[0] = [] + + def _register_child_watcher(self, watcher): + self._child_watchers[watcher._pid].append(watcher) + + def _unregister_child_watcher(self, watcher): + try: + # stop() should be idempotent + self._child_watchers[watcher._pid].remove(watcher) + except ValueError: + pass + + # Now's a good time to clean up any dead lists we don't need + # anymore + for pid in list(self._child_watchers): + if not self._child_watchers[pid]: + del self._child_watchers[pid] + + def io(self, fd, events, ref=True, priority=None): + # We rely on hard references here and explicit calls to + # close() on the returned object to correctly manage + # the watcher lifetimes. + + io_watchers = self._io_watchers + try: + io_watcher = io_watchers[fd] + assert io_watcher._multiplex_watchers, ("IO Watcher %s unclosed but should be dead" % io_watcher) + except KeyError: + # Start the watcher with just the events that we're interested in. + # as multiplexers are added, the real event mask will be updated to keep in sync. + # If we watch for too much, we get spurious wakeups and busy loops. + io_watcher = self._watchers.io(self, fd, 0) + io_watchers[fd] = io_watcher + io_watcher._no_more_watchers = lambda: delitem(io_watchers, fd) + + return io_watcher.multiplex(events) + + def prepare(self, ref=True, priority=None): + # We run arbitrary code in python_prepare_callback. That could switch + # greenlets. If it does that while also manipulating the active prepare + # watchers, we could corrupt the process state, since the prepare watcher + # queue is iterated on the stack (on unix). We could workaround this by implementing + # prepare watchers in pure Python. + # See https://github.com/gevent/gevent/issues/1126 + raise TypeError("prepare watchers are not currently supported in libuv. " + "If you need them, please contact the maintainers.") diff --git a/python/gevent/libuv/watcher.py b/python/gevent/libuv/watcher.py new file mode 100644 index 0000000..035cb43 --- /dev/null +++ b/python/gevent/libuv/watcher.py @@ -0,0 +1,732 @@ +# pylint: disable=too-many-lines, protected-access, redefined-outer-name, not-callable +# pylint: disable=no-member +from __future__ import absolute_import, print_function + +import functools +import sys + +from gevent.libuv import _corecffi # pylint:disable=no-name-in-module,import-error + +ffi = _corecffi.ffi +libuv = _corecffi.lib + +from gevent._ffi import watcher as _base +from gevent._ffi import _dbg + +_closing_watchers = set() + +# In debug mode, it would be nice to be able to clear the memory of +# the watcher (its size determined by +# libuv.uv_handle_size(ffi_watcher.type)) using memset so that if we +# are using it after it's supposedly been closed and deleted, we'd +# catch it sooner. BUT doing so breaks test__threadpool. We get errors +# about `pthread_mutex_lock[3]: Invalid argument` (and sometimes we +# crash) suggesting either that we're writing on memory that doesn't +# belong to us, somehow, or that we haven't actually lost all +# references... +_uv_close_callback = ffi.def_extern(name='_uv_close_callback')(_closing_watchers.remove) + + +_events = [(libuv.UV_READABLE, "READ"), + (libuv.UV_WRITABLE, "WRITE")] + +def _events_to_str(events): # export + return _base.events_to_str(events, _events) + +class UVFuncallError(ValueError): + pass + +class libuv_error_wrapper(object): + # Makes sure that everything stored as a function + # on the wrapper instances (classes, actually, + # because this is used by the metaclass) + # checks its return value and raises an error. + # This expects that everything we call has an int + # or void return value and follows the conventions + # of error handling (that negative values are errors) + def __init__(self, uv): + self._libuv = uv + + def __getattr__(self, name): + libuv_func = getattr(self._libuv, name) + + @functools.wraps(libuv_func) + def wrap(*args, **kwargs): + if args and isinstance(args[0], watcher): + args = args[1:] + res = libuv_func(*args, **kwargs) + if res is not None and res < 0: + raise UVFuncallError( + str(ffi.string(libuv.uv_err_name(res)).decode('ascii') + + ' ' + + ffi.string(libuv.uv_strerror(res)).decode('ascii')) + + " Args: " + repr(args) + " KWARGS: " + repr(kwargs) + ) + return res + + setattr(self, name, wrap) + + return wrap + + +class ffi_unwrapper(object): + # undoes the wrapping of libuv_error_wrapper for + # the methods used by the metaclass that care + + def __init__(self, ff): + self._ffi = ff + + def __getattr__(self, name): + return getattr(self._ffi, name) + + def addressof(self, lib, name): + assert isinstance(lib, libuv_error_wrapper) + return self._ffi.addressof(libuv, name) + + +class watcher(_base.watcher): + _FFI = ffi_unwrapper(ffi) + _LIB = libuv_error_wrapper(libuv) + + _watcher_prefix = 'uv' + _watcher_struct_pattern = '%s_t' + + @classmethod + def _watcher_ffi_close(cls, ffi_watcher): + # Managing the lifetime of _watcher is tricky. + # They have to be uv_close()'d, but that only + # queues them to be closed in the *next* loop iteration. + # The memory must stay valid for at least that long, + # or assert errors are triggered. We can't use a ffi.gc() + # pointer to queue the uv_close, because by the time the + # destructor is called, there's no way to keep the memory alive + # and it could be re-used. + # So here we resort to resurrecting the pointer object out + # of our scope, keeping it alive past this object's lifetime. + # We then use the uv_close callback to handle removing that + # reference. There's no context passed to the close callback, + # so we have to do this globally. + + # Sadly, doing this causes crashes if there were multiple + # watchers for a given FD, so we have to take special care + # about that. See https://github.com/gevent/gevent/issues/790#issuecomment-208076604 + + # Note that this cannot be a __del__ method, because we store + # the CFFI handle to self on self, which is a cycle, and + # objects with a __del__ method cannot be collected on CPython < 3.4 + + # Instead, this is arranged as a callback to GC when the + # watcher class dies. Obviously it's important to keep the ffi + # watcher alive. + # We can pass in "subclasses" if uv_handle_t that line up at the C level, + # but that don't in CFFI without a cast. But be careful what we use the cast + # for, don't pass it back to C. + ffi_handle_watcher = cls._FFI.cast('uv_handle_t*', ffi_watcher) + if ffi_handle_watcher.type and not libuv.uv_is_closing(ffi_watcher): + # If the type isn't set, we were never properly initialized, + # and trying to close it results in libuv terminating the process. + # Sigh. Same thing if it's already in the process of being + # closed. + _closing_watchers.add(ffi_watcher) + libuv.uv_close(ffi_watcher, libuv._uv_close_callback) + + ffi_handle_watcher.data = ffi.NULL + + + def _watcher_ffi_set_init_ref(self, ref): + self.ref = ref + + def _watcher_ffi_init(self, args): + # TODO: we could do a better job chokepointing this + return self._watcher_init(self.loop.ptr, + self._watcher, + *args) + + def _watcher_ffi_start(self): + self._watcher_start(self._watcher, self._watcher_callback) + + def _watcher_ffi_stop(self): + if self._watcher: + # The multiplexed io watcher deletes self._watcher + # when it closes down. If that's in the process of + # an error handler, AbstractCallbacks.unhandled_onerror + # will try to close us again. + self._watcher_stop(self._watcher) + + @_base.only_if_watcher + def _watcher_ffi_ref(self): + libuv.uv_ref(self._watcher) + + @_base.only_if_watcher + def _watcher_ffi_unref(self): + libuv.uv_unref(self._watcher) + + def _watcher_ffi_start_unref(self): + pass + + def _watcher_ffi_stop_ref(self): + pass + + def _get_ref(self): + # Convert 1/0 to True/False + if self._watcher is None: + return None + return True if libuv.uv_has_ref(self._watcher) else False + + def _set_ref(self, value): + if value: + self._watcher_ffi_ref() + else: + self._watcher_ffi_unref() + + ref = property(_get_ref, _set_ref) + + def feed(self, _revents, _callback, *_args): + raise Exception("Not implemented") + +class io(_base.IoMixin, watcher): + _watcher_type = 'poll' + _watcher_callback_name = '_gevent_poll_callback2' + + # On Windows is critical to be able to garbage collect these + # objects in a timely fashion so that they don't get reused + # for multiplexing completely different sockets. This is because + # uv_poll_init_socket does a lot of setup for the socket to make + # polling work. If get reused for another socket that has the same + # fileno, things break badly. (In theory this could be a problem + # on posix too, but in practice it isn't). + + # TODO: We should probably generalize this to all + # ffi watchers. Avoiding GC cycles as much as possible + # is a good thing, and potentially allocating new handles + # as needed gets us better memory locality. + + # Especially on Windows, we must also account for the case that a + # reference to this object has leaked (e.g., the socket object is + # still around), but the fileno has been closed and a new one + # opened. We must still get a new native watcher at that point. We + # handle this case by simply making sure that we don't even have + # a native watcher until the object is started, and we shut it down + # when the object is stopped. + + # XXX: I was able to solve at least Windows test_ftplib.py issues + # with more of a careful use of io objects in socket.py, so + # delaying this entirely is at least temporarily on hold. Instead + # sticking with the _watcher_create function override for the + # moment. + + # XXX: Note 2: Moving to a deterministic close model, which was necessary + # for PyPy, also seems to solve the Windows issues. So we're completely taking + # this object out of the loop's registration; we don't want GC callbacks and + # uv_close anywhere *near* this object. + + _watcher_registers_with_loop_on_create = False + + EVENT_MASK = libuv.UV_READABLE | libuv.UV_WRITABLE | libuv.UV_DISCONNECT + + _multiplex_watchers = () + + def __init__(self, loop, fd, events, ref=True, priority=None): + super(io, self).__init__(loop, fd, events, ref=ref, priority=priority, _args=(fd,)) + self._fd = fd + self._events = events + self._multiplex_watchers = [] + + def _get_fd(self): + return self._fd + + @_base.not_while_active + def _set_fd(self, fd): + self._fd = fd + self._watcher_ffi_init((fd,)) + + def _get_events(self): + return self._events + + def _set_events(self, events): + if events == self._events: + return + self._events = events + if self.active: + # We're running but libuv specifically says we can + # call start again to change our event mask. + assert self._handle is not None + self._watcher_start(self._watcher, self._events, self._watcher_callback) + + events = property(_get_events, _set_events) + + def _watcher_ffi_start(self): + self._watcher_start(self._watcher, self._events, self._watcher_callback) + + if sys.platform.startswith('win32'): + # uv_poll can only handle sockets on Windows, but the plain + # uv_poll_init we call on POSIX assumes that the fileno + # argument is already a C fileno, as created by + # _get_osfhandle. C filenos are limited resources, must be + # closed with _close. So there are lifetime issues with that: + # calling the C function _close to dispose of the fileno + # *also* closes the underlying win32 handle, possibly + # prematurely. (XXX: Maybe could do something with weak + # references? But to what?) + + # All libuv wants to do with the fileno in uv_poll_init is + # turn it back into a Win32 SOCKET handle. + + # Now, libuv provides uv_poll_init_socket, which instead of + # taking a C fileno takes the SOCKET, avoiding the need to dance with + # the C runtime. + + # It turns out that SOCKET (win32 handles in general) can be + # represented with `intptr_t`. It further turns out that + # CPython *directly* exposes the SOCKET handle as the value of + # fileno (32-bit PyPy does some munging on it, which should + # rarely matter). So we can pass socket.fileno() through + # to uv_poll_init_socket. + + # See _corecffi_build. + _watcher_init = watcher._LIB.uv_poll_init_socket + + + class _multiplexwatcher(object): + + callback = None + args = () + pass_events = False + ref = True + + def __init__(self, events, watcher): + self._events = events + + # References: + # These objects must keep the original IO object alive; + # the IO object SHOULD NOT keep these alive to avoid cycles + # We MUST NOT rely on GC to clean up the IO objects, but the explicit + # calls to close(); see _multiplex_closed. + self._watcher_ref = watcher + + events = property( + lambda self: self._events, + _base.not_while_active(lambda self, nv: setattr(self, '_events', nv))) + + def start(self, callback, *args, **kwargs): + self.pass_events = kwargs.get("pass_events") + self.callback = callback + self.args = args + + watcher = self._watcher_ref + if watcher is not None: + if not watcher.active: + watcher._io_start() + else: + # Make sure we're in the event mask + watcher._calc_and_update_events() + + def stop(self): + self.callback = None + self.pass_events = None + self.args = None + watcher = self._watcher_ref + if watcher is not None: + watcher._io_maybe_stop() + + def close(self): + if self._watcher_ref is not None: + self._watcher_ref._multiplex_closed(self) + self._watcher_ref = None + + @property + def active(self): + return self.callback is not None + + @property + def _watcher(self): + # For testing. + return self._watcher_ref._watcher + + # ares.pyx depends on this property, + # and test__core uses it too + fd = property(lambda self: getattr(self._watcher_ref, '_fd', -1), + lambda self, nv: self._watcher_ref._set_fd(nv)) + + def _io_maybe_stop(self): + self._calc_and_update_events() + for w in self._multiplex_watchers: + if w.callback is not None: + # There's still a reference to it, and it's started, + # so we can't stop. + return + # If we get here, nothing was started + # so we can take ourself out of the polling set + self.stop() + + def _io_start(self): + self._calc_and_update_events() + self.start(self._io_callback, pass_events=True) + + def _calc_and_update_events(self): + events = 0 + for watcher in self._multiplex_watchers: + if watcher.callback is not None: + # Only ask for events that are active. + events |= watcher.events + self._set_events(events) + + + def multiplex(self, events): + watcher = self._multiplexwatcher(events, self) + self._multiplex_watchers.append(watcher) + self._calc_and_update_events() + return watcher + + def close(self): + super(io, self).close() + del self._multiplex_watchers + + def _multiplex_closed(self, watcher): + self._multiplex_watchers.remove(watcher) + if not self._multiplex_watchers: + self.stop() # should already be stopped + self._no_more_watchers() + # It is absolutely critical that we control when the call + # to uv_close() gets made. uv_close() of a uv_poll_t + # handle winds up calling uv__platform_invalidate_fd, + # which, as the name implies, destroys any outstanding + # events for the *fd* that haven't been delivered yet, and also removes + # the *fd* from the poll set. So if this happens later, at some + # non-deterministic time when (cyclic or otherwise) GC runs, + # *and* we've opened a new watcher for the fd, that watcher will + # suddenly and mysteriously stop seeing events. So we do this now; + # this method is smart enough not to close the handle twice. + self.close() + else: + self._calc_and_update_events() + + def _no_more_watchers(self): + # The loop sets this on an individual watcher to delete it from + # the active list where it keeps hard references. + pass + + def _io_callback(self, events): + if events < 0: + # actually a status error code + _dbg("Callback error on", self._fd, + ffi.string(libuv.uv_err_name(events)), + ffi.string(libuv.uv_strerror(events))) + # XXX: We've seen one half of a FileObjectPosix pair + # (the read side of a pipe) report errno 11 'bad file descriptor' + # after the write side was closed and its watcher removed. But + # we still need to attempt to read from it to clear out what's in + # its buffers--if we return with the watcher inactive before proceeding to wake up + # the reader, we get a LoopExit. So we can't return here and arguably shouldn't print it + # either. The negative events mask will match the watcher's mask. + # See test__fileobject.py:Test.test_newlines for an example. + + # On Windows (at least with PyPy), we can get ENOTSOCK (socket operation on non-socket) + # if a socket gets closed. If we don't pass the events on, we hang. + # See test__makefile_ref.TestSSL for examples. + # return + + for watcher in self._multiplex_watchers: + if not watcher.callback: + # Stopped + continue + assert watcher._watcher_ref is self, (self, watcher._watcher_ref) + + send_event = (events & watcher.events) or events < 0 + if send_event: + if not watcher.pass_events: + watcher.callback(*watcher.args) + else: + watcher.callback(events, *watcher.args) + +class _SimulatedWithAsyncMixin(object): + _watcher_skip_ffi = True + + def __init__(self, loop, *args, **kwargs): + self._async = loop.async_() + try: + super(_SimulatedWithAsyncMixin, self).__init__(loop, *args, **kwargs) + except: + self._async.close() + raise + + def _watcher_create(self, _args): + return + + @property + def _watcher_handle(self): + return None + + def _watcher_ffi_init(self, _args): + return + + def _watcher_ffi_set_init_ref(self, ref): + self._async.ref = ref + + @property + def active(self): + return self._async.active + + def start(self, cb, *args): + self._register_loop_callback() + self.callback = cb + self.args = args + self._async.start(cb, *args) + #watcher.start(self, cb, *args) + + def stop(self): + self._unregister_loop_callback() + self.callback = None + self.args = None + self._async.stop() + + def close(self): + if self._async is not None: + a = self._async + #self._async = None + a.close() + + def _register_loop_callback(self): + # called from start() + raise NotImplementedError() + + def _unregister_loop_callback(self): + # called from stop + raise NotImplementedError() + +class fork(_SimulatedWithAsyncMixin, + _base.ForkMixin, + watcher): + # We'll have to implement this one completely manually + # Right now it doesn't matter much since libuv doesn't survive + # a fork anyway. (That's a work in progress) + _watcher_skip_ffi = False + + def _register_loop_callback(self): + self.loop._fork_watchers.add(self) + + def _unregister_loop_callback(self): + try: + # stop() should be idempotent + self.loop._fork_watchers.remove(self) + except KeyError: + pass + + def _on_fork(self): + self._async.send() + + +class child(_SimulatedWithAsyncMixin, + _base.ChildMixin, + watcher): + _watcher_skip_ffi = True + # We'll have to implement this one completely manually. + # Our approach is to use a SIGCHLD handler and the original + # os.waitpid call. + + # On Unix, libuv's uv_process_t and uv_spawn use SIGCHLD, + # just like libev does for its child watchers. So + # we're not adding any new SIGCHLD related issues not already + # present in libev. + + + def _register_loop_callback(self): + self.loop._register_child_watcher(self) + + def _unregister_loop_callback(self): + self.loop._unregister_child_watcher(self) + + def _set_waitpid_status(self, pid, status): + self._rpid = pid + self._rstatus = status + self._async.send() + + +class async_(_base.AsyncMixin, watcher): + _watcher_callback_name = '_gevent_async_callback0' + + def _watcher_ffi_init(self, args): + # It's dangerous to have a raw, non-initted struct + # around; it will crash in uv_close() when we get GC'd, + # and send() will also crash. + # NOTE: uv_async_init is NOT idempotent. Calling it more than + # once adds the uv_async_t to the internal queue multiple times, + # and uv_close only cleans up one of them, meaning that we tend to + # crash. Thus we have to be very careful not to allow that. + return self._watcher_init(self.loop.ptr, self._watcher, ffi.NULL) + + def _watcher_ffi_start(self): + # we're created in a started state, but we didn't provide a + # callback (because if we did and we don't have a value in our + # callback attribute, then python_callback would crash.) Note that + # uv_async_t->async_cb is not technically documented as public. + self._watcher.async_cb = self._watcher_callback + + def _watcher_ffi_stop(self): + self._watcher.async_cb = ffi.NULL + # We have to unref this because we're setting the cb behind libuv's + # back, basically: once a async watcher is started, it can't ever be + # stopped through libuv interfaces, so it would never lose its active + # status, and thus if it stays reffed it would keep the event loop + # from exiting. + self._watcher_ffi_unref() + + def send(self): + if libuv.uv_is_closing(self._watcher): + raise Exception("Closing handle") + libuv.uv_async_send(self._watcher) + + @property + def pending(self): + return None + +locals()['async'] = async_ + +class timer(_base.TimerMixin, watcher): + + _watcher_callback_name = '_gevent_timer_callback0' + + # In libuv, timer callbacks continue running while any timer is + # expired, including newly added timers. Newly added non-zero + # timers (especially of small duration) can be seen to be expired + # if the loop time is updated while we are in a timer callback. + # This can lead to us being stuck running timers for a terribly + # long time, which is not good. So default to not updating the + # time. + + # Also, newly-added timers of 0 duration can *also* stall the + # loop, because they'll be seen to be expired immediately. + # Updating the time can prevent that, *if* there was already a + # timer for a longer duration scheduled. + + # To mitigate the above problems, our loop implementation turns + # zero duration timers into check watchers instead using OneShotCheck. + # This ensures the loop cycles. Of course, the 'again' method does + # nothing on them and doesn't exist. In practice that's not an issue. + + _again = False + + def _watcher_ffi_init(self, args): + self._watcher_init(self.loop._ptr, self._watcher) + self._after, self._repeat = args + if self._after and self._after < 0.001: + import warnings + # XXX: The stack level is hard to determine, could be getting here + # through a number of different ways. + warnings.warn("libuv only supports millisecond timer resolution; " + "all times less will be set to 1 ms", + stacklevel=6) + # The alternative is to effectively pass in int(0.1) == 0, which + # means no sleep at all, which leads to excessive wakeups + self._after = 0.001 + if self._repeat and self._repeat < 0.001: + import warnings + warnings.warn("libuv only supports millisecond timer resolution; " + "all times less will be set to 1 ms", + stacklevel=6) + self._repeat = 0.001 + + def _watcher_ffi_start(self): + if self._again: + libuv.uv_timer_again(self._watcher) + else: + try: + self._watcher_start(self._watcher, self._watcher_callback, + int(self._after * 1000), + int(self._repeat * 1000)) + except ValueError: + # in case of non-ints in _after/_repeat + raise TypeError() + + def again(self, callback, *args, **kw): + if not self.active: + # If we've never been started, this is the same as starting us. + # libuv makes the distinction, libev doesn't. + self.start(callback, *args, **kw) + return + + self._again = True + try: + self.start(callback, *args, **kw) + finally: + del self._again + + +class stat(_base.StatMixin, watcher): + _watcher_type = 'fs_poll' + _watcher_struct_name = 'gevent_fs_poll_t' + _watcher_callback_name = '_gevent_fs_poll_callback3' + + def _watcher_set_data(self, the_watcher, data): + the_watcher.handle.data = data + return data + + def _watcher_ffi_init(self, args): + return self._watcher_init(self.loop._ptr, self._watcher) + + MIN_STAT_INTERVAL = 0.1074891 # match libev; 0.0 is default + + def _watcher_ffi_start(self): + # libev changes this when the watcher is started + if self._interval < self.MIN_STAT_INTERVAL: + self._interval = self.MIN_STAT_INTERVAL + self._watcher_start(self._watcher, self._watcher_callback, + self._cpath, + int(self._interval * 1000)) + + @property + def _watcher_handle(self): + return self._watcher.handle.data + + @property + def attr(self): + if not self._watcher.curr.st_nlink: + return + return self._watcher.curr + + @property + def prev(self): + if not self._watcher.prev.st_nlink: + return + return self._watcher.prev + + +class signal(_base.SignalMixin, watcher): + _watcher_callback_name = '_gevent_signal_callback1' + + def _watcher_ffi_init(self, args): + self._watcher_init(self.loop._ptr, self._watcher) + self.ref = False # libev doesn't ref these by default + + + def _watcher_ffi_start(self): + self._watcher_start(self._watcher, self._watcher_callback, + self._signalnum) + + +class idle(_base.IdleMixin, watcher): + # Because libuv doesn't support priorities, idle watchers are + # potentially quite a bit different than under libev + _watcher_callback_name = '_gevent_idle_callback0' + + +class check(_base.CheckMixin, watcher): + _watcher_callback_name = '_gevent_check_callback0' + +class OneShotCheck(check): + + _watcher_skip_ffi = True + + def __make_cb(self, func): + stop = self.stop + @functools.wraps(func) + def cb(*args): + stop() + return func(*args) + return cb + + def start(self, callback, *args): + return check.start(self, self.__make_cb(callback), *args) + +class prepare(_base.PrepareMixin, watcher): + _watcher_callback_name = '_gevent_prepare_callback0' |