aboutsummaryrefslogtreecommitdiffstats
path: root/youtube/search.py
diff options
context:
space:
mode:
Diffstat (limited to 'youtube/search.py')
-rw-r--r--youtube/search.py166
1 files changed, 67 insertions, 99 deletions
diff --git a/youtube/search.py b/youtube/search.py
index 0ddc84d..d586c62 100644
--- a/youtube/search.py
+++ b/youtube/search.py
@@ -1,17 +1,14 @@
+from youtube import util, yt_data_extract, proto, local_playlist
+from youtube import yt_app
+import settings
+
import json
import urllib
-import html
-from string import Template
import base64
-from math import ceil
-from youtube.common import default_multi_get, get_thumbnail_url, URL_ORIGIN
-from youtube import common, proto
-
-with open("yt_search_results_template.html", "r") as file:
- yt_search_results_template = file.read()
-
-page_button_template = Template('''<a class="page-button" href="$href">$page</a>''')
-current_page_button_template = Template('''<div class="page-button">$page</div>''')
+import mimetypes
+from flask import request
+import flask
+import os
# Sort: 1
# Upload date: 2
@@ -56,97 +53,68 @@ def get_search_json(query, page, autocorrect, sort, filters):
'X-YouTube-Client-Version': '2.20180418',
}
url += "&pbj=1&sp=" + page_number_to_sp_parameter(page, autocorrect, sort, filters).replace("=", "%3D")
- content = common.fetch_url(url, headers=headers, report_text="Got search results")
+ content = util.fetch_url(url, headers=headers, report_text="Got search results", debug_name='search_results')
info = json.loads(content)
return info
-
-def page_buttons_html(page_start, page_end, current_page, query):
- result = ""
- for page in range(page_start, page_end+1):
- if page == current_page:
- template = current_page_button_template
- else:
- template = page_button_template
- result += template.substitute(page=page, href=URL_ORIGIN + "/search?query=" + urllib.parse.quote_plus(query) + "&page=" + str(page))
- return result
-showing_results_for = Template('''
- <div>Showing results for <a>$corrected_query</a></div>
- <div>Search instead for <a href="$original_query_url">$original_query</a></div>
-''')
-did_you_mean = Template('''
- <div>Did you mean <a href="$corrected_query_url">$corrected_query</a></div>
-''')
-def get_search_page(query_string, parameters=()):
- qs_query = urllib.parse.parse_qs(query_string)
- if len(qs_query) == 0:
- return common.yt_basic_template.substitute(
- page_title = "Search",
- header = common.get_header(),
- style = '',
- page = '',
- )
- query = qs_query["query"][0]
- page = qs_query.get("page", "1")[0]
- autocorrect = int(qs_query.get("autocorrect", "1")[0])
- sort = int(qs_query.get("sort", "0")[0])
+@yt_app.route('/results')
+@yt_app.route('/search')
+def get_search_page():
+ query = request.args.get('search_query') or request.args.get('query')
+ if query is None:
+ return flask.render_template('home.html', title='Search')
+ elif query.startswith('https://www.youtube.com') or query.startswith('https://www.youtu.be'):
+ return flask.redirect(f'/{query}')
+
+ page = request.args.get("page", "1")
+ autocorrect = int(request.args.get("autocorrect", "1"))
+ sort = int(request.args.get("sort", "0"))
filters = {}
- filters['time'] = int(qs_query.get("time", "0")[0])
- filters['type'] = int(qs_query.get("type", "0")[0])
- filters['duration'] = int(qs_query.get("duration", "0")[0])
- info = get_search_json(query, page, autocorrect, sort, filters)
-
- estimated_results = int(info[1]['response']['estimatedResults'])
- estimated_pages = ceil(estimated_results/20)
- results = info[1]['response']['contents']['twoColumnSearchResultsRenderer']['primaryContents']['sectionListRenderer']['contents'][0]['itemSectionRenderer']['contents']
-
- corrections = ''
- result_list_html = ""
- for renderer in results:
- type = list(renderer.keys())[0]
- if type == 'shelfRenderer':
- continue
- if type == 'didYouMeanRenderer':
- renderer = renderer[type]
- corrected_query_string = urllib.parse.parse_qs(query_string)
- corrected_query_string['query'] = [renderer['correctedQueryEndpoint']['searchEndpoint']['query']]
- corrected_query_url = URL_ORIGIN + '/search?' + common.make_query_string(corrected_query_string)
- corrections = did_you_mean.substitute(
- corrected_query_url = corrected_query_url,
- corrected_query = common.format_text_runs(renderer['correctedQuery']['runs']),
- )
- continue
- if type == 'showingResultsForRenderer':
- renderer = renderer[type]
- no_autocorrect_query_string = urllib.parse.parse_qs(query_string)
- no_autocorrect_query_string['autocorrect'] = ['0']
- no_autocorrect_query_url = URL_ORIGIN + '/search?' + common.make_query_string(no_autocorrect_query_string)
- corrections = showing_results_for.substitute(
- corrected_query = common.format_text_runs(renderer['correctedQuery']['runs']),
- original_query_url = no_autocorrect_query_url,
- original_query = html.escape(renderer['originalQuery']['simpleText']),
- )
- continue
- result_list_html += common.renderer_html(renderer, current_query_string=query_string)
-
- page = int(page)
- if page <= 5:
- page_start = 1
- page_end = min(9, estimated_pages)
- else:
- page_start = page - 4
- page_end = min(page + 4, estimated_pages)
-
-
- result = Template(yt_search_results_template).substitute(
- header = common.get_header(query),
- results = result_list_html,
- page_title = query + " - Search",
- search_box_value = html.escape(query),
- number_of_results = '{:,}'.format(estimated_results),
- number_of_pages = '{:,}'.format(estimated_pages),
- page_buttons = page_buttons_html(page_start, page_end, page, query),
- corrections = corrections
+ filters['time'] = int(request.args.get("time", "0"))
+ filters['type'] = int(request.args.get("type", "0"))
+ filters['duration'] = int(request.args.get("duration", "0"))
+ polymer_json = get_search_json(query, page, autocorrect, sort, filters)
+
+ search_info = yt_data_extract.extract_search_info(polymer_json)
+ if search_info['error']:
+ return flask.render_template('error.html', error_message=search_info['error'])
+
+ for extract_item_info in search_info['items']:
+ util.prefix_urls(extract_item_info)
+ util.add_extra_html_info(extract_item_info)
+
+ corrections = search_info['corrections']
+ if corrections['type'] == 'did_you_mean':
+ corrected_query_string = request.args.to_dict(flat=False)
+ corrected_query_string['search_query'] = [corrections['corrected_query']]
+ corrections['corrected_query_url'] = util.URL_ORIGIN + '/results?' + urllib.parse.urlencode(corrected_query_string, doseq=True)
+ elif corrections['type'] == 'showing_results_for':
+ no_autocorrect_query_string = request.args.to_dict(flat=False)
+ no_autocorrect_query_string['autocorrect'] = ['0']
+ no_autocorrect_query_url = util.URL_ORIGIN + '/results?' + urllib.parse.urlencode(no_autocorrect_query_string, doseq=True)
+ corrections['original_query_url'] = no_autocorrect_query_url
+
+ return flask.render_template(
+ 'search.html',
+ header_playlist_names=local_playlist.get_playlist_names(),
+ query=query,
+ estimated_results=search_info['estimated_results'],
+ estimated_pages=search_info['estimated_pages'],
+ corrections=search_info['corrections'],
+ results=search_info['items'],
+ parameters_dictionary=request.args,
+ )
+
+
+@yt_app.route('/opensearch.xml')
+def get_search_engine_xml():
+ with open(os.path.join(settings.program_directory, 'youtube/opensearch.xml'), 'rb') as f:
+ if settings.app_public:
+ main_url = '%s' % settings.app_url
+ else:
+ main_url = '%s:%s' % (settings.app_url, settings.port_number)
+ content = f.read().replace(
+ b'$main_url', str(main_url).encode()
)
- return result \ No newline at end of file
+ return flask.Response(content, mimetype='application/xml')