diff options
| author | Astounds <kirito@disroot.org> | 2026-04-25 01:02:17 -0500 |
|---|---|---|
| committer | Astounds <kirito@disroot.org> | 2026-04-25 01:02:17 -0500 |
| commit | 50ad959a8051fec95f26b573f9fe067bdf3fdf6a (patch) | |
| tree | 4d94f63cf9adb951d4200b0f2bb0c762d45297c4 /youtube | |
| parent | a0f315be51ef121618e73d5b450c8616c0d11d21 (diff) | |
| download | yt-local-50ad959a8051fec95f26b573f9fe067bdf3fdf6a.tar.lz yt-local-50ad959a8051fec95f26b573f9fe067bdf3fdf6a.tar.xz yt-local-50ad959a8051fec95f26b573f9fe067bdf3fdf6a.zip | |
refactor: replace string concatenations with f-strings
Diffstat (limited to 'youtube')
| -rw-r--r-- | youtube/__init__.py | 10 | ||||
| -rw-r--r-- | youtube/channel.py | 38 | ||||
| -rw-r--r-- | youtube/comments.py | 37 | ||||
| -rw-r--r-- | youtube/local_playlist.py | 4 | ||||
| -rw-r--r-- | youtube/playlist.py | 14 | ||||
| -rw-r--r-- | youtube/proto.py | 9 | ||||
| -rw-r--r-- | youtube/proto_debug.py | 25 | ||||
| -rw-r--r-- | youtube/search.py | 2 | ||||
| -rw-r--r-- | youtube/subscriptions.py | 41 | ||||
| -rw-r--r-- | youtube/util.py | 39 | ||||
| -rw-r--r-- | youtube/watch.py | 118 | ||||
| -rw-r--r-- | youtube/yt_data_extract/common.py | 6 | ||||
| -rw-r--r-- | youtube/yt_data_extract/everything_else.py | 2 | ||||
| -rw-r--r-- | youtube/yt_data_extract/watch_extraction.py | 22 |
14 files changed, 167 insertions, 200 deletions
diff --git a/youtube/__init__.py b/youtube/__init__.py index b0e7cd3..885cadc 100644 --- a/youtube/__init__.py +++ b/youtube/__init__.py @@ -76,7 +76,7 @@ theme_names = { @yt_app.context_processor def inject_theme_preference(): return { - 'theme_path': '/youtube.com/static/' + theme_names[settings.theme] + '.css', + 'theme_path': f'/youtube.com/static/{theme_names[settings.theme]}.css', 'settings': settings, # Detect version 'current_version': app_version()['version'], @@ -145,9 +145,9 @@ def error_page(e): ' exit node is overutilized. Try getting a new exit node by' ' using the New Identity button in the Tor Browser.') if fetch_err.error_message: - error_message += '\n\n' + fetch_err.error_message + error_message += f'\n\n{fetch_err.error_message}' if fetch_err.ip: - error_message += '\n\nExit node IP address: ' + fetch_err.ip + error_message += f'\n\nExit node IP address: {fetch_err.ip}' return flask.render_template('error.html', error_message=error_message, slim=slim), 502 elif error_code == '429': @@ -157,7 +157,7 @@ def error_page(e): '• Enable Tor routing in Settings for automatic IP rotation\n' '• Use a VPN to change your IP address') if fetch_err.ip: - error_message += '\n\nYour IP: ' + fetch_err.ip + error_message += f'\n\nYour IP: {fetch_err.ip}' return flask.render_template('error.html', error_message=error_message, slim=slim), 429 elif error_code == '502' and ('Failed to resolve' in str(fetch_err) or 'Failed to establish' in str(fetch_err)): @@ -179,7 +179,7 @@ def error_page(e): # Catch-all for any other FetchError (400, etc.) error_message = f'Error communicating with YouTube ({error_code}).' if fetch_err.error_message: - error_message += '\n\n' + fetch_err.error_message + error_message += f'\n\n{fetch_err.error_message}' return flask.render_template('error.html', error_message=error_message, slim=slim), 502 return flask.render_template('error.html', traceback=traceback.format_exc(), diff --git a/youtube/channel.py b/youtube/channel.py index 14a565e..8baf588 100644 --- a/youtube/channel.py +++ b/youtube/channel.py @@ -253,7 +253,7 @@ def get_channel_tab(channel_id, page="1", sort=3, tab='videos', view=1, # For now it seems to be constant for the API endpoint, not dependent # on the browsing session or channel key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8' - url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key + url = f'https://www.youtube.com/youtubei/v1/browse?key={key}' data = { 'context': { @@ -285,8 +285,8 @@ def get_number_of_videos_channel(channel_id): return 1000 # Uploads playlist - playlist_id = 'UU' + channel_id[2:] - url = 'https://m.youtube.com/playlist?list=' + playlist_id + '&pbj=1' + playlist_id = f'UU{channel_id[2:]}' + url = f'https://m.youtube.com/playlist?list={playlist_id}&pbj=1' try: response = util.fetch_url(url, headers_mobile, @@ -328,7 +328,7 @@ def get_channel_id(base_url): # method that gives the smallest possible response at ~4 kb # needs to be as fast as possible base_url = base_url.replace('https://www', 'https://m') # avoid redirect - response = util.fetch_url(base_url + '/about?pbj=1', headers_mobile, + response = util.fetch_url(f'{base_url}/about?pbj=1', headers_mobile, debug_name='get_channel_id', report_text='Got channel id').decode('utf-8') match = channel_id_re.search(response) if match: @@ -372,7 +372,7 @@ def get_channel_search_json(channel_id, query, page): ctoken = base64.urlsafe_b64encode(proto.nested(80226972, ctoken)).decode('ascii') key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8' - url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key + url = f'https://www.youtube.com/youtubei/v1/browse?key={key}' data = { 'context': { @@ -414,18 +414,18 @@ def post_process_channel_info(info): def get_channel_first_page(base_url=None, tab='videos', channel_id=None, sort=None): if channel_id: - base_url = 'https://www.youtube.com/channel/' + channel_id + base_url = f'https://www.youtube.com/channel/{channel_id}' # Build URL with sort parameter # YouTube URL sort params: p=popular, dd=newest, lad=newest no shorts # Note: 'da' (oldest) was removed by YouTube in January 2026 - url = base_url + '/' + tab + '?pbj=1&view=0' + url = f'{base_url}/{tab}?pbj=1&view=0' if sort: # Map sort values to YouTube's URL parameter values sort_map = {'3': 'dd', '4': 'lad'} - url += '&sort=' + sort_map.get(sort, 'dd') + url += f'&sort={sort_map.get(sort, "dd")}' - return util.fetch_url(url, headers_desktop, debug_name='gen_channel_' + tab) + return util.fetch_url(url, headers_desktop, debug_name=f'gen_channel_{tab}') playlist_sort_codes = {'2': "da", '3': "dd", '4': "lad"} @@ -462,7 +462,7 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None): if page_number == 1: tasks = ( gevent.spawn(playlist.playlist_first_page, - 'UU' + channel_id[2:], + f'UU{channel_id[2:]}', report_text='Retrieved channel videos'), gevent.spawn(get_metadata, channel_id), ) @@ -477,11 +477,11 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None): set_cached_number_of_videos(channel_id, number_of_videos) else: tasks = ( - gevent.spawn(playlist.get_videos, 'UU' + channel_id[2:], + gevent.spawn(playlist.get_videos, f'UU{channel_id[2:]}', page_number, include_shorts=True), gevent.spawn(get_metadata, channel_id), gevent.spawn(get_number_of_videos_channel, channel_id), - gevent.spawn(playlist.playlist_first_page, 'UU' + channel_id[2:], + gevent.spawn(playlist.playlist_first_page, f'UU{channel_id[2:]}', report_text='Retrieved channel video count'), ) gevent.joinall(tasks) @@ -567,10 +567,10 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None): elif tab == 'search' and channel_id: polymer_json = get_channel_search_json(channel_id, query, page_number) elif tab == 'search': - url = base_url + '/search?pbj=1&query=' + urllib.parse.quote(query, safe='') + url = f'{base_url}/search?pbj=1&query={urllib.parse.quote(query, safe="")}' polymer_json = util.fetch_url(url, headers_desktop, debug_name='gen_channel_search') elif tab != 'videos': - flask.abort(404, 'Unknown channel tab: ' + tab) + flask.abort(404, f'Unknown channel tab: {tab}') if polymer_json is not None and info is None: info = yt_data_extract.extract_channel_info( @@ -583,7 +583,7 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None): return flask.render_template('error.html', error_message=info['error']) if channel_id: - info['channel_url'] = 'https://www.youtube.com/channel/' + channel_id + info['channel_url'] = f'https://www.youtube.com/channel/{channel_id}' info['channel_id'] = channel_id else: channel_id = info['channel_id'] @@ -663,22 +663,22 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None): @yt_app.route('/channel/<channel_id>/') @yt_app.route('/channel/<channel_id>/<tab>') def get_channel_page(channel_id, tab='videos'): - return get_channel_page_general_url('https://www.youtube.com/channel/' + channel_id, tab, request, channel_id) + return get_channel_page_general_url(f'https://www.youtube.com/channel/{channel_id}', tab, request, channel_id) @yt_app.route('/user/<username>/') @yt_app.route('/user/<username>/<tab>') def get_user_page(username, tab='videos'): - return get_channel_page_general_url('https://www.youtube.com/user/' + username, tab, request) + return get_channel_page_general_url(f'https://www.youtube.com/user/{username}', tab, request) @yt_app.route('/c/<custom>/') @yt_app.route('/c/<custom>/<tab>') def get_custom_c_page(custom, tab='videos'): - return get_channel_page_general_url('https://www.youtube.com/c/' + custom, tab, request) + return get_channel_page_general_url(f'https://www.youtube.com/c/{custom}', tab, request) @yt_app.route('/<custom>') @yt_app.route('/<custom>/<tab>') def get_toplevel_custom_page(custom, tab='videos'): - return get_channel_page_general_url('https://www.youtube.com/' + custom, tab, request) + return get_channel_page_general_url(f'https://www.youtube.com/{custom}', tab, request) diff --git a/youtube/comments.py b/youtube/comments.py index 8d03f22..c4e30dd 100644 --- a/youtube/comments.py +++ b/youtube/comments.py @@ -104,20 +104,19 @@ def post_process_comments_info(comments_info): comment['replies_url'] = None comment['replies_url'] = concat_or_none( util.URL_ORIGIN, - '/comments?replies=1&ctoken=' + ctoken) + f'/comments?replies=1&ctoken={ctoken}') if reply_count == 0: comment['view_replies_text'] = 'Reply' elif reply_count == 1: comment['view_replies_text'] = '1 reply' else: - comment['view_replies_text'] = str(reply_count) + ' replies' + comment['view_replies_text'] = f'{reply_count} replies' if comment['approx_like_count'] == '1': comment['likes_text'] = '1 like' else: - comment['likes_text'] = (str(comment['approx_like_count']) - + ' likes') + comment['likes_text'] = f"{comment['approx_like_count']} likes" comments_info['include_avatars'] = settings.enable_comment_avatars if comments_info['ctoken']: @@ -163,14 +162,13 @@ def video_comments(video_id, sort=0, offset=0, lc='', secret_key=''): comments_info = {'error': None} try: other_sort_url = ( - util.URL_ORIGIN + '/comments?ctoken=' - + make_comment_ctoken(video_id, sort=1 - sort, lc=lc) + f"{util.URL_ORIGIN}/comments?ctoken=" + f"{make_comment_ctoken(video_id, sort=1 - sort, lc=lc)}" ) - other_sort_text = 'Sort by ' + ('newest' if sort == 0 else 'top') + other_sort_text = f'Sort by {"newest" if sort == 0 else "top"}' - this_sort_url = (util.URL_ORIGIN - + '/comments?ctoken=' - + make_comment_ctoken(video_id, sort=sort, lc=lc)) + this_sort_url = (f"{util.URL_ORIGIN}/comments?ctoken=" + f"{make_comment_ctoken(video_id, sort=sort, lc=lc)}") comments_info['comment_links'] = [ (other_sort_text, other_sort_url), @@ -188,17 +186,16 @@ def video_comments(video_id, sort=0, offset=0, lc='', secret_key=''): if e.code == '429' and settings.route_tor: comments_info['error'] = 'Error: YouTube blocked the request because the Tor exit node is overutilized.' if e.error_message: - comments_info['error'] += '\n\n' + e.error_message - comments_info['error'] += '\n\nExit node IP address: %s' % e.ip + comments_info['error'] += f'\n\n{e.error_message}' + comments_info['error'] += f'\n\nExit node IP address: {e.ip}' else: - comments_info['error'] = 'YouTube blocked the request. Error: %s' % str(e) + comments_info['error'] = f'YouTube blocked the request. Error: {e}' except Exception as e: - comments_info['error'] = 'YouTube blocked the request. Error: %s' % str(e) + comments_info['error'] = f'YouTube blocked the request. Error: {e}' if comments_info.get('error'): - print('Error retrieving comments for ' + str(video_id) + ':\n' + - comments_info['error']) + print(f'Error retrieving comments for {video_id}:\n{comments_info["error"]}') return comments_info @@ -218,12 +215,10 @@ def get_comments_page(): other_sort_url = None else: other_sort_url = ( - util.URL_ORIGIN - + '/comments?ctoken=' - + make_comment_ctoken(comments_info['video_id'], - sort=1-comments_info['sort']) + f'{util.URL_ORIGIN}/comments?ctoken=' + f'{make_comment_ctoken(comments_info["video_id"], sort=1-comments_info["sort"])}' ) - other_sort_text = 'Sort by ' + ('newest' if comments_info['sort'] == 0 else 'top') + other_sort_text = f'Sort by {"newest" if comments_info["sort"] == 0 else "top"}' comments_info['comment_links'] = [(other_sort_text, other_sort_url)] return flask.render_template( diff --git a/youtube/local_playlist.py b/youtube/local_playlist.py index e9b0b20..44207d2 100644 --- a/youtube/local_playlist.py +++ b/youtube/local_playlist.py @@ -92,9 +92,7 @@ def add_extra_info_to_videos(videos, playlist_name): util.add_extra_html_info(video) if video['id'] + '.jpg' in thumbnails: video['thumbnail'] = ( - '/https://youtube.com/data/playlist_thumbnails/' - + playlist_name - + '/' + video['id'] + '.jpg') + f'/https://youtube.com/data/playlist_thumbnails/{playlist_name}/{video["id"]}.jpg') else: video['thumbnail'] = util.get_thumbnail_url(video['id']) missing_thumbnails.append(video['id']) diff --git a/youtube/playlist.py b/youtube/playlist.py index 3b784ba..e1e1342 100644 --- a/youtube/playlist.py +++ b/youtube/playlist.py @@ -20,7 +20,7 @@ def playlist_ctoken(playlist_id, offset, include_shorts=True): continuation_info = proto.string(3, proto.percent_b64encode(offset)) - playlist_id = proto.string(2, 'VL' + playlist_id) + playlist_id = proto.string(2, f'VL{playlist_id}') pointless_nest = proto.string(80226972, playlist_id + continuation_info) return base64.urlsafe_b64encode(pointless_nest).decode('ascii') @@ -30,7 +30,7 @@ def playlist_first_page(playlist_id, report_text="Retrieved playlist", use_mobile=False): # Use innertube API (pbj=1 no longer works for many playlists) key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8' - url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key + url = f'https://www.youtube.com/youtubei/v1/browse?key={key}' data = { 'context': { @@ -41,7 +41,7 @@ def playlist_first_page(playlist_id, report_text="Retrieved playlist", 'clientVersion': '2.20240327.00.00', }, }, - 'browseId': 'VL' + playlist_id, + 'browseId': f'VL{playlist_id}', } content_type_header = (('Content-Type', 'application/json'),) @@ -58,7 +58,7 @@ def get_videos(playlist_id, page, include_shorts=True, use_mobile=False, page_size = 100 key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8' - url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key + url = f'https://www.youtube.com/youtubei/v1/browse?key={key}' ctoken = playlist_ctoken(playlist_id, (int(page)-1)*page_size, include_shorts=include_shorts) @@ -97,7 +97,7 @@ def get_playlist_page(): if playlist_id.startswith('RD'): first_video_id = playlist_id[2:] # video ID after 'RD' prefix return flask.redirect( - util.URL_ORIGIN + '/watch?v=' + first_video_id + '&list=' + playlist_id, + f'{util.URL_ORIGIN}/watch?v={first_video_id}&list={playlist_id}', 302 ) @@ -132,9 +132,9 @@ def get_playlist_page(): if 'id' in item and not item.get('thumbnail'): item['thumbnail'] = f"{settings.img_prefix}https://i.ytimg.com/vi/{item['id']}/hqdefault.jpg" - item['url'] += '&list=' + playlist_id + item['url'] += f'&list={playlist_id}' if item['index']: - item['url'] += '&index=' + str(item['index']) + item['url'] += f'&index={item["index"]}' video_count = yt_data_extract.deep_get(info, 'metadata', 'video_count') if video_count is None: diff --git a/youtube/proto.py b/youtube/proto.py index db83a06..72a8a94 100644 --- a/youtube/proto.py +++ b/youtube/proto.py @@ -76,7 +76,7 @@ def read_varint(data): except IndexError: if i == 0: raise EOFError() - raise Exception('Unterminated varint starting at ' + str(data.tell() - i)) + raise Exception(f'Unterminated varint starting at {data.tell() - i}') result |= (byte & 127) << 7*i if not byte & 128: break @@ -118,7 +118,7 @@ def read_protobuf(data): elif wire_type == 5: value = data.read(4) else: - raise Exception("Unknown wire type: " + str(wire_type) + " at position " + str(data.tell())) + raise Exception(f"Unknown wire type: {wire_type} at position {data.tell()}") yield (wire_type, field_number, value) @@ -170,8 +170,7 @@ def _make_protobuf(data): elif field[0] == 2: result += string(field[1], _make_protobuf(field[2])) else: - raise NotImplementedError('Wire type ' + str(field[0]) - + ' not implemented') + raise NotImplementedError(f'Wire type {field[0]} not implemented') return result return data @@ -218,4 +217,4 @@ def b64_to_bytes(data): if isinstance(data, bytes): data = data.decode('ascii') data = data.replace("%3D", "=") - return base64.urlsafe_b64decode(data + "="*((4 - len(data) % 4) % 4)) + return base64.urlsafe_b64decode(f'{data}={"=" * ((4 - len(data) % 4) % 4)}') diff --git a/youtube/proto_debug.py b/youtube/proto_debug.py index 927b385..bb8da7c 100644 --- a/youtube/proto_debug.py +++ b/youtube/proto_debug.py @@ -179,7 +179,7 @@ def read_varint(data): except IndexError: if i == 0: raise EOFError() - raise Exception('Unterminated varint starting at ' + str(data.tell() - i)) + raise Exception(f'Unterminated varint starting at {data.tell() - i}') result |= (byte & 127) << 7*i if not byte & 128: break @@ -235,8 +235,7 @@ def _make_protobuf(data): elif field[0] == 2: result += string(field[1], _make_protobuf(field[2])) else: - raise NotImplementedError('Wire type ' + str(field[0]) - + ' not implemented') + raise NotImplementedError(f'Wire type {field[0]} not implemented') return result return data @@ -286,7 +285,7 @@ def b64_to_bytes(data): if isinstance(data, bytes): data = data.decode('ascii') data = data.replace("%3D", "=") - return base64.urlsafe_b64decode(data + "="*((4 - len(data) % 4) % 4)) + return base64.urlsafe_b64decode(f'{data}={"=" * ((4 - len(data) % 4) % 4)}') # -------------------------------------------------------------------- @@ -344,7 +343,7 @@ fromhex = bytes.fromhex def aligned_ascii(data): - return ' '.join(' ' + chr(n) if n in range(32, 128) else ' _' for n in data) + return ' '.join(f' {chr(n)}' if n in range(32, 128) else ' _' for n in data) def parse_protobuf(data, mutable=False, spec=()): @@ -372,7 +371,7 @@ def parse_protobuf(data, mutable=False, spec=()): elif wire_type == 5: value = data.read(4) else: - raise Exception("Unknown wire type: " + str(wire_type) + ", Tag: " + bytes_to_hex(varint_encode(tag)) + ", at position " + str(data.tell())) + raise Exception(f"Unknown wire type: {wire_type}, Tag: {bytes_to_hex(varint_encode(tag))}, at position {data.tell()}") if mutable: yield [wire_type, field_number, value] else: @@ -453,7 +452,7 @@ def b32decode(s, casefold=False, map01=None): if map01 is not None: map01 = _bytes_from_decode_data(map01) assert len(map01) == 1, repr(map01) - s = s.translate(bytes.maketrans(b'01', b'O' + map01)) + s = s.translate(bytes.maketrans(b'01', f'O{map01.decode("ascii")}')) if casefold: s = s.upper() # Strip off pad characters from the right. We need to count the pad @@ -494,7 +493,7 @@ def b32decode(s, casefold=False, map01=None): def dec32(data): if isinstance(data, bytes): data = data.decode('ascii') - return b32decode(data + "="*((8 - len(data)%8)%8)) + return b32decode(f'{data}={"=" * ((8 - len(data)%8)%8)}') _patterns = [ @@ -563,9 +562,7 @@ def _pp(obj, indent): # not my best work if len(obj) == 3: # (wire_type, field_number, data) return obj.__repr__() else: # (base64, [...]) - return ('(' + obj[0].__repr__() + ',\n' - + indent_lines(_pp(obj[1], indent), indent) + '\n' - + ')') + return f"({obj[0].__repr__()},\n{indent_lines(_pp(obj[1], indent), indent)}\n)" elif isinstance(obj, list): # [wire_type, field_number, data] if (len(obj) == 3 @@ -577,13 +574,11 @@ def _pp(obj, indent): # not my best work elif (len(obj) == 3 and not any(isinstance(x, (list, tuple)) for x in obj[0:2]) ): - return ('[' + obj[0].__repr__() + ', ' + obj[1].__repr__() + ',\n' - + indent_lines(_pp(obj[2], indent), indent) + '\n' - + ']') + return f"[{obj[0].__repr__()}, {obj[1].__repr__()},\n{indent_lines(_pp(obj[2], indent), indent)}\n]" else: s = '[\n' for x in obj: - s += indent_lines(_pp(x, indent), indent) + ',\n' + s += f"{indent_lines(_pp(x, indent), indent)},\n" s += ']' return s else: diff --git a/youtube/search.py b/youtube/search.py index 6e62e28..7573595 100644 --- a/youtube/search.py +++ b/youtube/search.py @@ -51,7 +51,7 @@ def get_search_json(query, page, autocorrect, sort, filters): 'X-YouTube-Client-Name': '1', 'X-YouTube-Client-Version': '2.20180418', } - url += "&pbj=1&sp=" + page_number_to_sp_parameter(page, autocorrect, sort, filters).replace("=", "%3D") + url += f"&pbj=1&sp={page_number_to_sp_parameter(page, autocorrect, sort, filters).replace('=', '%3D')}" content = util.fetch_url(url, headers=headers, report_text="Got search results", debug_name='search_results') info = json.loads(content) return info diff --git a/youtube/subscriptions.py b/youtube/subscriptions.py index dafea58..7d3efab 100644 --- a/youtube/subscriptions.py +++ b/youtube/subscriptions.py @@ -126,7 +126,7 @@ def delete_thumbnails(to_delete): os.remove(os.path.join(thumbnails_directory, thumbnail)) existing_thumbnails.remove(video_id) except Exception: - print('Failed to delete thumbnail: ' + thumbnail) + print(f'Failed to delete thumbnail: {thumbnail}') traceback.print_exc() @@ -184,7 +184,7 @@ def _get_videos(cursor, number_per_page, offset, tag=None): 'time_published': exact_timestamp(db_video[3]) if db_video[4] else posix_to_dumbed_down(db_video[3]), 'author': db_video[5], 'author_id': db_video[6], - 'author_url': '/https://www.youtube.com/channel/' + db_video[6], + 'author_url': f'/https://www.youtube.com/channel/{db_video[6]}', }) return videos, pseudo_number_of_videos @@ -304,9 +304,9 @@ def posix_to_dumbed_down(posix_time): if delta >= unit_time: quantifier = round(delta/unit_time) if quantifier == 1: - return '1 ' + unit_name + ' ago' + return f'1 {unit_name} ago' else: - return str(quantifier) + ' ' + unit_name + 's ago' + return f'{quantifier} {unit_name}s ago' else: raise Exception() @@ -363,7 +363,7 @@ def autocheck_dispatcher(): time_until_earliest_job = earliest_job['next_check_time'] - time.time() if time_until_earliest_job <= -5: # should not happen unless we're running extremely slow - print('ERROR: autocheck_dispatcher got job scheduled in the past, skipping and rescheduling: ' + earliest_job['channel_id'] + ', ' + earliest_job['channel_name'] + ', ' + str(earliest_job['next_check_time'])) + print(f'ERROR: autocheck_dispatcher got job scheduled in the past, skipping and rescheduling: {earliest_job["channel_id"]}, {earliest_job["channel_name"]}, {earliest_job["next_check_time"]}') next_check_time = time.time() + 3600*secrets.randbelow(60)/60 with_open_db(_schedule_checking, earliest_job['channel_id'], next_check_time) autocheck_jobs[earliest_job_index]['next_check_time'] = next_check_time @@ -451,7 +451,7 @@ def check_channels_if_necessary(channel_ids): def _get_atoma_feed(channel_id): - url = 'https://www.youtube.com/feeds/videos.xml?channel_id=' + channel_id + url = f'https://www.youtube.com/feeds/videos.xml?channel_id={channel_id}' try: return util.fetch_url(url).decode('utf-8') except util.FetchError as e: @@ -485,16 +485,15 @@ def _get_channel_videos_first_page(channel_id, channel_status_name): return channel_info except util.FetchError as e: if e.code == '429' and settings.route_tor: - error_message = ('Error checking channel ' + channel_status_name - + ': YouTube blocked the request because the' - + ' Tor exit node is overutilized. Try getting a new exit node' - + ' by using the New Identity button in the Tor Browser.') + error_message = (f'Error checking channel {channel_status_name}: ' + f'YouTube blocked the request because the Tor exit node is overutilized. ' + f'Try getting a new exit node by using the New Identity button in the Tor Browser.') if e.ip: - error_message += ' Exit node IP address: ' + e.ip + error_message += f' Exit node IP address: {e.ip}' print(error_message) return None elif e.code == '502': - print('Error checking channel', channel_status_name + ':', str(e)) + print(f'Error checking channel {channel_status_name}: {e}') return None raise @@ -505,7 +504,7 @@ def _get_upstream_videos(channel_id): except KeyError: channel_status_name = channel_id - print("Checking channel: " + channel_status_name) + print(f"Checking channel: {channel_status_name}") tasks = ( # channel page, need for video duration @@ -550,15 +549,15 @@ def _get_upstream_videos(channel_id): times_published[video_id_element.text] = time_published except ValueError: - print('Failed to read atoma feed for ' + channel_status_name) + print(f'Failed to read atoma feed for {channel_status_name}') traceback.print_exc() except defusedxml.ElementTree.ParseError: - print('Failed to read atoma feed for ' + channel_status_name) + print(f'Failed to read atoma feed for {channel_status_name}') if channel_info is None: # there was an error return if channel_info['error']: - print('Error checking channel ' + channel_status_name + ': ' + channel_info['error']) + print(f'Error checking channel {channel_status_name}: {channel_info["error"]}') return videos = channel_info['items'] @@ -1023,7 +1022,7 @@ def get_subscriptions_page(): tag = request.args.get('tag', None) videos, number_of_videos_in_db = _get_videos(cursor, 60, (page - 1)*60, tag) for video in videos: - video['thumbnail'] = util.URL_ORIGIN + '/data/subscription_thumbnails/' + video['id'] + '.jpg' + video['thumbnail'] = f'{util.URL_ORIGIN}/data/subscription_thumbnails/{video["id"]}.jpg' video['type'] = 'video' video['item_size'] = 'small' util.add_extra_html_info(video) @@ -1033,7 +1032,7 @@ def get_subscriptions_page(): subscription_list = [] for channel_name, channel_id, muted in _get_subscribed_channels(cursor): subscription_list.append({ - 'channel_url': util.URL_ORIGIN + '/channel/' + channel_id, + 'channel_url': f'{util.URL_ORIGIN}/channel/{channel_id}', 'channel_name': channel_name, 'channel_id': channel_id, 'muted': muted, @@ -1109,17 +1108,17 @@ def serve_subscription_thumbnail(thumbnail): for quality in ('hq720.jpg', 'sddefault.jpg', 'hqdefault.jpg'): url = f"https://i.ytimg.com/vi/{video_id}/{quality}" try: - image = util.fetch_url(url, report_text="Saved thumbnail: " + video_id) + image = util.fetch_url(url, report_text=f"Saved thumbnail: {video_id}") break except util.FetchError as e: if '404' in str(e): continue - print("Failed to download thumbnail for " + video_id + ": " + str(e)) + print(f"Failed to download thumbnail for {video_id}: {e}") flask.abort(500) except urllib.error.HTTPError as e: if e.code == 404: continue - print("Failed to download thumbnail for " + video_id + ": " + str(e)) + print(f"Failed to download thumbnail for {video_id}: {e}") flask.abort(e.code) if image is None: diff --git a/youtube/util.py b/youtube/util.py index 5e60d1c..7901a89 100644 --- a/youtube/util.py +++ b/youtube/util.py @@ -72,7 +72,7 @@ class TorManager: def __init__(self): self.old_tor_connection_pool = None self.tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager( - 'socks5h://127.0.0.1:' + str(settings.tor_port) + '/', + f'socks5h://127.0.0.1:{settings.tor_port}/', cert_reqs='CERT_REQUIRED') self.tor_pool_refresh_time = time.monotonic() settings.add_setting_changed_hook( @@ -92,7 +92,7 @@ class TorManager: self.old_tor_connection_pool = self.tor_connection_pool self.tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager( - 'socks5h://127.0.0.1:' + str(settings.tor_port) + '/', + f'socks5h://127.0.0.1:{settings.tor_port}/', cert_reqs='CERT_REQUIRED') self.tor_pool_refresh_time = time.monotonic() @@ -198,9 +198,9 @@ class HTTPAsymmetricCookieProcessor(urllib.request.BaseHandler): class FetchError(Exception): def __init__(self, code, reason='', ip=None, error_message=None): if error_message: - string = code + ' ' + reason + ': ' + error_message + string = f"{code} {reason}: {error_message}" else: - string = 'HTTP error during request: ' + code + ' ' + reason + string = f"HTTP error during request: {code} {reason}" Exception.__init__(self, string) self.code = code self.reason = reason @@ -294,14 +294,12 @@ def fetch_url_response(url, headers=(), timeout=15, data=None, exception_cause = e.__context__.__context__ if (isinstance(exception_cause, socks.ProxyConnectionError) and settings.route_tor): - msg = ('Failed to connect to Tor. Check that Tor is open and ' - 'that your internet connection is working.\n\n' - + str(e)) + msg = f'Failed to connect to Tor. Check that Tor is open and that your internet connection is working.\n\n{e}' raise FetchError('502', reason='Bad Gateway', error_message=msg) elif isinstance(e.__context__, urllib3.exceptions.NewConnectionError): - msg = 'Failed to establish a connection.\n\n' + str(e) + msg = f'Failed to establish a connection.\n\n{e}' raise FetchError( '502', reason='Bad Gateway', error_message=msg) @@ -391,7 +389,7 @@ def fetch_url(url, headers=(), timeout=15, report_text=None, data=None, if error: raise FetchError( '429', reason=response.reason, ip=ip, - error_message='Automatic circuit change: ' + error) + error_message=f'Automatic circuit change: {error}') continue # retry with new identity # Check for client errors (400, 404) - don't retry these @@ -467,10 +465,7 @@ def head(url, use_tor=False, report_text=None, max_redirects=10): headers = {'User-Agent': 'Python-urllib'} response = pool.request('HEAD', url, headers=headers, retries=retries) if report_text: - print( - report_text, - ' Latency:', - round(time.monotonic() - start_time, 3)) + print(f'{report_text} Latency: {round(time.monotonic() - start_time, 3)}') return response mobile_user_agent = 'Mozilla/5.0 (Linux; Android 7.0; Redmi Note 4 Build/NRD90M) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Mobile Safari/537.36' @@ -544,16 +539,16 @@ def download_thumbnail(save_directory, video_id): for quality in ('hq720.jpg', 'sddefault.jpg', 'hqdefault.jpg'): url = f'https://i.ytimg.com/vi/{video_id}/{quality}' try: - thumbnail = fetch_url(url, report_text='Saved thumbnail: ' + video_id) + thumbnail = fetch_url(url, report_text=f'Saved thumbnail: {video_id}') except FetchError as e: if '404' in str(e): continue - print('Failed to download thumbnail for ' + video_id + ': ' + str(e)) + print(f'Failed to download thumbnail for {video_id}: {e}') return False except urllib.error.HTTPError as e: if e.code == 404: continue - print('Failed to download thumbnail for ' + video_id + ': ' + str(e)) + print(f'Failed to download thumbnail for {video_id}: {e}') return False try: with open(save_location, 'wb') as f: @@ -563,7 +558,7 @@ def download_thumbnail(save_directory, video_id): with open(save_location, 'wb') as f: f.write(thumbnail) return True - print('No thumbnail available for ' + video_id) + print(f'No thumbnail available for {video_id}') return False @@ -698,7 +693,7 @@ def prefix_urls(item): def add_extra_html_info(item): if item['type'] == 'video': - item['url'] = (URL_ORIGIN + '/watch?v=' + item['id']) if item.get('id') else None + item['url'] = f'{URL_ORIGIN}/watch?v={item["id"]}' if item.get('id') else None video_info = {} for key in ('id', 'title', 'author', 'duration', 'author_id'): @@ -721,7 +716,7 @@ def add_extra_html_info(item): item['url'] = concat_or_none(URL_ORIGIN, "/channel/", item['id']) if item.get('author_id') and 'author_url' not in item: - item['author_url'] = URL_ORIGIN + '/channel/' + item['author_id'] + item['author_url'] = f'{URL_ORIGIN}/channel/{item["author_id"]}' def check_gevent_exceptions(*tasks): @@ -967,7 +962,7 @@ def call_youtube_api(client, api, data): user_agent = context['client'].get('userAgent') or mobile_user_agent visitor_data = get_visitor_data() - url = 'https://' + host + '/youtubei/v1/' + api + '?key=' + key + url = f'https://{host}/youtubei/v1/{api}?key={key}' if visitor_data: context['client'].update({'visitorData': visitor_data}) data['context'] = context @@ -978,8 +973,8 @@ def call_youtube_api(client, api, data): headers = ( *headers, ('X-Goog-Visitor-Id', visitor_data )) response = fetch_url( url, data=data, headers=headers, - debug_name='youtubei_' + api + '_' + client, - report_text='Fetched ' + client + ' youtubei ' + api + debug_name=f'youtubei_{api}_{client}', + report_text=f'Fetched {client} youtubei {api}' ).decode('utf-8') return response diff --git a/youtube/watch.py b/youtube/watch.py index 7f87215..9d1e442 100644 --- a/youtube/watch.py +++ b/youtube/watch.py @@ -53,7 +53,7 @@ def get_video_sources(info, target_resolution): if fmt['acodec'] and fmt['vcodec']: if fmt.get('audio_track_is_default', True) is False: continue - source = {'type': 'video/' + fmt['ext'], + source = {'type': f"video/{fmt['ext']}", 'quality_string': short_video_quality_string(fmt)} source['quality_string'] += ' (integrated)' source.update(fmt) @@ -70,10 +70,10 @@ def get_video_sources(info, target_resolution): if fmt['acodec'] and not fmt['vcodec'] and (fmt['audio_bitrate'] or fmt['bitrate']): if fmt['bitrate']: fmt['audio_bitrate'] = int(fmt['bitrate']/1000) - source = {'type': 'audio/' + fmt['ext'], + source = {'type': f"audio/{fmt['ext']}", 'quality_string': audio_quality_string(fmt)} source.update(fmt) - source['mime_codec'] = source['type'] + '; codecs="' + source['acodec'] + '"' + source['mime_codec'] = f"{source['type']}; codecs=\"{source['acodec']}\"" tid = fmt.get('audio_track_id') or 'default' if tid not in audio_by_track: audio_by_track[tid] = { @@ -85,11 +85,11 @@ def get_video_sources(info, target_resolution): elif all(fmt[attr] for attr in ('vcodec', 'quality', 'width', 'fps', 'file_size')): if codec_name(fmt['vcodec']) == 'unknown': continue - source = {'type': 'video/' + fmt['ext'], + source = {'type': f"video/{fmt['ext']}", 'quality_string': short_video_quality_string(fmt)} source.update(fmt) - source['mime_codec'] = source['type'] + '; codecs="' + source['vcodec'] + '"' - quality = str(fmt['quality']) + 'p' + str(fmt['fps']) + source['mime_codec'] = f"{source['type']}; codecs=\"{source['vcodec']}\"" + quality = f"{fmt['quality']}p{fmt['fps']}" video_only_sources.setdefault(quality, []).append(source) audio_tracks = [] @@ -141,7 +141,7 @@ def get_video_sources(info, target_resolution): def video_rank(src): ''' Sort by settings preference. Use file size as tiebreaker ''' - setting_name = 'codec_rank_' + codec_name(src['vcodec']) + setting_name = f'codec_rank_{codec_name(src["vcodec"])}' return (settings.current_settings_dict[setting_name], src['file_size']) pair_info['videos'].sort(key=video_rank) @@ -183,7 +183,7 @@ def make_caption_src(info, lang, auto=False, trans_lang=None): if auto: label += ' (Automatic)' if trans_lang: - label += ' -> ' + trans_lang + label += f' -> {trans_lang}' # Try to use Android caption URL directly (no PO Token needed) caption_url = None @@ -204,7 +204,7 @@ def make_caption_src(info, lang, auto=False, trans_lang=None): else: caption_url += '&fmt=vtt' if trans_lang: - caption_url += '&tlang=' + trans_lang + caption_url += f'&tlang={trans_lang}' url = util.prefix_url(caption_url) else: # Fallback to old method @@ -357,10 +357,10 @@ def decrypt_signatures(info, video_id): player_name = info['player_name'] if player_name in decrypt_cache: - print('Using cached decryption function for: ' + player_name) + print(f'Using cached decryption function for: {player_name}') info['decryption_function'] = decrypt_cache[player_name] else: - base_js = util.fetch_url(info['base_js'], debug_name='base.js', report_text='Fetched player ' + player_name) + base_js = util.fetch_url(info['base_js'], debug_name='base.js', report_text=f'Fetched player {player_name}') base_js = base_js.decode('utf-8') err = yt_data_extract.extract_decryption_function(info, base_js) if err: @@ -387,11 +387,11 @@ def fetch_player_response(client, video_id): def fetch_watch_page_info(video_id, playlist_id, index): # bpctr=9999999999 will bypass are-you-sure dialogs for controversial # videos - url = 'https://m.youtube.com/embed/' + video_id + '?bpctr=9999999999' + url = f'https://m.youtube.com/embed/{video_id}?bpctr=9999999999' if playlist_id: - url += '&list=' + playlist_id + url += f'&list={playlist_id}' if index: - url += '&index=' + index + url += f'&index={index}' headers = ( ('Accept', '*/*'), @@ -493,7 +493,7 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None): # Register HLS audio tracks for proxy access added = 0 for lang, track in info['hls_audio_tracks'].items(): - ck = video_id + '_' + lang + ck = f"{video_id}_{lang}" from youtube.hls_cache import register_track register_track(ck, track['hls_url'], video_id=video_id, track_id=lang) @@ -502,7 +502,7 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None): 'audio_track_id': lang, 'audio_track_name': track['name'], 'audio_track_is_default': track['is_default'], - 'itag': 'hls_' + lang, + 'itag': f'hls_{lang}', 'ext': 'mp4', 'audio_bitrate': 128, 'bitrate': 128000, @@ -516,7 +516,7 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None): 'fps': None, 'init_range': {'start': 0, 'end': 0}, 'index_range': {'start': 0, 'end': 0}, - 'url': '/ytl-api/audio-track?id=' + urllib.parse.quote(ck), + 'url': f'/ytl-api/audio-track?id={urllib.parse.quote(ck)}', 's': None, 'sp': None, 'quality': None, @@ -538,11 +538,11 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None): # Register HLS manifest for proxying if info['hls_manifest_url']: - ck = video_id + '_video' + ck = f"{video_id}_video" from youtube.hls_cache import register_track register_track(ck, info['hls_manifest_url'], video_id=video_id, track_id='video') # Use proxy URL instead of direct Google Video URL - info['hls_manifest_url'] = '/ytl-api/hls-manifest?id=' + urllib.parse.quote(ck) + info['hls_manifest_url'] = f'/ytl-api/hls-manifest?id={urllib.parse.quote(ck)}' # Fallback to 'ios' if no valid URLs are found if not info.get('formats') or info.get('player_urls_missing'): @@ -566,7 +566,7 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None): if info.get('formats'): decryption_error = decrypt_signatures(info, video_id) if decryption_error: - info['playability_error'] = 'Error decrypting url signatures: ' + decryption_error + info['playability_error'] = f'Error decrypting url signatures: {decryption_error}' # check if urls ready (non-live format) in former livestream # urls not ready if all of them have no filesize @@ -623,9 +623,9 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None): def video_quality_string(format): if format['vcodec']: - result = str(format['width'] or '?') + 'x' + str(format['height'] or '?') + result = f"{format['width'] or '?'}x{format['height'] or '?'}" if format['fps']: - result += ' ' + str(format['fps']) + 'fps' + result += f" {format['fps']}fps" return result elif format['acodec']: return 'audio only' @@ -634,7 +634,7 @@ def video_quality_string(format): def short_video_quality_string(fmt): - result = str(fmt['quality'] or '?') + 'p' + result = f"{fmt['quality'] or '?'}p" if fmt['fps']: result += str(fmt['fps']) if fmt['vcodec'].startswith('av01'): @@ -642,18 +642,18 @@ def short_video_quality_string(fmt): elif fmt['vcodec'].startswith('avc'): result += ' h264' else: - result += ' ' + fmt['vcodec'] + result += f" {fmt['vcodec']}" return result def audio_quality_string(fmt): if fmt['acodec']: if fmt['audio_bitrate']: - result = '%d' % fmt['audio_bitrate'] + 'k' + result = f"{fmt['audio_bitrate']}k" else: result = '?k' if fmt['audio_sample_rate']: - result += ' ' + '%.3G' % (fmt['audio_sample_rate']/1000) + 'kHz' + result += f" {'%.3G' % (fmt['audio_sample_rate']/1000)}kHz" return result elif fmt['vcodec']: return 'video only' @@ -737,9 +737,9 @@ def get_audio_track(): seg = line if line.startswith('http') else urljoin(playlist_base, line) # Always use &seg= parameter, never &url= for segments playlist_lines.append( - base_url + '/ytl-api/audio-track?id=' - + urllib.parse.quote(cache_key) - + '&seg=' + urllib.parse.quote(seg, safe='') + f'{base_url}/ytl-api/audio-track?id=' + f'{urllib.parse.quote(cache_key)}' + f'&seg={urllib.parse.quote(seg, safe="")}' ) playlist = '\n'.join(playlist_lines) @@ -797,9 +797,7 @@ def get_audio_track(): return url if not url.startswith('http://') and not url.startswith('https://'): url = urljoin(playlist_base, url) - return (base_url + '/ytl-api/audio-track?id=' - + urllib.parse.quote(cache_key) - + '&seg=' + urllib.parse.quote(url, safe='')) + return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&seg={urllib.parse.quote(url, safe="")}' playlist_lines = [] for line in playlist.split('\n'): @@ -812,7 +810,7 @@ def get_audio_track(): if line.startswith('#') and 'URI=' in line: def rewrite_uri_attr(match): uri = match.group(1) - return 'URI="' + proxy_url(uri) + '"' + return f'URI="{proxy_url(uri)}"' line = _re.sub(r'URI="([^"]+)"', rewrite_uri_attr, line) playlist_lines.append(line) elif line.startswith('#'): @@ -883,9 +881,7 @@ def get_audio_track(): if segment_url.startswith('/ytl-api/audio-track'): return segment_url base_url = request.url_root.rstrip('/') - return (base_url + '/ytl-api/audio-track?id=' - + urllib.parse.quote(cache_key) - + '&seg=' + urllib.parse.quote(segment_url)) + return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&seg={urllib.parse.quote(segment_url)}' playlist_lines = [] for line in playlist.split('\n'): @@ -949,14 +945,10 @@ def get_hls_manifest(): if is_audio_track: # Audio track playlist - proxy through audio-track endpoint - return (base_url + '/ytl-api/audio-track?id=' - + urllib.parse.quote(cache_key) - + '&url=' + urllib.parse.quote(url, safe='')) + return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&url={urllib.parse.quote(url, safe="")}' else: # Video segment or variant playlist - proxy through audio-track endpoint - return (base_url + '/ytl-api/audio-track?id=' - + urllib.parse.quote(cache_key) - + '&seg=' + urllib.parse.quote(url, safe='')) + return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&seg={urllib.parse.quote(url, safe="")}' # Parse and rewrite the manifest manifest_lines = [] @@ -974,7 +966,7 @@ def get_hls_manifest(): nonlocal rewritten_count uri = match.group(1) rewritten_count += 1 - return 'URI="' + rewrite_url(uri, is_audio_track=True) + '"' + return f'URI="{rewrite_url(uri, is_audio_track=True)}"' line = _re.sub(r'URI="([^"]+)"', rewrite_media_uri, line) manifest_lines.append(line) elif line.startswith('#'): @@ -1053,7 +1045,7 @@ def get_storyboard_vtt(): ts = 0 # current timestamp for i in range(storyboard.storyboard_count): - url = '/' + storyboard.url.replace("$M", str(i)) + url = f'/{storyboard.url.replace("$M", str(i))}' interval = storyboard.interval w, h = storyboard.width, storyboard.height w_cnt, h_cnt = storyboard.width_cnt, storyboard.height_cnt @@ -1078,7 +1070,7 @@ def get_watch_page(video_id=None): if not video_id: return flask.render_template('error.html', error_message='Missing video id'), 404 if len(video_id) < 11: - return flask.render_template('error.html', error_message='Incomplete video id (too short): ' + video_id), 404 + return flask.render_template('error.html', error_message=f'Incomplete video id (too short): {video_id}'), 404 time_start_str = request.args.get('t', '0s') time_start = 0 @@ -1141,9 +1133,9 @@ def get_watch_page(video_id=None): util.prefix_urls(item) util.add_extra_html_info(item) if playlist_id: - item['url'] += '&list=' + playlist_id + item['url'] += f'&list={playlist_id}' if item['index']: - item['url'] += '&index=' + str(item['index']) + item['url'] += f'&index={item["index"]}' info['playlist']['author_url'] = util.prefix_url( info['playlist']['author_url']) if settings.img_prefix: @@ -1159,16 +1151,16 @@ def get_watch_page(video_id=None): filename = title ext = fmt.get('ext') if ext: - filename += '.' + ext + filename += f'.{ext}' fmt['url'] = fmt['url'].replace( '/videoplayback', - '/videoplayback/name/' + filename) + f'/videoplayback/name/{filename}') download_formats = [] for format in (info['formats'] + info['hls_formats']): if format['acodec'] and format['vcodec']: - codecs_string = format['acodec'] + ', ' + format['vcodec'] + codecs_string = f"{format['acodec']}, {format['vcodec']}" else: codecs_string = format['acodec'] or format['vcodec'] or '?' download_formats.append({ @@ -1247,12 +1239,9 @@ def get_watch_page(video_id=None): for source in subtitle_sources: best_caption_parse = urllib.parse.urlparse( source['url'].lstrip('/')) - transcript_url = (util.URL_ORIGIN - + '/watch/transcript' - + best_caption_parse.path - + '?' + best_caption_parse.query) + transcript_url = f'{util.URL_ORIGIN}/watch/transcript{best_caption_parse.path}?{best_caption_parse.query}' other_downloads.append({ - 'label': 'Video Transcript: ' + source['label'], + 'label': f'Video Transcript: {source["label"]}', 'ext': 'txt', 'url': transcript_url }) @@ -1263,7 +1252,7 @@ def get_watch_page(video_id=None): template_name = 'watch.html' return flask.render_template(template_name, header_playlist_names = local_playlist.get_playlist_names(), - uploader_channel_url = ('/' + info['author_url']) if info['author_url'] else '', + uploader_channel_url = f'/{info["author_url"]}' if info['author_url'] else '', time_published = info['time_published'], view_count = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("view_count", None)), like_count = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("like_count", None)), @@ -1305,10 +1294,10 @@ def get_watch_page(video_id=None): ip_address = info['ip_address'] if settings.route_tor else None, invidious_used = info['invidious_used'], invidious_reload_button = info['invidious_reload_button'], - video_url = util.URL_ORIGIN + '/watch?v=' + video_id, + video_url = f'{util.URL_ORIGIN}/watch?v={video_id}', video_id = video_id, - storyboard_url = (util.URL_ORIGIN + '/ytl-api/storyboard.vtt?' + - urlencode([('spec_url', info['storyboard_spec_url'])]) + storyboard_url = (f'{util.URL_ORIGIN}/ytl-api/storyboard.vtt?' + f'{urlencode([("spec_url", info["storyboard_spec_url"])])}' if info['storyboard_spec_url'] else None), js_data = { @@ -1335,7 +1324,7 @@ def get_watch_page(video_id=None): @yt_app.route('/api/<path:dummy>') def get_captions(dummy): - url = 'https://www.youtube.com' + request.full_path + url = f'https://www.youtube.com{request.full_path}' try: result = util.fetch_url(url, headers=util.mobile_ua) result = result.replace(b"align:start position:0%", b"") @@ -1350,12 +1339,9 @@ inner_timestamp_removal_reg = re.compile(r'<[^>]+>') @yt_app.route('/watch/transcript/<path:caption_path>') def get_transcript(caption_path): try: - captions = util.fetch_url('https://www.youtube.com/' - + caption_path - + '?' + request.environ['QUERY_STRING']).decode('utf-8') + captions = util.fetch_url(f'https://www.youtube.com/{caption_path}?{request.environ["QUERY_STRING"]}').decode('utf-8') except util.FetchError as e: - msg = ('Error retrieving captions: ' + str(e) + '\n\n' - + 'The caption url may have expired.') + msg = f'Error retrieving captions: {e}\n\nThe caption url may have expired.' print(msg) return flask.Response( msg, @@ -1403,7 +1389,7 @@ def get_transcript(caption_path): result = '' for seg in segments: if seg['text'] != ' ': - result += seg['begin'] + ' ' + seg['text'] + '\r\n' + result += f"{seg['begin']} {seg['text']}\r\n" return flask.Response(result.encode('utf-8'), mimetype='text/plain;charset=UTF-8') diff --git a/youtube/yt_data_extract/common.py b/youtube/yt_data_extract/common.py index dce1d30..f91a467 100644 --- a/youtube/yt_data_extract/common.py +++ b/youtube/yt_data_extract/common.py @@ -212,7 +212,7 @@ def extract_date(date_text): month, day, year = parts[-3:] month = MONTH_ABBREVIATIONS.get(month[0:3]) # slicing in case they start writing out the full month name if month and (re.fullmatch(r'\d\d?', day) is not None) and (re.fullmatch(r'\d{4}', year) is not None): - return year + '-' + month + '-' + day + return f'{year}-{month}-{day}' return None def check_missing_keys(object, *key_sequences): @@ -222,7 +222,7 @@ def check_missing_keys(object, *key_sequences): for key in key_sequence: _object = _object[key] except (KeyError, IndexError, TypeError): - return 'Could not find ' + key + return f'Could not find {key}' return None @@ -467,7 +467,7 @@ def extract_item_info(item, additional_info={}): ['shortBylineText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'], ['ownerText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'] )) - info['author_url'] = ('https://www.youtube.com/channel/' + info['author_id']) if info['author_id'] else None + info['author_url'] = f'https://www.youtube.com/channel/{info["author_id"]}' if info['author_id'] else None info['description'] = extract_formatted_text(multi_deep_get( item, ['descriptionText'], ['descriptionSnippet'], diff --git a/youtube/yt_data_extract/everything_else.py b/youtube/yt_data_extract/everything_else.py index 5930111..b7379a5 100644 --- a/youtube/yt_data_extract/everything_else.py +++ b/youtube/yt_data_extract/everything_else.py @@ -305,7 +305,7 @@ def extract_playlist_metadata(polymer_json): metadata['description'] = desc if metadata['author_id']: - metadata['author_url'] = 'https://www.youtube.com/channel/' + metadata['author_id'] + metadata['author_url'] = f'https://www.youtube.com/channel/{metadata["author_id"]}' if metadata['first_video_id'] is None: metadata['thumbnail'] = None diff --git a/youtube/yt_data_extract/watch_extraction.py b/youtube/yt_data_extract/watch_extraction.py index de87a6a..2a60741 100644 --- a/youtube/yt_data_extract/watch_extraction.py +++ b/youtube/yt_data_extract/watch_extraction.py @@ -650,9 +650,9 @@ def _extract_playability_error(info, player_response, error_prefix=''): ) if playability_status not in (None, 'OK'): - info['playability_error'] = error_prefix + playability_reason + info['playability_error'] = f'{error_prefix}{playability_reason}' elif not info['playability_error']: # do not override - info['playability_error'] = error_prefix + 'Unknown playability error' + info['playability_error'] = f'{error_prefix}Unknown playability error' SUBTITLE_FORMATS = ('srv1', 'srv2', 'srv3', 'ttml', 'vtt') def extract_watch_info(polymer_json): @@ -726,7 +726,7 @@ def extract_watch_info(polymer_json): # Store the full URL from the player response (includes valid tokens) if base_url: normalized = normalize_url(base_url) if base_url.startswith('/') or not base_url.startswith('http') else base_url - info['_caption_track_urls'][lang_code + ('_asr' if caption_track.get('kind') == 'asr' else '')] = normalized + info['_caption_track_urls'][f'{lang_code}_{"asr" if caption_track.get("kind") == "asr" else ""}'] = normalized lang_name = deep_get(urllib.parse.parse_qs(urllib.parse.urlparse(base_url).query), 'name', 0) if lang_name: info['_manual_caption_language_names'][lang_code] = lang_name @@ -806,7 +806,7 @@ def extract_watch_info(polymer_json): info['allowed_countries'] = mf.get('availableCountries', []) # other stuff - info['author_url'] = 'https://www.youtube.com/channel/' + info['author_id'] if info['author_id'] else None + info['author_url'] = f'https://www.youtube.com/channel/{info["author_id"]}' if info['author_id'] else None info['storyboard_spec_url'] = deep_get(player_response, 'storyboards', 'playerStoryboardSpecRenderer', 'spec') return info @@ -912,12 +912,12 @@ def get_caption_url(info, language, format, automatic=False, translation_languag url = info['_captions_base_url'] if not url: return None - url += '&lang=' + language - url += '&fmt=' + format + url += f'&lang={language}' + url += f'&fmt={format}' if automatic: url += '&kind=asr' elif language in info['_manual_caption_language_names']: - url += '&name=' + urllib.parse.quote(info['_manual_caption_language_names'][language], safe='') + url += f'&name={urllib.parse.quote(info["_manual_caption_language_names"][language], safe="")}' if translation_language: url += '&tlang=' + translation_language @@ -964,7 +964,7 @@ def extract_decryption_function(info, base_js): return 'Could not find var_name' var_name = var_with_operation_match.group(1) - var_body_match = re.search(r'var ' + re.escape(var_name) + r'=\{(.*?)\};', base_js, flags=re.DOTALL) + var_body_match = re.search(rf'var {re.escape(var_name)}=\{{(.*?)\}};', base_js, flags=re.DOTALL) if var_body_match is None: return 'Could not find var_body' @@ -988,7 +988,7 @@ def extract_decryption_function(info, base_js): elif op_body.startswith('var c=a[0]'): operation_definitions[op_name] = 2 else: - return 'Unknown op_body: ' + op_body + return f'Unknown op_body: {op_body}' decryption_function = [] for op_with_arg in function_body: @@ -997,7 +997,7 @@ def extract_decryption_function(info, base_js): return 'Could not parse operation with arg' op_name = match.group(2).strip('[].') if op_name not in operation_definitions: - return 'Unknown op_name: ' + str(op_name) + return f'Unknown op_name: {op_name}' op_argument = match.group(3) decryption_function.append([operation_definitions[op_name], int(op_argument)]) @@ -1028,5 +1028,5 @@ def decrypt_signatures(info): _operation_2(a, argument) signature = ''.join(a) - format['url'] += '&' + format['sp'] + '=' + signature + format['url'] += f'&{format["sp"]}={signature}' return False |
