path: root/youtube/common.py
diff options
authorJames Taylor <user234683@users.noreply.github.com>2019-02-21 21:32:31 -0800
committerJames Taylor <user234683@users.noreply.github.com>2019-02-21 21:32:31 -0800
commitb32330be4f15dd044e6212f526e52375f0a0f6c2 (patch)
treefe2f7da84243d895e46967bd39d61d6cf17dab21 /youtube/common.py
parenta61ba6b8f45d94bf8e89a9f351c5c6cac2379387 (diff)
refactor common.py into 3 files
Diffstat (limited to 'youtube/common.py')
1 files changed, 0 insertions, 748 deletions
diff --git a/youtube/common.py b/youtube/common.py
deleted file mode 100644
index cb963ce..0000000
--- a/youtube/common.py
+++ /dev/null
@@ -1,748 +0,0 @@
-from youtube.template import Template
-from youtube import local_playlist
-import settings
-import html
-import json
-import re
-import urllib.parse
-import gzip
-import brotli
-import time
-import socks, sockshandler
-URL_ORIGIN = "/https://www.youtube.com"
-# videos (all of type str):
-# id
-# title
-# url
-# author
-# author_url
-# thumbnail
-# description
-# published
-# duration
-# likes
-# dislikes
-# views
-# playlist_index
-# playlists:
-# id
-# title
-# url
-# author
-# author_url
-# thumbnail
-# description
-# updated
-# size
-# first_video_id
-with open('yt_basic_template.html', 'r', encoding='utf-8') as file:
- yt_basic_template = Template(file.read())
-page_button_template = Template('''<a class="page-button" href="$href">$page</a>''')
-current_page_button_template = Template('''<div class="current-page-button">$page</a>''')
-medium_playlist_item_template = Template('''
- <div class="medium-item-box">
- <div class="medium-item">
- <a class="playlist-thumbnail-box" href="$url" title="$title">
- <img class="playlist-thumbnail-img" src="$thumbnail">
- <div class="playlist-thumbnail-info">
- <span>$size</span>
- </div>
- </a>
- <a class="title" href="$url" title="$title">$title</a>
- <div class="stats">$stats</div>
- </div>
- </div>
-medium_video_item_template = Template('''
- <div class="medium-item-box">
- <div class="medium-item">
- <a class="video-thumbnail-box" href="$url" title="$title">
- <img class="video-thumbnail-img" src="$thumbnail">
- <span class="video-duration">$duration</span>
- </a>
- <a class="title" href="$url" title="$title">$title</a>
- <div class="stats">$stats</div>
- <span class="description">$description</span>
- <span class="badges">$badges</span>
- </div>
- <input class="item-checkbox" type="checkbox" name="video_info_list" value="$video_info" form="playlist-edit">
- </div>
-small_video_item_template = Template('''
- <div class="small-item-box">
- <div class="small-item">
- <a class="video-thumbnail-box" href="$url" title="$title">
- <img class="video-thumbnail-img" src="$thumbnail">
- <span class="video-duration">$duration</span>
- </a>
- <a class="title" href="$url" title="$title">$title</a>
- <address>$author</address>
- <span class="views">$views</span>
- </div>
- <input class="item-checkbox" type="checkbox" name="video_info_list" value="$video_info" form="playlist-edit">
- </div>
-small_playlist_item_template = Template('''
- <div class="small-item-box">
- <div class="small-item">
- <a class="playlist-thumbnail-box" href="$url" title="$title">
- <img class="playlist-thumbnail-img" src="$thumbnail">
- <div class="playlist-thumbnail-info">
- <span>$size</span>
- </div>
- </a>
- <a class="title" href="$url" title="$title">$title</a>
- <address>$author</address>
- </div>
- </div>
-medium_channel_item_template = Template('''
- <div class="medium-item-box">
- <div class="medium-item">
- <a class="video-thumbnail-box" href="$url" title="$title">
- <img class="video-thumbnail-img" src="$thumbnail">
- <span class="video-duration">$duration</span>
- </a>
- <a class="title" href="$url">$title</a>
- <span>$subscriber_count</span>
- <span>$size</span>
- <span class="description">$description</span>
- </div>
- </div>
-class HTTPAsymmetricCookieProcessor(urllib.request.BaseHandler):
- '''Separate cookiejars for receiving and sending'''
- def __init__(self, cookiejar_send=None, cookiejar_receive=None):
- import http.cookiejar
- self.cookiejar_send = cookiejar_send
- self.cookiejar_receive = cookiejar_receive
- def http_request(self, request):
- if self.cookiejar_send is not None:
- self.cookiejar_send.add_cookie_header(request)
- return request
- def http_response(self, request, response):
- if self.cookiejar_receive is not None:
- self.cookiejar_receive.extract_cookies(response, request)
- return response
- https_request = http_request
- https_response = http_response
-def decode_content(content, encoding_header):
- encodings = encoding_header.replace(' ', '').split(',')
- for encoding in reversed(encodings):
- if encoding == 'identity':
- continue
- if encoding == 'br':
- content = brotli.decompress(content)
- elif encoding == 'gzip':
- content = gzip.decompress(content)
- return content
-def fetch_url(url, headers=(), timeout=15, report_text=None, data=None, cookiejar_send=None, cookiejar_receive=None, use_tor=True):
- '''
- When cookiejar_send is set to a CookieJar object,
- those cookies will be sent in the request (but cookies in response will not be merged into it)
- When cookiejar_receive is set to a CookieJar object,
- cookies received in the response will be merged into the object (nothing will be sent from it)
- When both are set to the same object, cookies will be sent from the object,
- and response cookies will be merged into it.
- '''
- headers = dict(headers) # Note: Calling dict() on a dict will make a copy
- headers['Accept-Encoding'] = 'gzip, br'
- # prevent python version being leaked by urllib if User-Agent isn't provided
- # (urllib will use ex. Python-urllib/3.6 otherwise)
- if 'User-Agent' not in headers and 'user-agent' not in headers and 'User-agent' not in headers:
- headers['User-Agent'] = 'Python-urllib'
- if data is not None:
- if isinstance(data, str):
- data = data.encode('ascii')
- elif not isinstance(data, bytes):
- data = urllib.parse.urlencode(data).encode('ascii')
- start_time = time.time()
- req = urllib.request.Request(url, data=data, headers=headers)
- cookie_processor = HTTPAsymmetricCookieProcessor(cookiejar_send=cookiejar_send, cookiejar_receive=cookiejar_receive)
- if use_tor and settings.route_tor:
- opener = urllib.request.build_opener(sockshandler.SocksiPyHandler(socks.PROXY_TYPE_SOCKS5, "", 9150), cookie_processor)
- else:
- opener = urllib.request.build_opener(cookie_processor)
- response = opener.open(req, timeout=timeout)
- response_time = time.time()
- content = response.read()
- read_finish = time.time()
- if report_text:
- print(report_text, ' Latency:', round(response_time - start_time,3), ' Read time:', round(read_finish - response_time,3))
- content = decode_content(content, response.getheader('Content-Encoding', default='identity'))
- return content
-mobile_user_agent = 'Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_1 like Mac OS X) AppleWebKit/603.1.30 (KHTML, like Gecko) Version/10.0 Mobile/14E304 Safari/602.1'
-mobile_ua = (('User-Agent', mobile_user_agent),)
-desktop_user_agent = 'Mozilla/5.0 (Windows NT 6.1; rv:52.0) Gecko/20100101 Firefox/52.0'
-desktop_ua = (('User-Agent', desktop_user_agent),)
-def dict_add(*dicts):
- for dictionary in dicts[1:]:
- dicts[0].update(dictionary)
- return dicts[0]
-def video_id(url):
- url_parts = urllib.parse.urlparse(url)
- return urllib.parse.parse_qs(url_parts.query)['v'][0]
-def uppercase_escape(s):
- return re.sub(
- r'\\U([0-9a-fA-F]{8})',
- lambda m: chr(int(m.group(1), base=16)), s)
-def default_multi_get(object, *keys, default):
- ''' Like dict.get(), but for nested dictionaries/sequences, supporting keys or indices. Last argument is the default value to use in case of any IndexErrors or KeyErrors '''
- try:
- for key in keys:
- object = object[key]
- return object
- except (IndexError, KeyError):
- return default
-def get_plain_text(node):
- try:
- return html.escape(node['simpleText'])
- except KeyError:
- return unformmated_text_runs(node['runs'])
-def unformmated_text_runs(runs):
- result = ''
- for text_run in runs:
- result += html.escape(text_run["text"])
- return result
-def format_text_runs(runs):
- if isinstance(runs, str):
- return runs
- result = ''
- for text_run in runs:
- if text_run.get("bold", False):
- result += "<b>" + html.escape(text_run["text"]) + "</b>"
- elif text_run.get('italics', False):
- result += "<i>" + html.escape(text_run["text"]) + "</i>"
- else:
- result += html.escape(text_run["text"])
- return result
-# default, sddefault, mqdefault, hqdefault, hq720
-def get_thumbnail_url(video_id):
- return "/i.ytimg.com/vi/" + video_id + "/mqdefault.jpg"
-def seconds_to_timestamp(seconds):
- seconds = int(seconds)
- hours, seconds = divmod(seconds,3600)
- minutes, seconds = divmod(seconds,60)
- if hours != 0:
- timestamp = str(hours) + ":"
- timestamp += str(minutes).zfill(2) # zfill pads with zeros
- else:
- timestamp = str(minutes)
- timestamp += ":" + str(seconds).zfill(2)
- return timestamp
-# -----
-# -----
-def small_video_item_html(item):
- video_info = json.dumps({key: item[key] for key in ('id', 'title', 'author', 'duration')})
- return small_video_item_template.substitute(
- title = html.escape(item["title"]),
- views = item["views"],
- author = html.escape(item["author"]),
- duration = item["duration"],
- url = URL_ORIGIN + "/watch?v=" + item["id"],
- thumbnail = get_thumbnail_url(item['id']),
- video_info = html.escape(video_info),
- )
-def small_playlist_item_html(item):
- return small_playlist_item_template.substitute(
- title=html.escape(item["title"]),
- size = item['size'],
- author="",
- url = URL_ORIGIN + "/playlist?list=" + item["id"],
- thumbnail= get_thumbnail_url(item['first_video_id']),
- )
-def medium_playlist_item_html(item):
- return medium_playlist_item_template.substitute(
- title=html.escape(item["title"]),
- size = item['size'],
- author=item['author'],
- author_url= URL_ORIGIN + item['author_url'],
- url = URL_ORIGIN + "/playlist?list=" + item["id"],
- thumbnail= item['thumbnail'],
- )
-def medium_video_item_html(medium_video_info):
- info = medium_video_info
- return medium_video_item_template.substitute(
- title=html.escape(info["title"]),
- views=info["views"],
- published = info["published"],
- description = format_text_runs(info["description"]),
- author=html.escape(info["author"]),
- author_url=info["author_url"],
- duration=info["duration"],
- url = URL_ORIGIN + "/watch?v=" + info["id"],
- thumbnail=info['thumbnail'],
- datetime='', # TODO
- )
-header_template = Template('''
- <header>
- <form id="site-search" action="/youtube.com/search">
- <input type="search" name="query" class="search-box" value="$search_box_value">
- <button type="submit" value="Search" class="search-button">Search</button>
- <div class="dropdown">
- <button class="dropdown-label">Options</button>
- <div class="css-sucks">
- <div class="dropdown-content">
- <h3>Sort by</h3>
- <input type="radio" id="sort_relevance" name="sort" value="0">
- <label for="sort_relevance">Relevance</label>
- <input type="radio" id="sort_upload_date" name="sort" value="2">
- <label for="sort_upload_date">Upload date</label>
- <input type="radio" id="sort_view_count" name="sort" value="3">
- <label for="sort_view_count">View count</label>
- <input type="radio" id="sort_rating" name="sort" value="1">
- <label for="sort_rating">Rating</label>
- <h3>Upload date</h3>
- <input type="radio" id="time_any" name="time" value="0">
- <label for="time_any">Any</label>
- <input type="radio" id="time_last_hour" name="time" value="1">
- <label for="time_last_hour">Last hour</label>
- <input type="radio" id="time_today" name="time" value="2">
- <label for="time_today">Today</label>
- <input type="radio" id="time_this_week" name="time" value="3">
- <label for="time_this_week">This week</label>
- <input type="radio" id="time_this_month" name="time" value="4">
- <label for="time_this_month">This month</label>
- <input type="radio" id="time_this_year" name="time" value="5">
- <label for="time_this_year">This year</label>
- <h3>Type</h3>
- <input type="radio" id="type_any" name="type" value="0">
- <label for="type_any">Any</label>
- <input type="radio" id="type_video" name="type" value="1">
- <label for="type_video">Video</label>
- <input type="radio" id="type_channel" name="type" value="2">
- <label for="type_channel">Channel</label>
- <input type="radio" id="type_playlist" name="type" value="3">
- <label for="type_playlist">Playlist</label>
- <input type="radio" id="type_movie" name="type" value="4">
- <label for="type_movie">Movie</label>
- <input type="radio" id="type_show" name="type" value="5">
- <label for="type_show">Show</label>
- <h3>Duration</h3>
- <input type="radio" id="duration_any" name="duration" value="0">
- <label for="duration_any">Any</label>
- <input type="radio" id="duration_short" name="duration" value="1">
- <label for="duration_short">Short (< 4 minutes)</label>
- <input type="radio" id="duration_long" name="duration" value="2">
- <label for="duration_long">Long (> 20 minutes)</label>
- </div>
- </div>
- </div>
- </form>
- <div id="header-right">
- <form id="playlist-edit" action="/youtube.com/edit_playlist" method="post" target="_self">
- <input name="playlist_name" id="playlist-name-selection" list="playlist-options" type="text">
- <datalist id="playlist-options">
- </datalist>
- <button type="submit" id="playlist-add-button" name="action" value="add">Add to playlist</button>
- <button type="reset" id="item-selection-reset">Clear selection</button>
- </form>
- <a href="/youtube.com/playlists" id="local-playlists">Local playlists</a>
- </div>
- </header>
-playlist_option_template = Template('''<option value="$name">$name</option>''')
-def get_header(search_box_value=""):
- playlists = ''
- for name in local_playlist.get_playlist_names():
- playlists += playlist_option_template.substitute(name = name)
- return header_template.substitute(playlists = playlists, search_box_value = html.escape(search_box_value))
-def get_url(node):
- try:
- return node['runs'][0]['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url']
- except KeyError:
- return node['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url']
-def get_text(node):
- try:
- return node['simpleText']
- except KeyError:
- pass
- try:
- return node['runs'][0]['text']
- except IndexError: # empty text runs
- return ''
-def get_formatted_text(node):
- try:
- return node['runs']
- except KeyError:
- return node['simpleText']
-def get_badges(node):
- badges = []
- for badge_node in node:
- badge = badge_node['metadataBadgeRenderer']['label']
- if badge.lower() != 'new':
- badges.append(badge)
- return badges
-def get_thumbnail(node):
- try:
- return node['thumbnails'][0]['url'] # polymer format
- except KeyError:
- return node['url'] # ajax format
-dispatch = {
-# polymer format
- 'title': ('title', get_text),
- 'publishedTimeText': ('published', get_text),
- 'videoId': ('id', lambda node: node),
- 'descriptionSnippet': ('description', get_formatted_text),
- 'lengthText': ('duration', get_text),
- 'thumbnail': ('thumbnail', get_thumbnail),
- 'thumbnails': ('thumbnail', lambda node: node[0]['thumbnails'][0]['url']),
- 'viewCountText': ('views', get_text),
- 'numVideosText': ('size', lambda node: get_text(node).split(' ')[0]), # the format is "324 videos"
- 'videoCountText': ('size', get_text),
- 'playlistId': ('id', lambda node: node),
- 'descriptionText': ('description', get_formatted_text),
- 'subscriberCountText': ('subscriber_count', get_text),
- 'channelId': ('id', lambda node: node),
- 'badges': ('badges', get_badges),
-# ajax format
- 'view_count_text': ('views', get_text),
- 'num_videos_text': ('size', lambda node: get_text(node).split(' ')[0]),
- 'owner_text': ('author', get_text),
- 'owner_endpoint': ('author_url', lambda node: node['url']),
- 'description': ('description', get_formatted_text),
- 'index': ('playlist_index', get_text),
- 'short_byline': ('author', get_text),
- 'length': ('duration', get_text),
- 'video_id': ('id', lambda node: node),
-def renderer_info(renderer):
- try:
- info = {}
- if 'viewCountText' in renderer: # prefer this one as it contains all the digits
- info['views'] = get_text(renderer['viewCountText'])
- elif 'shortViewCountText' in renderer:
- info['views'] = get_text(renderer['shortViewCountText'])
- if 'ownerText' in renderer:
- info['author'] = renderer['ownerText']['runs'][0]['text']
- info['author_url'] = renderer['ownerText']['runs'][0]['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url']
- try:
- overlays = renderer['thumbnailOverlays']
- except KeyError:
- pass
- else:
- for overlay in overlays:
- if 'thumbnailOverlayTimeStatusRenderer' in overlay:
- info['duration'] = get_text(overlay['thumbnailOverlayTimeStatusRenderer']['text'])
- # show renderers don't have videoCountText
- elif 'thumbnailOverlayBottomPanelRenderer' in overlay:
- info['size'] = get_text(overlay['thumbnailOverlayBottomPanelRenderer']['text'])
- # show renderers don't have playlistId, have to dig into the url to get it
- try:
- info['id'] = renderer['navigationEndpoint']['watchEndpoint']['playlistId']
- except KeyError:
- pass
- for key, node in renderer.items():
- if key in ('longBylineText', 'shortBylineText'):
- info['author'] = get_text(node)
- try:
- info['author_url'] = get_url(node)
- except KeyError:
- pass
- # show renderers don't have thumbnail key at top level, dig into thumbnailRenderer
- elif key == 'thumbnailRenderer' and 'showCustomThumbnailRenderer' in node:
- info['thumbnail'] = node['showCustomThumbnailRenderer']['thumbnail']['thumbnails'][0]['url']
- else:
- try:
- simple_key, function = dispatch[key]
- except KeyError:
- continue
- info[simple_key] = function(node)
- return info
- except KeyError:
- print(renderer)
- raise
-def ajax_info(item_json):
- try:
- info = {}
- for key, node in item_json.items():
- try:
- simple_key, function = dispatch[key]
- except KeyError:
- continue
- info[simple_key] = function(node)
- return info
- except KeyError:
- print(item_json)
- raise
-def badges_html(badges):
- return ' | '.join(map(html.escape, badges))
-html_transform_dispatch = {
- 'title': html.escape,
- 'published': html.escape,
- 'id': html.escape,
- 'description': format_text_runs,
- 'duration': html.escape,
- 'thumbnail': lambda url: html.escape('/' + url.lstrip('/')),
- 'size': html.escape,
- 'author': html.escape,
- 'author_url': lambda url: html.escape(URL_ORIGIN + url),
- 'views': html.escape,
- 'subscriber_count': html.escape,
- 'badges': badges_html,
- 'playlist_index': html.escape,
-def get_html_ready(item):
- html_ready = {}
- for key, value in item.items():
- try:
- function = html_transform_dispatch[key]
- except KeyError:
- continue
- html_ready[key] = function(value)
- return html_ready
-author_template_url = Template('''<address>By <a href="$author_url">$author</a></address>''')
-author_template = Template('''<address><b>$author</b></address>''')
-stat_templates = (
- Template('''<span class="views">$views</span>'''),
- Template('''<time datetime="$datetime">$published</time>'''),
-def get_stats(html_ready):
- stats = []
- if 'author' in html_ready:
- if 'author_url' in html_ready:
- stats.append(author_template_url.substitute(html_ready))
- else:
- stats.append(author_template.substitute(html_ready))
- for stat in stat_templates:
- try:
- stats.append(stat.strict_substitute(html_ready))
- except KeyError:
- pass
- return ' | '.join(stats)
-def video_item_html(item, template, html_exclude=set()):
- video_info = {}
- for key in ('id', 'title', 'author'):
- try:
- video_info[key] = item[key]
- except KeyError:
- video_info[key] = ''
- try:
- video_info['duration'] = item['duration']
- except KeyError:
- video_info['duration'] = 'Live' # livestreams don't have a duration
- html_ready = get_html_ready(item)
- html_ready['video_info'] = html.escape(json.dumps(video_info) )
- html_ready['url'] = URL_ORIGIN + "/watch?v=" + html_ready['id']
- html_ready['datetime'] = '' #TODO
- for key in html_exclude:
- del html_ready[key]
- html_ready['stats'] = get_stats(html_ready)
- return template.substitute(html_ready)
-def playlist_item_html(item, template, html_exclude=set()):
- html_ready = get_html_ready(item)
- html_ready['url'] = URL_ORIGIN + "/playlist?list=" + html_ready['id']
- html_ready['datetime'] = '' #TODO
- for key in html_exclude:
- del html_ready[key]
- html_ready['stats'] = get_stats(html_ready)
- return template.substitute(html_ready)
-def update_query_string(query_string, items):
- parameters = urllib.parse.parse_qs(query_string)
- parameters.update(items)
- return urllib.parse.urlencode(parameters, doseq=True)
-page_button_template = Template('''<a class="page-button" href="$href">$page</a>''')
-current_page_button_template = Template('''<div class="page-button">$page</div>''')
-def page_buttons_html(current_page, estimated_pages, url, current_query_string):
- if current_page <= 5:
- page_start = 1
- page_end = min(9, estimated_pages)
- else:
- page_start = current_page - 4
- page_end = min(current_page + 4, estimated_pages)
- result = ""
- for page in range(page_start, page_end+1):
- if page == current_page:
- template = current_page_button_template
- else:
- template = page_button_template
- result += template.substitute(page=page, href = url + "?" + update_query_string(current_query_string, {'page': [str(page)]}) )
- return result
-showing_results_for = Template('''
- <div class="showing-results-for">
- <div>Showing results for <a>$corrected_query</a></div>
- <div>Search instead for <a href="$original_query_url">$original_query</a></div>
- </div>
-did_you_mean = Template('''
- <div class="did-you-mean">
- <div>Did you mean <a href="$corrected_query_url">$corrected_query</a></div>
- </div>
-def renderer_html(renderer, additional_info={}, current_query_string=''):
- type = list(renderer.keys())[0]
- renderer = renderer[type]
- if type == 'itemSectionRenderer':
- return renderer_html(renderer['contents'][0], additional_info, current_query_string)
- if type == 'channelRenderer':
- info = renderer_info(renderer)
- html_ready = get_html_ready(info)
- html_ready['url'] = URL_ORIGIN + "/channel/" + html_ready['id']
- return medium_channel_item_template.substitute(html_ready)
- if type in ('movieRenderer', 'clarificationRenderer'):
- return ''
- info = renderer_info(renderer)
- info.update(additional_info)
- html_exclude = set(additional_info.keys())
- if type == 'compactVideoRenderer':
- return video_item_html(info, small_video_item_template, html_exclude=html_exclude)
- if type in ('compactPlaylistRenderer', 'compactRadioRenderer', 'compactShowRenderer'):
- return playlist_item_html(info, small_playlist_item_template, html_exclude=html_exclude)
- if type in ('videoRenderer', 'gridVideoRenderer'):
- return video_item_html(info, medium_video_item_template, html_exclude=html_exclude)
- if type in ('playlistRenderer', 'gridPlaylistRenderer', 'radioRenderer', 'gridRadioRenderer', 'gridShowRenderer', 'showRenderer'):
- return playlist_item_html(info, medium_playlist_item_template, html_exclude=html_exclude)
- #print(renderer)
- #raise NotImplementedError('Unknown renderer type: ' + type)
- return ''