diff options
| -rw-r--r-- | yt_dlp/YoutubeDL.py | 95 | ||||
| -rw-r--r-- | yt_dlp/extractor/common.py | 2 | ||||
| -rw-r--r-- | yt_dlp/minicurses.py | 78 | ||||
| -rw-r--r-- | yt_dlp/utils.py | 33 | 
4 files changed, 161 insertions, 47 deletions
| diff --git a/yt_dlp/YoutubeDL.py b/yt_dlp/YoutubeDL.py index 0ac1f1c61..a3fb3faeb 100644 --- a/yt_dlp/YoutubeDL.py +++ b/yt_dlp/YoutubeDL.py @@ -28,6 +28,7 @@ import traceback  import random  import unicodedata +from enum import Enum  from string import ascii_letters  from .compat import ( @@ -81,6 +82,7 @@ from .utils import (      make_HTTPS_handler,      MaxDownloadsReached,      network_exceptions, +    number_of_digits,      orderedSet,      OUTTMPL_TYPES,      PagedList, @@ -107,7 +109,6 @@ from .utils import (      strftime_or_none,      subtitles_filename,      supports_terminal_sequences, -    TERMINAL_SEQUENCES,      ThrottledDownload,      to_high_limit_path,      traverse_obj, @@ -123,6 +124,7 @@ from .utils import (      YoutubeDLRedirectHandler,  )  from .cache import Cache +from .minicurses import format_text  from .extractor import (      gen_extractor_classes,      get_info_extractor, @@ -524,7 +526,10 @@ class YoutubeDL(object):          windows_enable_vt_mode()          # FIXME: This will break if we ever print color to stdout -        self.params['no_color'] = self.params.get('no_color') or not supports_terminal_sequences(self._err_file) +        self._allow_colors = { +            'screen': not self.params.get('no_color') and supports_terminal_sequences(self._screen_file), +            'err': not self.params.get('no_color') and supports_terminal_sequences(self._err_file), +        }          if sys.version_info < (3, 6):              self.report_warning( @@ -532,10 +537,10 @@ class YoutubeDL(object):          if self.params.get('allow_unplayable_formats'):              self.report_warning( -                f'You have asked for {self._color_text("unplayable formats", "blue")} to be listed/downloaded. ' +                f'You have asked for {self._format_err("UNPLAYABLE", self.Styles.EMPHASIS)} formats to be listed/downloaded. '                  'This is a developer option intended for debugging. \n'                  '         If you experience any issues while using this option, ' -                f'{self._color_text("DO NOT", "red")} open a bug report') +                f'{self._format_err("DO NOT", self.Styles.ERROR)} open a bug report')          def check_deprecated(param, option, suggestion):              if self.params.get(param) is not None: @@ -554,6 +559,9 @@ class YoutubeDL(object):          for msg in self.params.get('_warnings', []):              self.report_warning(msg) +        if 'list-formats' in self.params.get('compat_opts', []): +            self.params['listformats_table'] = False +          if 'overwrites' not in self.params and self.params.get('nooverwrites') is not None:              # nooverwrites was unnecessarily changed to overwrites              # in 0c3d0f51778b153f65c21906031c2e091fcfb641 @@ -826,10 +834,32 @@ class YoutubeDL(object):          self.to_stdout(              message, skip_eol, quiet=self.params.get('quiet', False)) -    def _color_text(self, text, color): -        if self.params.get('no_color'): -            return text -        return f'{TERMINAL_SEQUENCES[color.upper()]}{text}{TERMINAL_SEQUENCES["RESET_STYLE"]}' +    class Styles(Enum): +        HEADERS = 'yellow' +        EMPHASIS = 'blue' +        ID = 'green' +        DELIM = 'blue' +        ERROR = 'red' +        WARNING = 'yellow' + +    def __format_text(self, out, text, f, fallback=None, *, test_encoding=False): +        assert out in ('screen', 'err') +        if test_encoding: +            original_text = text +            handle = self._screen_file if out == 'screen' else self._err_file +            encoding = self.params.get('encoding') or getattr(handle, 'encoding', 'ascii') +            text = text.encode(encoding, 'ignore').decode(encoding) +            if fallback is not None and text != original_text: +                text = fallback +        if isinstance(f, self.Styles): +            f = f._value_ +        return format_text(text, f) if self._allow_colors[out] else text if fallback is None else fallback + +    def _format_screen(self, *args, **kwargs): +        return self.__format_text('screen', *args, **kwargs) + +    def _format_err(self, *args, **kwargs): +        return self.__format_text('err', *args, **kwargs)      def report_warning(self, message, only_once=False):          ''' @@ -841,14 +871,14 @@ class YoutubeDL(object):          else:              if self.params.get('no_warnings'):                  return -            self.to_stderr(f'{self._color_text("WARNING:", "yellow")} {message}', only_once) +            self.to_stderr(f'{self._format_err("WARNING:", self.Styles.WARNING)} {message}', only_once)      def report_error(self, message, tb=None):          '''          Do the same as trouble, but prefixes the message with 'ERROR:', colored          in red if stderr is a tty file.          ''' -        self.trouble(f'{self._color_text("ERROR:", "red")} {message}', tb) +        self.trouble(f'{self._format_err("ERROR:", self.Styles.ERROR)} {message}', tb)      def write_debug(self, message, only_once=False):          '''Log debug message or Print message to stderr''' @@ -977,8 +1007,8 @@ class YoutubeDL(object):          # For fields playlist_index, playlist_autonumber and autonumber convert all occurrences          # of %(field)s to %(field)0Nd for backward compatibility          field_size_compat_map = { -            'playlist_index': len(str(info_dict.get('_last_playlist_index') or '')), -            'playlist_autonumber': len(str(info_dict.get('n_entries') or '')), +            'playlist_index': number_of_digits(info_dict.get('_last_playlist_index') or 0), +            'playlist_autonumber': number_of_digits(info_dict.get('n_entries') or 0),              'autonumber': self.params.get('autonumber_size') or 5,          } @@ -3167,38 +3197,46 @@ class YoutubeDL(object):              res += '~' + format_bytes(fdict['filesize_approx'])          return res +    def _list_format_headers(self, *headers): +        if self.params.get('listformats_table', True) is not False: +            return [self._format_screen(header, self.Styles.HEADERS) for header in headers] +        return headers +      def list_formats(self, info_dict):          formats = info_dict.get('formats', [info_dict]) -        new_format = ( -            'list-formats' not in self.params.get('compat_opts', []) -            and self.params.get('listformats_table', True) is not False) +        new_format = self.params.get('listformats_table', True) is not False          if new_format: +            tbr_digits = number_of_digits(max(f.get('tbr') or 0 for f in formats)) +            vbr_digits = number_of_digits(max(f.get('vbr') or 0 for f in formats)) +            abr_digits = number_of_digits(max(f.get('abr') or 0 for f in formats)) +            delim = self._format_screen('\u2502', self.Styles.DELIM, '|', test_encoding=True)              table = [                  [ -                    format_field(f, 'format_id'), +                    self._format_screen(format_field(f, 'format_id'), self.Styles.ID),                      format_field(f, 'ext'),                      self.format_resolution(f),                      format_field(f, 'fps', '%d'),                      format_field(f, 'dynamic_range', '%s', ignore=(None, 'SDR')).replace('HDR', ''), -                    '|', +                    delim,                      format_field(f, 'filesize', ' %s', func=format_bytes) + format_field(f, 'filesize_approx', '~%s', func=format_bytes), -                    format_field(f, 'tbr', '%4dk'), +                    format_field(f, 'tbr', f'%{tbr_digits}dk'),                      shorten_protocol_name(f.get('protocol', '').replace("native", "n")), -                    '|', +                    delim,                      format_field(f, 'vcodec', default='unknown').replace('none', ''), -                    format_field(f, 'vbr', '%4dk'), +                    format_field(f, 'vbr', f'%{vbr_digits}dk'),                      format_field(f, 'acodec', default='unknown').replace('none', ''), -                    format_field(f, 'abr', '%3dk'), +                    format_field(f, 'abr', f'%{abr_digits}dk'),                      format_field(f, 'asr', '%5dHz'),                      ', '.join(filter(None, ( -                        'UNSUPPORTED' if f.get('ext') in ('f4f', 'f4m') else '', +                        self._format_screen('UNSUPPORTED', 'light red') if f.get('ext') in ('f4f', 'f4m') else '',                          format_field(f, 'language', '[%s]'),                          format_field(f, 'format_note'),                          format_field(f, 'container', ignore=(None, f.get('ext'))),                      ))),                  ] for f in formats if f.get('preference') is None or f['preference'] >= -1000] -            header_line = ['ID', 'EXT', 'RESOLUTION', 'FPS', 'HDR', '|', ' FILESIZE', '  TBR', 'PROTO', -                           '|', 'VCODEC', '  VBR', 'ACODEC', ' ABR', ' ASR', 'MORE INFO'] +            header_line = self._list_format_headers( +                'ID', 'EXT', 'RESOLUTION', 'FPS', 'HDR', delim, ' FILESIZE', '  TBR', 'PROTO', +                delim, 'VCODEC', '  VBR', 'ACODEC', ' ABR', ' ASR', 'MORE INFO')          else:              table = [                  [ @@ -3213,7 +3251,10 @@ class YoutubeDL(object):          self.to_screen(              '[info] Available formats for %s:' % info_dict['id'])          self.to_stdout(render_table( -            header_line, table, delim=new_format, extraGap=(0 if new_format else 1), hideEmpty=new_format)) +            header_line, table, +            extraGap=(0 if new_format else 1), +            hideEmpty=new_format, +            delim=new_format and self._format_screen('\u2500', self.Styles.DELIM, '-', test_encoding=True)))      def list_thumbnails(self, info_dict):          thumbnails = list(info_dict.get('thumbnails')) @@ -3224,7 +3265,7 @@ class YoutubeDL(object):          self.to_screen(              '[info] Thumbnails for %s:' % info_dict['id'])          self.to_stdout(render_table( -            ['ID', 'width', 'height', 'URL'], +            self._list_format_headers('ID', 'Width', 'Height', 'URL'),              [[t['id'], t.get('width', 'unknown'), t.get('height', 'unknown'), t['url']] for t in thumbnails]))      def list_subtitles(self, video_id, subtitles, name='subtitles'): @@ -3241,7 +3282,7 @@ class YoutubeDL(object):              return [lang, ', '.join(names), ', '.join(exts)]          self.to_stdout(render_table( -            ['Language', 'Name', 'Formats'], +            self._list_format_headers('Language', 'Name', 'Formats'),              [_row(lang, formats) for lang, formats in subtitles.items()],              hideEmpty=True)) diff --git a/yt_dlp/extractor/common.py b/yt_dlp/extractor/common.py index 22b1ed69a..d1d1b46fc 100644 --- a/yt_dlp/extractor/common.py +++ b/yt_dlp/extractor/common.py @@ -1139,7 +1139,7 @@ class InfoExtractor(object):                  if mobj:                      break -        _name = self._downloader._color_text(name, 'blue') +        _name = self._downloader._format_err(name, self._downloader.Styles.EMPHASIS)          if mobj:              if group is None: diff --git a/yt_dlp/minicurses.py b/yt_dlp/minicurses.py index a6e159a14..38fdb5bc6 100644 --- a/yt_dlp/minicurses.py +++ b/yt_dlp/minicurses.py @@ -1,6 +1,72 @@  import functools  from threading import Lock -from .utils import supports_terminal_sequences, TERMINAL_SEQUENCES, write_string +from .utils import supports_terminal_sequences, write_string + + +CONTROL_SEQUENCES = { +    'DOWN': '\n', +    'UP': '\033[A', +    'ERASE_LINE': '\033[K', +    'RESET': '\033[0m', +} + + +_COLORS = { +    'BLACK': '0', +    'RED': '1', +    'GREEN': '2', +    'YELLOW': '3', +    'BLUE': '4', +    'PURPLE': '5', +    'CYAN': '6', +    'WHITE': '7', +} + + +_TEXT_STYLES = { +    'NORMAL': '0', +    'BOLD': '1', +    'UNDERLINED': '4', +} + + +def format_text(text, f): +    f = f.upper() +    tokens = f.strip().split() + +    bg_color = '' +    if 'ON' in tokens: +        if tokens[-1] == 'ON': +            raise SyntaxError(f'Empty background format specified in {f!r}') +        if tokens[-1] not in _COLORS: +            raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color') +        bg_color = f'4{_COLORS[tokens.pop()]}' +        if tokens[-1] == 'LIGHT': +            bg_color = f'0;10{bg_color[1:]}' +            tokens.pop() +        if tokens[-1] != 'ON': +            raise SyntaxError(f'Invalid format {f.split(" ON ", 1)[1]!r} in {f!r}') +        bg_color = f'\033[{bg_color}m' +        tokens.pop() + +    if not tokens: +        fg_color = '' +    elif tokens[-1] not in _COLORS: +        raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color') +    else: +        fg_color = f'3{_COLORS[tokens.pop()]}' +        if tokens and tokens[-1] == 'LIGHT': +            fg_color = f'9{fg_color[1:]}' +            tokens.pop() +        fg_style = tokens.pop() if tokens and tokens[-1] in _TEXT_STYLES else 'NORMAL' +        fg_color = f'\033[{_TEXT_STYLES[fg_style]};{fg_color}m' +        if tokens: +            raise SyntaxError(f'Invalid format {" ".join(tokens)!r} in {f!r}') + +    if fg_color or bg_color: +        return f'{fg_color}{bg_color}{text}{CONTROL_SEQUENCES["RESET"]}' +    else: +        return text  class MultilinePrinterBase: @@ -67,15 +133,15 @@ class MultilinePrinter(MultilinePrinterBase):          yield '\r'          distance = dest - current          if distance < 0: -            yield TERMINAL_SEQUENCES['UP'] * -distance +            yield CONTROL_SEQUENCES['UP'] * -distance          elif distance > 0: -            yield TERMINAL_SEQUENCES['DOWN'] * distance +            yield CONTROL_SEQUENCES['DOWN'] * distance          self._lastline = dest      @lock      def print_at_line(self, text, pos):          if self._HAVE_FULLCAP: -            self.write(*self._move_cursor(pos), TERMINAL_SEQUENCES['ERASE_LINE'], text) +            self.write(*self._move_cursor(pos), CONTROL_SEQUENCES['ERASE_LINE'], text)          text = self._add_line_number(text, pos)          textlen = len(text) @@ -103,7 +169,7 @@ class MultilinePrinter(MultilinePrinterBase):          if self._HAVE_FULLCAP:              self.write( -                *text, TERMINAL_SEQUENCES['ERASE_LINE'], -                f'{TERMINAL_SEQUENCES["UP"]}{TERMINAL_SEQUENCES["ERASE_LINE"]}' * self.maximum) +                *text, CONTROL_SEQUENCES['ERASE_LINE'], +                f'{CONTROL_SEQUENCES["UP"]}{CONTROL_SEQUENCES["ERASE_LINE"]}' * self.maximum)          else:              self.write(*text, ' ' * self._lastlength) diff --git a/yt_dlp/utils.py b/yt_dlp/utils.py index e05677d08..08f9a5dc9 100644 --- a/yt_dlp/utils.py +++ b/yt_dlp/utils.py @@ -4748,9 +4748,11 @@ def determine_protocol(info_dict):  def render_table(header_row, data, delim=False, extraGap=0, hideEmpty=False):      """ Render a list of rows, each as a list of values """ +    def width(string): +        return len(remove_terminal_sequences(string))      def get_max_lens(table): -        return [max(len(compat_str(v)) for v in col) for col in zip(*table)] +        return [max(width(str(v)) for v in col) for col in zip(*table)]      def filter_using_list(row, filterArray):          return [col for (take, col) in zip(filterArray, row) if take] @@ -4762,10 +4764,15 @@ def render_table(header_row, data, delim=False, extraGap=0, hideEmpty=False):      table = [header_row] + data      max_lens = get_max_lens(table) +    extraGap += 1      if delim: -        table = [header_row] + [['-' * ml for ml in max_lens]] + data -    format_str = ' '.join('%-' + compat_str(ml + extraGap) + 's' for ml in max_lens[:-1]) + ' %s' -    return '\n'.join(format_str % tuple(row) for row in table) +        table = [header_row] + [[delim * (ml + extraGap) for ml in max_lens]] + data +    max_lens[-1] = 0 +    for row in table: +        for pos, text in enumerate(map(str, row)): +            row[pos] = text + (' ' * (max_lens[pos] - width(text) + extraGap)) +    ret = '\n'.join(''.join(row) for row in table) +    return ret  def _match_one(filter_part, dct, incomplete): @@ -6498,12 +6505,12 @@ def supports_terminal_sequences(stream):          return False -TERMINAL_SEQUENCES = { -    'DOWN': '\n', -    'UP': '\x1b[A', -    'ERASE_LINE': '\x1b[K', -    'RED': '\033[0;31m', -    'YELLOW': '\033[0;33m', -    'BLUE': '\033[0;34m', -    'RESET_STYLE': '\033[0m', -} +_terminal_sequences_re = re.compile('\033\\[[^m]+m') + + +def remove_terminal_sequences(string): +    return _terminal_sequences_re.sub('', string) + + +def number_of_digits(number): +    return len('%d' % number) | 
