aboutsummaryrefslogtreecommitdiffstats
path: root/youtube/local_playlist.py
diff options
context:
space:
mode:
Diffstat (limited to 'youtube/local_playlist.py')
-rw-r--r--youtube/local_playlist.py106
1 files changed, 78 insertions, 28 deletions
diff --git a/youtube/local_playlist.py b/youtube/local_playlist.py
index 968f1a6..e9b0b20 100644
--- a/youtube/local_playlist.py
+++ b/youtube/local_playlist.py
@@ -1,26 +1,64 @@
-from youtube import util, yt_data_extract
+from youtube import util
from youtube import yt_app
import settings
import os
import json
-import html
import gevent
-import urllib
import math
+import glob
+import re
import flask
from flask import request
-playlists_directory = os.path.join(settings.data_dir, "playlists")
-thumbnails_directory = os.path.join(settings.data_dir, "playlist_thumbnails")
+playlists_directory = os.path.join(settings.data_dir, 'playlists')
+thumbnails_directory = os.path.join(settings.data_dir, 'playlist_thumbnails')
+
+# Whitelist accepted playlist names so user input cannot escape
+# `playlists_directory` / `thumbnails_directory` (CWE-22, OWASP A01:2021).
+# Allow letters, digits, spaces, dot, dash and underscore.
+_PLAYLIST_NAME_RE = re.compile(r'^[\w .\-]{1,128}$')
+
+
+def _validate_playlist_name(name):
+ '''Return the stripped name if safe, otherwise abort with 400.'''
+ if name is None:
+ flask.abort(400)
+ name = name.strip()
+ if not _PLAYLIST_NAME_RE.match(name):
+ flask.abort(400)
+ return name
+
+
+def _find_playlist_path(name):
+ '''Find playlist file robustly, handling trailing spaces in filenames'''
+ name = _validate_playlist_name(name)
+ pattern = os.path.join(playlists_directory, name + '*.txt')
+ files = glob.glob(pattern)
+ return files[0] if files else os.path.join(playlists_directory, name + '.txt')
+
+
+def _parse_playlist_lines(data):
+ """Parse playlist data lines robustly, skipping empty/malformed entries"""
+ videos = []
+ for line in data.splitlines():
+ clean_line = line.strip()
+ if not clean_line:
+ continue
+ try:
+ videos.append(json.loads(clean_line))
+ except json.decoder.JSONDecodeError:
+ print('Corrupt playlist entry: ' + clean_line)
+ return videos
def video_ids_in_playlist(name):
try:
- with open(os.path.join(playlists_directory, name + ".txt"), 'r', encoding='utf-8') as file:
+ playlist_path = _find_playlist_path(name)
+ with open(playlist_path, 'r', encoding='utf-8') as file:
videos = file.read()
- return set(json.loads(video)['id'] for video in videos.splitlines())
+ return set(json.loads(line.strip())['id'] for line in videos.splitlines() if line.strip())
except FileNotFoundError:
return set()
@@ -29,7 +67,8 @@ def add_to_playlist(name, video_info_list):
os.makedirs(playlists_directory, exist_ok=True)
ids = video_ids_in_playlist(name)
missing_thumbnails = []
- with open(os.path.join(playlists_directory, name + ".txt"), "a", encoding='utf-8') as file:
+ playlist_path = _find_playlist_path(name)
+ with open(playlist_path, "a", encoding='utf-8') as file:
for info in video_info_list:
id = json.loads(info)['id']
if id not in ids:
@@ -67,20 +106,14 @@ def add_extra_info_to_videos(videos, playlist_name):
def read_playlist(name):
'''Returns a list of videos for the given playlist name'''
- playlist_path = os.path.join(playlists_directory, name + '.txt')
- with open(playlist_path, 'r', encoding='utf-8') as f:
- data = f.read()
+ playlist_path = _find_playlist_path(name)
+ try:
+ with open(playlist_path, 'r', encoding='utf-8') as f:
+ data = f.read()
+ except FileNotFoundError:
+ return []
- videos = []
- videos_json = data.splitlines()
- for video_json in videos_json:
- try:
- info = json.loads(video_json)
- videos.append(info)
- except json.decoder.JSONDecodeError:
- if not video_json.strip() == '':
- print('Corrupt playlist video entry: ' + video_json)
- return videos
+ return _parse_playlist_lines(data)
def get_local_playlist_videos(name, offset=0, amount=50):
@@ -102,14 +135,21 @@ def get_playlist_names():
def remove_from_playlist(name, video_info_list):
ids = [json.loads(video)['id'] for video in video_info_list]
- with open(os.path.join(playlists_directory, name + ".txt"), 'r', encoding='utf-8') as file:
+ playlist_path = _find_playlist_path(name)
+ with open(playlist_path, 'r', encoding='utf-8') as file:
videos = file.read()
videos_in = videos.splitlines()
videos_out = []
for video in videos_in:
- if json.loads(video)['id'] not in ids:
- videos_out.append(video)
- with open(os.path.join(playlists_directory, name + ".txt"), 'w', encoding='utf-8') as file:
+ clean = video.strip()
+ if not clean:
+ continue
+ try:
+ if json.loads(clean)['id'] not in ids:
+ videos_out.append(clean)
+ except json.decoder.JSONDecodeError:
+ pass
+ with open(playlist_path, 'w', encoding='utf-8') as file:
file.write("\n".join(videos_out) + "\n")
try:
@@ -153,8 +193,9 @@ def path_edit_playlist(playlist_name):
redirect_page_number = min(int(request.values.get('page', 1)), math.ceil(number_of_videos_remaining/50))
return flask.redirect(util.URL_ORIGIN + request.path + '?page=' + str(redirect_page_number))
elif request.values['action'] == 'remove_playlist':
+ safe_name = _validate_playlist_name(playlist_name)
try:
- os.remove(os.path.join(playlists_directory, playlist_name + ".txt"))
+ os.remove(os.path.join(playlists_directory, safe_name + '.txt'))
except OSError:
pass
return flask.redirect(util.URL_ORIGIN + '/playlists')
@@ -194,8 +235,17 @@ def edit_playlist():
flask.abort(400)
+_THUMBNAIL_RE = re.compile(r'^[A-Za-z0-9_-]{11}\.jpg$')
+
+
@yt_app.route('/data/playlist_thumbnails/<playlist_name>/<thumbnail>')
def serve_thumbnail(playlist_name, thumbnail):
- # .. is necessary because flask always uses the application directory at ./youtube, not the working directory
+ # Validate both path components so a crafted URL cannot escape
+ # `thumbnails_directory` via `..` or NUL tricks (CWE-22).
+ safe_name = _validate_playlist_name(playlist_name)
+ if not _THUMBNAIL_RE.match(thumbnail):
+ flask.abort(400)
+ # .. is necessary because flask always uses the application directory at
+ # ./youtube, not the working directory.
return flask.send_from_directory(
- os.path.join('..', thumbnails_directory, playlist_name), thumbnail)
+ os.path.join('..', thumbnails_directory, safe_name), thumbnail)