aboutsummaryrefslogtreecommitdiffstats
path: root/yt_dlp/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'yt_dlp/utils.py')
-rw-r--r--yt_dlp/utils.py152
1 files changed, 152 insertions, 0 deletions
diff --git a/yt_dlp/utils.py b/yt_dlp/utils.py
index be7cbf9fd..f21d70672 100644
--- a/yt_dlp/utils.py
+++ b/yt_dlp/utils.py
@@ -2609,6 +2609,16 @@ def get_exe_version(exe, args=['--version'],
return detect_exe_version(out, version_re, unrecognized) if out else False
+def frange(start=0, stop=None, step=1):
+ """Float range"""
+ if stop is None:
+ start, stop = 0, start
+ sign = [-1, 1][step > 0] if step else 0
+ while sign * start < sign * stop:
+ yield start
+ start += step
+
+
class LazyList(collections.abc.Sequence):
"""Lazy immutable list from an iterable
Note that slices of a LazyList are lists and not LazyList"""
@@ -2805,6 +2815,148 @@ class InAdvancePagedList(PagedList):
yield from page_results
+class PlaylistEntries:
+ MissingEntry = object()
+ is_exhausted = False
+
+ def __init__(self, ydl, info_dict):
+ self.ydl, self.info_dict = ydl, info_dict
+
+ PLAYLIST_ITEMS_RE = re.compile(r'''(?x)
+ (?P<start>[+-]?\d+)?
+ (?P<range>[:-]
+ (?P<end>[+-]?\d+|inf(?:inite)?)?
+ (?::(?P<step>[+-]?\d+))?
+ )?''')
+
+ @classmethod
+ def parse_playlist_items(cls, string):
+ for segment in string.split(','):
+ if not segment:
+ raise ValueError('There is two or more consecutive commas')
+ mobj = cls.PLAYLIST_ITEMS_RE.fullmatch(segment)
+ if not mobj:
+ raise ValueError(f'{segment!r} is not a valid specification')
+ start, end, step, has_range = mobj.group('start', 'end', 'step', 'range')
+ if int_or_none(step) == 0:
+ raise ValueError(f'Step in {segment!r} cannot be zero')
+ yield slice(int_or_none(start), float_or_none(end), int_or_none(step)) if has_range else int(start)
+
+ def get_requested_items(self):
+ playlist_items = self.ydl.params.get('playlist_items')
+ playlist_start = self.ydl.params.get('playliststart', 1)
+ playlist_end = self.ydl.params.get('playlistend')
+ # For backwards compatibility, interpret -1 as whole list
+ if playlist_end in (-1, None):
+ playlist_end = ''
+ if not playlist_items:
+ playlist_items = f'{playlist_start}:{playlist_end}'
+ elif playlist_start != 1 or playlist_end:
+ self.ydl.report_warning('Ignoring playliststart and playlistend because playlistitems was given', only_once=True)
+
+ for index in self.parse_playlist_items(playlist_items):
+ for i, entry in self[index]:
+ yield i, entry
+ try:
+ # TODO: Add auto-generated fields
+ self.ydl._match_entry(entry, incomplete=True, silent=True)
+ except (ExistingVideoReached, RejectedVideoReached):
+ return
+
+ @property
+ def full_count(self):
+ if self.info_dict.get('playlist_count'):
+ return self.info_dict['playlist_count']
+ elif self.is_exhausted and not self.is_incomplete:
+ return len(self)
+ elif isinstance(self._entries, InAdvancePagedList):
+ if self._entries._pagesize == 1:
+ return self._entries._pagecount
+
+ @functools.cached_property
+ def _entries(self):
+ entries = self.info_dict.get('entries')
+ if entries is None:
+ raise EntryNotInPlaylist('There are no entries')
+ elif isinstance(entries, list):
+ self.is_exhausted = True
+
+ indices = self.info_dict.get('requested_entries')
+ self.is_incomplete = bool(indices)
+ if self.is_incomplete:
+ assert self.is_exhausted
+ ret = [self.MissingEntry] * max(indices)
+ for i, entry in zip(indices, entries):
+ ret[i - 1] = entry
+ return ret
+
+ if isinstance(entries, (list, PagedList, LazyList)):
+ return entries
+ return LazyList(entries)
+
+ @functools.cached_property
+ def _getter(self):
+ if isinstance(self._entries, list):
+ def get_entry(i):
+ try:
+ entry = self._entries[i]
+ except IndexError:
+ entry = self.MissingEntry
+ if not self.is_incomplete:
+ raise self.IndexError()
+ if entry is self.MissingEntry:
+ raise EntryNotInPlaylist(f'Entry {i} cannot be found')
+ return entry
+ else:
+ def get_entry(i):
+ try:
+ return type(self.ydl)._handle_extraction_exceptions(lambda _, i: self._entries[i])(self.ydl, i)
+ except (LazyList.IndexError, PagedList.IndexError):
+ raise self.IndexError()
+ return get_entry
+
+ def __getitem__(self, idx):
+ if isinstance(idx, int):
+ idx = slice(idx, idx)
+
+ # NB: PlaylistEntries[1:10] => (0, 1, ... 9)
+ step = 1 if idx.step is None else idx.step
+ if idx.start is None:
+ start = 0 if step > 0 else len(self) - 1
+ else:
+ start = idx.start - 1 if idx.start >= 0 else len(self) + idx.start
+
+ # NB: Do not call len(self) when idx == [:]
+ if idx.stop is None:
+ stop = 0 if step < 0 else float('inf')
+ else:
+ stop = idx.stop - 1 if idx.stop >= 0 else len(self) + idx.stop
+ stop += [-1, 1][step > 0]
+
+ for i in frange(start, stop, step):
+ if i < 0:
+ continue
+ try:
+ try:
+ entry = self._getter(i)
+ except self.IndexError:
+ self.is_exhausted = True
+ if step > 0:
+ break
+ continue
+ except IndexError:
+ if self.is_exhausted:
+ break
+ raise
+ yield i + 1, entry
+
+ def __len__(self):
+ return len(tuple(self[:]))
+
+ class IndexError(IndexError):
+ pass
+
+
def uppercase_escape(s):
unicode_escape = codecs.getdecoder('unicode_escape')
return re.sub(