diff options
author | James Taylor <user234683@users.noreply.github.com> | 2020-03-08 21:01:15 -0700 |
---|---|---|
committer | James Taylor <user234683@users.noreply.github.com> | 2020-03-08 21:01:15 -0700 |
commit | 408a9c79aebdaf3d86df67cb7db53ed006c18d0a (patch) | |
tree | 45b4949337958f1993daba940a47ec9aa3f82534 | |
parent | 56e7751da7f8bb8b8108871ce46e50310dc73a9f (diff) | |
download | yt-local-408a9c79aebdaf3d86df67cb7db53ed006c18d0a.tar.lz yt-local-408a9c79aebdaf3d86df67cb7db53ed006c18d0a.tar.xz yt-local-408a9c79aebdaf3d86df67cb7db53ed006c18d0a.zip |
Correctly start and stop subscriptions autochecker when it is
disabled/enabled in settings.
-rw-r--r-- | settings.py | 15 | ||||
-rw-r--r-- | youtube/subscriptions.py | 97 |
2 files changed, 71 insertions, 41 deletions
diff --git a/settings.py b/settings.py index 5407922..517e785 100644 --- a/settings.py +++ b/settings.py @@ -286,6 +286,14 @@ if route_tor: else: print("Tor routing is OFF - your Youtube activity is NOT anonymous") +hooks = {} +def add_setting_changed_hook(setting, func): + '''Called right before new settings take effect''' + if setting in hooks: + hooks[setting].append(func) + else: + hooks[setting] = [func] + def settings_page(): if request.method == 'GET': @@ -309,6 +317,13 @@ def settings_page(): assert settings_info[setting_name]['type'] is bool, missing_inputs settings[setting_name] = False + # call setting hooks + for setting_name, value in settings.items(): + old_value = globals()[setting_name] + if value != old_value and setting_name in hooks: + for func in hooks[setting_name]: + func(old_value, value) + globals().update(settings) save_settings(settings) return flask.redirect(util.URL_ORIGIN + '/settings', 303) diff --git a/youtube/subscriptions.py b/youtube/subscriptions.py index 66490de..63aa090 100644 --- a/youtube/subscriptions.py +++ b/youtube/subscriptions.py @@ -331,8 +331,50 @@ for i in range(0,5): # --- Auto checking system - Spaghetti code --- +def autocheck_dispatcher(): + '''Scans the auto_check_list. Sleeps until the earliest job is due, then adds that channel to the checking queue above. Can be sent a new job through autocheck_job_application''' + while True: + if len(autocheck_jobs) == 0: + new_job = autocheck_job_application.get() + autocheck_jobs.append(new_job) + else: + earliest_job_index = min(range(0, len(autocheck_jobs)), key=lambda index: autocheck_jobs[index]['next_check_time']) # https://stackoverflow.com/a/11825864 + earliest_job = autocheck_jobs[earliest_job_index] + time_until_earliest_job = earliest_job['next_check_time'] - time.time() + + if time_until_earliest_job <= -5: # should not happen unless we're running extremely slow + print('ERROR: autocheck_dispatcher got job scheduled in the past, skipping and rescheduling: ' + earliest_job['channel_id'] + ', ' + earliest_job['channel_name'] + ', ' + str(earliest_job['next_check_time'])) + next_check_time = time.time() + 3600*secrets.randbelow(60)/60 + with_open_db(_schedule_checking, earliest_job['channel_id'], next_check_time) + autocheck_jobs[earliest_job_index]['next_check_time'] = next_check_time + continue + + # make sure it's not muted + if with_open_db(_is_muted, earliest_job['channel_id']): + del autocheck_jobs[earliest_job_index] + continue + + if time_until_earliest_job > 0: # it can become less than zero (in the past) when it's set to go off while the dispatcher is doing something else at that moment + try: + new_job = autocheck_job_application.get(timeout = time_until_earliest_job) # sleep for time_until_earliest_job time, but allow to be interrupted by new jobs + except gevent.queue.Empty: # no new jobs + pass + else: # new job, add it to the list + autocheck_jobs.append(new_job) + continue + + # no new jobs, time to execute the earliest job + channel_names[earliest_job['channel_id']] = earliest_job['channel_name'] + checking_channels.add(earliest_job['channel_id']) + check_channels_queue.put(earliest_job['channel_id']) + del autocheck_jobs[earliest_job_index] + +dispatcher_greenlet = None +def start_autocheck_system(): + global autocheck_job_application + global autocheck_jobs + global dispatcher_greenlet -if settings.autocheck_subscriptions: # job application format: dict with keys (channel_id, channel_name, next_check_time) autocheck_job_application = gevent.queue.Queue() # only really meant to hold 1 item, just reusing gevent's wait and timeout machinery @@ -354,49 +396,22 @@ if settings.autocheck_subscriptions: row = (row[0], row[1], next_check_time) _schedule_checking(cursor, row[0], next_check_time) autocheck_jobs.append({'channel_id': row[0], 'channel_name': row[1], 'next_check_time': next_check_time}) + dispatcher_greenlet = gevent.spawn(autocheck_dispatcher) +def stop_autocheck_system(): + if dispatcher_greenlet is not None: + dispatcher_greenlet.kill() +def autocheck_setting_changed(old_value, new_value): + if new_value: + start_autocheck_system() + else: + stop_autocheck_system() - def autocheck_dispatcher(): - '''Scans the auto_check_list. Sleeps until the earliest job is due, then adds that channel to the checking queue above. Can be sent a new job through autocheck_job_application''' - while True: - if len(autocheck_jobs) == 0: - new_job = autocheck_job_application.get() - autocheck_jobs.append(new_job) - else: - earliest_job_index = min(range(0, len(autocheck_jobs)), key=lambda index: autocheck_jobs[index]['next_check_time']) # https://stackoverflow.com/a/11825864 - earliest_job = autocheck_jobs[earliest_job_index] - time_until_earliest_job = earliest_job['next_check_time'] - time.time() - - if time_until_earliest_job <= -5: # should not happen unless we're running extremely slow - print('ERROR: autocheck_dispatcher got job scheduled in the past, skipping and rescheduling: ' + earliest_job['channel_id'] + ', ' + earliest_job['channel_name'] + ', ' + str(earliest_job['next_check_time'])) - next_check_time = time.time() + 3600*secrets.randbelow(60)/60 - with_open_db(_schedule_checking, earliest_job['channel_id'], next_check_time) - autocheck_jobs[earliest_job_index]['next_check_time'] = next_check_time - continue - - # make sure it's not muted - if with_open_db(_is_muted, earliest_job['channel_id']): - del autocheck_jobs[earliest_job_index] - continue - - if time_until_earliest_job > 0: # it can become less than zero (in the past) when it's set to go off while the dispatcher is doing something else at that moment - try: - new_job = autocheck_job_application.get(timeout = time_until_earliest_job) # sleep for time_until_earliest_job time, but allow to be interrupted by new jobs - except gevent.queue.Empty: # no new jobs - pass - else: # new job, add it to the list - autocheck_jobs.append(new_job) - continue - - # no new jobs, time to execute the earliest job - channel_names[earliest_job['channel_id']] = earliest_job['channel_name'] - checking_channels.add(earliest_job['channel_id']) - check_channels_queue.put(earliest_job['channel_id']) - del autocheck_jobs[earliest_job_index] - - - gevent.spawn(autocheck_dispatcher) +settings.add_setting_changed_hook('autocheck_subscriptions', + autocheck_setting_changed) +if settings.autocheck_subscriptions: + start_autocheck_system() # ---------------------------- |