diff options
author | Jesús <heckyel@hyperbola.info> | 2020-12-15 21:52:04 -0500 |
---|---|---|
committer | Jesús <heckyel@hyperbola.info> | 2020-12-15 21:52:04 -0500 |
commit | b9a3082e7c654d97cecc5410c086e13a7b046909 (patch) | |
tree | dc093ccb2246e7df0beaca5deb8b7467620b74a2 /youtube | |
parent | f4b36a220d085080a881dbe2f63e51b2fb28a003 (diff) | |
download | yt-local-b9a3082e7c654d97cecc5410c086e13a7b046909.tar.lz yt-local-b9a3082e7c654d97cecc5410c086e13a7b046909.tar.xz yt-local-b9a3082e7c654d97cecc5410c086e13a7b046909.zip |
pep8
Diffstat (limited to 'youtube')
-rw-r--r-- | youtube/__init__.py | 17 | ||||
-rw-r--r-- | youtube/channel.py | 65 | ||||
-rw-r--r-- | youtube/comments.py | 57 | ||||
-rw-r--r-- | youtube/local_playlist.py | 23 | ||||
-rw-r--r-- | youtube/playlist.py | 27 | ||||
-rw-r--r-- | youtube/proto.py | 18 | ||||
-rw-r--r-- | youtube/search.py | 20 | ||||
-rw-r--r-- | youtube/subscriptions.py | 121 | ||||
-rw-r--r-- | youtube/util.py | 66 |
9 files changed, 248 insertions, 166 deletions
diff --git a/youtube/__init__.py b/youtube/__init__.py index a8ca227..d9edbc6 100644 --- a/youtube/__init__.py +++ b/youtube/__init__.py @@ -12,10 +12,9 @@ yt_app.url_map.strict_slashes = False # yt_app.jinja_env.lstrip_blocks = True - - yt_app.add_url_rule('/settings', 'settings_page', settings.settings_page, methods=['POST', 'GET']) + @yt_app.route('/') def homepage(): return flask.render_template('home.html', title="Youtube local") @@ -27,6 +26,7 @@ theme_names = { 2: 'dark_theme', } + @yt_app.context_processor def inject_theme_preference(): return { @@ -34,6 +34,7 @@ def inject_theme_preference(): 'settings': settings, } + @yt_app.template_filter('commatize') def commatize(num): if num is None: @@ -42,6 +43,7 @@ def commatize(num): num = int(num) return '{:,}'.format(num) + def timestamp_replacement(match): time_seconds = 0 for part in match.group(0).split(':'): @@ -53,11 +55,15 @@ def timestamp_replacement(match): + '</a>' ) + TIMESTAMP_RE = re.compile(r'\b(\d?\d:)?\d?\d:\d\d\b') + + @yt_app.template_filter('timestamps') def timestamps(text): return TIMESTAMP_RE.sub(timestamp_replacement, text) + @yt_app.errorhandler(500) def error_page(e): slim = request.args.get('slim', False) # whether it was an ajax request @@ -75,6 +81,7 @@ def error_page(e): return flask.render_template('error.html', error_message=error_message, slim=slim), 502 return flask.render_template('error.html', traceback=traceback.format_exc(), slim=slim), 500 + font_choices = { 0: 'initial', 1: 'arial, "liberation sans", sans-serif', @@ -83,11 +90,13 @@ font_choices = { 4: 'tahoma, sans-serif', } + @yt_app.route('/shared.css') def get_css(): return flask.Response( - flask.render_template('shared.css', - font_family = font_choices[settings.font] + flask.render_template( + 'shared.css', + font_family=font_choices[settings.font] ), mimetype='text/css', ) diff --git a/youtube/channel.py b/youtube/channel.py index e9cc87b..6f40965 100644 --- a/youtube/channel.py +++ b/youtube/channel.py @@ -51,7 +51,7 @@ def channel_ctoken_v3(channel_id, page, sort, tab, view=1): proto.string(1, proto.unpadded_b64encode(proto.uint(1,offset))) )) - tab = proto.string(2, tab ) + tab = proto.string(2, tab) sort = proto.uint(3, int(sort)) shelf_view = proto.uint(4, 0) @@ -60,11 +60,12 @@ def channel_ctoken_v3(channel_id, page, sort, tab, view=1): proto.percent_b64encode(tab + sort + shelf_view + view + page_token) ) - channel_id = proto.string(2, channel_id ) + channel_id = proto.string(2, channel_id) pointless_nest = proto.string(80226972, channel_id + continuation_info) return base64.urlsafe_b64encode(pointless_nest).decode('ascii') + def channel_ctoken_v2(channel_id, page, sort, tab, view=1): # see https://github.com/iv-org/invidious/issues/1319#issuecomment-671732646 # page > 1 doesn't work when sorting by oldest @@ -74,41 +75,44 @@ def channel_ctoken_v2(channel_id, page, sort, tab, view=1): 2: 17254859483345278706, 1: 16570086088270825023, }[int(sort)] - page_token = proto.string(61, proto.unpadded_b64encode(proto.string(1, - proto.uint(1, schema_number) + proto.string(2, - proto.string(1, proto.unpadded_b64encode(proto.uint(1,offset))) - ) - ))) + page_token = proto.string(61, proto.unpadded_b64encode( + proto.string(1, proto.uint(1, schema_number) + proto.string( + 2, + proto.string(1, proto.unpadded_b64encode(proto.uint(1, offset))) + )))) - tab = proto.string(2, tab ) + tab = proto.string(2, tab) sort = proto.uint(3, int(sort)) - #page = proto.string(15, str(page) ) + # page = proto.string(15, str(page) ) shelf_view = proto.uint(4, 0) view = proto.uint(6, int(view)) - continuation_info = proto.string(3, + continuation_info = proto.string( + 3, proto.percent_b64encode(tab + sort + shelf_view + view + page_token) ) - channel_id = proto.string(2, channel_id ) + channel_id = proto.string(2, channel_id) pointless_nest = proto.string(80226972, channel_id + continuation_info) return base64.urlsafe_b64encode(pointless_nest).decode('ascii') + def channel_ctoken_v1(channel_id, page, sort, tab, view=1): - tab = proto.string(2, tab ) + tab = proto.string(2, tab) sort = proto.uint(3, int(sort)) - page = proto.string(15, str(page) ) + page = proto.string(15, str(page)) # example with shelves in videos tab: https://www.youtube.com/channel/UCNL1ZadSjHpjm4q9j2sVtOA/videos shelf_view = proto.uint(4, 0) view = proto.uint(6, int(view)) continuation_info = proto.string(3, proto.percent_b64encode(tab + view + sort + shelf_view + page + proto.uint(23, 0)) ) - channel_id = proto.string(2, channel_id ) + channel_id = proto.string(2, channel_id) pointless_nest = proto.string(80226972, channel_id + continuation_info) return base64.urlsafe_b64encode(pointless_nest).decode('ascii') + def get_channel_tab(channel_id, page="1", sort=3, tab='videos', view=1, print_status=True): message = 'Got channel tab' if print_status else None @@ -118,18 +122,21 @@ def get_channel_tab(channel_id, page="1", sort=3, tab='videos', view=1, print_st url = ('https://www.youtube.com/channel/' + channel_id + '/' + tab + '?action_continuation=1&continuation=' + ctoken + '&pbj=1') - content = util.fetch_url(url, headers_desktop + real_cookie, + content = util.fetch_url( + url, headers_desktop + real_cookie, debug_name='channel_tab', report_text=message) else: ctoken = channel_ctoken_v3(channel_id, page, sort, tab, view) ctoken = ctoken.replace('=', '%3D') url = 'https://www.youtube.com/browse_ajax?ctoken=' + ctoken - content = util.fetch_url(url, + content = util.fetch_url( + url, headers_desktop + generic_cookie, debug_name='channel_tab', report_text=message) return content + # cache entries expire after 30 minutes @cachetools.func.ttl_cache(maxsize=128, ttl=30*60) def get_number_of_videos_channel(channel_id): @@ -157,22 +164,28 @@ def get_number_of_videos_channel(channel_id): else: return 0 + channel_id_re = re.compile(r'videos\.xml\?channel_id=([a-zA-Z0-9_-]{24})"') + + @cachetools.func.lru_cache(maxsize=128) def get_channel_id(base_url): # method that gives the smallest possible response at ~4 kb # needs to be as fast as possible base_url = base_url.replace('https://www', 'https://m') # avoid redirect - response = util.fetch_url(base_url + '/about?pbj=1', headers_mobile, + response = util.fetch_url( + base_url + '/about?pbj=1', headers_mobile, debug_name='get_channel_id', report_text='Got channel id').decode('utf-8') match = channel_id_re.search(response) if match: return match.group(1) return None + def get_number_of_videos_general(base_url): return get_number_of_videos_channel(get_channel_id(base_url)) + def get_channel_search_json(channel_id, query, page): params = proto.string(2, 'search') + proto.string(15, str(page)) params = proto.percent_b64encode(params) @@ -192,15 +205,14 @@ def post_process_channel_info(info): util.add_extra_html_info(item) - - - playlist_sort_codes = {'2': "da", '3': "dd", '4': "lad"} # youtube.com/[channel_id]/[tab] # youtube.com/user/[username]/[tab] # youtube.com/c/[custom]/[tab] # youtube.com/[custom]/[tab] + + def get_channel_page_general_url(base_url, tab, request, channel_id=None): page_number = int(request.args.get('page', 1)) @@ -236,10 +248,9 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None): else: flask.abort(404, 'Unknown channel tab: ' + tab) - info = yt_data_extract.extract_channel_info(json.loads(polymer_json), tab) if info['error'] is not None: - return flask.render_template('error.html', error_message = info['error']) + return flask.render_template('error.html', error_message=info['error']) post_process_channel_info(info) if tab == 'videos': @@ -254,28 +265,32 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None): info['page_number'] = page_number info['subscribed'] = subscriptions.is_subscribed(info['channel_id']) - return flask.render_template('channel.html', - parameters_dictionary = request.args, + return flask.render_template( + 'channel.html', + parameters_dictionary=request.args, **info ) + @yt_app.route('/channel/<channel_id>/') @yt_app.route('/channel/<channel_id>/<tab>') def get_channel_page(channel_id, tab='videos'): return get_channel_page_general_url('https://www.youtube.com/channel/' + channel_id, tab, request, channel_id) + @yt_app.route('/user/<username>/') @yt_app.route('/user/<username>/<tab>') def get_user_page(username, tab='videos'): return get_channel_page_general_url('https://www.youtube.com/user/' + username, tab, request) + @yt_app.route('/c/<custom>/') @yt_app.route('/c/<custom>/<tab>') def get_custom_c_page(custom, tab='videos'): return get_channel_page_general_url('https://www.youtube.com/c/' + custom, tab, request) + @yt_app.route('/<custom>') @yt_app.route('/<custom>/<tab>') def get_toplevel_custom_page(custom, tab='videos'): return get_channel_page_general_url('https://www.youtube.com/' + custom, tab, request) - diff --git a/youtube/comments.py b/youtube/comments.py index 2fb1fa2..fc353f9 100644 --- a/youtube/comments.py +++ b/youtube/comments.py @@ -25,12 +25,13 @@ from flask import request # *Old ASJN's continue to work, and start at the same comment even if new comments have been posted since # *The ASJN has no relation with any of the data in the response it came from + def make_comment_ctoken(video_id, sort=0, offset=0, lc='', secret_key=''): video_id = proto.as_bytes(video_id) secret_key = proto.as_bytes(secret_key) + page_info = proto.string(4, video_id) + proto.uint(6, sort) - page_info = proto.string(4,video_id) + proto.uint(6, sort) offset_information = proto.nested(4, page_info) + proto.uint(5, offset) if secret_key: offset_information = proto.string(1, secret_key) + offset_information @@ -39,19 +40,19 @@ def make_comment_ctoken(video_id, sort=0, offset=0, lc='', secret_key=''): if lc: page_params += proto.string(6, proto.percent_b64encode(proto.string(15, lc))) - result = proto.nested(2, page_params) + proto.uint(3,6) + proto.nested(6, offset_information) + result = proto.nested(2, page_params) + proto.uint(3, 6) + proto.nested(6, offset_information) return base64.urlsafe_b64encode(result).decode('ascii') + def comment_replies_ctoken(video_id, comment_id, max_results=500): params = proto.string(2, comment_id) + proto.uint(9, max_results) params = proto.nested(3, params) - result = proto.nested(2, proto.string(2, video_id)) + proto.uint(3,6) + proto.nested(6, params) + result = proto.nested(2, proto.string(2, video_id)) + proto.uint(3, 6) + proto.nested(6, params) return base64.urlsafe_b64encode(result).decode('ascii') - mobile_headers = { 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_1 like Mac OS X) AppleWebKit/603.1.30 (KHTML, like Gecko) Version/10.0 Mobile/14E304 Safari/602.1', 'Accept': '*/*', @@ -59,6 +60,8 @@ mobile_headers = { 'X-YouTube-Client-Name': '2', 'X-YouTube-Client-Version': '2.20180823', } + + def request_comments(ctoken, replies=False): if replies: # let's make it use different urls for no reason despite all the data being encoded base_url = "https://m.youtube.com/watch_comment?action_get_comment_replies=1&ctoken=" @@ -66,7 +69,7 @@ def request_comments(ctoken, replies=False): base_url = "https://m.youtube.com/watch_comment?action_get_comments=1&ctoken=" url = base_url + ctoken.replace("=", "%3D") + "&pbj=1" - for i in range(0,8): # don't retry more than 8 times + for i in range(0, 8): # don't retry more than 8 times content = util.fetch_url(url, headers=mobile_headers, report_text="Retrieved comments", debug_name='request_comments') if content[0:4] == b")]}'": # random closing characters included at beginning of response for some reason content = content[4:] @@ -81,13 +84,13 @@ def request_comments(ctoken, replies=False): def single_comment_ctoken(video_id, comment_id): - page_params = proto.string(2, video_id) + proto.string(6, proto.percent_b64encode(proto.string(15, comment_id))) + page_params = proto.string(2, video_id) + proto.string( + 6, proto.percent_b64encode(proto.string(15, comment_id))) - result = proto.nested(2, page_params) + proto.uint(3,6) + result = proto.nested(2, page_params) + proto.uint(3, 6) return base64.urlsafe_b64encode(result).decode('ascii') - def post_process_comments_info(comments_info): for comment in comments_info['comments']: comment['author_url'] = concat_or_none( @@ -95,15 +98,17 @@ def post_process_comments_info(comments_info): comment['author_avatar'] = concat_or_none( settings.img_prefix, comment['author_avatar']) - comment['permalink'] = concat_or_none(util.URL_ORIGIN, '/watch?v=', + comment['permalink'] = concat_or_none( + util.URL_ORIGIN, '/watch?v=', comments_info['video_id'], '&lc=', comment['id']) - reply_count = comment['reply_count'] + if reply_count == 0: comment['replies_url'] = None else: - comment['replies_url'] = concat_or_none(util.URL_ORIGIN, + comment['replies_url'] = concat_or_none( + util.URL_ORIGIN, '/comments?parent_id=', comment['id'], '&video_id=', comments_info['video_id']) @@ -122,18 +127,25 @@ def post_process_comments_info(comments_info): comments_info['include_avatars'] = settings.enable_comment_avatars if comments_info['ctoken']: - comments_info['more_comments_url'] = concat_or_none(util.URL_ORIGIN, - '/comments?ctoken=', comments_info['ctoken']) + comments_info['more_comments_url'] = concat_or_none( + util.URL_ORIGIN, + '/comments?ctoken=', + comments_info['ctoken'] + ) comments_info['page_number'] = page_number = str(int(comments_info['offset']/20) + 1) if not comments_info['is_replies']: comments_info['sort_text'] = 'top' if comments_info['sort'] == 0 else 'newest' + comments_info['video_url'] = concat_or_none( + util.URL_ORIGIN, + '/watch?v=', + comments_info['video_id'] + ) - comments_info['video_url'] = concat_or_none(util.URL_ORIGIN, - '/watch?v=', comments_info['video_id']) - comments_info['video_thumbnail'] = concat_or_none(settings.img_prefix, 'https://i.ytimg.com/vi/', + comments_info['video_thumbnail'] = concat_or_none( + settings.img_prefix, 'https://i.ytimg.com/vi/', comments_info['video_id'], '/mqdefault.jpg') @@ -183,7 +195,6 @@ def video_comments(video_id, sort=0, offset=0, lc='', secret_key=''): return comments_info - @yt_app.route('/comments') def get_comments_page(): ctoken = request.args.get('ctoken', '') @@ -195,7 +206,9 @@ def get_comments_page(): ctoken = comment_replies_ctoken(video_id, parent_id) replies = True - comments_info = yt_data_extract.extract_comments_info(request_comments(ctoken, replies)) + comments_info = yt_data_extract.extract_comments_info( + request_comments(ctoken, replies)) + post_process_comments_info(comments_info) if not replies: @@ -203,8 +216,8 @@ def get_comments_page(): other_sort_text = 'Sort by ' + ('newest' if comments_info['sort'] == 0 else 'top') comments_info['comment_links'] = [(other_sort_text, other_sort_url)] - return flask.render_template('comments_page.html', - comments_info = comments_info, - slim = request.args.get('slim', False) + return flask.render_template( + 'comments_page.html', + comments_info=comments_info, + slim=request.args.get('slim', False) ) - diff --git a/youtube/local_playlist.py b/youtube/local_playlist.py index 891bb76..776e992 100644 --- a/youtube/local_playlist.py +++ b/youtube/local_playlist.py @@ -15,6 +15,7 @@ from flask import request playlists_directory = os.path.join(settings.data_dir, "playlists") thumbnails_directory = os.path.join(settings.data_dir, "playlist_thumbnails") + def video_ids_in_playlist(name): try: with open(os.path.join(playlists_directory, name + ".txt"), 'r', encoding='utf-8') as file: @@ -23,6 +24,7 @@ def video_ids_in_playlist(name): except FileNotFoundError: return set() + def add_to_playlist(name, video_info_list): if not os.path.exists(playlists_directory): os.makedirs(playlists_directory) @@ -65,6 +67,7 @@ def get_local_playlist_videos(name, offset=0, amount=50): gevent.spawn(util.download_thumbnails, os.path.join(thumbnails_directory, name), missing_thumbnails) return videos[offset:offset+amount], len(videos) + def get_playlist_names(): try: items = os.listdir(playlists_directory) @@ -75,6 +78,7 @@ def get_playlist_names(): if ext == '.txt': yield name + def remove_from_playlist(name, video_info_list): ids = [json.loads(video)['id'] for video in video_info_list] with open(os.path.join(playlists_directory, name + ".txt"), 'r', encoding='utf-8') as file: @@ -109,14 +113,16 @@ def get_local_playlist_page(playlist_name=None): page = int(request.args.get('page', 1)) offset = 50*(page - 1) videos, num_videos = get_local_playlist_videos(playlist_name, offset=offset, amount=50) - return flask.render_template('local_playlist.html', - header_playlist_names = get_playlist_names(), - playlist_name = playlist_name, - videos = videos, - num_pages = math.ceil(num_videos/50), - parameters_dictionary = request.args, + return flask.render_template( + 'local_playlist.html', + header_playlist_names=get_playlist_names(), + playlist_name=playlist_name, + videos=videos, + num_pages=math.ceil(num_videos/50), + parameters_dictionary=request.args, ) + @yt_app.route('/playlists/<playlist_name>', methods=['POST']) def path_edit_playlist(playlist_name): '''Called when making changes to the playlist from that playlist's page''' @@ -128,6 +134,7 @@ def path_edit_playlist(playlist_name): else: flask.abort(400) + @yt_app.route('/edit_playlist', methods=['POST']) def edit_playlist(): '''Called when adding videos to a playlist from elsewhere''' @@ -137,7 +144,9 @@ def edit_playlist(): else: flask.abort(400) + @yt_app.route('/data/playlist_thumbnails/<playlist_name>/<thumbnail>') def serve_thumbnail(playlist_name, thumbnail): # .. is necessary because flask always uses the application directory at ./youtube, not the working directory - return flask.send_from_directory(os.path.join('..', thumbnails_directory, playlist_name), thumbnail) + return flask.send_from_directory( + os.path.join('..', thumbnails_directory, playlist_name), thumbnail) diff --git a/youtube/playlist.py b/youtube/playlist.py index e596eae..64c717e 100644 --- a/youtube/playlist.py +++ b/youtube/playlist.py @@ -12,9 +12,6 @@ from flask import request import flask - - - def playlist_ctoken(playlist_id, offset): offset = proto.uint(1, offset) @@ -22,9 +19,9 @@ def playlist_ctoken(playlist_id, offset): offset = b'PT:' + proto.unpadded_b64encode(offset) offset = proto.string(15, offset) - continuation_info = proto.string( 3, proto.percent_b64encode(offset) ) + continuation_info = proto.string(3, proto.percent_b64encode(offset)) - playlist_id = proto.string(2, 'VL' + playlist_id ) + playlist_id = proto.string(2, 'VL' + playlist_id) pointless_nest = proto.string(80226972, playlist_id + continuation_info) return base64.urlsafe_b64encode(pointless_nest).decode('ascii') @@ -46,7 +43,8 @@ headers_1 = ( ('X-YouTube-Client-Version', '2.20180614'), ) -def playlist_first_page(playlist_id, report_text = "Retrieved playlist"): + +def playlist_first_page(playlist_id, report_text="Retrieved playlist"): url = 'https://m.youtube.com/playlist?list=' + playlist_id + '&pbj=1' content = util.fetch_url(url, util.mobile_ua + headers_1, report_text=report_text, debug_name='playlist_first_page') content = json.loads(util.uppercase_escape(content.decode('utf-8'))) @@ -66,7 +64,9 @@ def get_videos(playlist_id, page): 'X-YouTube-Client-Version': '2.20180508', } - content = util.fetch_url(url, headers, report_text="Retrieved playlist", debug_name='playlist_videos') + content = util.fetch_url( + url, headers, + report_text="Retrieved playlist", debug_name='playlist_videos') info = json.loads(util.uppercase_escape(content.decode('utf-8'))) return info @@ -94,7 +94,7 @@ def get_playlist_page(): info = yt_data_extract.extract_playlist_info(this_page_json) if info['error']: - return flask.render_template('error.html', error_message = info['error']) + return flask.render_template('error.html', error_message=info['error']) if page != '1': info['metadata'] = yt_data_extract.extract_playlist_metadata(first_page_json) @@ -114,11 +114,12 @@ def get_playlist_page(): if video_count is None: video_count = 40 - return flask.render_template('playlist.html', - header_playlist_names = local_playlist.get_playlist_names(), - video_list = info.get('items', []), - num_pages = math.ceil(video_count/20), - parameters_dictionary = request.args, + return flask.render_template( + 'playlist.html', + header_playlist_names=local_playlist.get_playlist_names(), + video_list=info.get('items', []), + num_pages=math.ceil(video_count/20), + parameters_dictionary=request.args, **info['metadata'] ).encode('utf-8') diff --git a/youtube/proto.py b/youtube/proto.py index 5fd16d5..ff59eac 100644 --- a/youtube/proto.py +++ b/youtube/proto.py @@ -2,6 +2,7 @@ from math import ceil import base64 import io + def byte(n): return bytes((n,)) @@ -19,7 +20,7 @@ def varint_encode(offset): for i in range(0, needed_bytes - 1): encoded_bytes[i] = (offset & 127) | 128 # 7 least significant bits offset = offset >> 7 - encoded_bytes[-1] = offset & 127 # leave first bit as zero for last byte + encoded_bytes[-1] = offset & 127 # leave first bit as zero for last byte return bytes(encoded_bytes) @@ -37,18 +38,18 @@ def varint_decode(encoded): def string(field_number, data): data = as_bytes(data) return _proto_field(2, field_number, varint_encode(len(data)) + data) + + nested = string + def uint(field_number, value): return _proto_field(0, field_number, varint_encode(value)) - - def _proto_field(wire_type, field_number, data): ''' See https://developers.google.com/protocol-buffers/docs/encoding#structure ''' - return varint_encode( (field_number << 3) | wire_type) + data - + return varint_encode((field_number << 3) | wire_type) + data def percent_b64encode(data): @@ -58,6 +59,7 @@ def percent_b64encode(data): def unpadded_b64encode(data): return base64.urlsafe_b64encode(data).replace(b'=', b'') + def as_bytes(value): if isinstance(value, str): return value.encode('utf-8') @@ -90,6 +92,7 @@ def read_group(data, end_sequence): data.seek(index + len(end_sequence)) return data.original[start:index] + def read_protobuf(data): data_original = data data = io.BytesIO(data) @@ -118,12 +121,13 @@ def read_protobuf(data): raise Exception("Unknown wire type: " + str(wire_type) + ", Tag: " + bytes_to_hex(succinct_encode(tag)) + ", at position " + str(data.tell())) yield (wire_type, field_number, value) + def parse(data): return {field_number: value for _, field_number, value in read_protobuf(data)} + def b64_to_bytes(data): if isinstance(data, bytes): data = data.decode('ascii') data = data.replace("%3D", "=") - return base64.urlsafe_b64decode(data + "="*((4 - len(data)%4)%4) ) - + return base64.urlsafe_b64decode(data + "="*((4 - len(data)%4)%4)) diff --git a/youtube/search.py b/youtube/search.py index 34df76f..da8cdab 100644 --- a/youtube/search.py +++ b/youtube/search.py @@ -78,7 +78,7 @@ def get_search_page(): search_info = yt_data_extract.extract_search_info(polymer_json) if search_info['error']: - return flask.render_template('error.html', error_message = search_info['error']) + return flask.render_template('error.html', error_message=search_info['error']) for extract_item_info in search_info['items']: util.prefix_urls(extract_item_info) @@ -95,16 +95,18 @@ def get_search_page(): no_autocorrect_query_url = util.URL_ORIGIN + '/search?' + urllib.parse.urlencode(no_autocorrect_query_string, doseq=True) corrections['original_query_url'] = no_autocorrect_query_url - return flask.render_template('search.html', - header_playlist_names = local_playlist.get_playlist_names(), - query = query, - estimated_results = search_info['estimated_results'], - estimated_pages = search_info['estimated_pages'], - corrections = search_info['corrections'], - results = search_info['items'], - parameters_dictionary = request.args, + return flask.render_template( + 'search.html', + header_playlist_names=local_playlist.get_playlist_names(), + query=query, + estimated_results=search_info['estimated_results'], + estimated_pages=search_info['estimated_pages'], + corrections=search_info['corrections'], + results=search_info['items'], + parameters_dictionary=request.args, ) + @yt_app.route('/opensearch.xml') def get_search_engine_xml(): with open(os.path.join(settings.program_directory, 'youtube/opensearch.xml'), 'rb') as f: diff --git a/youtube/subscriptions.py b/youtube/subscriptions.py index 6f75578..b841f5d 100644 --- a/youtube/subscriptions.py +++ b/youtube/subscriptions.py @@ -26,6 +26,7 @@ thumbnails_directory = os.path.join(settings.data_dir, "subscription_thumbnails" database_path = os.path.join(settings.data_dir, "subscriptions.sqlite") + def open_database(): if not os.path.exists(settings.data_dir): os.makedirs(settings.data_dir) @@ -74,11 +75,13 @@ def open_database(): # https://stackoverflow.com/questions/19522505/using-sqlite3-in-python-with-with-keyword return contextlib.closing(connection) + def with_open_db(function, *args, **kwargs): with open_database() as connection: with connection as cursor: return function(cursor, *args, **kwargs) + def _is_subscribed(cursor, channel_id): result = cursor.execute('''SELECT EXISTS( SELECT 1 @@ -88,12 +91,14 @@ def _is_subscribed(cursor, channel_id): )''', [channel_id]).fetchone() return bool(result[0]) + def is_subscribed(channel_id): if not os.path.exists(database_path): return False return with_open_db(_is_subscribed, channel_id) + def _subscribe(channels): ''' channels is a list of (channel_id, channel_name) ''' channels = list(channels) @@ -101,7 +106,8 @@ def _subscribe(channels): with connection as cursor: channel_ids_to_check = [channel[0] for channel in channels if not _is_subscribed(cursor, channel[0])] - rows = ( (channel_id, channel_name, 0, 0) for channel_id, channel_name in channels) + rows = ((channel_id, channel_name, 0, 0) for channel_id, + channel_name in channels) cursor.executemany('''INSERT OR IGNORE INTO subscribed_channels (yt_channel_id, channel_name, time_last_checked, next_check_time) VALUES (?, ?, ?, ?)''', rows) @@ -111,6 +117,7 @@ def _subscribe(channels): channel_names.update(channels) check_channels_if_necessary(channel_ids_to_check) + def delete_thumbnails(to_delete): for thumbnail in to_delete: try: @@ -122,6 +129,7 @@ def delete_thumbnails(to_delete): print('Failed to delete thumbnail: ' + thumbnail) traceback.print_exc() + def _unsubscribe(cursor, channel_ids): ''' channel_ids is a list of channel_ids ''' to_delete = [] @@ -138,7 +146,8 @@ def _unsubscribe(cursor, channel_ids): gevent.spawn(delete_thumbnails, to_delete) cursor.executemany("DELETE FROM subscribed_channels WHERE yt_channel_id=?", ((channel_id, ) for channel_id in channel_ids)) -def _get_videos(cursor, number_per_page, offset, tag = None): + +def _get_videos(cursor, number_per_page, offset, tag=None): '''Returns a full page of videos with an offset, and a value good enough to be used as the total number of videos''' # We ask for the next 9 pages from the database # Then the actual length of the results tell us if there are more than 9 pages left, and if not, how many there actually are @@ -181,8 +190,6 @@ def _get_videos(cursor, number_per_page, offset, tag = None): return videos, pseudo_number_of_videos - - def _get_subscribed_channels(cursor): for item in cursor.execute('''SELECT channel_name, yt_channel_id, muted FROM subscribed_channels @@ -204,7 +211,6 @@ def _remove_tags(cursor, channel_ids, tags): )''', pairs) - def _get_tags(cursor, channel_id): return [row[0] for row in cursor.execute('''SELECT tag FROM tag_associations @@ -212,9 +218,11 @@ def _get_tags(cursor, channel_id): SELECT id FROM subscribed_channels WHERE yt_channel_id = ? )''', (channel_id,))] + def _get_all_tags(cursor): return [row[0] for row in cursor.execute('''SELECT DISTINCT tag FROM tag_associations''')] + def _get_channel_names(cursor, channel_ids): ''' returns list of (channel_id, channel_name) ''' result = [] @@ -222,11 +230,12 @@ def _get_channel_names(cursor, channel_ids): row = cursor.execute('''SELECT channel_name FROM subscribed_channels WHERE yt_channel_id = ?''', (channel_id,)).fetchone() - result.append( (channel_id, row[0]) ) + result.append((channel_id, row[0])) return result -def _channels_with_tag(cursor, tag, order=False, exclude_muted=False, include_muted_status=False): +def _channels_with_tag(cursor, tag, order=False, exclude_muted=False, + include_muted_status=False): ''' returns list of (channel_id, channel_name) ''' statement = '''SELECT yt_channel_id, channel_name''' @@ -247,12 +256,15 @@ def _channels_with_tag(cursor, tag, order=False, exclude_muted=False, include_mu return cursor.execute(statement, [tag]).fetchall() + def _schedule_checking(cursor, channel_id, next_check_time): cursor.execute('''UPDATE subscribed_channels SET next_check_time = ? WHERE yt_channel_id = ?''', [int(next_check_time), channel_id]) + def _is_muted(cursor, channel_id): return bool(cursor.execute('''SELECT muted FROM subscribed_channels WHERE yt_channel_id=?''', [channel_id]).fetchone()[0]) + units = collections.OrderedDict([ ('year', 31536000), # 365*24*3600 ('month', 2592000), # 30*24*3600 @@ -262,6 +274,8 @@ units = collections.OrderedDict([ ('minute', 60), ('second', 1), ]) + + def youtube_timestamp_to_posix(dumb_timestamp): ''' Given a dumbed down timestamp such as 1 year ago, 3 hours ago, approximates the unix time (seconds since 1/1/1970) ''' @@ -275,6 +289,7 @@ def youtube_timestamp_to_posix(dumb_timestamp): unit = unit[:-1] # remove s from end return now - quantifier*units[unit] + def posix_to_dumbed_down(posix_time): '''Inverse of youtube_timestamp_to_posix.''' delta = int(time.time() - posix_time) @@ -293,12 +308,14 @@ def posix_to_dumbed_down(posix_time): else: raise Exception() + def exact_timestamp(posix_time): result = time.strftime('%I:%M %p %m/%d/%y', time.localtime(posix_time)) if result[0] == '0': # remove 0 infront of hour (like 01:00 PM) return result[1:] return result + try: existing_thumbnails = set(os.path.splitext(name)[0] for name in os.listdir(thumbnails_directory)) except FileNotFoundError: @@ -314,6 +331,7 @@ checking_channels = set() # Just to use for printing channel checking status to console without opening database channel_names = dict() + def check_channel_worker(): while True: channel_id = check_channels_queue.get() @@ -324,12 +342,12 @@ def check_channel_worker(): finally: checking_channels.remove(channel_id) -for i in range(0,5): + +for i in range(0, 5): gevent.spawn(check_channel_worker) # ---------------------------- - # --- Auto checking system - Spaghetti code --- def autocheck_dispatcher(): '''Scans the auto_check_list. Sleeps until the earliest job is due, then adds that channel to the checking queue above. Can be sent a new job through autocheck_job_application''' @@ -356,7 +374,7 @@ def autocheck_dispatcher(): if time_until_earliest_job > 0: # it can become less than zero (in the past) when it's set to go off while the dispatcher is doing something else at that moment try: - new_job = autocheck_job_application.get(timeout = time_until_earliest_job) # sleep for time_until_earliest_job time, but allow to be interrupted by new jobs + new_job = autocheck_job_application.get(timeout=time_until_earliest_job) # sleep for time_until_earliest_job time, but allow to be interrupted by new jobs except gevent.queue.Empty: # no new jobs pass else: # new job, add it to the list @@ -369,7 +387,10 @@ def autocheck_dispatcher(): check_channels_queue.put(earliest_job['channel_id']) del autocheck_jobs[earliest_job_index] + dispatcher_greenlet = None + + def start_autocheck_system(): global autocheck_job_application global autocheck_jobs @@ -398,30 +419,34 @@ def start_autocheck_system(): autocheck_jobs.append({'channel_id': row[0], 'channel_name': row[1], 'next_check_time': next_check_time}) dispatcher_greenlet = gevent.spawn(autocheck_dispatcher) + def stop_autocheck_system(): if dispatcher_greenlet is not None: dispatcher_greenlet.kill() + def autocheck_setting_changed(old_value, new_value): if new_value: start_autocheck_system() else: stop_autocheck_system() -settings.add_setting_changed_hook('autocheck_subscriptions', + +settings.add_setting_changed_hook( + 'autocheck_subscriptions', autocheck_setting_changed) if settings.autocheck_subscriptions: start_autocheck_system() # ---------------------------- - def check_channels_if_necessary(channel_ids): for channel_id in channel_ids: if channel_id not in checking_channels: checking_channels.add(channel_id) check_channels_queue.put(channel_id) + def _get_atoma_feed(channel_id): url = 'https://www.youtube.com/feeds/videos.xml?channel_id=' + channel_id try: @@ -432,6 +457,7 @@ def _get_atoma_feed(channel_id): return '' raise + def _get_channel_tab(channel_id, channel_status_name): try: return channel.get_channel_tab(channel_id, print_status=False) @@ -447,6 +473,7 @@ def _get_channel_tab(channel_id, channel_status_name): return None raise + def _get_upstream_videos(channel_id): try: channel_status_name = channel_names[channel_id] @@ -527,9 +554,8 @@ def _get_upstream_videos(channel_id): video_item['channel_id'] = channel_id - if len(videos) == 0: - average_upload_period = 4*7*24*3600 # assume 1 month for channel with no videos + average_upload_period = 4*7*24*3600 # assume 1 month for channel with no videos elif len(videos) < 5: average_upload_period = int((time.time() - videos[len(videos)-1]['time_published'])/len(videos)) else: @@ -591,7 +617,6 @@ def _get_upstream_videos(channel_id): video_item['description'], )) - cursor.executemany('''INSERT OR IGNORE INTO videos ( sql_channel_id, video_id, @@ -619,7 +644,6 @@ def _get_upstream_videos(channel_id): print(str(number_of_new_videos) + ' new videos from ' + channel_status_name) - def check_all_channels(): with open_database() as connection: with connection as cursor: @@ -654,22 +678,20 @@ def check_specific_channels(channel_ids): check_channels_if_necessary(channel_ids) - @yt_app.route('/import_subscriptions', methods=['POST']) def import_subscriptions(): # check if the post request has the file part if 'subscriptions_file' not in request.files: - #flash('No file part') + # flash('No file part') return flask.redirect(util.URL_ORIGIN + request.full_path) file = request.files['subscriptions_file'] # if user does not select file, browser also # submit an empty part without filename if file.filename == '': - #flash('No selected file') + # flash('No selected file') return flask.redirect(util.URL_ORIGIN + request.full_path) - mime_type = file.mimetype if mime_type == 'application/json': @@ -681,7 +703,7 @@ def import_subscriptions(): return '400 Bad Request: Invalid json file', 400 try: - channels = ( (item['snippet']['resourceId']['channelId'], item['snippet']['title']) for item in file) + channels = ((item['snippet']['resourceId']['channelId'], item['snippet']['title']) for item in file) except (KeyError, IndexError): traceback.print_exc() return '400 Bad Request: Unknown json structure', 400 @@ -695,11 +717,10 @@ def import_subscriptions(): if (outline_element.tag != 'outline') or ('xmlUrl' not in outline_element.attrib): continue - channel_name = outline_element.attrib['text'] channel_rss_url = outline_element.attrib['xmlUrl'] channel_id = channel_rss_url[channel_rss_url.find('channel_id=')+11:].strip() - channels.append( (channel_id, channel_name) ) + channels.append((channel_id, channel_name)) except (AssertionError, IndexError, defusedxml.ElementTree.ParseError) as e: return '400 Bad Request: Unable to read opml xml file, or the file is not the expected format', 400 @@ -711,7 +732,6 @@ def import_subscriptions(): return flask.redirect(util.URL_ORIGIN + '/subscription_manager', 303) - @yt_app.route('/subscription_manager', methods=['GET']) def get_subscription_manager_page(): group_by_tags = request.args.get('group_by_tags', '0') == '1' @@ -731,7 +751,7 @@ def get_subscription_manager_page(): 'tags': [t for t in _get_tags(cursor, channel_id) if t != tag], }) - tag_groups.append( (tag, sub_list) ) + tag_groups.append((tag, sub_list)) # Channels with no tags channel_list = cursor.execute('''SELECT yt_channel_id, channel_name, muted @@ -751,7 +771,7 @@ def get_subscription_manager_page(): 'tags': [], }) - tag_groups.append( ('No tags', sub_list) ) + tag_groups.append(('No tags', sub_list)) else: sub_list = [] for channel_name, channel_id, muted in _get_subscribed_channels(cursor): @@ -763,20 +783,20 @@ def get_subscription_manager_page(): 'tags': _get_tags(cursor, channel_id), }) - - - if group_by_tags: - return flask.render_template('subscription_manager.html', - group_by_tags = True, - tag_groups = tag_groups, + return flask.render_template( + 'subscription_manager.html', + group_by_tags=True, + tag_groups=tag_groups, ) else: - return flask.render_template('subscription_manager.html', - group_by_tags = False, - sub_list = sub_list, + return flask.render_template( + 'subscription_manager.html', + group_by_tags=False, + sub_list=sub_list, ) + def list_from_comma_separated_tags(string): return [tag.strip() for tag in string.split(',') if tag.strip()] @@ -795,7 +815,7 @@ def post_subscription_manager_page(): _unsubscribe(cursor, request.values.getlist('channel_ids')) elif action == 'unsubscribe_verify': unsubscribe_list = _get_channel_names(cursor, request.values.getlist('channel_ids')) - return flask.render_template('unsubscribe_verify.html', unsubscribe_list = unsubscribe_list) + return flask.render_template('unsubscribe_verify.html', unsubscribe_list=unsubscribe_list) elif action == 'mute': cursor.executemany('''UPDATE subscribed_channels @@ -810,6 +830,7 @@ def post_subscription_manager_page(): return flask.redirect(util.URL_ORIGIN + request.full_path, 303) + @yt_app.route('/subscriptions', methods=['GET']) @yt_app.route('/feed/subscriptions', methods=['GET']) def get_subscriptions_page(): @@ -826,7 +847,6 @@ def get_subscriptions_page(): tags = _get_all_tags(cursor) - subscription_list = [] for channel_name, channel_id, muted in _get_subscribed_channels(cursor): subscription_list.append({ @@ -836,16 +856,18 @@ def get_subscriptions_page(): 'muted': muted, }) - return flask.render_template('subscriptions.html', - header_playlist_names = local_playlist.get_playlist_names(), - videos = videos, - num_pages = math.ceil(number_of_videos_in_db/60), - parameters_dictionary = request.args, - tags = tags, - current_tag = tag, - subscription_list = subscription_list, + return flask.render_template( + 'subscriptions.html', + header_playlist_names=local_playlist.get_playlist_names(), + videos=videos, + num_pages=math.ceil(number_of_videos_in_db/60), + parameters_dictionary=request.args, + tags=tags, + current_tag=tag, + subscription_list=subscription_list, ) + @yt_app.route('/subscriptions', methods=['POST']) @yt_app.route('/feed/subscriptions', methods=['POST']) def post_subscriptions_page(): @@ -900,17 +922,10 @@ def serve_subscription_thumbnail(thumbnail): try: f = open(thumbnail_path, 'wb') except FileNotFoundError: - os.makedirs(thumbnails_directory, exist_ok = True) + os.makedirs(thumbnails_directory, exist_ok=True) f = open(thumbnail_path, 'wb') f.write(image) f.close() existing_thumbnails.add(video_id) return flask.Response(image, mimetype='image/jpeg') - - - - - - - diff --git a/youtube/util.py b/youtube/util.py index 8945b9f..df4759e 100644 --- a/youtube/util.py +++ b/youtube/util.py @@ -1,6 +1,7 @@ import settings from youtube import yt_data_extract -import socks, sockshandler +import socks +import sockshandler import gzip try: import brotli @@ -55,14 +56,15 @@ import urllib3.contrib.socks URL_ORIGIN = "/https://www.youtube.com" -connection_pool = urllib3.PoolManager(cert_reqs = 'CERT_REQUIRED') +connection_pool = urllib3.PoolManager(cert_reqs='CERT_REQUIRED') + class TorManager: def __init__(self): self.old_tor_connection_pool = None self.tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager( 'socks5h://127.0.0.1:' + str(settings.tor_port) + '/', - cert_reqs = 'CERT_REQUIRED') + cert_reqs='CERT_REQUIRED') self.tor_pool_refresh_time = time.monotonic() self.new_identity_lock = gevent.lock.BoundedSemaphore(1) @@ -77,7 +79,7 @@ class TorManager: self.tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager( 'socks5h://127.0.0.1:' + str(settings.tor_port) + '/', - cert_reqs = 'CERT_REQUIRED') + cert_reqs='CERT_REQUIRED') self.tor_pool_refresh_time = time.monotonic() def get_tor_connection_pool(self): @@ -125,6 +127,7 @@ class TorManager: finally: self.new_identity_lock.release() + tor_manager = TorManager() @@ -154,6 +157,7 @@ class HTTPAsymmetricCookieProcessor(urllib.request.BaseHandler): https_request = http_request https_response = http_response + class FetchError(Exception): def __init__(self, code, reason='', ip=None, error_message=None): Exception.__init__(self, 'HTTP error during request: ' + code + ' ' + reason) @@ -162,6 +166,7 @@ class FetchError(Exception): self.ip = ip self.error_message = error_message + def decode_content(content, encoding_header): encodings = encoding_header.replace(' ', '').split(',') for encoding in reversed(encodings): @@ -173,6 +178,7 @@ def decode_content(content, encoding_header): content = gzip.decompress(content) return content + def fetch_url_response(url, headers=(), timeout=15, data=None, cookiejar_send=None, cookiejar_receive=None, use_tor=True, max_redirects=None): @@ -234,6 +240,7 @@ def fetch_url_response(url, headers=(), timeout=15, data=None, return response, cleanup_func + def fetch_url(url, headers=(), timeout=15, report_text=None, data=None, cookiejar_send=None, cookiejar_receive=None, use_tor=True, debug_name=None): @@ -284,7 +291,7 @@ def fetch_url(url, headers=(), timeout=15, report_text=None, data=None, break if report_text: - print(report_text, ' Latency:', round(response_time - start_time,3), ' Read time:', round(read_finish - response_time,3)) + print(report_text, ' Latency:', round(response_time - start_time, 3), ' Read time:', round(read_finish - response_time,3)) if settings.debugging_save_responses and debug_name is not None: save_dir = os.path.join(settings.data_dir, 'debug') @@ -296,6 +303,7 @@ def fetch_url(url, headers=(), timeout=15, report_text=None, data=None, return content + def head(url, use_tor=False, report_text=None, max_redirects=10): pool = get_pool(use_tor and settings.route_tor) start_time = time.monotonic() @@ -305,7 +313,9 @@ def head(url, use_tor=False, report_text=None, max_redirects=10): # According to the documentation for urlopen, a redirect counts as a retry # So there are 3 redirects max by default. Let's change that # to 10 since googlevideo redirects a lot. - retries = urllib3.Retry(3+max_redirects, redirect=max_redirects, + retries = urllib3.Retry( + 3+max_redirects, + redirect=max_redirects, raise_on_redirect=False) headers = {'User-Agent': 'Python-urllib'} response = pool.request('HEAD', url, headers=headers, retries=retries) @@ -313,19 +323,16 @@ def head(url, use_tor=False, report_text=None, max_redirects=10): print( report_text, ' Latency:', - round(time.monotonic() - start_time,3)) + round(time.monotonic() - start_time, 3)) return response + mobile_user_agent = 'Mozilla/5.0 (Linux; Android 7.0; Redmi Note 4 Build/NRD90M) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Mobile Safari/537.36' mobile_ua = (('User-Agent', mobile_user_agent),) desktop_user_agent = 'Mozilla/5.0 (Windows NT 6.1; rv:52.0) Gecko/20100101 Firefox/52.0' desktop_ua = (('User-Agent', desktop_user_agent),) - - - - class RateLimitedQueue(gevent.queue.Queue): ''' Does initial_burst (def. 30) at first, then alternates between waiting waiting_period (def. 5) seconds and doing subsequent_bursts (def. 10) queries. After 5 seconds with nothing left in the queue, resets rate limiting. ''' @@ -342,7 +349,6 @@ class RateLimitedQueue(gevent.queue.Queue): self.empty_start = 0 gevent.queue.Queue.__init__(self) - def get(self): self.lock.acquire() # blocks if another greenlet currently has the lock if self.count_since_last_wait >= self.subsequent_bursts and self.surpassed_initial: @@ -374,7 +380,6 @@ class RateLimitedQueue(gevent.queue.Queue): return item - def download_thumbnail(save_directory, video_id): url = "https://i.ytimg.com/vi/" + video_id + "/mqdefault.jpg" save_location = os.path.join(save_directory, video_id + ".jpg") @@ -386,12 +391,13 @@ def download_thumbnail(save_directory, video_id): try: f = open(save_location, 'wb') except FileNotFoundError: - os.makedirs(save_directory, exist_ok = True) + os.makedirs(save_directory, exist_ok=True) f = open(save_location, 'wb') f.write(thumbnail) f.close() return True + def download_thumbnails(save_directory, ids): if not isinstance(ids, (list, tuple)): ids = list(ids) @@ -404,15 +410,12 @@ def download_thumbnails(save_directory, ids): gevent.joinall([gevent.spawn(download_thumbnail, save_directory, ids[j]) for j in range(i*5 + 5, len(ids))]) - - - - def dict_add(*dicts): for dictionary in dicts[1:]: dicts[0].update(dictionary) return dicts[0] + def video_id(url): url_parts = urllib.parse.urlparse(url) return urllib.parse.parse_qs(url_parts.query)['v'][0] @@ -422,10 +425,11 @@ def video_id(url): def get_thumbnail_url(video_id): return settings.img_prefix + "https://i.ytimg.com/vi/" + video_id + "/mqdefault.jpg" + def seconds_to_timestamp(seconds): seconds = int(seconds) - hours, seconds = divmod(seconds,3600) - minutes, seconds = divmod(seconds,60) + hours, seconds = divmod(seconds, 3600) + minutes, seconds = divmod(seconds, 60) if hours != 0: timestamp = str(hours) + ":" timestamp += str(minutes).zfill(2) # zfill pads with zeros @@ -436,18 +440,17 @@ def seconds_to_timestamp(seconds): return timestamp - def update_query_string(query_string, items): parameters = urllib.parse.parse_qs(query_string) parameters.update(items) return urllib.parse.urlencode(parameters, doseq=True) - def uppercase_escape(s): - return re.sub( - r'\\U([0-9a-fA-F]{8})', - lambda m: chr(int(m.group(1), base=16)), s) + return re.sub( + r'\\U([0-9a-fA-F]{8})', + lambda m: chr(int(m.group(1), base=16)), s) + def prefix_url(url): if url is None: @@ -455,12 +458,14 @@ def prefix_url(url): url = url.lstrip('/') # some urls have // before them, which has a special meaning return '/' + url + def left_remove(string, substring): '''removes substring from the start of string, if present''' if string.startswith(substring): return string[len(substring):] return string + def concat_or_none(*strings): '''Concatenates strings. Returns None if any of the arguments are None''' result = '' @@ -483,6 +488,7 @@ def prefix_urls(item): except KeyError: pass + def add_extra_html_info(item): if item['type'] == 'video': item['url'] = (URL_ORIGIN + '/watch?v=' + item['id']) if item.get('id') else None @@ -501,6 +507,7 @@ def add_extra_html_info(item): elif item['type'] == 'channel': item['url'] = (URL_ORIGIN + "/channel/" + item['id']) if item.get('id') else None + def parse_info_prepare_for_html(renderer, additional_info={}): item = yt_data_extract.extract_item_info(renderer, additional_info) prefix_urls(item) @@ -508,6 +515,7 @@ def parse_info_prepare_for_html(renderer, additional_info={}): return item + def check_gevent_exceptions(*tasks): for task in tasks: if task.exception: @@ -528,7 +536,13 @@ replacement_map = collections.OrderedDict([ ('*', '_'), ('\t', ' '), ]) -DOS_names = {'con', 'prn', 'aux', 'nul', 'com0', 'com1', 'com2', 'com3', 'com4', 'com5', 'com6', 'com7', 'com8', 'com9', 'lpt0', 'lpt1', 'lpt2', 'lpt3', 'lpt4', 'lpt5', 'lpt6', 'lpt7', 'lpt8', 'lpt9'} + +DOS_names = {'con', 'prn', 'aux', 'nul', 'com0', 'com1', 'com2', 'com3', + 'com4', 'com5', 'com6', 'com7', 'com8', 'com9', 'lpt0', + 'lpt1', 'lpt2', 'lpt3', 'lpt4', 'lpt5', 'lpt6', 'lpt7', + 'lpt8', 'lpt9'} + + def to_valid_filename(name): '''Changes the name so it's valid on Windows, Linux, and Mac''' # See https://docs.microsoft.com/en-us/windows/win32/fileio/naming-a-file |