diff options
Diffstat (limited to 'mediagoblin/processing/task.py')
-rw-r--r-- | mediagoblin/processing/task.py | 29 |
1 files changed, 23 insertions, 6 deletions
diff --git a/mediagoblin/processing/task.py b/mediagoblin/processing/task.py index 7f683485..5e0e772d 100644 --- a/mediagoblin/processing/task.py +++ b/mediagoblin/processing/task.py @@ -15,8 +15,8 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. import logging -import urllib -import urllib2 + +from six.moves.urllib import request, parse import celery from celery.registry import tasks @@ -42,15 +42,15 @@ def handle_push_urls(feed_url): hubparameters = { 'hub.mode': 'publish', 'hub.url': feed_url} - hubdata = urllib.urlencode(hubparameters) + hubdata = parse.urlencode(hubparameters) hubheaders = { "Content-type": "application/x-www-form-urlencoded", "Connection": "close"} for huburl in mgg.app_config["push_urls"]: - hubrequest = urllib2.Request(huburl, hubdata, hubheaders) + hubrequest = request.Request(huburl, hubdata, hubheaders) try: - hubresponse = urllib2.urlopen(hubrequest) - except (urllib2.HTTPError, urllib2.URLError) as exc: + hubresponse = request.urlopen(hubrequest) + except (request.HTTPError, request.URLError) as exc: # We retry by default 3 times before failing _log.info("PuSH url %r gave error %r", huburl, exc) try: @@ -74,8 +74,11 @@ class ProcessMedia(celery.Task): Pass the media entry off to the appropriate processing function (for now just process_image...) + :param media_id: MediaEntry().id :param feed_url: The feed URL that the PuSH server needs to be updated for. + :param reprocess_action: What particular action should be run. For + example, 'initial'. :param reprocess: A dict containing all of the necessary reprocessing info for the media_type. """ @@ -155,5 +158,19 @@ class ProcessMedia(celery.Task): entry = mgg.database.MediaEntry.query.filter_by(id=entry_id).first() json_processing_callback(entry) + mgg.database.reset_after_request() + + def after_return(self, *args, **kwargs): + """ + This is called after the task has returned, we should clean up. + + We need to rollback the database to prevent ProgrammingError exceptions + from being raised. + """ + # In eager mode we get DetachedInstanceError, we do rollback on_failure + # to deal with that case though when in eager mode. + if not celery.app.default_app.conf['CELERY_ALWAYS_EAGER']: + mgg.database.reset_after_request() + tasks.register(ProcessMedia) |