diff options
Diffstat (limited to 'mediagoblin/tests')
-rw-r--r-- | mediagoblin/tests/__init__.py | 14 | ||||
-rw-r--r-- | mediagoblin/tests/test_auth.py | 1 | ||||
-rw-r--r-- | mediagoblin/tests/test_celery_setup.py | 2 | ||||
-rw-r--r-- | mediagoblin/tests/test_csrf_middleware.py | 5 | ||||
-rw-r--r-- | mediagoblin/tests/test_mgoblin_app.ini | 6 | ||||
-rw-r--r-- | mediagoblin/tests/test_migrations.py | 401 | ||||
-rw-r--r-- | mediagoblin/tests/test_pluginapi.py | 158 | ||||
-rw-r--r-- | mediagoblin/tests/test_processing.py | 20 | ||||
-rw-r--r-- | mediagoblin/tests/test_submission.py | 359 | ||||
-rw-r--r-- | mediagoblin/tests/test_submission/bigblue.png | bin | 0 -> 3142 bytes | |||
-rw-r--r-- | mediagoblin/tests/test_tags.py | 1 | ||||
-rw-r--r-- | mediagoblin/tests/test_workbench.py | 1 | ||||
-rw-r--r-- | mediagoblin/tests/tools.py | 33 |
13 files changed, 371 insertions, 630 deletions
diff --git a/mediagoblin/tests/__init__.py b/mediagoblin/tests/__init__.py index 15a5add0..4e84914a 100644 --- a/mediagoblin/tests/__init__.py +++ b/mediagoblin/tests/__init__.py @@ -14,10 +14,12 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -from mediagoblin import mg_globals +import os +import shutil +from mediagoblin import mg_globals from mediagoblin.tests.tools import ( - MEDIAGOBLIN_TEST_DB_NAME, suicide_if_bad_celery_environ) + TEST_USER_DEV, suicide_if_bad_celery_environ) def setup_package(): @@ -25,8 +27,6 @@ def setup_package(): def teardown_package(): - if ((mg_globals.db_connection - and mg_globals.database.name == MEDIAGOBLIN_TEST_DB_NAME)): - print "Killing db ..." - mg_globals.db_connection.drop_database(MEDIAGOBLIN_TEST_DB_NAME) - print "... done" + # Remove and reinstall user_dev directories + if os.path.exists(TEST_USER_DEV): + shutil.rmtree(TEST_USER_DEV) diff --git a/mediagoblin/tests/test_auth.py b/mediagoblin/tests/test_auth.py index 3a33c66c..8f988af3 100644 --- a/mediagoblin/tests/test_auth.py +++ b/mediagoblin/tests/test_auth.py @@ -269,6 +269,7 @@ def test_register_views(test_app): ## Try using an expired token to change password, shouldn't work template.clear_test_template_context() + new_user = mg_globals.database.User.find_one({'username': 'happygirl'}) real_token_expiration = new_user.fp_token_expire new_user.fp_token_expire = datetime.datetime.now() new_user.save() diff --git a/mediagoblin/tests/test_celery_setup.py b/mediagoblin/tests/test_celery_setup.py index fd600f56..5530c6f2 100644 --- a/mediagoblin/tests/test_celery_setup.py +++ b/mediagoblin/tests/test_celery_setup.py @@ -48,7 +48,7 @@ def test_setup_celery_from_config(): assert isinstance(fake_celery_module.CELERYD_ETA_SCHEDULER_PRECISION, float) assert fake_celery_module.CELERY_RESULT_PERSISTENT is True assert fake_celery_module.CELERY_IMPORTS == [ - 'foo.bar.baz', 'this.is.an.import', 'mediagoblin.processing'] + 'foo.bar.baz', 'this.is.an.import', 'mediagoblin.processing.task'] assert fake_celery_module.CELERY_RESULT_BACKEND == 'database' assert fake_celery_module.CELERY_RESULT_DBURI == ( 'sqlite:///' + diff --git a/mediagoblin/tests/test_csrf_middleware.py b/mediagoblin/tests/test_csrf_middleware.py index f49dc94e..ad433fe8 100644 --- a/mediagoblin/tests/test_csrf_middleware.py +++ b/mediagoblin/tests/test_csrf_middleware.py @@ -14,11 +14,6 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import urlparse -import datetime - -from nose.tools import assert_equal - from mediagoblin.tests.tools import setup_fresh_app from mediagoblin import mg_globals diff --git a/mediagoblin/tests/test_mgoblin_app.ini b/mediagoblin/tests/test_mgoblin_app.ini index 01bf0972..3b979ff7 100644 --- a/mediagoblin/tests/test_mgoblin_app.ini +++ b/mediagoblin/tests/test_mgoblin_app.ini @@ -2,7 +2,9 @@ direct_remote_path = /test_static/ email_sender_address = "notice@mediagoblin.example.org" email_debug_mode = true -db_name = __mediagoblin_tests__ + +# TODO: Switch to using an in-memory database +sql_engine = "sqlite:///%(here)s/test_user_dev/mediagoblin.db" # tag parsing tags_max_length = 50 @@ -27,3 +29,5 @@ lock_dir = %(here)s/test_user_dev/beaker/cache/lock [celery] CELERY_ALWAYS_EAGER = true +CELERY_RESULT_DBURI = "sqlite:///%(here)s/test_user_dev/celery.db" +BROKER_HOST = "sqlite:///%(here)s/test_user_dev/kombu.db" diff --git a/mediagoblin/tests/test_migrations.py b/mediagoblin/tests/test_migrations.py deleted file mode 100644 index f6e6c1a8..00000000 --- a/mediagoblin/tests/test_migrations.py +++ /dev/null @@ -1,401 +0,0 @@ -# GNU MediaGoblin -- federated, autonomous media hosting -# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - - -from nose.tools import assert_raises -from pymongo import Connection - -from mediagoblin.tests.tools import ( - install_fixtures_simple, assert_db_meets_expected) -from mediagoblin.db.mongo.util import ( - RegisterMigration, MigrationManager, ObjectId, - MissingCurrentMigration) -from mediagoblin.db.mongo.migrations import add_table_field - -# This one will get filled with local migrations -TEST_MIGRATION_REGISTRY = {} -# this one won't get filled -TEST_EMPTY_MIGRATION_REGISTRY = {} - -MIGRATION_DB_NAME = u'__mediagoblin_test_migrations__' - - -###################### -# Fake test migrations -###################### - -@RegisterMigration(1, TEST_MIGRATION_REGISTRY) -def creature_add_magical_powers(database): - """ - Add lists of magical powers. - - This defaults to [], an empty list. Since we haven't declared any - magical powers, all existing monsters, setting to an empty list is - fine. - """ - add_table_field(database, 'creatures', 'magical_powers', []) - - -@RegisterMigration(2, TEST_MIGRATION_REGISTRY) -def creature_rename_num_legs_to_num_limbs(database): - """ - It turns out we want to track how many limbs a creature has, not - just how many legs. We don't care about the ambiguous distinction - between arms/legs currently. - """ - # $rename not available till 1.7.2+, Debian Stable only includes - # 1.4.4... we should do renames manually for now :( - - collection = database['creatures'] - target = collection.find( - {'num_legs': {'$exists': True}}) - - for document in target: - # A lame manual renaming. - document['num_limbs'] = document.pop('num_legs') - collection.save(document) - - -@RegisterMigration(3, TEST_MIGRATION_REGISTRY) -def creature_remove_is_demon(database): - """ - It turns out we don't care much about whether creatures are demons - or not. - """ - database['creatures'].update( - {'is_demon': {'$exists': True}}, - {'$unset': {'is_demon': 1}}, - multi=True) - - -@RegisterMigration(4, TEST_MIGRATION_REGISTRY) -def level_exits_dict_to_list(database): - """ - For the sake of the indexes we want to write, and because we - intend to add more flexible fields, we want to move level exits - from like: - - {'big_door': 'castle_level_id', - 'trapdoor': 'dungeon_level_id'} - - to like: - - [{'name': 'big_door', - 'exits_to': 'castle_level_id'}, - {'name': 'trapdoor', - 'exits_to': 'dungeon_level_id'}] - """ - collection = database['levels'] - target = collection.find( - {'exits': {'$type': 3}}) - - for level in target: - new_exits = [] - for exit_name, exits_to in level['exits'].items(): - new_exits.append( - {'name': exit_name, - 'exits_to': exits_to}) - - level['exits'] = new_exits - collection.save(level) - - -CENTIPEDE_OBJECTID = ObjectId() -WOLF_OBJECTID = ObjectId() -WIZARDSNAKE_OBJECTID = ObjectId() - -UNMIGRATED_DBDATA = { - 'creatures': [ - {'_id': CENTIPEDE_OBJECTID, - 'name': 'centipede', - 'num_legs': 100, - 'is_demon': False}, - {'_id': WOLF_OBJECTID, - 'name': 'wolf', - 'num_legs': 4, - 'is_demon': False}, - # don't ask me what a wizardsnake is. - {'_id': WIZARDSNAKE_OBJECTID, - 'name': 'wizardsnake', - 'num_legs': 0, - 'is_demon': True}], - 'levels': [ - {'_id': 'necroplex', - 'name': 'The Necroplex', - 'description': 'A complex full of pure deathzone.', - 'exits': { - 'deathwell': 'evilstorm', - 'portal': 'central_park'}}, - {'_id': 'evilstorm', - 'name': 'Evil Storm', - 'description': 'A storm full of pure evil.', - 'exits': {}}, # you can't escape the evilstorm - {'_id': 'central_park', - 'name': 'Central Park, NY, NY', - 'description': "New York's friendly Central Park.", - 'exits': { - 'portal': 'necroplex'}}]} - - -EXPECTED_POST_MIGRATION_UNMIGRATED_DBDATA = { - 'creatures': [ - {'_id': CENTIPEDE_OBJECTID, - 'name': 'centipede', - 'num_limbs': 100, - 'magical_powers': []}, - {'_id': WOLF_OBJECTID, - 'name': 'wolf', - 'num_limbs': 4, - # kept around namely to check that it *isn't* removed! - 'magical_powers': []}, - {'_id': WIZARDSNAKE_OBJECTID, - 'name': 'wizardsnake', - 'num_limbs': 0, - 'magical_powers': []}], - 'levels': [ - {'_id': 'necroplex', - 'name': 'The Necroplex', - 'description': 'A complex full of pure deathzone.', - 'exits': [ - {'name': 'deathwell', - 'exits_to': 'evilstorm'}, - {'name': 'portal', - 'exits_to': 'central_park'}]}, - {'_id': 'evilstorm', - 'name': 'Evil Storm', - 'description': 'A storm full of pure evil.', - 'exits': []}, # you can't escape the evilstorm - {'_id': 'central_park', - 'name': 'Central Park, NY, NY', - 'description': "New York's friendly Central Park.", - 'exits': [ - {'name': 'portal', - 'exits_to': 'necroplex'}]}]} - -# We want to make sure that if we're at migration 3, migration 3 -# doesn't get re-run. - -SEMI_MIGRATED_DBDATA = { - 'creatures': [ - {'_id': CENTIPEDE_OBJECTID, - 'name': 'centipede', - 'num_limbs': 100, - 'magical_powers': []}, - {'_id': WOLF_OBJECTID, - 'name': 'wolf', - 'num_limbs': 4, - # kept around namely to check that it *isn't* removed! - 'is_demon': False, - 'magical_powers': [ - 'ice_breath', 'death_stare']}, - {'_id': WIZARDSNAKE_OBJECTID, - 'name': 'wizardsnake', - 'num_limbs': 0, - 'magical_powers': [ - 'death_rattle', 'sneaky_stare', - 'slithery_smoke', 'treacherous_tremors'], - 'is_demon': True}], - 'levels': [ - {'_id': 'necroplex', - 'name': 'The Necroplex', - 'description': 'A complex full of pure deathzone.', - 'exits': { - 'deathwell': 'evilstorm', - 'portal': 'central_park'}}, - {'_id': 'evilstorm', - 'name': 'Evil Storm', - 'description': 'A storm full of pure evil.', - 'exits': {}}, # you can't escape the evilstorm - {'_id': 'central_park', - 'name': 'Central Park, NY, NY', - 'description': "New York's friendly Central Park.", - 'exits': { - 'portal': 'necroplex'}}]} - - -EXPECTED_POST_MIGRATION_SEMI_MIGRATED_DBDATA = { - 'creatures': [ - {'_id': CENTIPEDE_OBJECTID, - 'name': 'centipede', - 'num_limbs': 100, - 'magical_powers': []}, - {'_id': WOLF_OBJECTID, - 'name': 'wolf', - 'num_limbs': 4, - # kept around namely to check that it *isn't* removed! - 'is_demon': False, - 'magical_powers': [ - 'ice_breath', 'death_stare']}, - {'_id': WIZARDSNAKE_OBJECTID, - 'name': 'wizardsnake', - 'num_limbs': 0, - 'magical_powers': [ - 'death_rattle', 'sneaky_stare', - 'slithery_smoke', 'treacherous_tremors'], - 'is_demon': True}], - 'levels': [ - {'_id': 'necroplex', - 'name': 'The Necroplex', - 'description': 'A complex full of pure deathzone.', - 'exits': [ - {'name': 'deathwell', - 'exits_to': 'evilstorm'}, - {'name': 'portal', - 'exits_to': 'central_park'}]}, - {'_id': 'evilstorm', - 'name': 'Evil Storm', - 'description': 'A storm full of pure evil.', - 'exits': []}, # you can't escape the evilstorm - {'_id': 'central_park', - 'name': 'Central Park, NY, NY', - 'description': "New York's friendly Central Park.", - 'exits': [ - {'name': 'portal', - 'exits_to': 'necroplex'}]}]} - - -class TestMigrations(object): - def setUp(self): - # Set up the connection, drop an existing possible database - self.connection = Connection() - self.connection.drop_database(MIGRATION_DB_NAME) - self.db = Connection()[MIGRATION_DB_NAME] - self.migration_manager = MigrationManager( - self.db, TEST_MIGRATION_REGISTRY) - self.empty_migration_manager = MigrationManager( - self.db, TEST_EMPTY_MIGRATION_REGISTRY) - self.run_migrations = [] - - def tearDown(self): - self.connection.drop_database(MIGRATION_DB_NAME) - - def _record_migration(self, migration_number, migration_func): - self.run_migrations.append((migration_number, migration_func)) - - def test_migrations_registered_and_sorted(self): - """ - Make sure that migrations get registered and are sorted right - in the migration manager - """ - assert TEST_MIGRATION_REGISTRY == { - 1: creature_add_magical_powers, - 2: creature_rename_num_legs_to_num_limbs, - 3: creature_remove_is_demon, - 4: level_exits_dict_to_list} - assert self.migration_manager.sorted_migrations == [ - (1, creature_add_magical_powers), - (2, creature_rename_num_legs_to_num_limbs), - (3, creature_remove_is_demon), - (4, level_exits_dict_to_list)] - assert self.empty_migration_manager.sorted_migrations == [] - - def test_run_full_migrations(self): - """ - Make sure that running the full migration suite from 0 updates - everything - """ - self.migration_manager.set_current_migration(0) - assert self.migration_manager.database_current_migration() == 0 - install_fixtures_simple(self.db, UNMIGRATED_DBDATA) - self.migration_manager.migrate_new(post_callback=self._record_migration) - - assert self.run_migrations == [ - (1, creature_add_magical_powers), - (2, creature_rename_num_legs_to_num_limbs), - (3, creature_remove_is_demon), - (4, level_exits_dict_to_list)] - - assert_db_meets_expected( - self.db, EXPECTED_POST_MIGRATION_UNMIGRATED_DBDATA) - - # Make sure the migration is recorded correctly - assert self.migration_manager.database_current_migration() == 4 - - # run twice! It should do nothing the second time. - # ------------------------------------------------ - self.run_migrations = [] - self.migration_manager.migrate_new(post_callback=self._record_migration) - assert self.run_migrations == [] - assert_db_meets_expected( - self.db, EXPECTED_POST_MIGRATION_UNMIGRATED_DBDATA) - assert self.migration_manager.database_current_migration() == 4 - - - def test_run_partial_migrations(self): - """ - Make sure that running full migration suite from 3 only runs - last migration - """ - self.migration_manager.set_current_migration(3) - assert self.migration_manager.database_current_migration() == 3 - install_fixtures_simple(self.db, SEMI_MIGRATED_DBDATA) - self.migration_manager.migrate_new(post_callback=self._record_migration) - - assert self.run_migrations == [ - (4, level_exits_dict_to_list)] - - assert_db_meets_expected( - self.db, EXPECTED_POST_MIGRATION_SEMI_MIGRATED_DBDATA) - - # Make sure the migration is recorded correctly - assert self.migration_manager.database_current_migration() == 4 - - def test_migrations_recorded_as_latest(self): - """ - Make sure that if we don't have a migration_status - pre-recorded it's marked as the latest - """ - self.migration_manager.install_migration_version_if_missing() - assert self.migration_manager.database_current_migration() == 4 - - def test_no_migrations_recorded_as_zero(self): - """ - Make sure that if we don't have a migration_status - but there *are* no migrations that it's marked as 0 - """ - self.empty_migration_manager.install_migration_version_if_missing() - assert self.empty_migration_manager.database_current_migration() == 0 - - def test_migrations_to_run(self): - """ - Make sure we get the right list of migrations to run - """ - self.migration_manager.set_current_migration(0) - - assert self.migration_manager.migrations_to_run() == [ - (1, creature_add_magical_powers), - (2, creature_rename_num_legs_to_num_limbs), - (3, creature_remove_is_demon), - (4, level_exits_dict_to_list)] - - self.migration_manager.set_current_migration(3) - - assert self.migration_manager.migrations_to_run() == [ - (4, level_exits_dict_to_list)] - - self.migration_manager.set_current_migration(4) - - assert self.migration_manager.migrations_to_run() == [] - - - def test_no_migrations_raises_exception(self): - """ - If we don't have the current migration set in the database, - this should error out. - """ - assert_raises( - MissingCurrentMigration, - self.migration_manager.migrations_to_run) diff --git a/mediagoblin/tests/test_pluginapi.py b/mediagoblin/tests/test_pluginapi.py new file mode 100644 index 00000000..c5c614f6 --- /dev/null +++ b/mediagoblin/tests/test_pluginapi.py @@ -0,0 +1,158 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import sys +from configobj import ConfigObj +from mediagoblin import mg_globals +from mediagoblin.init.plugins import setup_plugins +from mediagoblin.tools import pluginapi +from nose.tools import eq_ + + +def with_cleanup(*modules_to_delete): + def _with_cleanup(fun): + """Wrapper that saves and restores mg_globals""" + def _with_cleanup_inner(*args, **kwargs): + old_app_config = mg_globals.app_config + old_global_config = mg_globals.global_config + # Need to delete icky modules before and after so as to make + # sure things work correctly. + for module in modules_to_delete: + try: + del sys.modules[module] + except KeyError: + pass + # The plugin cache gets populated as a side-effect of + # importing, so it's best to clear it before and after a test. + pcache = pluginapi.PluginCache() + pcache.clear() + try: + return fun(*args, **kwargs) + finally: + mg_globals.app_config = old_app_config + mg_globals.global_config = old_global_config + # Need to delete icky modules before and after so as to make + # sure things work correctly. + for module in modules_to_delete: + try: + del sys.modules[module] + except KeyError: + pass + pcache.clear() + + _with_cleanup_inner.__name__ = fun.__name__ + return _with_cleanup_inner + return _with_cleanup + + +def build_config(sections): + """Builds a ConfigObj object with specified data + + :arg sections: list of ``(section_name, section_data, + subsection_list)`` tuples where section_data is a dict and + subsection_list is a list of ``(section_name, section_data, + subsection_list)``, ... + + For example: + + >>> build_config([ + ... ('mediagoblin', {'key1': 'val1'}, []), + ... ('section2', {}, [ + ... ('subsection1', {}, []) + ... ]) + ... ]) + """ + cfg = ConfigObj() + cfg.filename = 'foo' + def _iter_section(cfg, section_list): + for section_name, data, subsection_list in section_list: + cfg[section_name] = data + _iter_section(cfg[section_name], subsection_list) + + _iter_section(cfg, sections) + return cfg + + +@with_cleanup() +def test_no_plugins(): + """Run setup_plugins with no plugins in config""" + cfg = build_config([('mediagoblin', {}, [])]) + mg_globals.app_config = cfg['mediagoblin'] + mg_globals.global_config = cfg + + pcache = pluginapi.PluginCache() + setup_plugins() + + # Make sure we didn't load anything. + eq_(len(pcache.plugin_classes), 0) + eq_(len(pcache.plugin_objects), 0) + + +@with_cleanup('mediagoblin.plugins.sampleplugin', + 'mediagoblin.plugins.sampleplugin.main') +def test_one_plugin(): + """Run setup_plugins with a single working plugin""" + cfg = build_config([ + ('mediagoblin', {}, []), + ('plugins', {}, [ + ('mediagoblin.plugins.sampleplugin', {}, []) + ]) + ]) + + mg_globals.app_config = cfg['mediagoblin'] + mg_globals.global_config = cfg + + pcache = pluginapi.PluginCache() + setup_plugins() + + # Make sure we only found one plugin class + eq_(len(pcache.plugin_classes), 1) + # Make sure the class is the one we think it is. + eq_(pcache.plugin_classes[0].__name__, 'SamplePlugin') + + # Make sure there was one plugin created + eq_(len(pcache.plugin_objects), 1) + # Make sure we called setup_plugin on SamplePlugin + eq_(pcache.plugin_objects[0]._setup_plugin_called, 1) + + +@with_cleanup('mediagoblin.plugins.sampleplugin', + 'mediagoblin.plugins.sampleplugin.main') +def test_same_plugin_twice(): + """Run setup_plugins with a single working plugin twice""" + cfg = build_config([ + ('mediagoblin', {}, []), + ('plugins', {}, [ + ('mediagoblin.plugins.sampleplugin', {}, []), + ('mediagoblin.plugins.sampleplugin', {}, []), + ]) + ]) + + mg_globals.app_config = cfg['mediagoblin'] + mg_globals.global_config = cfg + + pcache = pluginapi.PluginCache() + setup_plugins() + + # Make sure we only found one plugin class + eq_(len(pcache.plugin_classes), 1) + # Make sure the class is the one we think it is. + eq_(pcache.plugin_classes[0].__name__, 'SamplePlugin') + + # Make sure there was one plugin created + eq_(len(pcache.plugin_objects), 1) + # Make sure we called setup_plugin on SamplePlugin + eq_(pcache.plugin_objects[0]._setup_plugin_called, 1) diff --git a/mediagoblin/tests/test_processing.py b/mediagoblin/tests/test_processing.py new file mode 100644 index 00000000..fe8489aa --- /dev/null +++ b/mediagoblin/tests/test_processing.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python + +from nose.tools import assert_equal + +from mediagoblin import processing + +class TestProcessing(object): + def run_fill(self, input, format, output=None): + builder = processing.FilenameBuilder(input) + result = builder.fill(format) + if output is None: + return result + assert_equal(output, result) + + def test_easy_filename_fill(self): + self.run_fill('/home/user/foo.TXT', '{basename}bar{ext}', 'foobar.txt') + + def test_long_filename_fill(self): + self.run_fill('{0}.png'.format('A' * 300), 'image-{basename}{ext}', + 'image-{0}.png'.format('A' * 245)) diff --git a/mediagoblin/tests/test_submission.py b/mediagoblin/tests/test_submission.py index 1f56779e..bf1b87aa 100644 --- a/mediagoblin/tests/test_submission.py +++ b/mediagoblin/tests/test_submission.py @@ -15,30 +15,34 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. import urlparse -import pkg_resources -import re +import os -from nose.tools import assert_equal, assert_true, assert_false +from nose.tools import assert_equal, assert_true +from pkg_resources import resource_filename -from mediagoblin.tests.tools import setup_fresh_app, get_test_app, \ +from mediagoblin.tests.tools import get_test_app, \ fixture_add_user from mediagoblin import mg_globals -from mediagoblin.tools import template, common - -GOOD_JPG = pkg_resources.resource_filename( - 'mediagoblin.tests', 'test_submission/good.jpg') -GOOD_PNG = pkg_resources.resource_filename( - 'mediagoblin.tests', 'test_submission/good.png') -EVIL_FILE = pkg_resources.resource_filename( - 'mediagoblin.tests', 'test_submission/evil') -EVIL_JPG = pkg_resources.resource_filename( - 'mediagoblin.tests', 'test_submission/evil.jpg') -EVIL_PNG = pkg_resources.resource_filename( - 'mediagoblin.tests', 'test_submission/evil.png') +from mediagoblin.tools import template + + +def resource(filename): + return resource_filename('mediagoblin.tests', 'test_submission/' + filename) + + +GOOD_JPG = resource('good.jpg') +GOOD_PNG = resource('good.png') +EVIL_FILE = resource('evil') +EVIL_JPG = resource('evil.jpg') +EVIL_PNG = resource('evil.png') +BIG_BLUE = resource('bigblue.png') GOOD_TAG_STRING = 'yin,yang' BAD_TAG_STRING = 'rage,' + 'f' * 26 + 'u' * 26 +FORM_CONTEXT = ['mediagoblin/submit/start.html', 'submit_form'] +REQUEST_CONTEXT = ['mediagoblin/user_pages/user.html', 'request'] + class TestSubmission: def setUp(self): @@ -61,234 +65,193 @@ class TestSubmission: def logout(self): self.test_app.get('/auth/logout/') + def do_post(self, data, *context_keys, **kwargs): + url = kwargs.pop('url', '/submit/') + do_follow = kwargs.pop('do_follow', False) + template.clear_test_template_context() + response = self.test_app.post(url, data, **kwargs) + if do_follow: + response.follow() + context_data = template.TEMPLATE_TEST_CONTEXT + for key in context_keys: + context_data = context_data[key] + return response, context_data + + def upload_data(self, filename): + return {'upload_files': [('file', filename)]} + + def check_comments(self, request, media_id, count): + comments = request.db.MediaComment.find({'media_entry': media_id}) + assert_equal(count, len(list(comments))) + def test_missing_fields(self): # Test blank form # --------------- - template.clear_test_template_context() - response = self.test_app.post( - '/submit/', {}) - context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/submit/start.html'] - form = context['submit_form'] - assert form.file.errors == [u'You must provide a file.'] + response, form = self.do_post({}, *FORM_CONTEXT) + assert_equal(form.file.errors, [u'You must provide a file.']) # Test blank file # --------------- - template.clear_test_template_context() - response = self.test_app.post( - '/submit/', { - 'title': 'test title'}) - context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/submit/start.html'] - form = context['submit_form'] - assert form.file.errors == [u'You must provide a file.'] - - - def test_normal_uploads(self): - # Test JPG - # -------- - template.clear_test_template_context() - response = self.test_app.post( - '/submit/', { - 'title': 'Normal upload 1' - }, upload_files=[( - 'file', GOOD_JPG)]) + response, form = self.do_post({'title': 'test title'}, *FORM_CONTEXT) + assert_equal(form.file.errors, [u'You must provide a file.']) - # User should be redirected - response.follow() - assert_equal( - urlparse.urlsplit(response.location)[2], - '/u/chris/') - assert template.TEMPLATE_TEST_CONTEXT.has_key( - 'mediagoblin/user_pages/user.html') + def check_url(self, response, path): + assert_equal(urlparse.urlsplit(response.location)[2], path) + def check_normal_upload(self, title, filename): + response, context = self.do_post({'title': title}, do_follow=True, + **self.upload_data(filename)) + self.check_url(response, '/u/{0}/'.format(self.test_user.username)) + assert_true('mediagoblin/user_pages/user.html' in context) # Make sure the media view is at least reachable, logged in... - self.test_app.get('/u/chris/m/normal-upload-1/') + url = '/u/{0}/m/{1}/'.format(self.test_user.username, + title.lower().replace(' ', '-')) + self.test_app.get(url) # ... and logged out too. self.logout() - self.test_app.get('/u/chris/m/normal-upload-1/') - # Log back in for the remaining tests. - self.login() + self.test_app.get(url) - # Test PNG - # -------- - template.clear_test_template_context() - response = self.test_app.post( - '/submit/', { - 'title': 'Normal upload 2' - }, upload_files=[( - 'file', GOOD_PNG)]) + def test_normal_jpg(self): + self.check_normal_upload('Normal upload 1', GOOD_JPG) - response.follow() - assert_equal( - urlparse.urlsplit(response.location)[2], - '/u/chris/') - assert template.TEMPLATE_TEST_CONTEXT.has_key( - 'mediagoblin/user_pages/user.html') + def test_normal_png(self): + self.check_normal_upload('Normal upload 2', GOOD_PNG) + + def check_media(self, request, find_data, count=None): + media = request.db.MediaEntry.find(find_data) + if count is not None: + assert_equal(media.count(), count) + if count == 0: + return + return media[0] def test_tags(self): # Good tag string # -------- - template.clear_test_template_context() - response = self.test_app.post( - '/submit/', { - 'title': 'Balanced Goblin', - 'tags': GOOD_TAG_STRING - }, upload_files=[( - 'file', GOOD_JPG)]) + response, request = self.do_post({'title': 'Balanced Goblin', + 'tags': GOOD_TAG_STRING}, + *REQUEST_CONTEXT, do_follow=True, + **self.upload_data(GOOD_JPG)) + media = self.check_media(request, {'title': 'Balanced Goblin'}, 1) + assert media.tags[0]['name'] == u'yin' + assert media.tags[0]['slug'] == u'yin' - # New media entry with correct tags should be created - response.follow() - context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/user_pages/user.html'] - request = context['request'] - media = request.db.MediaEntry.find({'title': 'Balanced Goblin'})[0] - assert_equal(media.tags, - [{'name': u'yin', 'slug': u'yin'}, - {'name': u'yang', 'slug': u'yang'}]) + assert media.tags[1]['name'] == u'yang' + assert media.tags[1]['slug'] == u'yang' # Test tags that are too long # --------------- - template.clear_test_template_context() - response = self.test_app.post( - '/submit/', { - 'title': 'Balanced Goblin', - 'tags': BAD_TAG_STRING - }, upload_files=[( - 'file', GOOD_JPG)]) - - # Too long error should be raised - context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/submit/start.html'] - form = context['submit_form'] - assert form.tags.errors == [ - u'Tags must be shorter than 50 characters. Tags that are too long'\ - ': ffffffffffffffffffffffffffuuuuuuuuuuuuuuuuuuuuuuuuuu'] + response, form = self.do_post({'title': 'Balanced Goblin', + 'tags': BAD_TAG_STRING}, + *FORM_CONTEXT, + **self.upload_data(GOOD_JPG)) + assert_equal(form.tags.errors, [ + u'Tags must be shorter than 50 characters. ' \ + 'Tags that are too long: ' \ + 'ffffffffffffffffffffffffffuuuuuuuuuuuuuuuuuuuuuuuuuu']) def test_delete(self): - template.clear_test_template_context() - response = self.test_app.post( - '/submit/', { - 'title': 'Balanced Goblin', - }, upload_files=[( - 'file', GOOD_JPG)]) - - # Post image - response.follow() - - request = template.TEMPLATE_TEST_CONTEXT[ - 'mediagoblin/user_pages/user.html']['request'] - - media = request.db.MediaEntry.find({'title': 'Balanced Goblin'})[0] - - # Does media entry exist? - assert_true(media) + response, request = self.do_post({'title': 'Balanced Goblin'}, + *REQUEST_CONTEXT, do_follow=True, + **self.upload_data(GOOD_JPG)) + media = self.check_media(request, {'title': 'Balanced Goblin'}, 1) + media_id = media.id # Add a comment, so we can test for its deletion later. - get_comments = lambda: list( - request.db.MediaComment.find({'media_entry': media._id})) - assert_false(get_comments()) - response = self.test_app.post( - request.urlgen('mediagoblin.user_pages.media_post_comment', - user=self.test_user.username, - media=media._id), - {'comment_content': 'i love this test'}) - response.follow() - assert_true(get_comments()) + self.check_comments(request, media_id, 0) + comment_url = request.urlgen( + 'mediagoblin.user_pages.media_post_comment', + user=self.test_user.username, media=media_id) + response = self.do_post({'comment_content': 'i love this test'}, + url=comment_url, do_follow=True)[0] + self.check_comments(request, media_id, 1) # Do not confirm deletion # --------------------------------------------------- - response = self.test_app.post( - request.urlgen('mediagoblin.user_pages.media_confirm_delete', - # No work: user=media.uploader().username, - user=self.test_user.username, - media=media._id), - # no value means no confirm - {}) - - response.follow() - - request = template.TEMPLATE_TEST_CONTEXT[ - 'mediagoblin/user_pages/user.html']['request'] - - media = request.db.MediaEntry.find({'title': 'Balanced Goblin'})[0] - - # Does media entry still exist? - assert_true(media) + delete_url = request.urlgen( + 'mediagoblin.user_pages.media_confirm_delete', + user=self.test_user.username, media=media_id) + # Empty data means don't confirm + response = self.do_post({}, do_follow=True, url=delete_url)[0] + media = self.check_media(request, {'title': 'Balanced Goblin'}, 1) + media_id = media.id # Confirm deletion # --------------------------------------------------- - response = self.test_app.post( - request.urlgen('mediagoblin.user_pages.media_confirm_delete', - # No work: user=media.uploader().username, - user=self.test_user.username, - media=media._id), - {'confirm': 'y'}) - - response.follow() + response, request = self.do_post({'confirm': 'y'}, *REQUEST_CONTEXT, + do_follow=True, url=delete_url) + self.check_media(request, {'_id': media_id}, 0) + self.check_comments(request, media_id, 0) - request = template.TEMPLATE_TEST_CONTEXT[ - 'mediagoblin/user_pages/user.html']['request'] - - # Does media entry still exist? - assert_false( - request.db.MediaEntry.find( - {'_id': media._id}).count()) - - # How about the comment? - assert_false(get_comments()) - - def test_malicious_uploads(self): + def test_evil_file(self): # Test non-suppoerted file with non-supported extension # ----------------------------------------------------- + response, form = self.do_post({'title': 'Malicious Upload 1'}, + *FORM_CONTEXT, + **self.upload_data(EVIL_FILE)) + assert_equal(len(form.file.errors), 1) + assert 'Sorry, I don\'t support that file type :(' == \ + str(form.file.errors[0]) + + def test_sniffing(self): + ''' + Test sniffing mechanism to assert that regular uploads work as intended + ''' template.clear_test_template_context() response = self.test_app.post( '/submit/', { - 'title': 'Malicious Upload 1' + 'title': 'UNIQUE_TITLE_PLS_DONT_CREATE_OTHER_MEDIA_WITH_THIS_TITLE' }, upload_files=[( - 'file', EVIL_FILE)]) + 'file', GOOD_JPG)]) + + response.follow() + + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/user_pages/user.html'] + + request = context['request'] + + media = request.db.MediaEntry.find_one({ + u'title': u'UNIQUE_TITLE_PLS_DONT_CREATE_OTHER_MEDIA_WITH_THIS_TITLE'}) - context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/submit/start.html'] - form = context['submit_form'] - assert re.match(r'^Could not extract any file extension from ".*?"$', str(form.file.errors[0])) - assert len(form.file.errors) == 1 + assert media.media_type == 'mediagoblin.media_types.image' + def check_false_image(self, title, filename): # NOTE: The following 2 tests will ultimately fail, but they # *will* pass the initial form submission step. Instead, # they'll be caught as failures during the processing step. + response, context = self.do_post({'title': title}, do_follow=True, + **self.upload_data(filename)) + self.check_url(response, '/u/{0}/'.format(self.test_user.username)) + entry = mg_globals.database.MediaEntry.find_one({'title': title}) + assert_equal(entry.state, 'failed') + assert_equal(entry.fail_error, u'mediagoblin.processing:BadMediaFail') + def test_evil_jpg(self): # Test non-supported file with .jpg extension # ------------------------------------------- - template.clear_test_template_context() - response = self.test_app.post( - '/submit/', { - 'title': 'Malicious Upload 2' - }, upload_files=[( - 'file', EVIL_JPG)]) - response.follow() - assert_equal( - urlparse.urlsplit(response.location)[2], - '/u/chris/') - - entry = mg_globals.database.MediaEntry.find_one( - {'title': 'Malicious Upload 2'}) - assert_equal(entry.state, 'failed') - assert_equal( - entry.fail_error, - u'mediagoblin.processing:BadMediaFail') + self.check_false_image('Malicious Upload 2', EVIL_JPG) + def test_evil_png(self): # Test non-supported file with .png extension # ------------------------------------------- - template.clear_test_template_context() - response = self.test_app.post( - '/submit/', { - 'title': 'Malicious Upload 3' - }, upload_files=[( - 'file', EVIL_PNG)]) - response.follow() - assert_equal( - urlparse.urlsplit(response.location)[2], - '/u/chris/') - - entry = mg_globals.database.MediaEntry.find_one( - {'title': 'Malicious Upload 3'}) - assert_equal(entry.state, 'failed') - assert_equal( - entry.fail_error, - u'mediagoblin.processing:BadMediaFail') + self.check_false_image('Malicious Upload 3', EVIL_PNG) + + def test_processing(self): + data = {'title': 'Big Blue'} + response, request = self.do_post(data, *REQUEST_CONTEXT, do_follow=True, + **self.upload_data(BIG_BLUE)) + media = self.check_media(request, data, 1) + last_size = 1024 ** 3 # Needs to be larger than bigblue.png + for key, basename in (('original', 'bigblue.png'), + ('medium', 'bigblue.medium.png'), + ('thumb', 'bigblue.thumbnail.png')): + # Does the processed image have a good filename? + filename = resource_filename( + 'mediagoblin.tests', + os.path.join('test_user_dev/media/public', + *media.media_files.get(key, []))) + assert_true(filename.endswith('_' + basename)) + # Is it smaller than the last processed image we looked at? + size = os.stat(filename).st_size + assert_true(last_size > size) + last_size = size diff --git a/mediagoblin/tests/test_submission/bigblue.png b/mediagoblin/tests/test_submission/bigblue.png Binary files differnew file mode 100644 index 00000000..2b2c2a44 --- /dev/null +++ b/mediagoblin/tests/test_submission/bigblue.png diff --git a/mediagoblin/tests/test_tags.py b/mediagoblin/tests/test_tags.py index 79f925aa..bc657660 100644 --- a/mediagoblin/tests/test_tags.py +++ b/mediagoblin/tests/test_tags.py @@ -15,7 +15,6 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. from mediagoblin.tests.tools import setup_fresh_app -from mediagoblin import mg_globals from mediagoblin.tools import text @setup_fresh_app diff --git a/mediagoblin/tests/test_workbench.py b/mediagoblin/tests/test_workbench.py index b5243a9b..04a74653 100644 --- a/mediagoblin/tests/test_workbench.py +++ b/mediagoblin/tests/test_workbench.py @@ -17,7 +17,6 @@ import os import tempfile -from nose.tools import assert_raises from mediagoblin import workbench from mediagoblin.tests.test_storage import get_tmp_filestorage diff --git a/mediagoblin/tests/tools.py b/mediagoblin/tests/tools.py index 7cf355b0..5b4e3746 100644 --- a/mediagoblin/tests/tools.py +++ b/mediagoblin/tests/tools.py @@ -26,8 +26,11 @@ from mediagoblin.tools import testing from mediagoblin.init.config import read_mediagoblin_config from mediagoblin.decorators import _make_safe from mediagoblin.db.open import setup_connection_and_db_from_config +from mediagoblin.db.sql.base import Session from mediagoblin.meddleware import BaseMeddleware from mediagoblin.auth.lib import bcrypt_gen_password_hash +from mediagoblin.gmg_commands.dbupdate import run_dbupdate +from mediagoblin.init.celery import setup_celery_app MEDIAGOBLIN_TEST_DB_NAME = u'__mediagoblin_tests__' @@ -125,26 +128,19 @@ def get_test_app(dump_old_app=True): global_config, validation_result = read_mediagoblin_config(TEST_APP_CONFIG) app_config = global_config['mediagoblin'] - # Wipe database - # @@: For now we're dropping collections, but we could also just - # collection.remove() ? - connection, db = setup_connection_and_db_from_config(app_config) - assert db.name == MEDIAGOBLIN_TEST_DB_NAME - - collections_to_wipe = [ - collection - for collection in db.collection_names() - if not collection.startswith('system.')] - - for collection in collections_to_wipe: - db.drop_collection(collection) - - # TODO: Drop and recreate indexes + # Run database setup/migrations + run_dbupdate(app_config) # setup app and return test_app = loadapp( 'config:' + TEST_SERVER_CONFIG) + Session.rollback() + Session.remove() + + # Re-setup celery + setup_celery_app(app_config, global_config) + # Insert the TestingMeddleware, which can do some # sanity checks on every request/response. # Doing it this way is probably not the cleanest way. @@ -216,4 +212,11 @@ def fixture_add_user(username = u'chris', password = 'toast', test_user.save() + # Reload + test_user = mg_globals.database.User.find_one({'username': username}) + + # ... and detach from session: + from mediagoblin.db.sql.base import Session + Session.expunge(test_user) + return test_user |