aboutsummaryrefslogtreecommitdiffstats
path: root/hypervideo_dl/plugins.py
diff options
context:
space:
mode:
Diffstat (limited to 'hypervideo_dl/plugins.py')
-rw-r--r--hypervideo_dl/plugins.py173
1 files changed, 173 insertions, 0 deletions
diff --git a/hypervideo_dl/plugins.py b/hypervideo_dl/plugins.py
new file mode 100644
index 0000000..38e4a2c
--- /dev/null
+++ b/hypervideo_dl/plugins.py
@@ -0,0 +1,173 @@
+import contextlib
+import importlib
+import importlib.abc
+import importlib.machinery
+import importlib.util
+import inspect
+import itertools
+import pkgutil
+import sys
+import traceback
+import zipimport
+from pathlib import Path
+from zipfile import ZipFile
+
+from .compat import functools # isort: split
+from .utils import (
+ get_executable_path,
+ get_system_config_dirs,
+ get_user_config_dirs,
+ orderedSet,
+ write_string,
+)
+
+PACKAGE_NAME = 'hypervideo_dl_plugins'
+COMPAT_PACKAGE_NAME = 'ytdlp_plugins'
+
+
+class PluginLoader(importlib.abc.Loader):
+ """Dummy loader for virtual namespace packages"""
+
+ def exec_module(self, module):
+ return None
+
+
+@functools.cache
+def dirs_in_zip(archive):
+ try:
+ with ZipFile(archive) as zip_:
+ return set(itertools.chain.from_iterable(
+ Path(file).parents for file in zip_.namelist()))
+ except FileNotFoundError:
+ pass
+ except Exception as e:
+ write_string(f'WARNING: Could not read zip file {archive}: {e}\n')
+ return set()
+
+
+class PluginFinder(importlib.abc.MetaPathFinder):
+ """
+ This class provides one or multiple namespace packages.
+ It searches in sys.path and hypervideo config folders for
+ the existing subdirectories from which the modules can be imported
+ """
+
+ def __init__(self, *packages):
+ self._zip_content_cache = {}
+ self.packages = set(itertools.chain.from_iterable(
+ itertools.accumulate(name.split('.'), lambda a, b: '.'.join((a, b)))
+ for name in packages))
+
+ def search_locations(self, fullname):
+ candidate_locations = []
+
+ def _get_package_paths(*root_paths, containing_folder='plugins'):
+ for config_dir in orderedSet(map(Path, root_paths), lazy=True):
+ with contextlib.suppress(OSError):
+ yield from (config_dir / containing_folder).iterdir()
+
+ # Load from hypervideo config folders
+ candidate_locations.extend(_get_package_paths(
+ *get_user_config_dirs('hypervideo'),
+ *get_system_config_dirs('hypervideo'),
+ containing_folder='plugins'))
+
+ # Load from hypervideo-plugins folders
+ candidate_locations.extend(_get_package_paths(
+ get_executable_path(),
+ *get_user_config_dirs(''),
+ *get_system_config_dirs(''),
+ containing_folder='hypervideo-plugins'))
+
+ candidate_locations.extend(map(Path, sys.path)) # PYTHONPATH
+ with contextlib.suppress(ValueError): # Added when running __main__.py directly
+ candidate_locations.remove(Path(__file__).parent)
+
+ parts = Path(*fullname.split('.'))
+ for path in orderedSet(candidate_locations, lazy=True):
+ candidate = path / parts
+ if candidate.is_dir():
+ yield candidate
+ elif path.suffix in ('.zip', '.egg', '.whl') and path.is_file():
+ if parts in dirs_in_zip(path):
+ yield candidate
+
+ def find_spec(self, fullname, path=None, target=None):
+ if fullname not in self.packages:
+ return None
+
+ search_locations = list(map(str, self.search_locations(fullname)))
+ if not search_locations:
+ return None
+
+ spec = importlib.machinery.ModuleSpec(fullname, PluginLoader(), is_package=True)
+ spec.submodule_search_locations = search_locations
+ return spec
+
+ def invalidate_caches(self):
+ dirs_in_zip.cache_clear()
+ for package in self.packages:
+ if package in sys.modules:
+ del sys.modules[package]
+
+
+def directories():
+ spec = importlib.util.find_spec(PACKAGE_NAME)
+ return spec.submodule_search_locations if spec else []
+
+
+def iter_modules(subpackage):
+ fullname = f'{PACKAGE_NAME}.{subpackage}'
+ with contextlib.suppress(ModuleNotFoundError):
+ pkg = importlib.import_module(fullname)
+ yield from pkgutil.iter_modules(path=pkg.__path__, prefix=f'{fullname}.')
+
+
+def load_module(module, module_name, suffix):
+ return inspect.getmembers(module, lambda obj: (
+ inspect.isclass(obj)
+ and obj.__name__.endswith(suffix)
+ and obj.__module__.startswith(module_name)
+ and not obj.__name__.startswith('_')
+ and obj.__name__ in getattr(module, '__all__', [obj.__name__])))
+
+
+def load_plugins(name, suffix):
+ classes = {}
+
+ for finder, module_name, _ in iter_modules(name):
+ if any(x.startswith('_') for x in module_name.split('.')):
+ continue
+ try:
+ if sys.version_info < (3, 10) and isinstance(finder, zipimport.zipimporter):
+ # zipimporter.load_module() is deprecated in 3.10 and removed in 3.12
+ # The exec_module branch below is the replacement for >= 3.10
+ # See: https://docs.python.org/3/library/zipimport.html#zipimport.zipimporter.exec_module
+ module = finder.load_module(module_name)
+ else:
+ spec = finder.find_spec(module_name)
+ module = importlib.util.module_from_spec(spec)
+ sys.modules[module_name] = module
+ spec.loader.exec_module(module)
+ except Exception:
+ write_string(f'Error while importing module {module_name!r}\n{traceback.format_exc(limit=-1)}')
+ continue
+ classes.update(load_module(module, module_name, suffix))
+
+ # Compat: old plugin system using __init__.py
+ # Note: plugins imported this way do not show up in directories()
+ # nor are considered part of the hypervideo_dl_plugins namespace package
+ with contextlib.suppress(FileNotFoundError):
+ spec = importlib.util.spec_from_file_location(
+ name, Path(get_executable_path(), COMPAT_PACKAGE_NAME, name, '__init__.py'))
+ plugins = importlib.util.module_from_spec(spec)
+ sys.modules[spec.name] = plugins
+ spec.loader.exec_module(plugins)
+ classes.update(load_module(plugins, spec.name, suffix))
+
+ return classes
+
+
+sys.meta_path.insert(0, PluginFinder(f'{PACKAGE_NAME}.extractor', f'{PACKAGE_NAME}.postprocessor'))
+
+__all__ = ['directories', 'load_plugins', 'PACKAGE_NAME', 'COMPAT_PACKAGE_NAME']