aboutsummaryrefslogtreecommitdiffstats
path: root/youtube/local_playlist.py
diff options
context:
space:
mode:
Diffstat (limited to 'youtube/local_playlist.py')
-rw-r--r--youtube/local_playlist.py173
1 files changed, 136 insertions, 37 deletions
diff --git a/youtube/local_playlist.py b/youtube/local_playlist.py
index 776e992..e9b0b20 100644
--- a/youtube/local_playlist.py
+++ b/youtube/local_playlist.py
@@ -1,36 +1,74 @@
-from youtube import util, yt_data_extract
+from youtube import util
from youtube import yt_app
import settings
import os
import json
-import html
import gevent
-import urllib
import math
+import glob
+import re
import flask
from flask import request
-playlists_directory = os.path.join(settings.data_dir, "playlists")
-thumbnails_directory = os.path.join(settings.data_dir, "playlist_thumbnails")
+playlists_directory = os.path.join(settings.data_dir, 'playlists')
+thumbnails_directory = os.path.join(settings.data_dir, 'playlist_thumbnails')
+
+# Whitelist accepted playlist names so user input cannot escape
+# `playlists_directory` / `thumbnails_directory` (CWE-22, OWASP A01:2021).
+# Allow letters, digits, spaces, dot, dash and underscore.
+_PLAYLIST_NAME_RE = re.compile(r'^[\w .\-]{1,128}$')
+
+
+def _validate_playlist_name(name):
+ '''Return the stripped name if safe, otherwise abort with 400.'''
+ if name is None:
+ flask.abort(400)
+ name = name.strip()
+ if not _PLAYLIST_NAME_RE.match(name):
+ flask.abort(400)
+ return name
+
+
+def _find_playlist_path(name):
+ '''Find playlist file robustly, handling trailing spaces in filenames'''
+ name = _validate_playlist_name(name)
+ pattern = os.path.join(playlists_directory, name + '*.txt')
+ files = glob.glob(pattern)
+ return files[0] if files else os.path.join(playlists_directory, name + '.txt')
+
+
+def _parse_playlist_lines(data):
+ """Parse playlist data lines robustly, skipping empty/malformed entries"""
+ videos = []
+ for line in data.splitlines():
+ clean_line = line.strip()
+ if not clean_line:
+ continue
+ try:
+ videos.append(json.loads(clean_line))
+ except json.decoder.JSONDecodeError:
+ print('Corrupt playlist entry: ' + clean_line)
+ return videos
def video_ids_in_playlist(name):
try:
- with open(os.path.join(playlists_directory, name + ".txt"), 'r', encoding='utf-8') as file:
+ playlist_path = _find_playlist_path(name)
+ with open(playlist_path, 'r', encoding='utf-8') as file:
videos = file.read()
- return set(json.loads(video)['id'] for video in videos.splitlines())
+ return set(json.loads(line.strip())['id'] for line in videos.splitlines() if line.strip())
except FileNotFoundError:
return set()
def add_to_playlist(name, video_info_list):
- if not os.path.exists(playlists_directory):
- os.makedirs(playlists_directory)
+ os.makedirs(playlists_directory, exist_ok=True)
ids = video_ids_in_playlist(name)
missing_thumbnails = []
- with open(os.path.join(playlists_directory, name + ".txt"), "a", encoding='utf-8') as file:
+ playlist_path = _find_playlist_path(name)
+ with open(playlist_path, "a", encoding='utf-8') as file:
for info in video_info_list:
id = json.loads(info)['id']
if id not in ids:
@@ -39,32 +77,48 @@ def add_to_playlist(name, video_info_list):
gevent.spawn(util.download_thumbnails, os.path.join(thumbnails_directory, name), missing_thumbnails)
-def get_local_playlist_videos(name, offset=0, amount=50):
+def add_extra_info_to_videos(videos, playlist_name):
+ '''Adds extra information necessary for rendering the video item HTML
+ Downloads missing thumbnails'''
try:
- thumbnails = set(os.listdir(os.path.join(thumbnails_directory, name)))
+ thumbnails = set(os.listdir(os.path.join(thumbnails_directory,
+ playlist_name)))
except FileNotFoundError:
thumbnails = set()
missing_thumbnails = []
- videos = []
- with open(os.path.join(playlists_directory, name + ".txt"), 'r', encoding='utf-8') as file:
- data = file.read()
- videos_json = data.splitlines()
- for video_json in videos_json:
- try:
- info = json.loads(video_json)
- if info['id'] + ".jpg" in thumbnails:
- info['thumbnail'] = "/https://youtube.com/data/playlist_thumbnails/" + name + "/" + info['id'] + ".jpg"
- else:
- info['thumbnail'] = util.get_thumbnail_url(info['id'])
- missing_thumbnails.append(info['id'])
- info['type'] = 'video'
- util.add_extra_html_info(info)
- videos.append(info)
- except json.decoder.JSONDecodeError:
- if not video_json.strip() == '':
- print('Corrupt playlist video entry: ' + video_json)
- gevent.spawn(util.download_thumbnails, os.path.join(thumbnails_directory, name), missing_thumbnails)
+ for video in videos:
+ video['type'] = 'video'
+ util.add_extra_html_info(video)
+ if video['id'] + '.jpg' in thumbnails:
+ video['thumbnail'] = (
+ '/https://youtube.com/data/playlist_thumbnails/'
+ + playlist_name
+ + '/' + video['id'] + '.jpg')
+ else:
+ video['thumbnail'] = util.get_thumbnail_url(video['id'])
+ missing_thumbnails.append(video['id'])
+
+ gevent.spawn(util.download_thumbnails,
+ os.path.join(thumbnails_directory, playlist_name),
+ missing_thumbnails)
+
+
+def read_playlist(name):
+ '''Returns a list of videos for the given playlist name'''
+ playlist_path = _find_playlist_path(name)
+ try:
+ with open(playlist_path, 'r', encoding='utf-8') as f:
+ data = f.read()
+ except FileNotFoundError:
+ return []
+
+ return _parse_playlist_lines(data)
+
+
+def get_local_playlist_videos(name, offset=0, amount=50):
+ videos = read_playlist(name)
+ add_extra_info_to_videos(videos, name)
return videos[offset:offset+amount], len(videos)
@@ -81,14 +135,21 @@ def get_playlist_names():
def remove_from_playlist(name, video_info_list):
ids = [json.loads(video)['id'] for video in video_info_list]
- with open(os.path.join(playlists_directory, name + ".txt"), 'r', encoding='utf-8') as file:
+ playlist_path = _find_playlist_path(name)
+ with open(playlist_path, 'r', encoding='utf-8') as file:
videos = file.read()
videos_in = videos.splitlines()
videos_out = []
for video in videos_in:
- if json.loads(video)['id'] not in ids:
- videos_out.append(video)
- with open(os.path.join(playlists_directory, name + ".txt"), 'w', encoding='utf-8') as file:
+ clean = video.strip()
+ if not clean:
+ continue
+ try:
+ if json.loads(clean)['id'] not in ids:
+ videos_out.append(clean)
+ except json.decoder.JSONDecodeError:
+ pass
+ with open(playlist_path, 'w', encoding='utf-8') as file:
file.write("\n".join(videos_out) + "\n")
try:
@@ -131,6 +192,35 @@ def path_edit_playlist(playlist_name):
number_of_videos_remaining = remove_from_playlist(playlist_name, videos_to_remove)
redirect_page_number = min(int(request.values.get('page', 1)), math.ceil(number_of_videos_remaining/50))
return flask.redirect(util.URL_ORIGIN + request.path + '?page=' + str(redirect_page_number))
+ elif request.values['action'] == 'remove_playlist':
+ safe_name = _validate_playlist_name(playlist_name)
+ try:
+ os.remove(os.path.join(playlists_directory, safe_name + '.txt'))
+ except OSError:
+ pass
+ return flask.redirect(util.URL_ORIGIN + '/playlists')
+ elif request.values['action'] == 'export':
+ videos = read_playlist(playlist_name)
+ fmt = request.values['export_format']
+ if fmt in ('ids', 'urls'):
+ prefix = ''
+ if fmt == 'urls':
+ prefix = 'https://www.youtube.com/watch?v='
+ id_list = '\n'.join(prefix + v['id'] for v in videos)
+ id_list += '\n'
+ resp = flask.Response(id_list, mimetype='text/plain')
+ cd = 'attachment; filename="%s.txt"' % playlist_name
+ resp.headers['Content-Disposition'] = cd
+ return resp
+ elif fmt == 'json':
+ json_data = json.dumps({'videos': videos}, indent=2,
+ sort_keys=True)
+ resp = flask.Response(json_data, mimetype='text/json')
+ cd = 'attachment; filename="%s.json"' % playlist_name
+ resp.headers['Content-Disposition'] = cd
+ return resp
+ else:
+ flask.abort(400)
else:
flask.abort(400)
@@ -145,8 +235,17 @@ def edit_playlist():
flask.abort(400)
+_THUMBNAIL_RE = re.compile(r'^[A-Za-z0-9_-]{11}\.jpg$')
+
+
@yt_app.route('/data/playlist_thumbnails/<playlist_name>/<thumbnail>')
def serve_thumbnail(playlist_name, thumbnail):
- # .. is necessary because flask always uses the application directory at ./youtube, not the working directory
+ # Validate both path components so a crafted URL cannot escape
+ # `thumbnails_directory` via `..` or NUL tricks (CWE-22).
+ safe_name = _validate_playlist_name(playlist_name)
+ if not _THUMBNAIL_RE.match(thumbnail):
+ flask.abort(400)
+ # .. is necessary because flask always uses the application directory at
+ # ./youtube, not the working directory.
return flask.send_from_directory(
- os.path.join('..', thumbnails_directory, playlist_name), thumbnail)
+ os.path.join('..', thumbnails_directory, safe_name), thumbnail)