diff options
| author | James Taylor <user234683@users.noreply.github.com> | 2019-06-09 16:03:20 -0700 | 
|---|---|---|
| committer | James Taylor <user234683@users.noreply.github.com> | 2019-06-09 16:03:20 -0700 | 
| commit | 27ee2990e97b1e73df41c1512332a1facd56f759 (patch) | |
| tree | 183e892af367061f79a6ff37c64ff9c86e7873ed | |
| parent | f5c76462d722d45645029754b4fc85252cf8212e (diff) | |
| download | yt-local-27ee2990e97b1e73df41c1512332a1facd56f759.tar.lz yt-local-27ee2990e97b1e73df41c1512332a1facd56f759.tar.xz yt-local-27ee2990e97b1e73df41c1512332a1facd56f759.zip | |
Overhaul refresh system, make it asynchronous
| -rw-r--r-- | youtube/subscriptions.py | 144 | ||||
| -rw-r--r-- | youtube/util.py | 54 | ||||
| -rw-r--r-- | yt_subscriptions_template.html | 1 | 
3 files changed, 172 insertions, 27 deletions
| diff --git a/youtube/subscriptions.py b/youtube/subscriptions.py index ff74d94..ba27655 100644 --- a/youtube/subscriptions.py +++ b/youtube/subscriptions.py @@ -26,7 +26,7 @@ database_path = os.path.join(settings.data_dir, "subscriptions.sqlite")  def open_database():      if not os.path.exists(settings.data_dir):          os.makedirs(settings.data_dir) -    connection = sqlite3.connect(database_path) +    connection = sqlite3.connect(database_path, check_same_thread=False)      # Create tables if they don't exist      try: @@ -172,17 +172,75 @@ def youtube_timestamp_to_posix(dumb_timestamp):          unit = unit[:-1]    # remove s from end      return now - number*units[unit] + +try: +    existing_thumbnails = set(os.path.splitext(name)[0] for name in os.listdir(thumbnails_directory)) +except FileNotFoundError: +    existing_thumbnails = set() + + +thumbnails_queue = util.RateLimitedQueue() +check_channels_queue = util.RateLimitedQueue() + +  # Use this to mark a thumbnail acceptable to be retrieved at the request of the browser +# can't simply check if it's in the queue because items are removed when the download starts, not when it finishes  downloading_thumbnails = set() -def download_thumbnails(thumbnails_directory, thumbnails): -    try: -        g = gevent.spawn(util.download_thumbnails, thumbnails_directory, thumbnails) -        g.join() -    finally: -        downloading_thumbnails.difference_update(thumbnails) + +checking_channels = set() + +# Just to use for printing channel checking status to console without opening database +channel_names = dict() + +def download_thumbnail_worker(): +    while True: +        video_id = thumbnails_queue.get() +        try: +            success = util.download_thumbnail(thumbnails_directory, video_id) +            if success: +                existing_thumbnails.add(video_id) +        except Exception: +            traceback.print_exc() +        finally: +            downloading_thumbnails.remove(video_id) + +def check_channel_worker(): +    while True: +        channel_id = check_channels_queue.get() +        try: +            _get_upstream_videos(channel_id) +        finally: +            checking_channels.remove(channel_id) + +for i in range(0,5): +    gevent.spawn(download_thumbnail_worker) +    gevent.spawn(check_channel_worker) + + + + + + +def download_thumbnails_if_necessary(thumbnails): +    for video_id in thumbnails: +        if video_id not in existing_thumbnails and video_id not in downloading_thumbnails: +            downloading_thumbnails.add(video_id) +            thumbnails_queue.put(video_id) + +def check_channels_if_necessary(channel_ids): +    for channel_id in channel_ids: +        if channel_id not in checking_channels: +            checking_channels.add(channel_id) +            check_channels_queue.put(channel_id) +  def _get_upstream_videos(channel_id): +    try: +        print("Checking channel: " + channel_names[channel_id]) +    except KeyError: +        print("Checking channel " + channel_id) +      videos = []      json_channel_videos = channel.get_grid_items(channel.get_channel_tab(channel_id)[1]['response']) @@ -190,23 +248,56 @@ def _get_upstream_videos(channel_id):          info = yt_data_extract.renderer_info(json_video['gridVideoRenderer'])          if 'description' not in info:              info['description'] = '' -        info['time_published'] = youtube_timestamp_to_posix(info['published']) - i  # subtract a few seconds off the videos so they will be in the right order -        videos.append(info) +        try: +            info['time_published'] = youtube_timestamp_to_posix(info['published']) - i  # subtract a few seconds off the videos so they will be in the right order +        except KeyError: +            print(info) +        videos.append((channel_id, info['id'], info['title'], info['duration'], info['time_published'], info['description'])) + +    now = time.time() +    download_thumbnails_if_necessary(video[1] for video in videos if (now - video[4]) < 30*24*3600) # Don't download thumbnails from videos older than a month + +    with open_database() as connection: +        with connection as cursor: +            cursor.executemany('''INSERT OR IGNORE INTO videos (sql_channel_id, video_id, title, duration, time_published, description) +                                  VALUES ((SELECT id FROM subscribed_channels WHERE yt_channel_id=?), ?, ?, ?, ?, ?)''', videos) +            cursor.execute('''UPDATE subscribed_channels +                              SET time_last_checked = ? +                              WHERE yt_channel_id=?''', [int(time.time()), channel_id]) -    try: -        existing_thumbnails = set(os.path.splitext(name)[0] for name in os.listdir(thumbnails_directory)) -    except FileNotFoundError: -        existing_thumbnails = set() -    missing_thumbnails = set(video['id'] for video in videos) - existing_thumbnails -    downloading_thumbnails.update(missing_thumbnails) -    gevent.spawn(download_thumbnails, thumbnails_directory, missing_thumbnails) -    return videos +def check_all_channels(): +    with open_database() as connection: +        with connection as cursor: +            channel_id_name_list = cursor.execute('''SELECT yt_channel_id, channel_name FROM subscribed_channels''').fetchall() +    channel_names.update(channel_id_name_list) +    check_channels_if_necessary([item[0] for item in channel_id_name_list]) +def check_tags(tags): +    channel_id_name_list = [] +    with open_database() as connection: +        with connection as cursor: +            for tag in tags: +                channel_id_name_list += cursor.execute('''SELECT yt_channel_id, channel_name +                                                          FROM subscribed_channels +                                                          WHERE subscribed_channels.id IN ( +                                                              SELECT tag_associations.sql_channel_id FROM tag_associations WHERE tag=? +                                                          )''', [tag]).fetchall() +    channel_names.update(channel_id_name_list) +    check_channels_if_necessary([item[0] for item in channel_id_name_list]) +def check_specific_channels(channel_ids): +    with open_database() as connection: +        with connection as cursor: +            for channel_id in channel_ids: +                channel_id_name_list += cursor.execute('''SELECT yt_channel_id, channel_name +                                                          FROM subscribed_channels +                                                          WHERE yt_channel_id=?''', [channel_id]).fetchall() +    channel_names.update(channel_id_name_list) +    check_channels_if_necessary(channel_ids) @@ -408,15 +499,18 @@ def post_subscriptions_page(env, start_response):          _unsubscribe(params['channel_id'])      elif action == 'refresh': -        with open_database() as connection: -            with connection as cursor: -                for sql_channel_id, yt_channel_id in cursor.execute('''SELECT id, yt_channel_id FROM subscribed_channels''').fetchall(): -                    db_videos = ( (sql_channel_id, info['id'], info['title'], info['duration'], info['time_published'], info['description']) for info in _get_upstream_videos(yt_channel_id) ) -                    cursor.executemany('''INSERT OR IGNORE INTO videos (sql_channel_id, video_id, title, duration, time_published, description) VALUES (?, ?, ?, ?, ?, ?)''', db_videos) - -                cursor.execute('''UPDATE subscribed_channels SET time_last_checked = ?''', ( int(time.time()), ) ) +        type = params['type'][0] +        if type == 'all': +            check_all_channels() +        elif type == 'tag': +            check_tags(params['tag_name']) +        elif type == 'channel': +            check_specific_channels(params['channel_id']) +        else: +            start_response('400 Bad Request', ()) +            return b'400 Bad Request' -        start_response('303 See Other', [('Location', util.URL_ORIGIN + '/subscriptions'),] ) +        start_response('204 No Content', ())          return b''      else:          start_response('400 Bad Request', ()) diff --git a/youtube/util.py b/youtube/util.py index 42d76a3..c4e1aff 100644 --- a/youtube/util.py +++ b/youtube/util.py @@ -7,6 +7,8 @@ import re  import time  import os  import gevent +import gevent.queue +import gevent.lock  # The trouble with the requests library: It ships its own certificate bundle via certifi  #  instead of using the system certificate store, meaning self-signed certificates @@ -176,6 +178,53 @@ desktop_ua = (('User-Agent', desktop_user_agent),) +class RateLimitedQueue(gevent.queue.Queue): +    ''' Does initial_burst (def. 30) at first, then alternates between waiting waiting_period (def. 5) seconds and doing subsequent_bursts (def. 10) queries. After 5 seconds with nothing left in the queue, resets rate limiting. ''' + +    def __init__(self, initial_burst=30, waiting_period=5, subsequent_bursts=10): +        self.initial_burst = initial_burst +        self.waiting_period = waiting_period +        self.subsequent_bursts = subsequent_bursts + +        self.count_since_last_wait = 0 +        self.surpassed_initial = False + +        self.lock = gevent.lock.BoundedSemaphore(1) +        self.currently_empty = False +        self.empty_start = 0 +        gevent.queue.Queue.__init__(self) + + +    def get(self): +        self.lock.acquire()     # blocks if another greenlet currently has the lock +        if self.count_since_last_wait >= self.subsequent_bursts and self.surpassed_initial: +            gevent.sleep(self.waiting_period) +            self.count_since_last_wait = 0 + +        elif self.count_since_last_wait >= self.initial_burst and not self.surpassed_initial: +            self.surpassed_initial = True +            gevent.sleep(self.waiting_period) +            self.count_since_last_wait = 0 + +        self.count_since_last_wait += 1 + +        if not self.currently_empty and self.empty(): +            self.currently_empty = True +            self.empty_start = time.monotonic() + +        item = gevent.queue.Queue.get(self)     # blocks when nothing left + +        if self.currently_empty: +            if time.monotonic() - self.empty_start >= self.waiting_period: +                self.count_since_last_wait = 0 +                self.surpassed_initial = False + +            self.currently_empty = False + +        self.lock.release() + +        return item +  def download_thumbnail(save_directory, video_id): @@ -185,14 +234,15 @@ def download_thumbnail(save_directory, video_id):          thumbnail = fetch_url(url, report_text="Saved thumbnail: " + video_id)      except urllib.error.HTTPError as e:          print("Failed to download thumbnail for " + video_id + ": " + str(e)) -        return +        return False      try:          f = open(save_location, 'wb')      except FileNotFoundError: -        os.makedirs(save_directory) +        os.makedirs(save_directory, exist_ok = True)          f = open(save_location, 'wb')      f.write(thumbnail)      f.close() +    return True  def download_thumbnails(save_directory, ids):      if not isinstance(ids, (list, tuple)): diff --git a/yt_subscriptions_template.html b/yt_subscriptions_template.html index a6d3e38..8e1434c 100644 --- a/yt_subscriptions_template.html +++ b/yt_subscriptions_template.html @@ -56,6 +56,7 @@ $items                      <form method="POST" class="refresh-all">                          <input type="submit" value="Check All">                          <input type="hidden" name="action" value="refresh"> +                        <input type="hidden" name="type" value="all">                      </form>                  </div> | 
