aboutsummaryrefslogtreecommitdiffstats
path: root/mediagoblin/processing
diff options
context:
space:
mode:
Diffstat (limited to 'mediagoblin/processing')
-rw-r--r--mediagoblin/processing/__init__.py31
-rw-r--r--mediagoblin/processing/task.py29
2 files changed, 40 insertions, 20 deletions
diff --git a/mediagoblin/processing/__init__.py b/mediagoblin/processing/__init__.py
index 102fd5de..29345227 100644
--- a/mediagoblin/processing/__init__.py
+++ b/mediagoblin/processing/__init__.py
@@ -24,6 +24,8 @@ except:
import logging
import os
+import six
+
from mediagoblin import mg_globals as mgg
from mediagoblin.db.util import atomic_update
from mediagoblin.db.models import MediaEntry
@@ -46,7 +48,7 @@ class ProgressCallback(object):
def create_pub_filepath(entry, filename):
return mgg.public_store.get_unique_filepath(
['media_entries',
- unicode(entry.id),
+ six.text_type(entry.id),
filename])
@@ -307,8 +309,8 @@ def mark_entry_failed(entry_id, exc):
store extra information that can be useful for users telling them
why their media failed to process.
- Args:
- - entry_id: The id of the media entry
+ :param entry_id: The id of the media entry
+ :param exc: An instance of BaseProcessingFail
"""
# Was this a BaseProcessingFail? In other words, was this a
@@ -319,18 +321,17 @@ def mark_entry_failed(entry_id, exc):
atomic_update(mgg.database.MediaEntry,
{'id': entry_id},
{u'state': u'failed',
- u'fail_error': unicode(exc.exception_path),
+ u'fail_error': six.text_type(exc.exception_path),
u'fail_metadata': exc.metadata})
else:
_log.warn("No idea what happened here, but it failed: %r", exc)
- # Looks like no, so just mark it as failed and don't record a
- # failure_error (we'll assume it wasn't handled) and don't record
- # metadata (in fact overwrite it if somehow it had previous info
- # here)
+ # Looks like no, let's record it so that admin could ask us about the
+ # reason
atomic_update(mgg.database.MediaEntry,
{'id': entry_id},
{u'state': u'failed',
- u'fail_error': None,
+ u'fail_error': u'Unhandled exception: {0}'.format(
+ six.text_type(exc)),
u'fail_metadata': {}})
@@ -376,12 +377,11 @@ def store_public(entry, keyname, local_file, target_name=None,
entry.media_files[keyname], target_filepath)
if delete_if_exists:
mgg.public_store.delete_file(entry.media_files[keyname])
-
try:
mgg.public_store.copy_local_to_storage(local_file, target_filepath)
- except:
+ except Exception as e:
+ _log.error(u'Exception happened: {0}'.format(e))
raise PublicStoreFail(keyname=keyname)
-
# raise an error if the file failed to copy
if not mgg.public_store.file_exists(target_filepath):
raise PublicStoreFail(keyname=keyname)
@@ -409,8 +409,11 @@ class BaseProcessingFail(Exception):
return u"%s:%s" % (
self.__class__.__module__, self.__class__.__name__)
- def __init__(self, **metadata):
- self.metadata = metadata or {}
+ def __init__(self, message=None, **metadata):
+ if message is not None:
+ super(BaseProcessingFail, self).__init__(message)
+ metadata['message'] = message
+ self.metadata = metadata
class BadMediaFail(BaseProcessingFail):
"""
diff --git a/mediagoblin/processing/task.py b/mediagoblin/processing/task.py
index 7f683485..5e0e772d 100644
--- a/mediagoblin/processing/task.py
+++ b/mediagoblin/processing/task.py
@@ -15,8 +15,8 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
-import urllib
-import urllib2
+
+from six.moves.urllib import request, parse
import celery
from celery.registry import tasks
@@ -42,15 +42,15 @@ def handle_push_urls(feed_url):
hubparameters = {
'hub.mode': 'publish',
'hub.url': feed_url}
- hubdata = urllib.urlencode(hubparameters)
+ hubdata = parse.urlencode(hubparameters)
hubheaders = {
"Content-type": "application/x-www-form-urlencoded",
"Connection": "close"}
for huburl in mgg.app_config["push_urls"]:
- hubrequest = urllib2.Request(huburl, hubdata, hubheaders)
+ hubrequest = request.Request(huburl, hubdata, hubheaders)
try:
- hubresponse = urllib2.urlopen(hubrequest)
- except (urllib2.HTTPError, urllib2.URLError) as exc:
+ hubresponse = request.urlopen(hubrequest)
+ except (request.HTTPError, request.URLError) as exc:
# We retry by default 3 times before failing
_log.info("PuSH url %r gave error %r", huburl, exc)
try:
@@ -74,8 +74,11 @@ class ProcessMedia(celery.Task):
Pass the media entry off to the appropriate processing function
(for now just process_image...)
+ :param media_id: MediaEntry().id
:param feed_url: The feed URL that the PuSH server needs to be
updated for.
+ :param reprocess_action: What particular action should be run. For
+ example, 'initial'.
:param reprocess: A dict containing all of the necessary reprocessing
info for the media_type.
"""
@@ -155,5 +158,19 @@ class ProcessMedia(celery.Task):
entry = mgg.database.MediaEntry.query.filter_by(id=entry_id).first()
json_processing_callback(entry)
+ mgg.database.reset_after_request()
+
+ def after_return(self, *args, **kwargs):
+ """
+ This is called after the task has returned, we should clean up.
+
+ We need to rollback the database to prevent ProgrammingError exceptions
+ from being raised.
+ """
+ # In eager mode we get DetachedInstanceError, we do rollback on_failure
+ # to deal with that case though when in eager mode.
+ if not celery.app.default_app.conf['CELERY_ALWAYS_EAGER']:
+ mgg.database.reset_after_request()
+
tasks.register(ProcessMedia)