diff options
Diffstat (limited to 'youtube/subscriptions.py')
-rw-r--r-- | youtube/subscriptions.py | 92 |
1 files changed, 88 insertions, 4 deletions
diff --git a/youtube/subscriptions.py b/youtube/subscriptions.py index 64a72f4..60d5531 100644 --- a/youtube/subscriptions.py +++ b/youtube/subscriptions.py @@ -12,6 +12,7 @@ import contextlib import defusedxml.ElementTree import urllib import math +import secrets import flask from flask import request @@ -37,8 +38,8 @@ def open_database(): yt_channel_id text UNIQUE NOT NULL, channel_name text NOT NULL, time_last_checked integer, - muted integer DEFAULT 0, - upload_frequency integer + next_check_time integer, + muted integer DEFAULT 0 )''') cursor.execute('''CREATE TABLE IF NOT EXISTS videos ( id integer PRIMARY KEY, @@ -227,6 +228,11 @@ def _channels_with_tag(cursor, tag, order=False, exclude_muted=False, include_mu return cursor.execute(statement, [tag]).fetchall() +def _schedule_checking(cursor, channel_id, next_check_time): + cursor.execute('''UPDATE subscribed_channels SET next_check_time = ? WHERE yt_channel_id = ?''', [int(next_check_time), channel_id]) + +def _is_muted(cursor, channel_id): + return bool(cursor.execute('''SELECT muted FROM subscribed_channels WHERE yt_channel_id=?''', [channel_id]).fetchone()[0]) units = { 'year': 31536000, # 365*24*3600 @@ -257,6 +263,9 @@ except FileNotFoundError: existing_thumbnails = set() +# --- Manual checking system. Rate limited in order to support very large numbers of channels to be checked --- +# Auto checking system plugs into this for convenience, though it doesn't really need the rate limiting + check_channels_queue = util.RateLimitedQueue() checking_channels = set() @@ -273,8 +282,65 @@ def check_channel_worker(): for i in range(0,5): gevent.spawn(check_channel_worker) +# ---------------------------- + + + +# --- Auto checking system --- + +if settings.autocheck_subscriptions: + # job application format: dict with keys (channel_id, channel_name, next_check_time) + autocheck_job_application = gevent.queue.Queue() # only really meant to hold 1 item, just reusing gevent's wait and timeout machinery + + autocheck_jobs = [] # list of dicts with the keys (channel_id, channel_name, next_check_time). Stores all the channels that need to be autochecked and when to check them + with open_database() as connection: + with connection as cursor: + now = time.time() + for row in cursor.execute('''SELECT yt_channel_id, channel_name, next_check_time FROM subscribed_channels WHERE next_check_time IS NOT NULL AND muted != 1''').fetchall(): + if row[2] < now: # expired, check randomly within the 30 minutes + next_check_time = now + 3600*secrets.randbelow(60)/60 + row = (row[0], row[1], next_check_time) + _schedule_checking(cursor, row[0], next_check_time) + autocheck_jobs.append({'channel_id': row[0], 'channel_name': row[1], 'next_check_time': row[2]}) + + + + def autocheck_dispatcher(): + '''Scans the auto_check_list. Sleeps until the earliest job is due, then adds that channel to the checking queue above. Can be sent a new job through autocheck_job_application''' + while True: + if len(autocheck_jobs) == 0: + new_job = autocheck_job_application.get() + autocheck_jobs.append(new_job) + else: + earliest_job_index = min(range(0, len(autocheck_jobs)), key=lambda index: autocheck_jobs[index]['next_check_time']) # https://stackoverflow.com/a/11825864 + earliest_job = autocheck_jobs[earliest_job_index] + time_until_earliest_job = earliest_job['next_check_time'] - time.time() + + if time_until_earliest_job <= 0: + print('ERROR: autocheck_dispatcher got job scheduled in the past, skipping and rescheduling: ' + earliest_job['channel_id'] + ', ' + earliest_job['channel_name'] + ', ' + str(earliest_job['next_check_time'])) + next_check_time = time.time() + 3600*secrets.randbelow(60)/60 + with_open_db(_schedule_checking, earliest_job['channel_id'], next_check_time) + autocheck_jobs[earliest_job_index]['next_check_time'] = next_check_time + continue + + # make sure it's not muted + if with_open_db(_is_muted, earliest_job['channel_id']): + del autocheck_jobs[earliest_job_index] + continue + + try: + new_job = autocheck_job_application.get(timeout = time_until_earliest_job) # sleep for time_until_earliest_job time, but allow to be interrupted by new jobs + except gevent.queue.Empty: # no new jobs, time to execute the earliest job + channel_names[earliest_job['channel_id']] = earliest_job['channel_name'] + checking_channels.add(earliest_job['channel_id']) + check_channels_queue.put(earliest_job['channel_id']) + del autocheck_jobs[earliest_job_index] + else: # new job, add it to the list + autocheck_jobs.append(new_job) + gevent.spawn(autocheck_dispatcher) +# ---------------------------- @@ -305,13 +371,31 @@ def _get_upstream_videos(channel_id): videos.append((channel_id, video_item['id'], video_item['title'], video_item['duration'], video_item['time_published'], video_item['description'])) + if len(videos) == 0: + average_upload_period = 4*7*24*3600 # assume 1 month for channel with no videos + elif len(videos) < 5: + average_upload_period = int((time.time() - videos[len(videos)-1][4])/len(videos)) + else: + average_upload_period = int((time.time() - videos[4][4])/5) # equivalent to averaging the time between videos for the last 5 videos + + # calculate when to check next for auto checking + # add some quantization and randomness to make pattern analysis by Youtube slightly harder + quantized_upload_period = average_upload_period - (average_upload_period % (4*3600)) + 4*3600 # round up to nearest 4 hours + randomized_upload_period = quantized_upload_period*(1 + secrets.randbelow(50)/50*0.5) # randomly between 1x and 1.5x + next_check_delay = randomized_upload_period/5 # check at 5x the channel posting rate. might want to fine tune this number + next_check_time = int(time.time() + next_check_delay) + with open_database() as connection: with connection as cursor: cursor.executemany('''INSERT OR IGNORE INTO videos (sql_channel_id, video_id, title, duration, time_published, description) VALUES ((SELECT id FROM subscribed_channels WHERE yt_channel_id=?), ?, ?, ?, ?, ?)''', videos) cursor.execute('''UPDATE subscribed_channels - SET time_last_checked = ? - WHERE yt_channel_id=?''', [int(time.time()), channel_id]) + SET time_last_checked = ?, next_check_time = ? + WHERE yt_channel_id=?''', [int(time.time()), next_check_time, channel_id]) + + if settings.autocheck_subscriptions: + if not _is_muted(cursor, channel_id): + autocheck_job_application.put({'channel_id': channel_id, 'channel_name': channel_names[channel_id], 'next_check_time': next_check_time}) def check_all_channels(): |