diff options
Diffstat (limited to 'youtube/search.py')
| -rw-r--r-- | youtube/search.py | 166 |
1 files changed, 67 insertions, 99 deletions
diff --git a/youtube/search.py b/youtube/search.py index 0ddc84d..d586c62 100644 --- a/youtube/search.py +++ b/youtube/search.py @@ -1,17 +1,14 @@ +from youtube import util, yt_data_extract, proto, local_playlist +from youtube import yt_app +import settings + import json import urllib -import html -from string import Template import base64 -from math import ceil -from youtube.common import default_multi_get, get_thumbnail_url, URL_ORIGIN -from youtube import common, proto - -with open("yt_search_results_template.html", "r") as file: - yt_search_results_template = file.read() - -page_button_template = Template('''<a class="page-button" href="$href">$page</a>''') -current_page_button_template = Template('''<div class="page-button">$page</div>''') +import mimetypes +from flask import request +import flask +import os # Sort: 1 # Upload date: 2 @@ -56,97 +53,68 @@ def get_search_json(query, page, autocorrect, sort, filters): 'X-YouTube-Client-Version': '2.20180418', } url += "&pbj=1&sp=" + page_number_to_sp_parameter(page, autocorrect, sort, filters).replace("=", "%3D") - content = common.fetch_url(url, headers=headers, report_text="Got search results") + content = util.fetch_url(url, headers=headers, report_text="Got search results", debug_name='search_results') info = json.loads(content) return info - -def page_buttons_html(page_start, page_end, current_page, query): - result = "" - for page in range(page_start, page_end+1): - if page == current_page: - template = current_page_button_template - else: - template = page_button_template - result += template.substitute(page=page, href=URL_ORIGIN + "/search?query=" + urllib.parse.quote_plus(query) + "&page=" + str(page)) - return result -showing_results_for = Template(''' - <div>Showing results for <a>$corrected_query</a></div> - <div>Search instead for <a href="$original_query_url">$original_query</a></div> -''') -did_you_mean = Template(''' - <div>Did you mean <a href="$corrected_query_url">$corrected_query</a></div> -''') -def get_search_page(query_string, parameters=()): - qs_query = urllib.parse.parse_qs(query_string) - if len(qs_query) == 0: - return common.yt_basic_template.substitute( - page_title = "Search", - header = common.get_header(), - style = '', - page = '', - ) - query = qs_query["query"][0] - page = qs_query.get("page", "1")[0] - autocorrect = int(qs_query.get("autocorrect", "1")[0]) - sort = int(qs_query.get("sort", "0")[0]) +@yt_app.route('/results') +@yt_app.route('/search') +def get_search_page(): + query = request.args.get('search_query') or request.args.get('query') + if query is None: + return flask.render_template('home.html', title='Search') + elif query.startswith('https://www.youtube.com') or query.startswith('https://www.youtu.be'): + return flask.redirect(f'/{query}') + + page = request.args.get("page", "1") + autocorrect = int(request.args.get("autocorrect", "1")) + sort = int(request.args.get("sort", "0")) filters = {} - filters['time'] = int(qs_query.get("time", "0")[0]) - filters['type'] = int(qs_query.get("type", "0")[0]) - filters['duration'] = int(qs_query.get("duration", "0")[0]) - info = get_search_json(query, page, autocorrect, sort, filters) - - estimated_results = int(info[1]['response']['estimatedResults']) - estimated_pages = ceil(estimated_results/20) - results = info[1]['response']['contents']['twoColumnSearchResultsRenderer']['primaryContents']['sectionListRenderer']['contents'][0]['itemSectionRenderer']['contents'] - - corrections = '' - result_list_html = "" - for renderer in results: - type = list(renderer.keys())[0] - if type == 'shelfRenderer': - continue - if type == 'didYouMeanRenderer': - renderer = renderer[type] - corrected_query_string = urllib.parse.parse_qs(query_string) - corrected_query_string['query'] = [renderer['correctedQueryEndpoint']['searchEndpoint']['query']] - corrected_query_url = URL_ORIGIN + '/search?' + common.make_query_string(corrected_query_string) - corrections = did_you_mean.substitute( - corrected_query_url = corrected_query_url, - corrected_query = common.format_text_runs(renderer['correctedQuery']['runs']), - ) - continue - if type == 'showingResultsForRenderer': - renderer = renderer[type] - no_autocorrect_query_string = urllib.parse.parse_qs(query_string) - no_autocorrect_query_string['autocorrect'] = ['0'] - no_autocorrect_query_url = URL_ORIGIN + '/search?' + common.make_query_string(no_autocorrect_query_string) - corrections = showing_results_for.substitute( - corrected_query = common.format_text_runs(renderer['correctedQuery']['runs']), - original_query_url = no_autocorrect_query_url, - original_query = html.escape(renderer['originalQuery']['simpleText']), - ) - continue - result_list_html += common.renderer_html(renderer, current_query_string=query_string) - - page = int(page) - if page <= 5: - page_start = 1 - page_end = min(9, estimated_pages) - else: - page_start = page - 4 - page_end = min(page + 4, estimated_pages) - - - result = Template(yt_search_results_template).substitute( - header = common.get_header(query), - results = result_list_html, - page_title = query + " - Search", - search_box_value = html.escape(query), - number_of_results = '{:,}'.format(estimated_results), - number_of_pages = '{:,}'.format(estimated_pages), - page_buttons = page_buttons_html(page_start, page_end, page, query), - corrections = corrections + filters['time'] = int(request.args.get("time", "0")) + filters['type'] = int(request.args.get("type", "0")) + filters['duration'] = int(request.args.get("duration", "0")) + polymer_json = get_search_json(query, page, autocorrect, sort, filters) + + search_info = yt_data_extract.extract_search_info(polymer_json) + if search_info['error']: + return flask.render_template('error.html', error_message=search_info['error']) + + for extract_item_info in search_info['items']: + util.prefix_urls(extract_item_info) + util.add_extra_html_info(extract_item_info) + + corrections = search_info['corrections'] + if corrections['type'] == 'did_you_mean': + corrected_query_string = request.args.to_dict(flat=False) + corrected_query_string['search_query'] = [corrections['corrected_query']] + corrections['corrected_query_url'] = util.URL_ORIGIN + '/results?' + urllib.parse.urlencode(corrected_query_string, doseq=True) + elif corrections['type'] == 'showing_results_for': + no_autocorrect_query_string = request.args.to_dict(flat=False) + no_autocorrect_query_string['autocorrect'] = ['0'] + no_autocorrect_query_url = util.URL_ORIGIN + '/results?' + urllib.parse.urlencode(no_autocorrect_query_string, doseq=True) + corrections['original_query_url'] = no_autocorrect_query_url + + return flask.render_template( + 'search.html', + header_playlist_names=local_playlist.get_playlist_names(), + query=query, + estimated_results=search_info['estimated_results'], + estimated_pages=search_info['estimated_pages'], + corrections=search_info['corrections'], + results=search_info['items'], + parameters_dictionary=request.args, + ) + + +@yt_app.route('/opensearch.xml') +def get_search_engine_xml(): + with open(os.path.join(settings.program_directory, 'youtube/opensearch.xml'), 'rb') as f: + if settings.app_public: + main_url = '%s' % settings.app_url + else: + main_url = '%s:%s' % (settings.app_url, settings.port_number) + content = f.read().replace( + b'$main_url', str(main_url).encode() ) - return result
\ No newline at end of file + return flask.Response(content, mimetype='application/xml') |
