diff options
Diffstat (limited to 'mediagoblin/processing/__init__.py')
-rw-r--r-- | mediagoblin/processing/__init__.py | 318 |
1 files changed, 277 insertions, 41 deletions
diff --git a/mediagoblin/processing/__init__.py b/mediagoblin/processing/__init__.py index 27d89895..a4744e14 100644 --- a/mediagoblin/processing/__init__.py +++ b/mediagoblin/processing/__init__.py @@ -14,12 +14,14 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +from collections import OrderedDict import logging import os -from mediagoblin.db.util import atomic_update from mediagoblin import mg_globals as mgg - +from mediagoblin.db.util import atomic_update +from mediagoblin.db.models import MediaEntry +from mediagoblin.tools.pluginapi import hook_handle from mediagoblin.tools.translate import lazy_pass_to_ugettext as _ _log = logging.getLogger(__name__) @@ -74,49 +76,89 @@ class FilenameBuilder(object): ext=self.ext) -class ProcessingState(object): - """ - The first and only argument to the "processor" of a media type - This could be thought of as a "request" to the processor - function. It has the main info for the request (media entry) - and a bunch of tools for the request on it. - It can get more fancy without impacting old media types. +class MediaProcessor(object): + """A particular processor for this media type. + + While the ProcessingManager handles all types of MediaProcessing + possible for a particular media type, a MediaProcessor can be + thought of as a *particular* processing action for a media type. + For example, you may have separate MediaProcessors for: + + - initial_processing: the intial processing of a media + - gen_thumb: generate a thumbnail + - resize: resize an image + - transcode: transcode a video + + ... etc. + + Some information on producing a new MediaProcessor for your media type: + + - You *must* supply a name attribute. This must be a class level + attribute, and a string. This will be used to determine the + subcommand of your process + - It's recommended that you supply a class level description + attribute. + - Supply a media_is_eligible classmethod. This will be used to + determine whether or not a media entry is eligible to use this + processor type. See the method documentation for details. + - To give "./bin/gmg reprocess run" abilities to this media type, + supply both gnerate_parser and parser_to_request classmethods. + - The process method will be what actually processes your media. """ - def __init__(self, entry): + # You MUST override this in the child MediaProcessor! + name = None + + # Optional, but will be used in various places to describe the + # action this MediaProcessor provides + description = None + + def __init__(self, manager, entry): + self.manager = manager self.entry = entry + self.entry_orig_state = entry.state + + # Should be initialized at time of processing, at least self.workbench = None - self.queued_filename = None - def set_workbench(self, wb): - self.workbench = wb + def __enter__(self): + self.workbench = mgg.workbench_manager.create() + return self + + def __exit__(self, *args): + self.workbench.destroy() + self.workbench = None - def get_queued_filename(self): + # @with_workbench + def process(self, **kwargs): """ - Get the a filename for the original, on local storage + Actually process this media entry. """ - if self.queued_filename is not None: - return self.queued_filename - queued_filepath = self.entry.queued_media_file - queued_filename = self.workbench.localized_file( - mgg.queue_store, queued_filepath, - 'source') - self.queued_filename = queued_filename - return queued_filename - - def copy_original(self, target_name, keyname=u"original"): - self.store_public(keyname, self.get_queued_filename(), target_name) - - def store_public(self, keyname, local_file, target_name=None): - if target_name is None: - target_name = os.path.basename(local_file) - target_filepath = create_pub_filepath(self.entry, target_name) - if keyname in self.entry.media_files: - _log.warn("store_public: keyname %r already used for file %r, " - "replacing with %r", keyname, - self.entry.media_files[keyname], target_filepath) - mgg.public_store.copy_local_to_storage(local_file, target_filepath) - self.entry.media_files[keyname] = target_filepath + raise NotImplementedError + + @classmethod + def media_is_eligible(cls, entry=None, state=None): + raise NotImplementedError + + ############################### + # Command line interface things + ############################### + + @classmethod + def generate_parser(cls): + raise NotImplementedError + + @classmethod + def args_to_request(cls, args): + raise NotImplementedError + + ########################################## + # THE FUTURE: web interface things here :) + ########################################## + + ##################### + # Some common "steps" + ##################### def delete_queue_file(self): # Remove queued media file from storage and database. @@ -124,9 +166,129 @@ class ProcessingState(object): # be removed too, but fail if the directory is not empty to be on # the super-safe side. queued_filepath = self.entry.queued_media_file - mgg.queue_store.delete_file(queued_filepath) # rm file - mgg.queue_store.delete_dir(queued_filepath[:-1]) # rm dir - self.entry.queued_media_file = [] + if queued_filepath: + mgg.queue_store.delete_file(queued_filepath) # rm file + mgg.queue_store.delete_dir(queued_filepath[:-1]) # rm dir + self.entry.queued_media_file = [] + + +class ProcessingKeyError(Exception): pass +class ProcessorDoesNotExist(ProcessingKeyError): pass +class ProcessorNotEligible(ProcessingKeyError): pass +class ProcessingManagerDoesNotExist(ProcessingKeyError): pass + + + +class ProcessingManager(object): + """Manages all the processing actions available for a media type + + Specific processing actions, MediaProcessor subclasses, are added + to the ProcessingManager. + """ + def __init__(self): + # Dict of all MediaProcessors of this media type + self.processors = OrderedDict() + + def add_processor(self, processor): + """ + Add a processor class to this media type + """ + name = processor.name + if name is None: + raise AttributeError("Processor class's .name attribute not set") + + self.processors[name] = processor + + def list_eligible_processors(self, entry): + """ + List all processors that this media entry is eligible to be processed + for. + """ + return [ + processor + for processor in self.processors.values() + if processor.media_is_eligible(entry=entry)] + + def list_all_processors_by_state(self, state): + """ + List all processors that this media state is eligible to be processed + for. + """ + return [ + processor + for processor in self.processors.values() + if processor.media_is_eligible(state=state)] + + + def list_all_processors(self): + return self.processors.values() + + def gen_process_request_via_cli(self, subparser): + # Got to figure out what actually goes here before I can write this properly + pass + + def get_processor(self, key, entry=None): + """ + Get the processor with this key. + + If entry supplied, make sure this entry is actually compatible; + otherwise raise error. + """ + try: + processor = self.processors[key] + except KeyError: + import pdb + pdb.set_trace() + raise ProcessorDoesNotExist( + "'%s' processor does not exist for this media type" % key) + + if entry and not processor.media_is_eligible(entry): + raise ProcessorNotEligible( + "This entry is not eligible for processor with name '%s'" % key) + + return processor + + +def request_from_args(args, which_args): + """ + Generate a request from the values of some argparse parsed args + """ + request = {} + for arg in which_args: + request[arg] = getattr(args, arg) + + return request + + +class MediaEntryNotFound(Exception): pass + + +def get_processing_manager_for_type(media_type): + """ + Get the appropriate media manager for this type + """ + manager_class = hook_handle(('reprocess_manager', media_type)) + if not manager_class: + raise ProcessingManagerDoesNotExist( + "A processing manager does not exist for {0}".format(media_type)) + manager = manager_class() + + return manager + + +def get_entry_and_processing_manager(media_id): + """ + Get a MediaEntry, its media type, and its manager all in one go. + + Returns a tuple of: `(entry, media_type, media_manager)` + """ + entry = MediaEntry.query.filter_by(id=media_id).first() + if entry is None: + raise MediaEntryNotFound("Can't find media with id '%s'" % media_id) + + manager = get_processing_manager_for_type(entry.media_type) + + return entry, manager def mark_entry_failed(entry_id, exc): @@ -165,6 +327,66 @@ def mark_entry_failed(entry_id, exc): u'fail_metadata': {}}) +def get_process_filename(entry, workbench, acceptable_files): + """ + Try and get the queued file if available, otherwise return the first file + in the acceptable_files that we have. + + If no acceptable_files, raise ProcessFileNotFound + """ + if entry.queued_media_file: + filepath = entry.queued_media_file + storage = mgg.queue_store + else: + for keyname in acceptable_files: + if entry.media_files.get(keyname): + filepath = entry.media_files[keyname] + storage = mgg.public_store + break + + if not filepath: + raise ProcessFileNotFound() + + filename = workbench.localized_file( + storage, filepath, + 'source') + + if not os.path.exists(filename): + raise ProcessFileNotFound() + + return filename + + +def store_public(entry, keyname, local_file, target_name=None, + delete_if_exists=True): + if target_name is None: + target_name = os.path.basename(local_file) + target_filepath = create_pub_filepath(entry, target_name) + + if keyname in entry.media_files: + _log.warn("store_public: keyname %r already used for file %r, " + "replacing with %r", keyname, + entry.media_files[keyname], target_filepath) + if delete_if_exists: + mgg.public_store.delete_file(entry.media_files[keyname]) + + try: + mgg.public_store.copy_local_to_storage(local_file, target_filepath) + except: + raise PublicStoreFail(keyname=keyname) + + # raise an error if the file failed to copy + copied_filepath = mgg.public_store.get_local_path(target_filepath) + if not os.path.exists(copied_filepath): + raise PublicStoreFail(keyname=keyname) + + entry.media_files[keyname] = target_filepath + + +def copy_original(entry, orig_filename, target_name, keyname=u"original"): + store_public(entry, keyname, orig_filename, target_name) + + class BaseProcessingFail(Exception): """ Base exception that all other processing failure messages should @@ -184,10 +406,24 @@ class BaseProcessingFail(Exception): def __init__(self, **metadata): self.metadata = metadata or {} - class BadMediaFail(BaseProcessingFail): """ Error that should be raised when an inappropriate file was given for the media type specified. """ general_message = _(u'Invalid file given for media type.') + + +class PublicStoreFail(BaseProcessingFail): + """ + Error that should be raised when copying to public store fails + """ + general_message = _('Copying to public storage failed.') + + +class ProcessFileNotFound(BaseProcessingFail): + """ + Error that should be raised when an acceptable file for processing + is not found. + """ + general_message = _(u'An acceptable processing file was not found') |