diff options
Diffstat (limited to 'mediagoblin/processing')
-rw-r--r-- | mediagoblin/processing/__init__.py | 31 | ||||
-rw-r--r-- | mediagoblin/processing/task.py | 29 |
2 files changed, 40 insertions, 20 deletions
diff --git a/mediagoblin/processing/__init__.py b/mediagoblin/processing/__init__.py index 102fd5de..29345227 100644 --- a/mediagoblin/processing/__init__.py +++ b/mediagoblin/processing/__init__.py @@ -24,6 +24,8 @@ except: import logging import os +import six + from mediagoblin import mg_globals as mgg from mediagoblin.db.util import atomic_update from mediagoblin.db.models import MediaEntry @@ -46,7 +48,7 @@ class ProgressCallback(object): def create_pub_filepath(entry, filename): return mgg.public_store.get_unique_filepath( ['media_entries', - unicode(entry.id), + six.text_type(entry.id), filename]) @@ -307,8 +309,8 @@ def mark_entry_failed(entry_id, exc): store extra information that can be useful for users telling them why their media failed to process. - Args: - - entry_id: The id of the media entry + :param entry_id: The id of the media entry + :param exc: An instance of BaseProcessingFail """ # Was this a BaseProcessingFail? In other words, was this a @@ -319,18 +321,17 @@ def mark_entry_failed(entry_id, exc): atomic_update(mgg.database.MediaEntry, {'id': entry_id}, {u'state': u'failed', - u'fail_error': unicode(exc.exception_path), + u'fail_error': six.text_type(exc.exception_path), u'fail_metadata': exc.metadata}) else: _log.warn("No idea what happened here, but it failed: %r", exc) - # Looks like no, so just mark it as failed and don't record a - # failure_error (we'll assume it wasn't handled) and don't record - # metadata (in fact overwrite it if somehow it had previous info - # here) + # Looks like no, let's record it so that admin could ask us about the + # reason atomic_update(mgg.database.MediaEntry, {'id': entry_id}, {u'state': u'failed', - u'fail_error': None, + u'fail_error': u'Unhandled exception: {0}'.format( + six.text_type(exc)), u'fail_metadata': {}}) @@ -376,12 +377,11 @@ def store_public(entry, keyname, local_file, target_name=None, entry.media_files[keyname], target_filepath) if delete_if_exists: mgg.public_store.delete_file(entry.media_files[keyname]) - try: mgg.public_store.copy_local_to_storage(local_file, target_filepath) - except: + except Exception as e: + _log.error(u'Exception happened: {0}'.format(e)) raise PublicStoreFail(keyname=keyname) - # raise an error if the file failed to copy if not mgg.public_store.file_exists(target_filepath): raise PublicStoreFail(keyname=keyname) @@ -409,8 +409,11 @@ class BaseProcessingFail(Exception): return u"%s:%s" % ( self.__class__.__module__, self.__class__.__name__) - def __init__(self, **metadata): - self.metadata = metadata or {} + def __init__(self, message=None, **metadata): + if message is not None: + super(BaseProcessingFail, self).__init__(message) + metadata['message'] = message + self.metadata = metadata class BadMediaFail(BaseProcessingFail): """ diff --git a/mediagoblin/processing/task.py b/mediagoblin/processing/task.py index 7f683485..5e0e772d 100644 --- a/mediagoblin/processing/task.py +++ b/mediagoblin/processing/task.py @@ -15,8 +15,8 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. import logging -import urllib -import urllib2 + +from six.moves.urllib import request, parse import celery from celery.registry import tasks @@ -42,15 +42,15 @@ def handle_push_urls(feed_url): hubparameters = { 'hub.mode': 'publish', 'hub.url': feed_url} - hubdata = urllib.urlencode(hubparameters) + hubdata = parse.urlencode(hubparameters) hubheaders = { "Content-type": "application/x-www-form-urlencoded", "Connection": "close"} for huburl in mgg.app_config["push_urls"]: - hubrequest = urllib2.Request(huburl, hubdata, hubheaders) + hubrequest = request.Request(huburl, hubdata, hubheaders) try: - hubresponse = urllib2.urlopen(hubrequest) - except (urllib2.HTTPError, urllib2.URLError) as exc: + hubresponse = request.urlopen(hubrequest) + except (request.HTTPError, request.URLError) as exc: # We retry by default 3 times before failing _log.info("PuSH url %r gave error %r", huburl, exc) try: @@ -74,8 +74,11 @@ class ProcessMedia(celery.Task): Pass the media entry off to the appropriate processing function (for now just process_image...) + :param media_id: MediaEntry().id :param feed_url: The feed URL that the PuSH server needs to be updated for. + :param reprocess_action: What particular action should be run. For + example, 'initial'. :param reprocess: A dict containing all of the necessary reprocessing info for the media_type. """ @@ -155,5 +158,19 @@ class ProcessMedia(celery.Task): entry = mgg.database.MediaEntry.query.filter_by(id=entry_id).first() json_processing_callback(entry) + mgg.database.reset_after_request() + + def after_return(self, *args, **kwargs): + """ + This is called after the task has returned, we should clean up. + + We need to rollback the database to prevent ProgrammingError exceptions + from being raised. + """ + # In eager mode we get DetachedInstanceError, we do rollback on_failure + # to deal with that case though when in eager mode. + if not celery.app.default_app.conf['CELERY_ALWAYS_EAGER']: + mgg.database.reset_after_request() + tasks.register(ProcessMedia) |