From 3a081a9c465828b3491d15b673074b7dbdcfc822 Mon Sep 17 00:00:00 2001 From: James Taylor Date: Sun, 25 Oct 2020 11:15:59 -0700 Subject: Automatically change tor circuit once if ip is blocked Use stem library to send a new identity signal via the tor control port. See #20 --- youtube/util.py | 162 ++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 117 insertions(+), 45 deletions(-) (limited to 'youtube/util.py') diff --git a/youtube/util.py b/youtube/util.py index 8d0f8ca..ccdcbc1 100644 --- a/youtube/util.py +++ b/youtube/util.py @@ -16,6 +16,9 @@ import gevent import gevent.queue import gevent.lock import collections +import stem +import stem.control +import traceback # The trouble with the requests library: It ships its own certificate bundle via certifi # instead of using the system certificate store, meaning self-signed certificates @@ -54,32 +57,81 @@ URL_ORIGIN = "/https://www.youtube.com" connection_pool = urllib3.PoolManager(cert_reqs = 'CERT_REQUIRED') -old_tor_connection_pool = None -tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager('socks5://127.0.0.1:' + str(settings.tor_port) + '/', cert_reqs = 'CERT_REQUIRED') +class TorManager: + def __init__(self): + self.old_tor_connection_pool = None + self.tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager( + 'socks5://127.0.0.1:' + str(settings.tor_port) + '/', + cert_reqs = 'CERT_REQUIRED') + self.tor_pool_refresh_time = time.monotonic() -tor_pool_refresh_time = time.monotonic() # prevent problems due to clock changes + self.new_identity_lock = gevent.lock.BoundedSemaphore(1) + self.last_new_identity_time = time.monotonic() - 20 -def get_pool(use_tor): - global old_tor_connection_pool - global tor_connection_pool - global tor_pool_refresh_time + def refresh_tor_connection_pool(self): + self.tor_connection_pool.clear() - if not use_tor: - return connection_pool + # Keep a reference for 5 min to avoid it getting garbage collected + # while sockets still in use + self.old_tor_connection_pool = self.tor_connection_pool - # Tor changes circuits after 10 minutes: https://tor.stackexchange.com/questions/262/for-how-long-does-a-circuit-stay-alive - current_time = time.monotonic() - if current_time - tor_pool_refresh_time > 300: # close pool after 5 minutes - tor_connection_pool.clear() + self.tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager( + 'socks5://127.0.0.1:' + str(settings.tor_port) + '/', + cert_reqs = 'CERT_REQUIRED') + self.tor_pool_refresh_time = time.monotonic() - # Keep a reference for 5 min to avoid it getting garbage collected while sockets still in use - old_tor_connection_pool = tor_connection_pool + def get_tor_connection_pool(self): + # Tor changes circuits after 10 minutes: + # https://tor.stackexchange.com/questions/262/for-how-long-does-a-circuit-stay-alive + current_time = time.monotonic() - tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager('socks5://127.0.0.1:' + str(settings.tor_port) + '/', cert_reqs = 'CERT_REQUIRED') - tor_pool_refresh_time = current_time + # close pool after 5 minutes + if current_time - self.tor_pool_refresh_time > 300: + self.refresh_tor_connection_pool() - return tor_connection_pool + return self.tor_connection_pool + def new_identity(self, time_failed_request_started): + '''return error, or None if no error and the identity is fresh''' + print('new_identity: new_identity called') + # blocks if another greenlet currently has the lock + self.new_identity_lock.acquire() + print('new_identity: New identity lock acquired') + + try: + # This was caused by a request that failed within a previous, + # stale identity + if time_failed_request_started <= self.last_new_identity_time: + print('new_identity: Cancelling; request was from stale identity') + return None + + delta = time.monotonic() - self.last_new_identity_time + if delta < 20: + print('new_identity: Retried already within last 20 seconds') + return 'Retried with new circuit once (max) within last 20 seconds.' + try: + port = settings.tor_control_port + with stem.control.Controller.from_port(port=port) as controller: + controller.authenticate() + print('new_identity: Getting new identity') + controller.signal(stem.Signal.NEWNYM) + print('new_identity: NEWNYM signal sent') + self.last_new_identity_time = time.monotonic() + self.refresh_tor_connection_pool() + return None + except stem.SocketError: + traceback.print_exc() + return 'Failed to connect to Tor control port.' + finally: + self.new_identity_lock.release() + +tor_manager = TorManager() + + +def get_pool(use_tor): + if not use_tor: + return connection_pool + return tor_manager.get_tor_connection_pool() class HTTPAsymmetricCookieProcessor(urllib.request.BaseHandler): @@ -103,11 +155,12 @@ class HTTPAsymmetricCookieProcessor(urllib.request.BaseHandler): https_response = http_response class FetchError(Exception): - def __init__(self, code, reason='', ip=None): + def __init__(self, code, reason='', ip=None, error_message=None): Exception.__init__(self, 'HTTP error during request: ' + code + ' ' + reason) self.code = code self.reason = reason self.ip = ip + self.error_message = error_message def decode_content(content, encoding_header): encodings = encoding_header.replace(' ', '').split(',') @@ -184,32 +237,51 @@ def fetch_url_response(url, headers=(), timeout=15, data=None, def fetch_url(url, headers=(), timeout=15, report_text=None, data=None, cookiejar_send=None, cookiejar_receive=None, use_tor=True, debug_name=None): - start_time = time.time() - - response, cleanup_func = fetch_url_response( - url, headers, timeout=timeout, - cookiejar_send=cookiejar_send, cookiejar_receive=cookiejar_receive, - use_tor=use_tor) - response_time = time.time() - - content = response.read() - read_finish = time.time() - - cleanup_func(response) # release_connection for urllib3 - content = decode_content( - content, - response.getheader('Content-Encoding', default='identity')) - - if (response.status == 429 - and content.startswith(b'= 400: - raise FetchError(str(response.status), reason=response.reason, ip=None) + while True: + start_time = time.time() + + response, cleanup_func = fetch_url_response( + url, headers, timeout=timeout, + cookiejar_send=cookiejar_send, cookiejar_receive=cookiejar_receive, + use_tor=use_tor) + response_time = time.time() + + content = response.read() + + read_finish = time.time() + + cleanup_func(response) # release_connection for urllib3 + content = decode_content( + content, + response.getheader('Content-Encoding', default='identity')) + + if (response.status == 429 + and content.startswith(b'= 400: + raise FetchError(str(response.status), reason=response.reason, + ip=None) + break if report_text: print(report_text, ' Latency:', round(response_time - start_time,3), ' Read time:', round(read_finish - response_time,3)) -- cgit v1.2.3