diff options
Diffstat (limited to 'youtube/local_playlist.py')
| -rw-r--r-- | youtube/local_playlist.py | 173 |
1 files changed, 136 insertions, 37 deletions
diff --git a/youtube/local_playlist.py b/youtube/local_playlist.py index 776e992..e9b0b20 100644 --- a/youtube/local_playlist.py +++ b/youtube/local_playlist.py @@ -1,36 +1,74 @@ -from youtube import util, yt_data_extract +from youtube import util from youtube import yt_app import settings import os import json -import html import gevent -import urllib import math +import glob +import re import flask from flask import request -playlists_directory = os.path.join(settings.data_dir, "playlists") -thumbnails_directory = os.path.join(settings.data_dir, "playlist_thumbnails") +playlists_directory = os.path.join(settings.data_dir, 'playlists') +thumbnails_directory = os.path.join(settings.data_dir, 'playlist_thumbnails') + +# Whitelist accepted playlist names so user input cannot escape +# `playlists_directory` / `thumbnails_directory` (CWE-22, OWASP A01:2021). +# Allow letters, digits, spaces, dot, dash and underscore. +_PLAYLIST_NAME_RE = re.compile(r'^[\w .\-]{1,128}$') + + +def _validate_playlist_name(name): + '''Return the stripped name if safe, otherwise abort with 400.''' + if name is None: + flask.abort(400) + name = name.strip() + if not _PLAYLIST_NAME_RE.match(name): + flask.abort(400) + return name + + +def _find_playlist_path(name): + '''Find playlist file robustly, handling trailing spaces in filenames''' + name = _validate_playlist_name(name) + pattern = os.path.join(playlists_directory, name + '*.txt') + files = glob.glob(pattern) + return files[0] if files else os.path.join(playlists_directory, name + '.txt') + + +def _parse_playlist_lines(data): + """Parse playlist data lines robustly, skipping empty/malformed entries""" + videos = [] + for line in data.splitlines(): + clean_line = line.strip() + if not clean_line: + continue + try: + videos.append(json.loads(clean_line)) + except json.decoder.JSONDecodeError: + print('Corrupt playlist entry: ' + clean_line) + return videos def video_ids_in_playlist(name): try: - with open(os.path.join(playlists_directory, name + ".txt"), 'r', encoding='utf-8') as file: + playlist_path = _find_playlist_path(name) + with open(playlist_path, 'r', encoding='utf-8') as file: videos = file.read() - return set(json.loads(video)['id'] for video in videos.splitlines()) + return set(json.loads(line.strip())['id'] for line in videos.splitlines() if line.strip()) except FileNotFoundError: return set() def add_to_playlist(name, video_info_list): - if not os.path.exists(playlists_directory): - os.makedirs(playlists_directory) + os.makedirs(playlists_directory, exist_ok=True) ids = video_ids_in_playlist(name) missing_thumbnails = [] - with open(os.path.join(playlists_directory, name + ".txt"), "a", encoding='utf-8') as file: + playlist_path = _find_playlist_path(name) + with open(playlist_path, "a", encoding='utf-8') as file: for info in video_info_list: id = json.loads(info)['id'] if id not in ids: @@ -39,32 +77,48 @@ def add_to_playlist(name, video_info_list): gevent.spawn(util.download_thumbnails, os.path.join(thumbnails_directory, name), missing_thumbnails) -def get_local_playlist_videos(name, offset=0, amount=50): +def add_extra_info_to_videos(videos, playlist_name): + '''Adds extra information necessary for rendering the video item HTML + Downloads missing thumbnails''' try: - thumbnails = set(os.listdir(os.path.join(thumbnails_directory, name))) + thumbnails = set(os.listdir(os.path.join(thumbnails_directory, + playlist_name))) except FileNotFoundError: thumbnails = set() missing_thumbnails = [] - videos = [] - with open(os.path.join(playlists_directory, name + ".txt"), 'r', encoding='utf-8') as file: - data = file.read() - videos_json = data.splitlines() - for video_json in videos_json: - try: - info = json.loads(video_json) - if info['id'] + ".jpg" in thumbnails: - info['thumbnail'] = "/https://youtube.com/data/playlist_thumbnails/" + name + "/" + info['id'] + ".jpg" - else: - info['thumbnail'] = util.get_thumbnail_url(info['id']) - missing_thumbnails.append(info['id']) - info['type'] = 'video' - util.add_extra_html_info(info) - videos.append(info) - except json.decoder.JSONDecodeError: - if not video_json.strip() == '': - print('Corrupt playlist video entry: ' + video_json) - gevent.spawn(util.download_thumbnails, os.path.join(thumbnails_directory, name), missing_thumbnails) + for video in videos: + video['type'] = 'video' + util.add_extra_html_info(video) + if video['id'] + '.jpg' in thumbnails: + video['thumbnail'] = ( + '/https://youtube.com/data/playlist_thumbnails/' + + playlist_name + + '/' + video['id'] + '.jpg') + else: + video['thumbnail'] = util.get_thumbnail_url(video['id']) + missing_thumbnails.append(video['id']) + + gevent.spawn(util.download_thumbnails, + os.path.join(thumbnails_directory, playlist_name), + missing_thumbnails) + + +def read_playlist(name): + '''Returns a list of videos for the given playlist name''' + playlist_path = _find_playlist_path(name) + try: + with open(playlist_path, 'r', encoding='utf-8') as f: + data = f.read() + except FileNotFoundError: + return [] + + return _parse_playlist_lines(data) + + +def get_local_playlist_videos(name, offset=0, amount=50): + videos = read_playlist(name) + add_extra_info_to_videos(videos, name) return videos[offset:offset+amount], len(videos) @@ -81,14 +135,21 @@ def get_playlist_names(): def remove_from_playlist(name, video_info_list): ids = [json.loads(video)['id'] for video in video_info_list] - with open(os.path.join(playlists_directory, name + ".txt"), 'r', encoding='utf-8') as file: + playlist_path = _find_playlist_path(name) + with open(playlist_path, 'r', encoding='utf-8') as file: videos = file.read() videos_in = videos.splitlines() videos_out = [] for video in videos_in: - if json.loads(video)['id'] not in ids: - videos_out.append(video) - with open(os.path.join(playlists_directory, name + ".txt"), 'w', encoding='utf-8') as file: + clean = video.strip() + if not clean: + continue + try: + if json.loads(clean)['id'] not in ids: + videos_out.append(clean) + except json.decoder.JSONDecodeError: + pass + with open(playlist_path, 'w', encoding='utf-8') as file: file.write("\n".join(videos_out) + "\n") try: @@ -131,6 +192,35 @@ def path_edit_playlist(playlist_name): number_of_videos_remaining = remove_from_playlist(playlist_name, videos_to_remove) redirect_page_number = min(int(request.values.get('page', 1)), math.ceil(number_of_videos_remaining/50)) return flask.redirect(util.URL_ORIGIN + request.path + '?page=' + str(redirect_page_number)) + elif request.values['action'] == 'remove_playlist': + safe_name = _validate_playlist_name(playlist_name) + try: + os.remove(os.path.join(playlists_directory, safe_name + '.txt')) + except OSError: + pass + return flask.redirect(util.URL_ORIGIN + '/playlists') + elif request.values['action'] == 'export': + videos = read_playlist(playlist_name) + fmt = request.values['export_format'] + if fmt in ('ids', 'urls'): + prefix = '' + if fmt == 'urls': + prefix = 'https://www.youtube.com/watch?v=' + id_list = '\n'.join(prefix + v['id'] for v in videos) + id_list += '\n' + resp = flask.Response(id_list, mimetype='text/plain') + cd = 'attachment; filename="%s.txt"' % playlist_name + resp.headers['Content-Disposition'] = cd + return resp + elif fmt == 'json': + json_data = json.dumps({'videos': videos}, indent=2, + sort_keys=True) + resp = flask.Response(json_data, mimetype='text/json') + cd = 'attachment; filename="%s.json"' % playlist_name + resp.headers['Content-Disposition'] = cd + return resp + else: + flask.abort(400) else: flask.abort(400) @@ -145,8 +235,17 @@ def edit_playlist(): flask.abort(400) +_THUMBNAIL_RE = re.compile(r'^[A-Za-z0-9_-]{11}\.jpg$') + + @yt_app.route('/data/playlist_thumbnails/<playlist_name>/<thumbnail>') def serve_thumbnail(playlist_name, thumbnail): - # .. is necessary because flask always uses the application directory at ./youtube, not the working directory + # Validate both path components so a crafted URL cannot escape + # `thumbnails_directory` via `..` or NUL tricks (CWE-22). + safe_name = _validate_playlist_name(playlist_name) + if not _THUMBNAIL_RE.match(thumbnail): + flask.abort(400) + # .. is necessary because flask always uses the application directory at + # ./youtube, not the working directory. return flask.send_from_directory( - os.path.join('..', thumbnails_directory, playlist_name), thumbnail) + os.path.join('..', thumbnails_directory, safe_name), thumbnail) |
