aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--settings.py6
-rw-r--r--youtube/subscriptions.py92
2 files changed, 94 insertions, 4 deletions
diff --git a/settings.py b/settings.py
index 4aedd19..9f0861d 100644
--- a/settings.py
+++ b/settings.py
@@ -66,6 +66,12 @@ For security reasons, enabling this is not recommended.''',
1 to sort by newest''',
}),
+ ('autocheck_subscriptions', {
+ 'type': bool,
+ 'default': 0,
+ 'comment': '',
+ }),
+
('gather_googlevideo_domains', {
'type': bool,
'default': False,
diff --git a/youtube/subscriptions.py b/youtube/subscriptions.py
index 64a72f4..60d5531 100644
--- a/youtube/subscriptions.py
+++ b/youtube/subscriptions.py
@@ -12,6 +12,7 @@ import contextlib
import defusedxml.ElementTree
import urllib
import math
+import secrets
import flask
from flask import request
@@ -37,8 +38,8 @@ def open_database():
yt_channel_id text UNIQUE NOT NULL,
channel_name text NOT NULL,
time_last_checked integer,
- muted integer DEFAULT 0,
- upload_frequency integer
+ next_check_time integer,
+ muted integer DEFAULT 0
)''')
cursor.execute('''CREATE TABLE IF NOT EXISTS videos (
id integer PRIMARY KEY,
@@ -227,6 +228,11 @@ def _channels_with_tag(cursor, tag, order=False, exclude_muted=False, include_mu
return cursor.execute(statement, [tag]).fetchall()
+def _schedule_checking(cursor, channel_id, next_check_time):
+ cursor.execute('''UPDATE subscribed_channels SET next_check_time = ? WHERE yt_channel_id = ?''', [int(next_check_time), channel_id])
+
+def _is_muted(cursor, channel_id):
+ return bool(cursor.execute('''SELECT muted FROM subscribed_channels WHERE yt_channel_id=?''', [channel_id]).fetchone()[0])
units = {
'year': 31536000, # 365*24*3600
@@ -257,6 +263,9 @@ except FileNotFoundError:
existing_thumbnails = set()
+# --- Manual checking system. Rate limited in order to support very large numbers of channels to be checked ---
+# Auto checking system plugs into this for convenience, though it doesn't really need the rate limiting
+
check_channels_queue = util.RateLimitedQueue()
checking_channels = set()
@@ -273,8 +282,65 @@ def check_channel_worker():
for i in range(0,5):
gevent.spawn(check_channel_worker)
+# ----------------------------
+
+
+
+# --- Auto checking system ---
+
+if settings.autocheck_subscriptions:
+ # job application format: dict with keys (channel_id, channel_name, next_check_time)
+ autocheck_job_application = gevent.queue.Queue() # only really meant to hold 1 item, just reusing gevent's wait and timeout machinery
+
+ autocheck_jobs = [] # list of dicts with the keys (channel_id, channel_name, next_check_time). Stores all the channels that need to be autochecked and when to check them
+ with open_database() as connection:
+ with connection as cursor:
+ now = time.time()
+ for row in cursor.execute('''SELECT yt_channel_id, channel_name, next_check_time FROM subscribed_channels WHERE next_check_time IS NOT NULL AND muted != 1''').fetchall():
+ if row[2] < now: # expired, check randomly within the 30 minutes
+ next_check_time = now + 3600*secrets.randbelow(60)/60
+ row = (row[0], row[1], next_check_time)
+ _schedule_checking(cursor, row[0], next_check_time)
+ autocheck_jobs.append({'channel_id': row[0], 'channel_name': row[1], 'next_check_time': row[2]})
+
+
+
+ def autocheck_dispatcher():
+ '''Scans the auto_check_list. Sleeps until the earliest job is due, then adds that channel to the checking queue above. Can be sent a new job through autocheck_job_application'''
+ while True:
+ if len(autocheck_jobs) == 0:
+ new_job = autocheck_job_application.get()
+ autocheck_jobs.append(new_job)
+ else:
+ earliest_job_index = min(range(0, len(autocheck_jobs)), key=lambda index: autocheck_jobs[index]['next_check_time']) # https://stackoverflow.com/a/11825864
+ earliest_job = autocheck_jobs[earliest_job_index]
+ time_until_earliest_job = earliest_job['next_check_time'] - time.time()
+
+ if time_until_earliest_job <= 0:
+ print('ERROR: autocheck_dispatcher got job scheduled in the past, skipping and rescheduling: ' + earliest_job['channel_id'] + ', ' + earliest_job['channel_name'] + ', ' + str(earliest_job['next_check_time']))
+ next_check_time = time.time() + 3600*secrets.randbelow(60)/60
+ with_open_db(_schedule_checking, earliest_job['channel_id'], next_check_time)
+ autocheck_jobs[earliest_job_index]['next_check_time'] = next_check_time
+ continue
+
+ # make sure it's not muted
+ if with_open_db(_is_muted, earliest_job['channel_id']):
+ del autocheck_jobs[earliest_job_index]
+ continue
+
+ try:
+ new_job = autocheck_job_application.get(timeout = time_until_earliest_job) # sleep for time_until_earliest_job time, but allow to be interrupted by new jobs
+ except gevent.queue.Empty: # no new jobs, time to execute the earliest job
+ channel_names[earliest_job['channel_id']] = earliest_job['channel_name']
+ checking_channels.add(earliest_job['channel_id'])
+ check_channels_queue.put(earliest_job['channel_id'])
+ del autocheck_jobs[earliest_job_index]
+ else: # new job, add it to the list
+ autocheck_jobs.append(new_job)
+ gevent.spawn(autocheck_dispatcher)
+# ----------------------------
@@ -305,13 +371,31 @@ def _get_upstream_videos(channel_id):
videos.append((channel_id, video_item['id'], video_item['title'], video_item['duration'], video_item['time_published'], video_item['description']))
+ if len(videos) == 0:
+ average_upload_period = 4*7*24*3600 # assume 1 month for channel with no videos
+ elif len(videos) < 5:
+ average_upload_period = int((time.time() - videos[len(videos)-1][4])/len(videos))
+ else:
+ average_upload_period = int((time.time() - videos[4][4])/5) # equivalent to averaging the time between videos for the last 5 videos
+
+ # calculate when to check next for auto checking
+ # add some quantization and randomness to make pattern analysis by Youtube slightly harder
+ quantized_upload_period = average_upload_period - (average_upload_period % (4*3600)) + 4*3600 # round up to nearest 4 hours
+ randomized_upload_period = quantized_upload_period*(1 + secrets.randbelow(50)/50*0.5) # randomly between 1x and 1.5x
+ next_check_delay = randomized_upload_period/5 # check at 5x the channel posting rate. might want to fine tune this number
+ next_check_time = int(time.time() + next_check_delay)
+
with open_database() as connection:
with connection as cursor:
cursor.executemany('''INSERT OR IGNORE INTO videos (sql_channel_id, video_id, title, duration, time_published, description)
VALUES ((SELECT id FROM subscribed_channels WHERE yt_channel_id=?), ?, ?, ?, ?, ?)''', videos)
cursor.execute('''UPDATE subscribed_channels
- SET time_last_checked = ?
- WHERE yt_channel_id=?''', [int(time.time()), channel_id])
+ SET time_last_checked = ?, next_check_time = ?
+ WHERE yt_channel_id=?''', [int(time.time()), next_check_time, channel_id])
+
+ if settings.autocheck_subscriptions:
+ if not _is_muted(cursor, channel_id):
+ autocheck_job_application.put({'channel_id': channel_id, 'channel_name': channel_names[channel_id], 'next_check_time': next_check_time})
def check_all_channels():