aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--mediagoblin/media_types/ascii/processing.py3
-rw-r--r--mediagoblin/media_types/audio/processing.py3
-rw-r--r--mediagoblin/media_types/image/processing.py3
-rw-r--r--mediagoblin/media_types/pdf/processing.py3
-rw-r--r--mediagoblin/media_types/raw_image/processing.py3
-rw-r--r--mediagoblin/media_types/stl/processing.py3
-rw-r--r--mediagoblin/media_types/video/processing.py40
-rw-r--r--mediagoblin/processing/__init__.py3
-rw-r--r--mediagoblin/submit/lib.py2
9 files changed, 42 insertions, 21 deletions
diff --git a/mediagoblin/media_types/ascii/processing.py b/mediagoblin/media_types/ascii/processing.py
index 71ccc86e..823dc4fd 100644
--- a/mediagoblin/media_types/ascii/processing.py
+++ b/mediagoblin/media_types/ascii/processing.py
@@ -274,7 +274,8 @@ class AsciiProcessingManager(ProcessingManager):
self.add_processor(InitialProcessor)
self.add_processor(Resizer)
- def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None):
+ def workflow(self, entry, manager, feed_url, reprocess_action,
+ reprocess_info=None):
ProcessMedia().apply_async(
[entry.id, feed_url, reprocess_action, reprocess_info], {},
task_id=entry.queued_task_id)
diff --git a/mediagoblin/media_types/audio/processing.py b/mediagoblin/media_types/audio/processing.py
index a83d60f7..b74364bc 100644
--- a/mediagoblin/media_types/audio/processing.py
+++ b/mediagoblin/media_types/audio/processing.py
@@ -366,7 +366,8 @@ class AudioProcessingManager(ProcessingManager):
self.add_processor(Resizer)
self.add_processor(Transcoder)
- def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None):
+ def workflow(self, entry, manager, feed_url, reprocess_action,
+ reprocess_info=None):
ProcessMedia().apply_async(
[entry.id, feed_url, reprocess_action, reprocess_info], {},
task_id=entry.queued_task_id)
diff --git a/mediagoblin/media_types/image/processing.py b/mediagoblin/media_types/image/processing.py
index 42234eff..a189fef3 100644
--- a/mediagoblin/media_types/image/processing.py
+++ b/mediagoblin/media_types/image/processing.py
@@ -431,7 +431,8 @@ class ImageProcessingManager(ProcessingManager):
self.add_processor(Resizer)
self.add_processor(MetadataProcessing)
- def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None):
+ def workflow(self, entry, manager, feed_url, reprocess_action,
+ reprocess_info=None):
ProcessMedia().apply_async(
[entry.id, feed_url, reprocess_action, reprocess_info], {},
task_id=entry.queued_task_id)
diff --git a/mediagoblin/media_types/pdf/processing.py b/mediagoblin/media_types/pdf/processing.py
index d93b19bb..6a13c8e3 100644
--- a/mediagoblin/media_types/pdf/processing.py
+++ b/mediagoblin/media_types/pdf/processing.py
@@ -471,7 +471,8 @@ class PdfProcessingManager(ProcessingManager):
self.add_processor(InitialProcessor)
self.add_processor(Resizer)
- def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None):
+ def workflow(self, entry, manager, feed_url, reprocess_action,
+ reprocess_info=None):
ProcessMedia().apply_async(
[entry.id, feed_url, reprocess_action, reprocess_info], {},
task_id=entry.queued_task_id)
diff --git a/mediagoblin/media_types/raw_image/processing.py b/mediagoblin/media_types/raw_image/processing.py
index a385d563..7f2d155a 100644
--- a/mediagoblin/media_types/raw_image/processing.py
+++ b/mediagoblin/media_types/raw_image/processing.py
@@ -81,7 +81,8 @@ class RawImageProcessingManager(ProcessingManager):
self.add_processor(InitialRawProcessor)
self.add_processor(Resizer)
- def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None):
+ def workflow(self, entry, manager, feed_url, reprocess_action,
+ reprocess_info=None):
ProcessMedia().apply_async(
[entry.id, feed_url, reprocess_action, reprocess_info], {},
task_id=entry.queued_task_id)
diff --git a/mediagoblin/media_types/stl/processing.py b/mediagoblin/media_types/stl/processing.py
index 7f2f350d..9dd6d49b 100644
--- a/mediagoblin/media_types/stl/processing.py
+++ b/mediagoblin/media_types/stl/processing.py
@@ -369,7 +369,8 @@ class StlProcessingManager(ProcessingManager):
self.add_processor(InitialProcessor)
self.add_processor(Resizer)
- def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None):
+ def workflow(self, entry, manager, feed_url, reprocess_action,
+ reprocess_info=None):
ProcessMedia().apply_async(
[entry.id, feed_url, reprocess_action, reprocess_info], {},
task_id=entry.queued_task_id)
diff --git a/mediagoblin/media_types/video/processing.py b/mediagoblin/media_types/video/processing.py
index 5cae42f5..d039c24b 100644
--- a/mediagoblin/media_types/video/processing.py
+++ b/mediagoblin/media_types/video/processing.py
@@ -22,6 +22,7 @@ import celery
import six
+from celery import group, chord
from mediagoblin import mg_globals as mgg
from mediagoblin.processing import (
FilenameBuilder, BaseProcessingFail,
@@ -34,7 +35,7 @@ from mediagoblin.tools.translate import lazy_pass_to_ugettext as _
from mediagoblin.media_types import MissingComponents
from . import transcoders
-from .util import skip_transcode
+from .util import skip_transcode, ACCEPTED_RESOLUTIONS
_log = logging.getLogger(__name__)
_log.setLevel(logging.DEBUG)
@@ -165,26 +166,26 @@ def store_metadata(media_entry, metadata):
@celery.task()
-def main_task(**process_info):
+def main_task(resolution, medium_size, **process_info):
processor = CommonVideoProcessor(process_info['manager'], process_info['entry'])
- processor.common_setup(process_info['resolution'])
- processor.transcode(medium_size=process_info['medium_size'], vp8_quality=process_info['vp8_quality'],
+ processor.common_setup(resolution)
+ processor.transcode(medium_size=medium_size, vp8_quality=process_info['vp8_quality'],
vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality'])
processor.generate_thumb(thumb_size=process_info['thumb_size'])
processor.store_orig_metadata()
@celery.task()
-def complimentary_task(**process_info):
+def complimentary_task(resolution, medium_size, **process_info):
processor = CommonVideoProcessor(process_info['manager'], process_info['entry'])
- processor.common_setup(process_info['resolution'])
- processor.transcode(medium_size=process_info['medium_size'], vp8_quality=process_info['vp8_quality'],
+ processor.common_setup(resolution)
+ processor.transcode(medium_size=medium_size, vp8_quality=process_info['vp8_quality'],
vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality'])
@celery.task()
-def processing_cleanup(**process_info):
- processor = CommonVideoProcessor(process_info['manager'], process_info['entry'])
+def processing_cleanup(entry, manager):
+ processor = CommonVideoProcessor(manager, entry) # is it manager, entry or entry, manager?
processor.delete_queue_file()
# =====================
@@ -523,7 +524,20 @@ class VideoProcessingManager(ProcessingManager):
self.add_processor(Resizer)
self.add_processor(Transcoder)
- def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None):
- ProcessMedia().apply_async(
- [entry.id, feed_url, reprocess_action, reprocess_info], {},
- task_id=entry.queued_task_id)
+ def workflow(self, entry, manager, feed_url, reprocess_action,
+ reprocess_info=None):
+
+ reprocess_info['entry'] = entry.id # ?
+ reprocess_info['manager'] = manager # can celery serialize this?
+
+ # Add args
+
+ transcoding_tasks = group(
+ main_task.signature(queue='default', priority=5, immutable=True),
+ complimentary_task.signature(queue='default', priority=4, immutable=True),
+ complimentary_task.signature(queue='default', priority=3, immutable=True),
+ complimentary_task.signature(queue='default', priority=2, immutable=True)
+ complimentary_task.signature(queue='default', priority=1, immutable=True)
+ )
+
+ chord(transcoding_tasks)(processing_cleanup.signature(queue='default', immutable=True))
diff --git a/mediagoblin/processing/__init__.py b/mediagoblin/processing/__init__.py
index 4e5853c1..76f81faa 100644
--- a/mediagoblin/processing/__init__.py
+++ b/mediagoblin/processing/__init__.py
@@ -257,7 +257,8 @@ class ProcessingManager(object):
return processor
- def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None):
+ def workflow(self, entry, manager, feed_url, reprocess_action,
+ reprocess_info=None):
"""
Returns the Celery command needed to proceed with media processing
*This method has to be implemented in all media types*
diff --git a/mediagoblin/submit/lib.py b/mediagoblin/submit/lib.py
index 402eb851..1c78f73a 100644
--- a/mediagoblin/submit/lib.py
+++ b/mediagoblin/submit/lib.py
@@ -267,7 +267,7 @@ def run_process_media(entry, feed_url=None,
entry, manager = get_entry_and_processing_manager(entry.id)
try:
- manager.workflow(entry, feed_url, reprocess_action, reprocess_info)
+ manager.workflow(entry, manager, feed_url, reprocess_action, reprocess_info)
except BaseException as exc:
# The purpose of this section is because when running in "lazy"
# or always-eager-with-exceptions-propagated celery mode that