aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvijeth-aradhya <vijthaaa@gmail.com>2017-06-13 01:43:43 +0530
committervijeth-aradhya <vijthaaa@gmail.com>2017-06-13 01:43:43 +0530
commitd77eb56280f57e547294e29e6a1b2b4d46c15ac6 (patch)
tree95c6889f786d0d9fde1985e23eb7ee7e40105533
parentbd011c940eeeddd060ccf921ad3519d20d77a015 (diff)
downloadmediagoblin-d77eb56280f57e547294e29e6a1b2b4d46c15ac6.tar.lz
mediagoblin-d77eb56280f57e547294e29e6a1b2b4d46c15ac6.tar.xz
mediagoblin-d77eb56280f57e547294e29e6a1b2b4d46c15ac6.zip
Celery Priority testing with debug statements
Error at this line: `self.entry.set_file_metadata(self.curr_file, **file_metadata)` Otherwise, celery part should work fine.
-rw-r--r--mediagoblin/init/celery/__init__.py3
-rw-r--r--mediagoblin/media_types/ascii/processing.py3
-rw-r--r--mediagoblin/media_types/audio/processing.py3
-rw-r--r--mediagoblin/media_types/image/processing.py3
-rw-r--r--mediagoblin/media_types/pdf/processing.py3
-rw-r--r--mediagoblin/media_types/raw_image/processing.py3
-rw-r--r--mediagoblin/media_types/stl/processing.py3
-rw-r--r--mediagoblin/media_types/video/processing.py101
-rw-r--r--mediagoblin/processing/__init__.py3
-rw-r--r--mediagoblin/submit/lib.py2
10 files changed, 80 insertions, 47 deletions
diff --git a/mediagoblin/init/celery/__init__.py b/mediagoblin/init/celery/__init__.py
index 9a67942c..a3335958 100644
--- a/mediagoblin/init/celery/__init__.py
+++ b/mediagoblin/init/celery/__init__.py
@@ -55,6 +55,9 @@ def get_celery_settings_dict(app_config, global_config,
queue_arguments={'x-max-priority': 10}),
)
+ print "CELERY_ACKS_LATE", celery_conf['CELERY_ACKS_LATE']
+ print "CELERYD_PREFETCH_MULTIPLIER", celery_conf['CELERYD_PREFETCH_MULTIPLIER']
+
celery_settings = {}
# Add all celery settings from config
diff --git a/mediagoblin/media_types/ascii/processing.py b/mediagoblin/media_types/ascii/processing.py
index 823dc4fd..c9b47fb5 100644
--- a/mediagoblin/media_types/ascii/processing.py
+++ b/mediagoblin/media_types/ascii/processing.py
@@ -274,8 +274,7 @@ class AsciiProcessingManager(ProcessingManager):
self.add_processor(InitialProcessor)
self.add_processor(Resizer)
- def workflow(self, entry, manager, feed_url, reprocess_action,
- reprocess_info=None):
+ def workflow(self, entry_id, feed_url, reprocess_action, reprocess_info=None):
ProcessMedia().apply_async(
[entry.id, feed_url, reprocess_action, reprocess_info], {},
task_id=entry.queued_task_id)
diff --git a/mediagoblin/media_types/audio/processing.py b/mediagoblin/media_types/audio/processing.py
index b74364bc..15d0b0a7 100644
--- a/mediagoblin/media_types/audio/processing.py
+++ b/mediagoblin/media_types/audio/processing.py
@@ -366,8 +366,7 @@ class AudioProcessingManager(ProcessingManager):
self.add_processor(Resizer)
self.add_processor(Transcoder)
- def workflow(self, entry, manager, feed_url, reprocess_action,
- reprocess_info=None):
+ def workflow(self, entry_id, feed_url, reprocess_action, reprocess_info=None):
ProcessMedia().apply_async(
[entry.id, feed_url, reprocess_action, reprocess_info], {},
task_id=entry.queued_task_id)
diff --git a/mediagoblin/media_types/image/processing.py b/mediagoblin/media_types/image/processing.py
index a189fef3..7224a8fd 100644
--- a/mediagoblin/media_types/image/processing.py
+++ b/mediagoblin/media_types/image/processing.py
@@ -431,8 +431,7 @@ class ImageProcessingManager(ProcessingManager):
self.add_processor(Resizer)
self.add_processor(MetadataProcessing)
- def workflow(self, entry, manager, feed_url, reprocess_action,
- reprocess_info=None):
+ def workflow(self, entry_id, feed_url, reprocess_action, reprocess_info=None):
ProcessMedia().apply_async(
[entry.id, feed_url, reprocess_action, reprocess_info], {},
task_id=entry.queued_task_id)
diff --git a/mediagoblin/media_types/pdf/processing.py b/mediagoblin/media_types/pdf/processing.py
index 6a13c8e3..e6e6e0a9 100644
--- a/mediagoblin/media_types/pdf/processing.py
+++ b/mediagoblin/media_types/pdf/processing.py
@@ -471,8 +471,7 @@ class PdfProcessingManager(ProcessingManager):
self.add_processor(InitialProcessor)
self.add_processor(Resizer)
- def workflow(self, entry, manager, feed_url, reprocess_action,
- reprocess_info=None):
+ def workflow(self, entry_id, feed_url, reprocess_action, reprocess_info=None):
ProcessMedia().apply_async(
[entry.id, feed_url, reprocess_action, reprocess_info], {},
task_id=entry.queued_task_id)
diff --git a/mediagoblin/media_types/raw_image/processing.py b/mediagoblin/media_types/raw_image/processing.py
index 7f2d155a..4bfd9f3a 100644
--- a/mediagoblin/media_types/raw_image/processing.py
+++ b/mediagoblin/media_types/raw_image/processing.py
@@ -81,8 +81,7 @@ class RawImageProcessingManager(ProcessingManager):
self.add_processor(InitialRawProcessor)
self.add_processor(Resizer)
- def workflow(self, entry, manager, feed_url, reprocess_action,
- reprocess_info=None):
+ def workflow(self, entry_id, feed_url, reprocess_action, reprocess_info=None):
ProcessMedia().apply_async(
[entry.id, feed_url, reprocess_action, reprocess_info], {},
task_id=entry.queued_task_id)
diff --git a/mediagoblin/media_types/stl/processing.py b/mediagoblin/media_types/stl/processing.py
index 9dd6d49b..cd3ffd8c 100644
--- a/mediagoblin/media_types/stl/processing.py
+++ b/mediagoblin/media_types/stl/processing.py
@@ -369,8 +369,7 @@ class StlProcessingManager(ProcessingManager):
self.add_processor(InitialProcessor)
self.add_processor(Resizer)
- def workflow(self, entry, manager, feed_url, reprocess_action,
- reprocess_info=None):
+ def workflow(self, entry_id, feed_url, reprocess_action, reprocess_info=None):
ProcessMedia().apply_async(
[entry.id, feed_url, reprocess_action, reprocess_info], {},
task_id=entry.queued_task_id)
diff --git a/mediagoblin/media_types/video/processing.py b/mediagoblin/media_types/video/processing.py
index 64cacb5f..c3257c84 100644
--- a/mediagoblin/media_types/video/processing.py
+++ b/mediagoblin/media_types/video/processing.py
@@ -29,7 +29,7 @@ from mediagoblin.processing import (
ProgressCallback, MediaProcessor,
ProcessingManager, request_from_args,
get_process_filename, store_public,
- copy_original)
+ copy_original, get_entry_and_processing_manager)
from mediagoblin.processing.task import ProcessMedia
from mediagoblin.tools.translate import lazy_pass_to_ugettext as _
from mediagoblin.media_types import MissingComponents
@@ -166,27 +166,35 @@ def store_metadata(media_entry, metadata):
@celery.task()
-def main_task(resolution, medium_size, **process_info):
- processor = CommonVideoProcessor(process_info['manager'], process_info['entry'])
- processor.common_setup(resolution)
- processor.transcode(medium_size=medium_size, vp8_quality=process_info['vp8_quality'],
- vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality'])
- processor.generate_thumb(thumb_size=process_info['thumb_size'])
- processor.store_orig_metadata()
+def main_task(entry_id, resolution, medium_size, **process_info):
+ entry, manager = get_entry_and_processing_manager(entry_id)
+ print "\nEntered main_task\n"
+ with CommonVideoProcessor(manager, entry) as processor:
+ processor.common_setup(resolution)
+ processor.transcode(medium_size=tuple(medium_size), vp8_quality=process_info['vp8_quality'],
+ vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality'])
+ processor.generate_thumb(thumb_size=process_info['thumb_size'])
+ processor.store_orig_metadata()
+ print "\nExited main_task\n"
@celery.task()
-def complimentary_task(resolution, medium_size, **process_info):
- processor = CommonVideoProcessor(process_info['manager'], process_info['entry'])
- processor.common_setup(resolution)
- processor.transcode(medium_size=medium_size, vp8_quality=process_info['vp8_quality'],
- vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality'])
+def complimentary_task(entry_id, resolution, medium_size, **process_info):
+ entry, manager = get_entry_and_processing_manager(entry_id)
+ print "\nEntered complimentary_task\n"
+ with CommonVideoProcessor(manager, entry) as processor:
+ processor.common_setup(resolution)
+ processor.transcode(medium_size=tuple(medium_size), vp8_quality=process_info['vp8_quality'],
+ vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality'])
+ print "\nExited complimentary_task\n"
@celery.task()
-def processing_cleanup(entry, manager):
- processor = CommonVideoProcessor(manager, entry)
- processor.delete_queue_file()
+def processing_cleanup(entry_id):
+ entry, manager = get_entry_and_processing_manager(entry_id)
+ with CommonVideoProcessor(manager, entry) as processor:
+ processor.delete_queue_file()
+ print "\nDeleted queue_file\n"
# =====================
@@ -206,7 +214,7 @@ class CommonVideoProcessor(MediaProcessor):
self.process_filename = get_process_filename(
self.entry, self.workbench, self.acceptable_files)
self.name_builder = FilenameBuilder(self.process_filename)
-
+
self.transcoder = transcoders.VideoTranscoder()
self.did_transcode = False
@@ -218,6 +226,8 @@ class CommonVideoProcessor(MediaProcessor):
self.curr_file = 'webm_video'
self.part_filename = self.name_builder.fill('{basename}.medium.webm')
+ print self.curr_file, ": Done common_setup()"
+
def copy_original(self):
# If we didn't transcode, then we need to keep the original
raise NotImplementedError
@@ -254,6 +264,7 @@ class CommonVideoProcessor(MediaProcessor):
def transcode(self, medium_size=None, vp8_quality=None, vp8_threads=None,
vorbis_quality=None):
+ print self.curr_file, ": Enter transcode"
progress_callback = ProgressCallback(self.entry)
tmp_dst = os.path.join(self.workbench.dir, self.part_filename)
@@ -292,25 +303,34 @@ class CommonVideoProcessor(MediaProcessor):
self.entry.media_files[self.curr_file].delete()
else:
+ print self.curr_file, ": ->1.1"
+ print type(medium_size)
+ medium_size = tuple(medium_size)
+ print type(medium_size)
+ print self.curr_file, ": ->1.2"
self.transcoder.transcode(self.process_filename, tmp_dst,
vp8_quality=vp8_quality,
vp8_threads=vp8_threads,
vorbis_quality=vorbis_quality,
progress_callback=progress_callback,
dimensions=tuple(medium_size))
+ print self.curr_file, ": ->2"
if self.transcoder.dst_data:
+ print self.curr_file, ": ->3"
# Push transcoded video to public storage
_log.debug('Saving medium...')
- store_public(self.entry, 'webm_video', tmp_dst,
- self.name_builder.fill('{basename}.medium.webm'))
+ store_public(self.entry, 'webm_video', tmp_dst, self.part_filename)
_log.debug('Saved medium')
+ print self.curr_file, ": ->4"
# Is this the file_metadata that paroneayea was talking about?
self.entry.set_file_metadata(self.curr_file, **file_metadata)
self.did_transcode = True
+ print self.curr_file, ": Done transcode()"
def generate_thumb(self, thumb_size=None):
+ print self.curr_file, ": Enter generate_thumb()"
# Temporary file for the video thumbnail (cleaned up with workbench)
tmp_thumb = os.path.join(self.workbench.dir,
self.name_builder.fill(
@@ -339,9 +359,10 @@ class CommonVideoProcessor(MediaProcessor):
self.name_builder.fill('{basename}.thumbnail.jpg'))
self.entry.set_file_metadata('thumb', thumb_size=thumb_size)
+ print self.curr_file, ": Done generate_thumb()"
def store_orig_metadata(self):
-
+ print self.curr_file, ": 2"
# Extract metadata and keep a record of it
metadata = transcoders.discover(self.process_filename)
@@ -524,25 +545,41 @@ class VideoProcessingManager(ProcessingManager):
self.add_processor(Resizer)
self.add_processor(Transcoder)
- def workflow(self, entry, manager, feed_url, reprocess_action,
- reprocess_info=None):
+ def workflow(self, entry_id, feed_url, reprocess_action, reprocess_info=None):
- reprocess_info['entry'] = entry
- reprocess_info['manager'] = manager
+ reprocess_info = reprocess_info or {}
+ if 'vp8_quality' not in reprocess_info:
+ reprocess_info['vp8_quality'] = None
+ if 'vorbis_quality' not in reprocess_info:
+ reprocess_info['vorbis_quality'] = None
+ if 'vp8_threads' not in reprocess_info:
+ reprocess_info['vp8_threads'] = None
+ if 'thumb_size' not in reprocess_info:
+ reprocess_info['thumb_size'] = None
- transcoding_tasks = group(
- main_task.signature(args=('480p', ACCEPTED_RESOLUTIONS['480p']),
+ transcoding_tasks = group([
+ main_task.signature(args=(entry_id, '480p', ACCEPTED_RESOLUTIONS['480p']),
kwargs=reprocess_info, queue='default',
priority=5, immutable=True),
- complimentary_task.signature(args=('360p', ACCEPTED_RESOLUTIONS['360p']),
+ ])
+
+ cleanup_task = processing_cleanup.signature(args=(entry_id,),
+ queue='default', immutable=True)
+
+ """
+ complimentary_task.signature(args=(entry_id, '360p', ACCEPTED_RESOLUTIONS['360p']),
kwargs=reprocess_info, queue='default',
priority=4, immutable=True),
- complimentary_task.signature(args=('720p', ACCEPTED_RESOLUTIONS['720p']),
+ complimentary_task.signature(args=(entry_id, '720p', ACCEPTED_RESOLUTIONS['720p']),
kwargs=reprocess_info, queue='default',
priority=3, immutable=True),
- )
+ main_task.apply_async(args=(entry_id, '480p', ACCEPTED_RESOLUTIONS['480p']),
+ kwargs=reprocess_info, queue='default',
+ priority=5, immutable=True)
+ processing_cleanup.apply_async(args=(entry_id,), queue='default', immutable=True)
+ """
- cleanup_task = processing_cleanup.signature(args=(entry, manager),
- queue='default', immutable=True)
-
chord(transcoding_tasks)(cleanup_task)
+
+ # main_task(entry_id, '480p', ACCEPTED_RESOLUTIONS['480p'], **reprocess_info)
+ # processing_cleanup(entry_id)
diff --git a/mediagoblin/processing/__init__.py b/mediagoblin/processing/__init__.py
index 76f81faa..98031bbc 100644
--- a/mediagoblin/processing/__init__.py
+++ b/mediagoblin/processing/__init__.py
@@ -257,8 +257,7 @@ class ProcessingManager(object):
return processor
- def workflow(self, entry, manager, feed_url, reprocess_action,
- reprocess_info=None):
+ def workflow(self, entry_id, feed_url, reprocess_action, reprocess_info=None):
"""
Returns the Celery command needed to proceed with media processing
*This method has to be implemented in all media types*
diff --git a/mediagoblin/submit/lib.py b/mediagoblin/submit/lib.py
index 1c78f73a..f347e715 100644
--- a/mediagoblin/submit/lib.py
+++ b/mediagoblin/submit/lib.py
@@ -267,7 +267,7 @@ def run_process_media(entry, feed_url=None,
entry, manager = get_entry_and_processing_manager(entry.id)
try:
- manager.workflow(entry, manager, feed_url, reprocess_action, reprocess_info)
+ manager.workflow(entry.id, feed_url, reprocess_action, reprocess_info)
except BaseException as exc:
# The purpose of this section is because when running in "lazy"
# or always-eager-with-exceptions-propagated celery mode that