aboutsummaryrefslogtreecommitdiffstats
path: root/mediagoblin/processing/task.py
diff options
context:
space:
mode:
Diffstat (limited to 'mediagoblin/processing/task.py')
-rw-r--r--mediagoblin/processing/task.py29
1 files changed, 23 insertions, 6 deletions
diff --git a/mediagoblin/processing/task.py b/mediagoblin/processing/task.py
index 7f683485..5e0e772d 100644
--- a/mediagoblin/processing/task.py
+++ b/mediagoblin/processing/task.py
@@ -15,8 +15,8 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
-import urllib
-import urllib2
+
+from six.moves.urllib import request, parse
import celery
from celery.registry import tasks
@@ -42,15 +42,15 @@ def handle_push_urls(feed_url):
hubparameters = {
'hub.mode': 'publish',
'hub.url': feed_url}
- hubdata = urllib.urlencode(hubparameters)
+ hubdata = parse.urlencode(hubparameters)
hubheaders = {
"Content-type": "application/x-www-form-urlencoded",
"Connection": "close"}
for huburl in mgg.app_config["push_urls"]:
- hubrequest = urllib2.Request(huburl, hubdata, hubheaders)
+ hubrequest = request.Request(huburl, hubdata, hubheaders)
try:
- hubresponse = urllib2.urlopen(hubrequest)
- except (urllib2.HTTPError, urllib2.URLError) as exc:
+ hubresponse = request.urlopen(hubrequest)
+ except (request.HTTPError, request.URLError) as exc:
# We retry by default 3 times before failing
_log.info("PuSH url %r gave error %r", huburl, exc)
try:
@@ -74,8 +74,11 @@ class ProcessMedia(celery.Task):
Pass the media entry off to the appropriate processing function
(for now just process_image...)
+ :param media_id: MediaEntry().id
:param feed_url: The feed URL that the PuSH server needs to be
updated for.
+ :param reprocess_action: What particular action should be run. For
+ example, 'initial'.
:param reprocess: A dict containing all of the necessary reprocessing
info for the media_type.
"""
@@ -155,5 +158,19 @@ class ProcessMedia(celery.Task):
entry = mgg.database.MediaEntry.query.filter_by(id=entry_id).first()
json_processing_callback(entry)
+ mgg.database.reset_after_request()
+
+ def after_return(self, *args, **kwargs):
+ """
+ This is called after the task has returned, we should clean up.
+
+ We need to rollback the database to prevent ProgrammingError exceptions
+ from being raised.
+ """
+ # In eager mode we get DetachedInstanceError, we do rollback on_failure
+ # to deal with that case though when in eager mode.
+ if not celery.app.default_app.conf['CELERY_ALWAYS_EAGER']:
+ mgg.database.reset_after_request()
+
tasks.register(ProcessMedia)