diff options
Diffstat (limited to 'mediagoblin')
| -rw-r--r-- | mediagoblin/media_types/image/processing.py | 92 | ||||
| -rw-r--r-- | mediagoblin/media_types/video/processing.py | 12 | ||||
| -rw-r--r-- | mediagoblin/processing/__init__.py | 32 | ||||
| -rw-r--r-- | mediagoblin/tests/test_processing.py | 20 | ||||
| -rw-r--r-- | mediagoblin/tests/test_submission.py | 338 | ||||
| -rw-r--r-- | mediagoblin/tests/test_submission/bigblue.png | bin | 0 -> 3142 bytes | 
6 files changed, 238 insertions, 256 deletions
| diff --git a/mediagoblin/media_types/image/processing.py b/mediagoblin/media_types/image/processing.py index a3bf2c20..c484117b 100644 --- a/mediagoblin/media_types/image/processing.py +++ b/mediagoblin/media_types/image/processing.py @@ -19,10 +19,40 @@ import os  from mediagoblin import mg_globals as mgg  from mediagoblin.processing import BadMediaFail, \ -    create_pub_filepath, THUMB_SIZE, MEDIUM_SIZE +    create_pub_filepath, THUMB_SIZE, MEDIUM_SIZE, FilenameBuilder  from mediagoblin.tools.exif import exif_fix_image_orientation, \      extract_exif, clean_exif, get_gps_data, get_useful +def resize_image(entry, filename, new_path, exif_tags, workdir, new_size, +                 size_limits=(0, 0)): +    """Store a resized version of an image and return its pathname. + +    Arguments: +    entry -- the entry for the image to resize +    filename -- the filename of the original image being resized +    new_path -- public file path for the new resized image +    exif_tags -- EXIF data for the original image +    workdir -- directory path for storing converted image files +    new_size -- 2-tuple size for the resized image +    size_limits (optional) -- image is only resized if it exceeds this size + +    """ +    try: +        resized = Image.open(filename) +    except IOError: +        raise BadMediaFail() +    resized = exif_fix_image_orientation(resized, exif_tags)  # Fix orientation + +    if ((resized.size[0] > size_limits[0]) or +        (resized.size[1] > size_limits[1])): +        resized.thumbnail(new_size, Image.ANTIALIAS) + +    # Copy the new file to the conversion subdir, then remotely. +    tmp_resized_filename = os.path.join(workdir, new_path[-1]) +    with file(tmp_resized_filename, 'w') as resized_file: +        resized.save(resized_file) +    mgg.public_store.copy_local_to_storage(tmp_resized_filename, new_path) +  def process_image(entry):      """      Code to process an image @@ -37,67 +67,33 @@ def process_image(entry):      queued_filename = workbench.localized_file(          mgg.queue_store, queued_filepath,          'source') - -    filename_bits = os.path.splitext(queued_filename) -    basename = os.path.split(filename_bits[0])[1] -    extension = filename_bits[1].lower() +    name_builder = FilenameBuilder(queued_filename)      # EXIF extraction      exif_tags = extract_exif(queued_filename)      gps_data = get_gps_data(exif_tags) -    try: -        thumb = Image.open(queued_filename) -    except IOError: -        raise BadMediaFail() - -    thumb = exif_fix_image_orientation(thumb, exif_tags) - -    thumb.thumbnail(THUMB_SIZE, Image.ANTIALIAS) - -    # Copy the thumb to the conversion subdir, then remotely. -    thumb_filename = 'thumbnail' + extension -    thumb_filepath = create_pub_filepath(entry, thumb_filename) - -    tmp_thumb_filename = os.path.join( -        conversions_subdir, thumb_filename) - -    with file(tmp_thumb_filename, 'w') as thumb_file: -        thumb.save(thumb_file) - -    mgg.public_store.copy_local_to_storage( -        tmp_thumb_filename, thumb_filepath) +    # Always create a small thumbnail +    thumb_filepath = create_pub_filepath( +        entry, name_builder.fill('{basename}.thumbnail{ext}')) +    resize_image(entry, queued_filename, thumb_filepath, +                 exif_tags, conversions_subdir, THUMB_SIZE)      # If the size of the original file exceeds the specified size of a `medium` -    # file, a `medium.jpg` files is created and later associated with the media +    # file, a `.medium.jpg` files is created and later associated with the media      # entry. -    medium = Image.open(queued_filename) - -    # Fix orientation -    medium = exif_fix_image_orientation(medium, exif_tags) - -    if medium.size[0] > MEDIUM_SIZE[0] or medium.size[1] > MEDIUM_SIZE[1]: -        medium.thumbnail(MEDIUM_SIZE, Image.ANTIALIAS) - -    medium_filename = 'medium' + extension -    medium_filepath = create_pub_filepath(entry, medium_filename) - -    tmp_medium_filename = os.path.join( -        conversions_subdir, medium_filename) - -    with file(tmp_medium_filename, 'w') as medium_file: -        medium.save(medium_file) - -    mgg.public_store.copy_local_to_storage( -        tmp_medium_filename, medium_filepath) +    medium_filepath = create_pub_filepath( +        entry, name_builder.fill('{basename}.medium{ext}')) +    resize_image(entry, queued_filename, medium_filepath, +                 exif_tags, conversions_subdir, MEDIUM_SIZE, MEDIUM_SIZE)      # we have to re-read because unlike PIL, not everything reads      # things in string representation :)      queued_file = file(queued_filename, 'rb')      with queued_file: -        #create_pub_filepath(entry, queued_filepath[-1]) -        original_filepath = create_pub_filepath(entry, basename + extension)  +        original_filepath = create_pub_filepath( +            entry, name_builder.fill('{basename}{ext}') )          with mgg.public_store.get_file(original_filepath, 'wb') \              as original_file: diff --git a/mediagoblin/media_types/video/processing.py b/mediagoblin/media_types/video/processing.py index 3a479802..24c03648 100644 --- a/mediagoblin/media_types/video/processing.py +++ b/mediagoblin/media_types/video/processing.py @@ -20,7 +20,7 @@ import os  from mediagoblin import mg_globals as mgg  from mediagoblin.processing import mark_entry_failed, \ -    THUMB_SIZE, MEDIUM_SIZE, create_pub_filepath +    THUMB_SIZE, MEDIUM_SIZE, create_pub_filepath, FilenameBuilder  from . import transcoders  logging.basicConfig() @@ -49,17 +49,13 @@ def process_video(entry):      queued_filename = workbench.localized_file(          mgg.queue_store, queued_filepath,          'source') +    name_builder = FilenameBuilder(queued_filename)      medium_filepath = create_pub_filepath( -        entry, -        '{original}-640p.webm'.format( -            original=os.path.splitext( -                queued_filepath[-1])[0]  # Select the  -            )) +        entry, name_builder.fill('{basename}-640p.webm'))      thumbnail_filepath = create_pub_filepath( -        entry, 'thumbnail.jpg') - +        entry, name_builder.fill('{basename}.thumbnail.jpg'))      # Create a temporary file for the video destination      tmp_dst = tempfile.NamedTemporaryFile() diff --git a/mediagoblin/processing/__init__.py b/mediagoblin/processing/__init__.py index ecdfa8a9..9dee3baa 100644 --- a/mediagoblin/processing/__init__.py +++ b/mediagoblin/processing/__init__.py @@ -15,6 +15,7 @@  # along with this program.  If not, see <http://www.gnu.org/licenses/>.  import logging +import os  from mediagoblin.db.util import atomic_update  from mediagoblin import mg_globals as mgg @@ -33,6 +34,37 @@ def create_pub_filepath(entry, filename):               unicode(entry._id),               filename]) +class FilenameBuilder(object): +    """Easily slice and dice filenames. + +    Initialize this class with an original file path, then use the fill() +    method to create new filenames based on the original. + +    """ +    MAX_FILENAME_LENGTH = 255  # VFAT's maximum filename length + +    def __init__(self, path): +        """Initialize a builder from an original file path.""" +        self.dirpath, self.basename = os.path.split(path) +        self.basename, self.ext = os.path.splitext(self.basename) +        self.ext = self.ext.lower() + +    def fill(self, fmtstr): +        """Build a new filename based on the original. + +        The fmtstr argument can include the following: +        {basename} -- the original basename, with the extension removed +        {ext} -- the original extension, always lowercase + +        If necessary, {basename} will be truncated so the filename does not +        exceed this class' MAX_FILENAME_LENGTH in length. + +        """ +        basename_len = (self.MAX_FILENAME_LENGTH - +                        len(fmtstr.format(basename='', ext=self.ext))) +        return fmtstr.format(basename=self.basename[:basename_len], +                             ext=self.ext) +  def mark_entry_failed(entry_id, exc):      """ diff --git a/mediagoblin/tests/test_processing.py b/mediagoblin/tests/test_processing.py new file mode 100644 index 00000000..417f91f3 --- /dev/null +++ b/mediagoblin/tests/test_processing.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python + +from nose.tools import assert_equal, assert_true, assert_false + +from mediagoblin import processing + +class TestProcessing(object): +    def run_fill(self, input, format, output=None): +        builder = processing.FilenameBuilder(input) +        result = builder.fill(format) +        if output is None: +            return result +        assert_equal(output, result) +         +    def test_easy_filename_fill(self): +        self.run_fill('/home/user/foo.TXT', '{basename}bar{ext}', 'foobar.txt') + +    def test_long_filename_fill(self): +        self.run_fill('{0}.png'.format('A' * 300), 'image-{basename}{ext}', +                      'image-{0}.png'.format('A' * 245)) diff --git a/mediagoblin/tests/test_submission.py b/mediagoblin/tests/test_submission.py index 1f56779e..ba80ba20 100644 --- a/mediagoblin/tests/test_submission.py +++ b/mediagoblin/tests/test_submission.py @@ -15,30 +15,34 @@  # along with this program.  If not, see <http://www.gnu.org/licenses/>.  import urlparse -import pkg_resources +import os  import re +import time  from nose.tools import assert_equal, assert_true, assert_false +from pkg_resources import resource_filename  from mediagoblin.tests.tools import setup_fresh_app, get_test_app, \      fixture_add_user  from mediagoblin import mg_globals +from mediagoblin.processing import create_pub_filepath  from mediagoblin.tools import template, common -GOOD_JPG = pkg_resources.resource_filename( -  'mediagoblin.tests', 'test_submission/good.jpg') -GOOD_PNG = pkg_resources.resource_filename( -  'mediagoblin.tests', 'test_submission/good.png') -EVIL_FILE = pkg_resources.resource_filename( -  'mediagoblin.tests', 'test_submission/evil') -EVIL_JPG = pkg_resources.resource_filename( -  'mediagoblin.tests', 'test_submission/evil.jpg') -EVIL_PNG = pkg_resources.resource_filename( -  'mediagoblin.tests', 'test_submission/evil.png') +def resource(filename): +    return resource_filename('mediagoblin.tests', 'test_submission/' + filename) + +GOOD_JPG = resource('good.jpg') +GOOD_PNG = resource('good.png') +EVIL_FILE = resource('evil') +EVIL_JPG = resource('evil.jpg') +EVIL_PNG = resource('evil.png') +BIG_BLUE = resource('bigblue.png')  GOOD_TAG_STRING = 'yin,yang'  BAD_TAG_STRING = 'rage,' + 'f' * 26 + 'u' * 26 +FORM_CONTEXT = ['mediagoblin/submit/start.html', 'submit_form'] +REQUEST_CONTEXT = ['mediagoblin/user_pages/user.html', 'request']  class TestSubmission:      def setUp(self): @@ -61,234 +65,168 @@ class TestSubmission:      def logout(self):          self.test_app.get('/auth/logout/') +    def do_post(self, data, *context_keys, **kwargs): +        url = kwargs.pop('url', '/submit/') +        do_follow = kwargs.pop('do_follow', False) +        template.clear_test_template_context() +        response = self.test_app.post(url, data, **kwargs) +        if do_follow: +            response.follow() +        context_data = template.TEMPLATE_TEST_CONTEXT +        for key in context_keys: +            context_data = context_data[key] +        return response, context_data +         +    def upload_data(self, filename): +        return {'upload_files': [('file', filename)]} + +    def check_comments(self, request, media, count): +        comments = request.db.MediaComment.find({'media_entry': media._id}) +        assert_equal(count, len(list(comments))) +      def test_missing_fields(self):          # Test blank form          # --------------- -        template.clear_test_template_context() -        response = self.test_app.post( -            '/submit/', {}) -        context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/submit/start.html'] -        form = context['submit_form'] -        assert form.file.errors == [u'You must provide a file.'] +        response, form = self.do_post({}, *FORM_CONTEXT) +        assert_equal(form.file.errors, [u'You must provide a file.'])          # Test blank file          # --------------- -        template.clear_test_template_context() -        response = self.test_app.post( -            '/submit/', { -                'title': 'test title'}) -        context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/submit/start.html'] -        form = context['submit_form'] -        assert form.file.errors == [u'You must provide a file.'] +        response, form = self.do_post({'title': 'test title'}, *FORM_CONTEXT) +        assert_equal(form.file.errors, [u'You must provide a file.']) +    def check_url(self, response, path): +        assert_equal(urlparse.urlsplit(response.location)[2], path) -    def test_normal_uploads(self): -        # Test JPG -        # -------- -        template.clear_test_template_context() -        response = self.test_app.post( -            '/submit/', { -                'title': 'Normal upload 1' -                }, upload_files=[( -                    'file', GOOD_JPG)]) - -        # User should be redirected -        response.follow() -        assert_equal( -            urlparse.urlsplit(response.location)[2], -            '/u/chris/') -        assert template.TEMPLATE_TEST_CONTEXT.has_key( -            'mediagoblin/user_pages/user.html') - +    def check_normal_upload(self, title, filename): +        response, context = self.do_post({'title': title}, do_follow=True, +                                         **self.upload_data(filename)) +        self.check_url(response, '/u/{0}/'.format(self.test_user.username)) +        assert_true(context.has_key('mediagoblin/user_pages/user.html'))          # Make sure the media view is at least reachable, logged in... -        self.test_app.get('/u/chris/m/normal-upload-1/') +        url = '/u/{0}/m/{1}/'.format(self.test_user.username, +                                     title.lower().replace(' ', '-')) +        self.test_app.get(url)          # ... and logged out too.          self.logout() -        self.test_app.get('/u/chris/m/normal-upload-1/') -        # Log back in for the remaining tests. -        self.login() +        self.test_app.get(url) -        # Test PNG -        # -------- -        template.clear_test_template_context() -        response = self.test_app.post( -            '/submit/', { -                'title': 'Normal upload 2' -                }, upload_files=[( -                    'file', GOOD_PNG)]) - -        response.follow() -        assert_equal( -            urlparse.urlsplit(response.location)[2], -            '/u/chris/') -        assert template.TEMPLATE_TEST_CONTEXT.has_key( -            'mediagoblin/user_pages/user.html') +    def test_normal_jpg(self): +        self.check_normal_upload('Normal upload 1', GOOD_JPG) + +    def test_normal_png(self): +        self.check_normal_upload('Normal upload 2', GOOD_PNG) + +    def check_media(self, request, find_data, count=None): +        media = request.db.MediaEntry.find(find_data) +        if count is not None: +            assert_equal(media.count(), count) +            if count == 0: +                return +        return media[0]      def test_tags(self):          # Good tag string          # -------- -        template.clear_test_template_context() -        response = self.test_app.post( -            '/submit/', { -                'title': 'Balanced Goblin', -                'tags': GOOD_TAG_STRING -                }, upload_files=[( -                    'file', GOOD_JPG)]) - -        # New media entry with correct tags should be created -        response.follow() -        context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/user_pages/user.html'] -        request = context['request'] -        media = request.db.MediaEntry.find({'title': 'Balanced Goblin'})[0] +        response, request = self.do_post({'title': 'Balanced Goblin', +                                          'tags': GOOD_TAG_STRING}, +                                         *REQUEST_CONTEXT, do_follow=True, +                                         **self.upload_data(GOOD_JPG)) +        media = self.check_media(request, {'title': 'Balanced Goblin'}, 1)          assert_equal(media.tags,                       [{'name': u'yin', 'slug': u'yin'}, -                                            {'name': u'yang', 'slug': u'yang'}]) +                      {'name': u'yang', 'slug': u'yang'}])          # Test tags that are too long          # --------------- -        template.clear_test_template_context() -        response = self.test_app.post( -            '/submit/', { -                'title': 'Balanced Goblin', -                'tags': BAD_TAG_STRING -                }, upload_files=[( -                    'file', GOOD_JPG)]) - -        # Too long error should be raised -        context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/submit/start.html'] -        form = context['submit_form'] -        assert form.tags.errors == [ -            u'Tags must be shorter than 50 characters.  Tags that are too long'\ -             ': ffffffffffffffffffffffffffuuuuuuuuuuuuuuuuuuuuuuuuuu'] +        response, form = self.do_post({'title': 'Balanced Goblin', +                                       'tags': BAD_TAG_STRING}, +                                      *FORM_CONTEXT, +                                      **self.upload_data(GOOD_JPG)) +        assert_equal(form.tags.errors, [ +                u'Tags must be shorter than 50 characters.  ' \ +                    'Tags that are too long: ' \ +                    'ffffffffffffffffffffffffffuuuuuuuuuuuuuuuuuuuuuuuuuu'])      def test_delete(self): -        template.clear_test_template_context() -        response = self.test_app.post( -            '/submit/', { -                'title': 'Balanced Goblin', -                }, upload_files=[( -                    'file', GOOD_JPG)]) - -        # Post image -        response.follow() - -        request = template.TEMPLATE_TEST_CONTEXT[ -            'mediagoblin/user_pages/user.html']['request'] - -        media = request.db.MediaEntry.find({'title': 'Balanced Goblin'})[0] - -        # Does media entry exist? -        assert_true(media) +        response, request = self.do_post({'title': 'Balanced Goblin'}, +                                         *REQUEST_CONTEXT, do_follow=True, +                                         **self.upload_data(GOOD_JPG)) +        media = self.check_media(request, {'title': 'Balanced Goblin'}, 1)          # Add a comment, so we can test for its deletion later. -        get_comments = lambda: list( -            request.db.MediaComment.find({'media_entry': media._id})) -        assert_false(get_comments()) -        response = self.test_app.post( -            request.urlgen('mediagoblin.user_pages.media_post_comment', -                           user=self.test_user.username, -                           media=media._id), -            {'comment_content': 'i love this test'}) -        response.follow() -        assert_true(get_comments()) +        self.check_comments(request, media, 0) +        comment_url = request.urlgen( +            'mediagoblin.user_pages.media_post_comment', +            user=self.test_user.username, media=media._id) +        response = self.do_post({'comment_content': 'i love this test'}, +                                url=comment_url, do_follow=True)[0] +        self.check_comments(request, media, 1)          # Do not confirm deletion          # --------------------------------------------------- -        response = self.test_app.post( -            request.urlgen('mediagoblin.user_pages.media_confirm_delete', -                           # No work: user=media.uploader().username, -                           user=self.test_user.username, -                           media=media._id), -            # no value means no confirm -            {}) - -        response.follow() - -        request = template.TEMPLATE_TEST_CONTEXT[ -            'mediagoblin/user_pages/user.html']['request'] - -        media = request.db.MediaEntry.find({'title': 'Balanced Goblin'})[0] - -        # Does media entry still exist? -        assert_true(media) +        delete_url = request.urlgen( +            'mediagoblin.user_pages.media_confirm_delete', +            user=self.test_user.username, media=media._id) +        # Empty data means don't confirm +        response = self.do_post({}, do_follow=True, url=delete_url)[0] +        media = self.check_media(request, {'title': 'Balanced Goblin'}, 1)          # Confirm deletion          # --------------------------------------------------- -        response = self.test_app.post( -            request.urlgen('mediagoblin.user_pages.media_confirm_delete', -                           # No work: user=media.uploader().username, -                           user=self.test_user.username, -                           media=media._id), -            {'confirm': 'y'}) - -        response.follow() - -        request = template.TEMPLATE_TEST_CONTEXT[ -            'mediagoblin/user_pages/user.html']['request'] +        response, request = self.do_post({'confirm': 'y'}, *REQUEST_CONTEXT, +                                         do_follow=True, url=delete_url) +        self.check_media(request, {'_id': media._id}, 0) +        self.check_comments(request, media, 0) -        # Does media entry still exist? -        assert_false( -            request.db.MediaEntry.find( -                {'_id': media._id}).count()) - -        # How about the comment? -        assert_false(get_comments()) - -    def test_malicious_uploads(self): +    def test_evil_file(self):          # Test non-suppoerted file with non-supported extension          # ----------------------------------------------------- -        template.clear_test_template_context() -        response = self.test_app.post( -            '/submit/', { -                'title': 'Malicious Upload 1' -                }, upload_files=[( -                    'file', EVIL_FILE)]) - -        context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/submit/start.html'] -        form = context['submit_form'] -        assert re.match(r'^Could not extract any file extension from ".*?"$', str(form.file.errors[0])) -        assert len(form.file.errors) == 1 - -        # NOTE: The following 2 tests will ultimately fail, but they +        response, form = self.do_post({'title': 'Malicious Upload 1'}, +                                      *FORM_CONTEXT, +                                      **self.upload_data(EVIL_FILE)) +        assert_equal(len(form.file.errors), 1) +        assert_true(re.match( +                r'^Could not extract any file extension from ".*?"$', +                str(form.file.errors[0]))) + +    def check_false_image(self, title, filename): +        # NOTE: These images should ultimately fail, but they          #   *will* pass the initial form submission step.  Instead,          #   they'll be caught as failures during the processing step. +        response, context = self.do_post({'title': title}, do_follow=True, +                                         **self.upload_data(filename)) +        self.check_url(response, '/u/{0}/'.format(self.test_user.username)) +        entry = mg_globals.database.MediaEntry.find_one({'title': title}) +        assert_equal(entry.state, 'failed') +        assert_equal(entry.fail_error, u'mediagoblin.processing:BadMediaFail') +    def test_evil_jpg(self):          # Test non-supported file with .jpg extension          # ------------------------------------------- -        template.clear_test_template_context() -        response = self.test_app.post( -           '/submit/', { -               'title': 'Malicious Upload 2' -               }, upload_files=[( -                   'file', EVIL_JPG)]) -        response.follow() -        assert_equal( -            urlparse.urlsplit(response.location)[2], -            '/u/chris/') - -        entry = mg_globals.database.MediaEntry.find_one( -            {'title': 'Malicious Upload 2'}) -        assert_equal(entry.state, 'failed') -        assert_equal( -            entry.fail_error, -            u'mediagoblin.processing:BadMediaFail') +        self.check_false_image('Malicious Upload 2', EVIL_JPG) +    def test_evil_png(self):          # Test non-supported file with .png extension          # ------------------------------------------- -        template.clear_test_template_context() -        response = self.test_app.post( -           '/submit/', { -               'title': 'Malicious Upload 3' -               }, upload_files=[( -                   'file', EVIL_PNG)]) -        response.follow() -        assert_equal( -            urlparse.urlsplit(response.location)[2], -            '/u/chris/') - -        entry = mg_globals.database.MediaEntry.find_one( -            {'title': 'Malicious Upload 3'}) -        assert_equal(entry.state, 'failed') -        assert_equal( -            entry.fail_error, -            u'mediagoblin.processing:BadMediaFail') +        self.check_false_image('Malicious Upload 3', EVIL_PNG) + +    def test_processing(self): +        data = {'title': 'Big Blue'} +        response, request = self.do_post(data, *REQUEST_CONTEXT, do_follow=True, +                                         **self.upload_data(BIG_BLUE)) +        media = self.check_media(request, data, 1) +        last_size = 1024 ** 3  # Needs to be larger than bigblue.png +        for key, basename in (('original', 'bigblue.png'), +                              ('medium', 'bigblue.medium.png'), +                              ('thumb', 'bigblue.thumbnail.png')): +            # Does the processed image have a good filename? +            filename = resource_filename( +                'mediagoblin.tests', +                os.path.join('test_user_dev/media/public', +                             *media['media_files'].get(key, []))) +            assert_true(filename.endswith('_' + basename)) +            # Is it smaller than the last processed image we looked at? +            size = os.stat(filename).st_size +            assert_true(last_size > size) +            last_size = size diff --git a/mediagoblin/tests/test_submission/bigblue.png b/mediagoblin/tests/test_submission/bigblue.pngBinary files differ new file mode 100644 index 00000000..2b2c2a44 --- /dev/null +++ b/mediagoblin/tests/test_submission/bigblue.png | 
