From 27ee2990e97b1e73df41c1512332a1facd56f759 Mon Sep 17 00:00:00 2001 From: James Taylor Date: Sun, 9 Jun 2019 16:03:20 -0700 Subject: Overhaul refresh system, make it asynchronous --- youtube/subscriptions.py | 144 ++++++++++++++++++++++++++++++++++------- youtube/util.py | 54 +++++++++++++++- yt_subscriptions_template.html | 1 + 3 files changed, 172 insertions(+), 27 deletions(-) diff --git a/youtube/subscriptions.py b/youtube/subscriptions.py index ff74d94..ba27655 100644 --- a/youtube/subscriptions.py +++ b/youtube/subscriptions.py @@ -26,7 +26,7 @@ database_path = os.path.join(settings.data_dir, "subscriptions.sqlite") def open_database(): if not os.path.exists(settings.data_dir): os.makedirs(settings.data_dir) - connection = sqlite3.connect(database_path) + connection = sqlite3.connect(database_path, check_same_thread=False) # Create tables if they don't exist try: @@ -172,17 +172,75 @@ def youtube_timestamp_to_posix(dumb_timestamp): unit = unit[:-1] # remove s from end return now - number*units[unit] + +try: + existing_thumbnails = set(os.path.splitext(name)[0] for name in os.listdir(thumbnails_directory)) +except FileNotFoundError: + existing_thumbnails = set() + + +thumbnails_queue = util.RateLimitedQueue() +check_channels_queue = util.RateLimitedQueue() + + # Use this to mark a thumbnail acceptable to be retrieved at the request of the browser +# can't simply check if it's in the queue because items are removed when the download starts, not when it finishes downloading_thumbnails = set() -def download_thumbnails(thumbnails_directory, thumbnails): - try: - g = gevent.spawn(util.download_thumbnails, thumbnails_directory, thumbnails) - g.join() - finally: - downloading_thumbnails.difference_update(thumbnails) + +checking_channels = set() + +# Just to use for printing channel checking status to console without opening database +channel_names = dict() + +def download_thumbnail_worker(): + while True: + video_id = thumbnails_queue.get() + try: + success = util.download_thumbnail(thumbnails_directory, video_id) + if success: + existing_thumbnails.add(video_id) + except Exception: + traceback.print_exc() + finally: + downloading_thumbnails.remove(video_id) + +def check_channel_worker(): + while True: + channel_id = check_channels_queue.get() + try: + _get_upstream_videos(channel_id) + finally: + checking_channels.remove(channel_id) + +for i in range(0,5): + gevent.spawn(download_thumbnail_worker) + gevent.spawn(check_channel_worker) + + + + + + +def download_thumbnails_if_necessary(thumbnails): + for video_id in thumbnails: + if video_id not in existing_thumbnails and video_id not in downloading_thumbnails: + downloading_thumbnails.add(video_id) + thumbnails_queue.put(video_id) + +def check_channels_if_necessary(channel_ids): + for channel_id in channel_ids: + if channel_id not in checking_channels: + checking_channels.add(channel_id) + check_channels_queue.put(channel_id) + def _get_upstream_videos(channel_id): + try: + print("Checking channel: " + channel_names[channel_id]) + except KeyError: + print("Checking channel " + channel_id) + videos = [] json_channel_videos = channel.get_grid_items(channel.get_channel_tab(channel_id)[1]['response']) @@ -190,23 +248,56 @@ def _get_upstream_videos(channel_id): info = yt_data_extract.renderer_info(json_video['gridVideoRenderer']) if 'description' not in info: info['description'] = '' - info['time_published'] = youtube_timestamp_to_posix(info['published']) - i # subtract a few seconds off the videos so they will be in the right order - videos.append(info) + try: + info['time_published'] = youtube_timestamp_to_posix(info['published']) - i # subtract a few seconds off the videos so they will be in the right order + except KeyError: + print(info) + videos.append((channel_id, info['id'], info['title'], info['duration'], info['time_published'], info['description'])) + + now = time.time() + download_thumbnails_if_necessary(video[1] for video in videos if (now - video[4]) < 30*24*3600) # Don't download thumbnails from videos older than a month + + with open_database() as connection: + with connection as cursor: + cursor.executemany('''INSERT OR IGNORE INTO videos (sql_channel_id, video_id, title, duration, time_published, description) + VALUES ((SELECT id FROM subscribed_channels WHERE yt_channel_id=?), ?, ?, ?, ?, ?)''', videos) + cursor.execute('''UPDATE subscribed_channels + SET time_last_checked = ? + WHERE yt_channel_id=?''', [int(time.time()), channel_id]) - try: - existing_thumbnails = set(os.path.splitext(name)[0] for name in os.listdir(thumbnails_directory)) - except FileNotFoundError: - existing_thumbnails = set() - missing_thumbnails = set(video['id'] for video in videos) - existing_thumbnails - downloading_thumbnails.update(missing_thumbnails) - gevent.spawn(download_thumbnails, thumbnails_directory, missing_thumbnails) - return videos +def check_all_channels(): + with open_database() as connection: + with connection as cursor: + channel_id_name_list = cursor.execute('''SELECT yt_channel_id, channel_name FROM subscribed_channels''').fetchall() + channel_names.update(channel_id_name_list) + check_channels_if_necessary([item[0] for item in channel_id_name_list]) +def check_tags(tags): + channel_id_name_list = [] + with open_database() as connection: + with connection as cursor: + for tag in tags: + channel_id_name_list += cursor.execute('''SELECT yt_channel_id, channel_name + FROM subscribed_channels + WHERE subscribed_channels.id IN ( + SELECT tag_associations.sql_channel_id FROM tag_associations WHERE tag=? + )''', [tag]).fetchall() + channel_names.update(channel_id_name_list) + check_channels_if_necessary([item[0] for item in channel_id_name_list]) +def check_specific_channels(channel_ids): + with open_database() as connection: + with connection as cursor: + for channel_id in channel_ids: + channel_id_name_list += cursor.execute('''SELECT yt_channel_id, channel_name + FROM subscribed_channels + WHERE yt_channel_id=?''', [channel_id]).fetchall() + channel_names.update(channel_id_name_list) + check_channels_if_necessary(channel_ids) @@ -408,15 +499,18 @@ def post_subscriptions_page(env, start_response): _unsubscribe(params['channel_id']) elif action == 'refresh': - with open_database() as connection: - with connection as cursor: - for sql_channel_id, yt_channel_id in cursor.execute('''SELECT id, yt_channel_id FROM subscribed_channels''').fetchall(): - db_videos = ( (sql_channel_id, info['id'], info['title'], info['duration'], info['time_published'], info['description']) for info in _get_upstream_videos(yt_channel_id) ) - cursor.executemany('''INSERT OR IGNORE INTO videos (sql_channel_id, video_id, title, duration, time_published, description) VALUES (?, ?, ?, ?, ?, ?)''', db_videos) - - cursor.execute('''UPDATE subscribed_channels SET time_last_checked = ?''', ( int(time.time()), ) ) + type = params['type'][0] + if type == 'all': + check_all_channels() + elif type == 'tag': + check_tags(params['tag_name']) + elif type == 'channel': + check_specific_channels(params['channel_id']) + else: + start_response('400 Bad Request', ()) + return b'400 Bad Request' - start_response('303 See Other', [('Location', util.URL_ORIGIN + '/subscriptions'),] ) + start_response('204 No Content', ()) return b'' else: start_response('400 Bad Request', ()) diff --git a/youtube/util.py b/youtube/util.py index 42d76a3..c4e1aff 100644 --- a/youtube/util.py +++ b/youtube/util.py @@ -7,6 +7,8 @@ import re import time import os import gevent +import gevent.queue +import gevent.lock # The trouble with the requests library: It ships its own certificate bundle via certifi # instead of using the system certificate store, meaning self-signed certificates @@ -176,6 +178,53 @@ desktop_ua = (('User-Agent', desktop_user_agent),) +class RateLimitedQueue(gevent.queue.Queue): + ''' Does initial_burst (def. 30) at first, then alternates between waiting waiting_period (def. 5) seconds and doing subsequent_bursts (def. 10) queries. After 5 seconds with nothing left in the queue, resets rate limiting. ''' + + def __init__(self, initial_burst=30, waiting_period=5, subsequent_bursts=10): + self.initial_burst = initial_burst + self.waiting_period = waiting_period + self.subsequent_bursts = subsequent_bursts + + self.count_since_last_wait = 0 + self.surpassed_initial = False + + self.lock = gevent.lock.BoundedSemaphore(1) + self.currently_empty = False + self.empty_start = 0 + gevent.queue.Queue.__init__(self) + + + def get(self): + self.lock.acquire() # blocks if another greenlet currently has the lock + if self.count_since_last_wait >= self.subsequent_bursts and self.surpassed_initial: + gevent.sleep(self.waiting_period) + self.count_since_last_wait = 0 + + elif self.count_since_last_wait >= self.initial_burst and not self.surpassed_initial: + self.surpassed_initial = True + gevent.sleep(self.waiting_period) + self.count_since_last_wait = 0 + + self.count_since_last_wait += 1 + + if not self.currently_empty and self.empty(): + self.currently_empty = True + self.empty_start = time.monotonic() + + item = gevent.queue.Queue.get(self) # blocks when nothing left + + if self.currently_empty: + if time.monotonic() - self.empty_start >= self.waiting_period: + self.count_since_last_wait = 0 + self.surpassed_initial = False + + self.currently_empty = False + + self.lock.release() + + return item + def download_thumbnail(save_directory, video_id): @@ -185,14 +234,15 @@ def download_thumbnail(save_directory, video_id): thumbnail = fetch_url(url, report_text="Saved thumbnail: " + video_id) except urllib.error.HTTPError as e: print("Failed to download thumbnail for " + video_id + ": " + str(e)) - return + return False try: f = open(save_location, 'wb') except FileNotFoundError: - os.makedirs(save_directory) + os.makedirs(save_directory, exist_ok = True) f = open(save_location, 'wb') f.write(thumbnail) f.close() + return True def download_thumbnails(save_directory, ids): if not isinstance(ids, (list, tuple)): diff --git a/yt_subscriptions_template.html b/yt_subscriptions_template.html index a6d3e38..8e1434c 100644 --- a/yt_subscriptions_template.html +++ b/yt_subscriptions_template.html @@ -56,6 +56,7 @@ $items
+
-- cgit v1.2.3