aboutsummaryrefslogtreecommitdiffstats
path: root/youtube/subscriptions.py
diff options
context:
space:
mode:
Diffstat (limited to 'youtube/subscriptions.py')
-rw-r--r--youtube/subscriptions.py35
1 files changed, 26 insertions, 9 deletions
diff --git a/youtube/subscriptions.py b/youtube/subscriptions.py
index 980822a..dafea58 100644
--- a/youtube/subscriptions.py
+++ b/youtube/subscriptions.py
@@ -292,7 +292,10 @@ def youtube_timestamp_to_posix(dumb_timestamp):
def posix_to_dumbed_down(posix_time):
'''Inverse of youtube_timestamp_to_posix.'''
delta = int(time.time() - posix_time)
- assert delta >= 0
+ # Guard against future timestamps (clock drift) without relying on
+ # `assert` (which is stripped under `python -O`).
+ if delta < 0:
+ delta = 0
if delta == 0:
return '0 seconds ago'
@@ -531,7 +534,8 @@ def _get_upstream_videos(channel_id):
return None
root = defusedxml.ElementTree.fromstring(feed)
- assert remove_bullshit(root.tag) == 'feed'
+ if remove_bullshit(root.tag) != 'feed':
+ raise ValueError('Root element is not <feed>')
for entry in root:
if (remove_bullshit(entry.tag) != 'entry'):
continue
@@ -539,13 +543,13 @@ def _get_upstream_videos(channel_id):
# it's yt:videoId in the xml but the yt: is turned into a namespace which is removed by remove_bullshit
video_id_element = find_element(entry, 'videoId')
time_published_element = find_element(entry, 'published')
- assert video_id_element is not None
- assert time_published_element is not None
+ if video_id_element is None or time_published_element is None:
+ raise ValueError('Missing videoId or published element')
time_published = int(calendar.timegm(time.strptime(time_published_element.text, '%Y-%m-%dT%H:%M:%S+00:00')))
times_published[video_id_element.text] = time_published
- except AssertionError:
+ except ValueError:
print('Failed to read atoma feed for ' + channel_status_name)
traceback.print_exc()
except defusedxml.ElementTree.ParseError:
@@ -593,7 +597,10 @@ def _get_upstream_videos(channel_id):
# Special case: none of the videos have a time published.
# In this case, make something up
if videos and videos[0]['time_published'] is None:
- assert all(v['time_published'] is None for v in videos)
+ # Invariant: if the first video has no timestamp, earlier passes
+ # ensure all of them are unset. Don't rely on `assert`.
+ if not all(v['time_published'] is None for v in videos):
+ raise RuntimeError('Inconsistent time_published state')
now = time.time()
for i in range(len(videos)):
# 1 month between videos
@@ -808,7 +815,8 @@ def import_subscriptions():
file = file.read().decode('utf-8')
try:
root = defusedxml.ElementTree.fromstring(file)
- assert root.tag == 'opml'
+ if root.tag != 'opml':
+ raise ValueError('Root element is not <opml>')
channels = []
for outline_element in root[0][0]:
if (outline_element.tag != 'outline') or ('xmlUrl' not in outline_element.attrib):
@@ -819,7 +827,7 @@ def import_subscriptions():
channel_id = channel_rss_url[channel_rss_url.find('channel_id=')+11:].strip()
channels.append((channel_id, channel_name))
- except (AssertionError, IndexError, defusedxml.ElementTree.ParseError) as e:
+ except (ValueError, IndexError, defusedxml.ElementTree.ParseError):
return '400 Bad Request: Unable to read opml xml file, or the file is not the expected format', 400
elif mime_type in ('text/csv', 'application/vnd.ms-excel'):
content = file.read().decode('utf-8')
@@ -1071,11 +1079,20 @@ def post_subscriptions_page():
return '', 204
+# YouTube video IDs are exactly 11 chars from [A-Za-z0-9_-]. Enforce this
+# before using the value in filesystem paths to prevent path traversal
+# (CWE-22, OWASP A01:2021).
+_VIDEO_ID_RE = re.compile(r'^[A-Za-z0-9_-]{11}$')
+
+
@yt_app.route('/data/subscription_thumbnails/<thumbnail>')
def serve_subscription_thumbnail(thumbnail):
'''Serves thumbnail from disk if it's been saved already. If not, downloads the thumbnail, saves to disk, and serves it.'''
- assert thumbnail[-4:] == '.jpg'
+ if not thumbnail.endswith('.jpg'):
+ flask.abort(400)
video_id = thumbnail[0:-4]
+ if not _VIDEO_ID_RE.match(video_id):
+ flask.abort(400)
thumbnail_path = os.path.join(thumbnails_directory, thumbnail)
if video_id in existing_thumbnails: