aboutsummaryrefslogtreecommitdiffstats
path: root/mediagoblin/tests
diff options
context:
space:
mode:
Diffstat (limited to 'mediagoblin/tests')
-rw-r--r--mediagoblin/tests/__init__.py14
-rw-r--r--mediagoblin/tests/test_auth.py1
-rw-r--r--mediagoblin/tests/test_celery_setup.py2
-rw-r--r--mediagoblin/tests/test_csrf_middleware.py5
-rw-r--r--mediagoblin/tests/test_mgoblin_app.ini6
-rw-r--r--mediagoblin/tests/test_migrations.py401
-rw-r--r--mediagoblin/tests/test_pluginapi.py158
-rw-r--r--mediagoblin/tests/test_processing.py20
-rw-r--r--mediagoblin/tests/test_submission.py359
-rw-r--r--mediagoblin/tests/test_submission/bigblue.pngbin0 -> 3142 bytes
-rw-r--r--mediagoblin/tests/test_tags.py1
-rw-r--r--mediagoblin/tests/test_workbench.py1
-rw-r--r--mediagoblin/tests/tools.py33
13 files changed, 371 insertions, 630 deletions
diff --git a/mediagoblin/tests/__init__.py b/mediagoblin/tests/__init__.py
index 15a5add0..4e84914a 100644
--- a/mediagoblin/tests/__init__.py
+++ b/mediagoblin/tests/__init__.py
@@ -14,10 +14,12 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-from mediagoblin import mg_globals
+import os
+import shutil
+from mediagoblin import mg_globals
from mediagoblin.tests.tools import (
- MEDIAGOBLIN_TEST_DB_NAME, suicide_if_bad_celery_environ)
+ TEST_USER_DEV, suicide_if_bad_celery_environ)
def setup_package():
@@ -25,8 +27,6 @@ def setup_package():
def teardown_package():
- if ((mg_globals.db_connection
- and mg_globals.database.name == MEDIAGOBLIN_TEST_DB_NAME)):
- print "Killing db ..."
- mg_globals.db_connection.drop_database(MEDIAGOBLIN_TEST_DB_NAME)
- print "... done"
+ # Remove and reinstall user_dev directories
+ if os.path.exists(TEST_USER_DEV):
+ shutil.rmtree(TEST_USER_DEV)
diff --git a/mediagoblin/tests/test_auth.py b/mediagoblin/tests/test_auth.py
index 3a33c66c..8f988af3 100644
--- a/mediagoblin/tests/test_auth.py
+++ b/mediagoblin/tests/test_auth.py
@@ -269,6 +269,7 @@ def test_register_views(test_app):
## Try using an expired token to change password, shouldn't work
template.clear_test_template_context()
+ new_user = mg_globals.database.User.find_one({'username': 'happygirl'})
real_token_expiration = new_user.fp_token_expire
new_user.fp_token_expire = datetime.datetime.now()
new_user.save()
diff --git a/mediagoblin/tests/test_celery_setup.py b/mediagoblin/tests/test_celery_setup.py
index fd600f56..5530c6f2 100644
--- a/mediagoblin/tests/test_celery_setup.py
+++ b/mediagoblin/tests/test_celery_setup.py
@@ -48,7 +48,7 @@ def test_setup_celery_from_config():
assert isinstance(fake_celery_module.CELERYD_ETA_SCHEDULER_PRECISION, float)
assert fake_celery_module.CELERY_RESULT_PERSISTENT is True
assert fake_celery_module.CELERY_IMPORTS == [
- 'foo.bar.baz', 'this.is.an.import', 'mediagoblin.processing']
+ 'foo.bar.baz', 'this.is.an.import', 'mediagoblin.processing.task']
assert fake_celery_module.CELERY_RESULT_BACKEND == 'database'
assert fake_celery_module.CELERY_RESULT_DBURI == (
'sqlite:///' +
diff --git a/mediagoblin/tests/test_csrf_middleware.py b/mediagoblin/tests/test_csrf_middleware.py
index f49dc94e..ad433fe8 100644
--- a/mediagoblin/tests/test_csrf_middleware.py
+++ b/mediagoblin/tests/test_csrf_middleware.py
@@ -14,11 +14,6 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import urlparse
-import datetime
-
-from nose.tools import assert_equal
-
from mediagoblin.tests.tools import setup_fresh_app
from mediagoblin import mg_globals
diff --git a/mediagoblin/tests/test_mgoblin_app.ini b/mediagoblin/tests/test_mgoblin_app.ini
index 01bf0972..3b979ff7 100644
--- a/mediagoblin/tests/test_mgoblin_app.ini
+++ b/mediagoblin/tests/test_mgoblin_app.ini
@@ -2,7 +2,9 @@
direct_remote_path = /test_static/
email_sender_address = "notice@mediagoblin.example.org"
email_debug_mode = true
-db_name = __mediagoblin_tests__
+
+# TODO: Switch to using an in-memory database
+sql_engine = "sqlite:///%(here)s/test_user_dev/mediagoblin.db"
# tag parsing
tags_max_length = 50
@@ -27,3 +29,5 @@ lock_dir = %(here)s/test_user_dev/beaker/cache/lock
[celery]
CELERY_ALWAYS_EAGER = true
+CELERY_RESULT_DBURI = "sqlite:///%(here)s/test_user_dev/celery.db"
+BROKER_HOST = "sqlite:///%(here)s/test_user_dev/kombu.db"
diff --git a/mediagoblin/tests/test_migrations.py b/mediagoblin/tests/test_migrations.py
deleted file mode 100644
index f6e6c1a8..00000000
--- a/mediagoblin/tests/test_migrations.py
+++ /dev/null
@@ -1,401 +0,0 @@
-# GNU MediaGoblin -- federated, autonomous media hosting
-# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
-from nose.tools import assert_raises
-from pymongo import Connection
-
-from mediagoblin.tests.tools import (
- install_fixtures_simple, assert_db_meets_expected)
-from mediagoblin.db.mongo.util import (
- RegisterMigration, MigrationManager, ObjectId,
- MissingCurrentMigration)
-from mediagoblin.db.mongo.migrations import add_table_field
-
-# This one will get filled with local migrations
-TEST_MIGRATION_REGISTRY = {}
-# this one won't get filled
-TEST_EMPTY_MIGRATION_REGISTRY = {}
-
-MIGRATION_DB_NAME = u'__mediagoblin_test_migrations__'
-
-
-######################
-# Fake test migrations
-######################
-
-@RegisterMigration(1, TEST_MIGRATION_REGISTRY)
-def creature_add_magical_powers(database):
- """
- Add lists of magical powers.
-
- This defaults to [], an empty list. Since we haven't declared any
- magical powers, all existing monsters, setting to an empty list is
- fine.
- """
- add_table_field(database, 'creatures', 'magical_powers', [])
-
-
-@RegisterMigration(2, TEST_MIGRATION_REGISTRY)
-def creature_rename_num_legs_to_num_limbs(database):
- """
- It turns out we want to track how many limbs a creature has, not
- just how many legs. We don't care about the ambiguous distinction
- between arms/legs currently.
- """
- # $rename not available till 1.7.2+, Debian Stable only includes
- # 1.4.4... we should do renames manually for now :(
-
- collection = database['creatures']
- target = collection.find(
- {'num_legs': {'$exists': True}})
-
- for document in target:
- # A lame manual renaming.
- document['num_limbs'] = document.pop('num_legs')
- collection.save(document)
-
-
-@RegisterMigration(3, TEST_MIGRATION_REGISTRY)
-def creature_remove_is_demon(database):
- """
- It turns out we don't care much about whether creatures are demons
- or not.
- """
- database['creatures'].update(
- {'is_demon': {'$exists': True}},
- {'$unset': {'is_demon': 1}},
- multi=True)
-
-
-@RegisterMigration(4, TEST_MIGRATION_REGISTRY)
-def level_exits_dict_to_list(database):
- """
- For the sake of the indexes we want to write, and because we
- intend to add more flexible fields, we want to move level exits
- from like:
-
- {'big_door': 'castle_level_id',
- 'trapdoor': 'dungeon_level_id'}
-
- to like:
-
- [{'name': 'big_door',
- 'exits_to': 'castle_level_id'},
- {'name': 'trapdoor',
- 'exits_to': 'dungeon_level_id'}]
- """
- collection = database['levels']
- target = collection.find(
- {'exits': {'$type': 3}})
-
- for level in target:
- new_exits = []
- for exit_name, exits_to in level['exits'].items():
- new_exits.append(
- {'name': exit_name,
- 'exits_to': exits_to})
-
- level['exits'] = new_exits
- collection.save(level)
-
-
-CENTIPEDE_OBJECTID = ObjectId()
-WOLF_OBJECTID = ObjectId()
-WIZARDSNAKE_OBJECTID = ObjectId()
-
-UNMIGRATED_DBDATA = {
- 'creatures': [
- {'_id': CENTIPEDE_OBJECTID,
- 'name': 'centipede',
- 'num_legs': 100,
- 'is_demon': False},
- {'_id': WOLF_OBJECTID,
- 'name': 'wolf',
- 'num_legs': 4,
- 'is_demon': False},
- # don't ask me what a wizardsnake is.
- {'_id': WIZARDSNAKE_OBJECTID,
- 'name': 'wizardsnake',
- 'num_legs': 0,
- 'is_demon': True}],
- 'levels': [
- {'_id': 'necroplex',
- 'name': 'The Necroplex',
- 'description': 'A complex full of pure deathzone.',
- 'exits': {
- 'deathwell': 'evilstorm',
- 'portal': 'central_park'}},
- {'_id': 'evilstorm',
- 'name': 'Evil Storm',
- 'description': 'A storm full of pure evil.',
- 'exits': {}}, # you can't escape the evilstorm
- {'_id': 'central_park',
- 'name': 'Central Park, NY, NY',
- 'description': "New York's friendly Central Park.",
- 'exits': {
- 'portal': 'necroplex'}}]}
-
-
-EXPECTED_POST_MIGRATION_UNMIGRATED_DBDATA = {
- 'creatures': [
- {'_id': CENTIPEDE_OBJECTID,
- 'name': 'centipede',
- 'num_limbs': 100,
- 'magical_powers': []},
- {'_id': WOLF_OBJECTID,
- 'name': 'wolf',
- 'num_limbs': 4,
- # kept around namely to check that it *isn't* removed!
- 'magical_powers': []},
- {'_id': WIZARDSNAKE_OBJECTID,
- 'name': 'wizardsnake',
- 'num_limbs': 0,
- 'magical_powers': []}],
- 'levels': [
- {'_id': 'necroplex',
- 'name': 'The Necroplex',
- 'description': 'A complex full of pure deathzone.',
- 'exits': [
- {'name': 'deathwell',
- 'exits_to': 'evilstorm'},
- {'name': 'portal',
- 'exits_to': 'central_park'}]},
- {'_id': 'evilstorm',
- 'name': 'Evil Storm',
- 'description': 'A storm full of pure evil.',
- 'exits': []}, # you can't escape the evilstorm
- {'_id': 'central_park',
- 'name': 'Central Park, NY, NY',
- 'description': "New York's friendly Central Park.",
- 'exits': [
- {'name': 'portal',
- 'exits_to': 'necroplex'}]}]}
-
-# We want to make sure that if we're at migration 3, migration 3
-# doesn't get re-run.
-
-SEMI_MIGRATED_DBDATA = {
- 'creatures': [
- {'_id': CENTIPEDE_OBJECTID,
- 'name': 'centipede',
- 'num_limbs': 100,
- 'magical_powers': []},
- {'_id': WOLF_OBJECTID,
- 'name': 'wolf',
- 'num_limbs': 4,
- # kept around namely to check that it *isn't* removed!
- 'is_demon': False,
- 'magical_powers': [
- 'ice_breath', 'death_stare']},
- {'_id': WIZARDSNAKE_OBJECTID,
- 'name': 'wizardsnake',
- 'num_limbs': 0,
- 'magical_powers': [
- 'death_rattle', 'sneaky_stare',
- 'slithery_smoke', 'treacherous_tremors'],
- 'is_demon': True}],
- 'levels': [
- {'_id': 'necroplex',
- 'name': 'The Necroplex',
- 'description': 'A complex full of pure deathzone.',
- 'exits': {
- 'deathwell': 'evilstorm',
- 'portal': 'central_park'}},
- {'_id': 'evilstorm',
- 'name': 'Evil Storm',
- 'description': 'A storm full of pure evil.',
- 'exits': {}}, # you can't escape the evilstorm
- {'_id': 'central_park',
- 'name': 'Central Park, NY, NY',
- 'description': "New York's friendly Central Park.",
- 'exits': {
- 'portal': 'necroplex'}}]}
-
-
-EXPECTED_POST_MIGRATION_SEMI_MIGRATED_DBDATA = {
- 'creatures': [
- {'_id': CENTIPEDE_OBJECTID,
- 'name': 'centipede',
- 'num_limbs': 100,
- 'magical_powers': []},
- {'_id': WOLF_OBJECTID,
- 'name': 'wolf',
- 'num_limbs': 4,
- # kept around namely to check that it *isn't* removed!
- 'is_demon': False,
- 'magical_powers': [
- 'ice_breath', 'death_stare']},
- {'_id': WIZARDSNAKE_OBJECTID,
- 'name': 'wizardsnake',
- 'num_limbs': 0,
- 'magical_powers': [
- 'death_rattle', 'sneaky_stare',
- 'slithery_smoke', 'treacherous_tremors'],
- 'is_demon': True}],
- 'levels': [
- {'_id': 'necroplex',
- 'name': 'The Necroplex',
- 'description': 'A complex full of pure deathzone.',
- 'exits': [
- {'name': 'deathwell',
- 'exits_to': 'evilstorm'},
- {'name': 'portal',
- 'exits_to': 'central_park'}]},
- {'_id': 'evilstorm',
- 'name': 'Evil Storm',
- 'description': 'A storm full of pure evil.',
- 'exits': []}, # you can't escape the evilstorm
- {'_id': 'central_park',
- 'name': 'Central Park, NY, NY',
- 'description': "New York's friendly Central Park.",
- 'exits': [
- {'name': 'portal',
- 'exits_to': 'necroplex'}]}]}
-
-
-class TestMigrations(object):
- def setUp(self):
- # Set up the connection, drop an existing possible database
- self.connection = Connection()
- self.connection.drop_database(MIGRATION_DB_NAME)
- self.db = Connection()[MIGRATION_DB_NAME]
- self.migration_manager = MigrationManager(
- self.db, TEST_MIGRATION_REGISTRY)
- self.empty_migration_manager = MigrationManager(
- self.db, TEST_EMPTY_MIGRATION_REGISTRY)
- self.run_migrations = []
-
- def tearDown(self):
- self.connection.drop_database(MIGRATION_DB_NAME)
-
- def _record_migration(self, migration_number, migration_func):
- self.run_migrations.append((migration_number, migration_func))
-
- def test_migrations_registered_and_sorted(self):
- """
- Make sure that migrations get registered and are sorted right
- in the migration manager
- """
- assert TEST_MIGRATION_REGISTRY == {
- 1: creature_add_magical_powers,
- 2: creature_rename_num_legs_to_num_limbs,
- 3: creature_remove_is_demon,
- 4: level_exits_dict_to_list}
- assert self.migration_manager.sorted_migrations == [
- (1, creature_add_magical_powers),
- (2, creature_rename_num_legs_to_num_limbs),
- (3, creature_remove_is_demon),
- (4, level_exits_dict_to_list)]
- assert self.empty_migration_manager.sorted_migrations == []
-
- def test_run_full_migrations(self):
- """
- Make sure that running the full migration suite from 0 updates
- everything
- """
- self.migration_manager.set_current_migration(0)
- assert self.migration_manager.database_current_migration() == 0
- install_fixtures_simple(self.db, UNMIGRATED_DBDATA)
- self.migration_manager.migrate_new(post_callback=self._record_migration)
-
- assert self.run_migrations == [
- (1, creature_add_magical_powers),
- (2, creature_rename_num_legs_to_num_limbs),
- (3, creature_remove_is_demon),
- (4, level_exits_dict_to_list)]
-
- assert_db_meets_expected(
- self.db, EXPECTED_POST_MIGRATION_UNMIGRATED_DBDATA)
-
- # Make sure the migration is recorded correctly
- assert self.migration_manager.database_current_migration() == 4
-
- # run twice! It should do nothing the second time.
- # ------------------------------------------------
- self.run_migrations = []
- self.migration_manager.migrate_new(post_callback=self._record_migration)
- assert self.run_migrations == []
- assert_db_meets_expected(
- self.db, EXPECTED_POST_MIGRATION_UNMIGRATED_DBDATA)
- assert self.migration_manager.database_current_migration() == 4
-
-
- def test_run_partial_migrations(self):
- """
- Make sure that running full migration suite from 3 only runs
- last migration
- """
- self.migration_manager.set_current_migration(3)
- assert self.migration_manager.database_current_migration() == 3
- install_fixtures_simple(self.db, SEMI_MIGRATED_DBDATA)
- self.migration_manager.migrate_new(post_callback=self._record_migration)
-
- assert self.run_migrations == [
- (4, level_exits_dict_to_list)]
-
- assert_db_meets_expected(
- self.db, EXPECTED_POST_MIGRATION_SEMI_MIGRATED_DBDATA)
-
- # Make sure the migration is recorded correctly
- assert self.migration_manager.database_current_migration() == 4
-
- def test_migrations_recorded_as_latest(self):
- """
- Make sure that if we don't have a migration_status
- pre-recorded it's marked as the latest
- """
- self.migration_manager.install_migration_version_if_missing()
- assert self.migration_manager.database_current_migration() == 4
-
- def test_no_migrations_recorded_as_zero(self):
- """
- Make sure that if we don't have a migration_status
- but there *are* no migrations that it's marked as 0
- """
- self.empty_migration_manager.install_migration_version_if_missing()
- assert self.empty_migration_manager.database_current_migration() == 0
-
- def test_migrations_to_run(self):
- """
- Make sure we get the right list of migrations to run
- """
- self.migration_manager.set_current_migration(0)
-
- assert self.migration_manager.migrations_to_run() == [
- (1, creature_add_magical_powers),
- (2, creature_rename_num_legs_to_num_limbs),
- (3, creature_remove_is_demon),
- (4, level_exits_dict_to_list)]
-
- self.migration_manager.set_current_migration(3)
-
- assert self.migration_manager.migrations_to_run() == [
- (4, level_exits_dict_to_list)]
-
- self.migration_manager.set_current_migration(4)
-
- assert self.migration_manager.migrations_to_run() == []
-
-
- def test_no_migrations_raises_exception(self):
- """
- If we don't have the current migration set in the database,
- this should error out.
- """
- assert_raises(
- MissingCurrentMigration,
- self.migration_manager.migrations_to_run)
diff --git a/mediagoblin/tests/test_pluginapi.py b/mediagoblin/tests/test_pluginapi.py
new file mode 100644
index 00000000..c5c614f6
--- /dev/null
+++ b/mediagoblin/tests/test_pluginapi.py
@@ -0,0 +1,158 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import sys
+from configobj import ConfigObj
+from mediagoblin import mg_globals
+from mediagoblin.init.plugins import setup_plugins
+from mediagoblin.tools import pluginapi
+from nose.tools import eq_
+
+
+def with_cleanup(*modules_to_delete):
+ def _with_cleanup(fun):
+ """Wrapper that saves and restores mg_globals"""
+ def _with_cleanup_inner(*args, **kwargs):
+ old_app_config = mg_globals.app_config
+ old_global_config = mg_globals.global_config
+ # Need to delete icky modules before and after so as to make
+ # sure things work correctly.
+ for module in modules_to_delete:
+ try:
+ del sys.modules[module]
+ except KeyError:
+ pass
+ # The plugin cache gets populated as a side-effect of
+ # importing, so it's best to clear it before and after a test.
+ pcache = pluginapi.PluginCache()
+ pcache.clear()
+ try:
+ return fun(*args, **kwargs)
+ finally:
+ mg_globals.app_config = old_app_config
+ mg_globals.global_config = old_global_config
+ # Need to delete icky modules before and after so as to make
+ # sure things work correctly.
+ for module in modules_to_delete:
+ try:
+ del sys.modules[module]
+ except KeyError:
+ pass
+ pcache.clear()
+
+ _with_cleanup_inner.__name__ = fun.__name__
+ return _with_cleanup_inner
+ return _with_cleanup
+
+
+def build_config(sections):
+ """Builds a ConfigObj object with specified data
+
+ :arg sections: list of ``(section_name, section_data,
+ subsection_list)`` tuples where section_data is a dict and
+ subsection_list is a list of ``(section_name, section_data,
+ subsection_list)``, ...
+
+ For example:
+
+ >>> build_config([
+ ... ('mediagoblin', {'key1': 'val1'}, []),
+ ... ('section2', {}, [
+ ... ('subsection1', {}, [])
+ ... ])
+ ... ])
+ """
+ cfg = ConfigObj()
+ cfg.filename = 'foo'
+ def _iter_section(cfg, section_list):
+ for section_name, data, subsection_list in section_list:
+ cfg[section_name] = data
+ _iter_section(cfg[section_name], subsection_list)
+
+ _iter_section(cfg, sections)
+ return cfg
+
+
+@with_cleanup()
+def test_no_plugins():
+ """Run setup_plugins with no plugins in config"""
+ cfg = build_config([('mediagoblin', {}, [])])
+ mg_globals.app_config = cfg['mediagoblin']
+ mg_globals.global_config = cfg
+
+ pcache = pluginapi.PluginCache()
+ setup_plugins()
+
+ # Make sure we didn't load anything.
+ eq_(len(pcache.plugin_classes), 0)
+ eq_(len(pcache.plugin_objects), 0)
+
+
+@with_cleanup('mediagoblin.plugins.sampleplugin',
+ 'mediagoblin.plugins.sampleplugin.main')
+def test_one_plugin():
+ """Run setup_plugins with a single working plugin"""
+ cfg = build_config([
+ ('mediagoblin', {}, []),
+ ('plugins', {}, [
+ ('mediagoblin.plugins.sampleplugin', {}, [])
+ ])
+ ])
+
+ mg_globals.app_config = cfg['mediagoblin']
+ mg_globals.global_config = cfg
+
+ pcache = pluginapi.PluginCache()
+ setup_plugins()
+
+ # Make sure we only found one plugin class
+ eq_(len(pcache.plugin_classes), 1)
+ # Make sure the class is the one we think it is.
+ eq_(pcache.plugin_classes[0].__name__, 'SamplePlugin')
+
+ # Make sure there was one plugin created
+ eq_(len(pcache.plugin_objects), 1)
+ # Make sure we called setup_plugin on SamplePlugin
+ eq_(pcache.plugin_objects[0]._setup_plugin_called, 1)
+
+
+@with_cleanup('mediagoblin.plugins.sampleplugin',
+ 'mediagoblin.plugins.sampleplugin.main')
+def test_same_plugin_twice():
+ """Run setup_plugins with a single working plugin twice"""
+ cfg = build_config([
+ ('mediagoblin', {}, []),
+ ('plugins', {}, [
+ ('mediagoblin.plugins.sampleplugin', {}, []),
+ ('mediagoblin.plugins.sampleplugin', {}, []),
+ ])
+ ])
+
+ mg_globals.app_config = cfg['mediagoblin']
+ mg_globals.global_config = cfg
+
+ pcache = pluginapi.PluginCache()
+ setup_plugins()
+
+ # Make sure we only found one plugin class
+ eq_(len(pcache.plugin_classes), 1)
+ # Make sure the class is the one we think it is.
+ eq_(pcache.plugin_classes[0].__name__, 'SamplePlugin')
+
+ # Make sure there was one plugin created
+ eq_(len(pcache.plugin_objects), 1)
+ # Make sure we called setup_plugin on SamplePlugin
+ eq_(pcache.plugin_objects[0]._setup_plugin_called, 1)
diff --git a/mediagoblin/tests/test_processing.py b/mediagoblin/tests/test_processing.py
new file mode 100644
index 00000000..fe8489aa
--- /dev/null
+++ b/mediagoblin/tests/test_processing.py
@@ -0,0 +1,20 @@
+#!/usr/bin/env python
+
+from nose.tools import assert_equal
+
+from mediagoblin import processing
+
+class TestProcessing(object):
+ def run_fill(self, input, format, output=None):
+ builder = processing.FilenameBuilder(input)
+ result = builder.fill(format)
+ if output is None:
+ return result
+ assert_equal(output, result)
+
+ def test_easy_filename_fill(self):
+ self.run_fill('/home/user/foo.TXT', '{basename}bar{ext}', 'foobar.txt')
+
+ def test_long_filename_fill(self):
+ self.run_fill('{0}.png'.format('A' * 300), 'image-{basename}{ext}',
+ 'image-{0}.png'.format('A' * 245))
diff --git a/mediagoblin/tests/test_submission.py b/mediagoblin/tests/test_submission.py
index 1f56779e..bf1b87aa 100644
--- a/mediagoblin/tests/test_submission.py
+++ b/mediagoblin/tests/test_submission.py
@@ -15,30 +15,34 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import urlparse
-import pkg_resources
-import re
+import os
-from nose.tools import assert_equal, assert_true, assert_false
+from nose.tools import assert_equal, assert_true
+from pkg_resources import resource_filename
-from mediagoblin.tests.tools import setup_fresh_app, get_test_app, \
+from mediagoblin.tests.tools import get_test_app, \
fixture_add_user
from mediagoblin import mg_globals
-from mediagoblin.tools import template, common
-
-GOOD_JPG = pkg_resources.resource_filename(
- 'mediagoblin.tests', 'test_submission/good.jpg')
-GOOD_PNG = pkg_resources.resource_filename(
- 'mediagoblin.tests', 'test_submission/good.png')
-EVIL_FILE = pkg_resources.resource_filename(
- 'mediagoblin.tests', 'test_submission/evil')
-EVIL_JPG = pkg_resources.resource_filename(
- 'mediagoblin.tests', 'test_submission/evil.jpg')
-EVIL_PNG = pkg_resources.resource_filename(
- 'mediagoblin.tests', 'test_submission/evil.png')
+from mediagoblin.tools import template
+
+
+def resource(filename):
+ return resource_filename('mediagoblin.tests', 'test_submission/' + filename)
+
+
+GOOD_JPG = resource('good.jpg')
+GOOD_PNG = resource('good.png')
+EVIL_FILE = resource('evil')
+EVIL_JPG = resource('evil.jpg')
+EVIL_PNG = resource('evil.png')
+BIG_BLUE = resource('bigblue.png')
GOOD_TAG_STRING = 'yin,yang'
BAD_TAG_STRING = 'rage,' + 'f' * 26 + 'u' * 26
+FORM_CONTEXT = ['mediagoblin/submit/start.html', 'submit_form']
+REQUEST_CONTEXT = ['mediagoblin/user_pages/user.html', 'request']
+
class TestSubmission:
def setUp(self):
@@ -61,234 +65,193 @@ class TestSubmission:
def logout(self):
self.test_app.get('/auth/logout/')
+ def do_post(self, data, *context_keys, **kwargs):
+ url = kwargs.pop('url', '/submit/')
+ do_follow = kwargs.pop('do_follow', False)
+ template.clear_test_template_context()
+ response = self.test_app.post(url, data, **kwargs)
+ if do_follow:
+ response.follow()
+ context_data = template.TEMPLATE_TEST_CONTEXT
+ for key in context_keys:
+ context_data = context_data[key]
+ return response, context_data
+
+ def upload_data(self, filename):
+ return {'upload_files': [('file', filename)]}
+
+ def check_comments(self, request, media_id, count):
+ comments = request.db.MediaComment.find({'media_entry': media_id})
+ assert_equal(count, len(list(comments)))
+
def test_missing_fields(self):
# Test blank form
# ---------------
- template.clear_test_template_context()
- response = self.test_app.post(
- '/submit/', {})
- context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/submit/start.html']
- form = context['submit_form']
- assert form.file.errors == [u'You must provide a file.']
+ response, form = self.do_post({}, *FORM_CONTEXT)
+ assert_equal(form.file.errors, [u'You must provide a file.'])
# Test blank file
# ---------------
- template.clear_test_template_context()
- response = self.test_app.post(
- '/submit/', {
- 'title': 'test title'})
- context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/submit/start.html']
- form = context['submit_form']
- assert form.file.errors == [u'You must provide a file.']
-
-
- def test_normal_uploads(self):
- # Test JPG
- # --------
- template.clear_test_template_context()
- response = self.test_app.post(
- '/submit/', {
- 'title': 'Normal upload 1'
- }, upload_files=[(
- 'file', GOOD_JPG)])
+ response, form = self.do_post({'title': 'test title'}, *FORM_CONTEXT)
+ assert_equal(form.file.errors, [u'You must provide a file.'])
- # User should be redirected
- response.follow()
- assert_equal(
- urlparse.urlsplit(response.location)[2],
- '/u/chris/')
- assert template.TEMPLATE_TEST_CONTEXT.has_key(
- 'mediagoblin/user_pages/user.html')
+ def check_url(self, response, path):
+ assert_equal(urlparse.urlsplit(response.location)[2], path)
+ def check_normal_upload(self, title, filename):
+ response, context = self.do_post({'title': title}, do_follow=True,
+ **self.upload_data(filename))
+ self.check_url(response, '/u/{0}/'.format(self.test_user.username))
+ assert_true('mediagoblin/user_pages/user.html' in context)
# Make sure the media view is at least reachable, logged in...
- self.test_app.get('/u/chris/m/normal-upload-1/')
+ url = '/u/{0}/m/{1}/'.format(self.test_user.username,
+ title.lower().replace(' ', '-'))
+ self.test_app.get(url)
# ... and logged out too.
self.logout()
- self.test_app.get('/u/chris/m/normal-upload-1/')
- # Log back in for the remaining tests.
- self.login()
+ self.test_app.get(url)
- # Test PNG
- # --------
- template.clear_test_template_context()
- response = self.test_app.post(
- '/submit/', {
- 'title': 'Normal upload 2'
- }, upload_files=[(
- 'file', GOOD_PNG)])
+ def test_normal_jpg(self):
+ self.check_normal_upload('Normal upload 1', GOOD_JPG)
- response.follow()
- assert_equal(
- urlparse.urlsplit(response.location)[2],
- '/u/chris/')
- assert template.TEMPLATE_TEST_CONTEXT.has_key(
- 'mediagoblin/user_pages/user.html')
+ def test_normal_png(self):
+ self.check_normal_upload('Normal upload 2', GOOD_PNG)
+
+ def check_media(self, request, find_data, count=None):
+ media = request.db.MediaEntry.find(find_data)
+ if count is not None:
+ assert_equal(media.count(), count)
+ if count == 0:
+ return
+ return media[0]
def test_tags(self):
# Good tag string
# --------
- template.clear_test_template_context()
- response = self.test_app.post(
- '/submit/', {
- 'title': 'Balanced Goblin',
- 'tags': GOOD_TAG_STRING
- }, upload_files=[(
- 'file', GOOD_JPG)])
+ response, request = self.do_post({'title': 'Balanced Goblin',
+ 'tags': GOOD_TAG_STRING},
+ *REQUEST_CONTEXT, do_follow=True,
+ **self.upload_data(GOOD_JPG))
+ media = self.check_media(request, {'title': 'Balanced Goblin'}, 1)
+ assert media.tags[0]['name'] == u'yin'
+ assert media.tags[0]['slug'] == u'yin'
- # New media entry with correct tags should be created
- response.follow()
- context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/user_pages/user.html']
- request = context['request']
- media = request.db.MediaEntry.find({'title': 'Balanced Goblin'})[0]
- assert_equal(media.tags,
- [{'name': u'yin', 'slug': u'yin'},
- {'name': u'yang', 'slug': u'yang'}])
+ assert media.tags[1]['name'] == u'yang'
+ assert media.tags[1]['slug'] == u'yang'
# Test tags that are too long
# ---------------
- template.clear_test_template_context()
- response = self.test_app.post(
- '/submit/', {
- 'title': 'Balanced Goblin',
- 'tags': BAD_TAG_STRING
- }, upload_files=[(
- 'file', GOOD_JPG)])
-
- # Too long error should be raised
- context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/submit/start.html']
- form = context['submit_form']
- assert form.tags.errors == [
- u'Tags must be shorter than 50 characters. Tags that are too long'\
- ': ffffffffffffffffffffffffffuuuuuuuuuuuuuuuuuuuuuuuuuu']
+ response, form = self.do_post({'title': 'Balanced Goblin',
+ 'tags': BAD_TAG_STRING},
+ *FORM_CONTEXT,
+ **self.upload_data(GOOD_JPG))
+ assert_equal(form.tags.errors, [
+ u'Tags must be shorter than 50 characters. ' \
+ 'Tags that are too long: ' \
+ 'ffffffffffffffffffffffffffuuuuuuuuuuuuuuuuuuuuuuuuuu'])
def test_delete(self):
- template.clear_test_template_context()
- response = self.test_app.post(
- '/submit/', {
- 'title': 'Balanced Goblin',
- }, upload_files=[(
- 'file', GOOD_JPG)])
-
- # Post image
- response.follow()
-
- request = template.TEMPLATE_TEST_CONTEXT[
- 'mediagoblin/user_pages/user.html']['request']
-
- media = request.db.MediaEntry.find({'title': 'Balanced Goblin'})[0]
-
- # Does media entry exist?
- assert_true(media)
+ response, request = self.do_post({'title': 'Balanced Goblin'},
+ *REQUEST_CONTEXT, do_follow=True,
+ **self.upload_data(GOOD_JPG))
+ media = self.check_media(request, {'title': 'Balanced Goblin'}, 1)
+ media_id = media.id
# Add a comment, so we can test for its deletion later.
- get_comments = lambda: list(
- request.db.MediaComment.find({'media_entry': media._id}))
- assert_false(get_comments())
- response = self.test_app.post(
- request.urlgen('mediagoblin.user_pages.media_post_comment',
- user=self.test_user.username,
- media=media._id),
- {'comment_content': 'i love this test'})
- response.follow()
- assert_true(get_comments())
+ self.check_comments(request, media_id, 0)
+ comment_url = request.urlgen(
+ 'mediagoblin.user_pages.media_post_comment',
+ user=self.test_user.username, media=media_id)
+ response = self.do_post({'comment_content': 'i love this test'},
+ url=comment_url, do_follow=True)[0]
+ self.check_comments(request, media_id, 1)
# Do not confirm deletion
# ---------------------------------------------------
- response = self.test_app.post(
- request.urlgen('mediagoblin.user_pages.media_confirm_delete',
- # No work: user=media.uploader().username,
- user=self.test_user.username,
- media=media._id),
- # no value means no confirm
- {})
-
- response.follow()
-
- request = template.TEMPLATE_TEST_CONTEXT[
- 'mediagoblin/user_pages/user.html']['request']
-
- media = request.db.MediaEntry.find({'title': 'Balanced Goblin'})[0]
-
- # Does media entry still exist?
- assert_true(media)
+ delete_url = request.urlgen(
+ 'mediagoblin.user_pages.media_confirm_delete',
+ user=self.test_user.username, media=media_id)
+ # Empty data means don't confirm
+ response = self.do_post({}, do_follow=True, url=delete_url)[0]
+ media = self.check_media(request, {'title': 'Balanced Goblin'}, 1)
+ media_id = media.id
# Confirm deletion
# ---------------------------------------------------
- response = self.test_app.post(
- request.urlgen('mediagoblin.user_pages.media_confirm_delete',
- # No work: user=media.uploader().username,
- user=self.test_user.username,
- media=media._id),
- {'confirm': 'y'})
-
- response.follow()
+ response, request = self.do_post({'confirm': 'y'}, *REQUEST_CONTEXT,
+ do_follow=True, url=delete_url)
+ self.check_media(request, {'_id': media_id}, 0)
+ self.check_comments(request, media_id, 0)
- request = template.TEMPLATE_TEST_CONTEXT[
- 'mediagoblin/user_pages/user.html']['request']
-
- # Does media entry still exist?
- assert_false(
- request.db.MediaEntry.find(
- {'_id': media._id}).count())
-
- # How about the comment?
- assert_false(get_comments())
-
- def test_malicious_uploads(self):
+ def test_evil_file(self):
# Test non-suppoerted file with non-supported extension
# -----------------------------------------------------
+ response, form = self.do_post({'title': 'Malicious Upload 1'},
+ *FORM_CONTEXT,
+ **self.upload_data(EVIL_FILE))
+ assert_equal(len(form.file.errors), 1)
+ assert 'Sorry, I don\'t support that file type :(' == \
+ str(form.file.errors[0])
+
+ def test_sniffing(self):
+ '''
+ Test sniffing mechanism to assert that regular uploads work as intended
+ '''
template.clear_test_template_context()
response = self.test_app.post(
'/submit/', {
- 'title': 'Malicious Upload 1'
+ 'title': 'UNIQUE_TITLE_PLS_DONT_CREATE_OTHER_MEDIA_WITH_THIS_TITLE'
}, upload_files=[(
- 'file', EVIL_FILE)])
+ 'file', GOOD_JPG)])
+
+ response.follow()
+
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/user_pages/user.html']
+
+ request = context['request']
+
+ media = request.db.MediaEntry.find_one({
+ u'title': u'UNIQUE_TITLE_PLS_DONT_CREATE_OTHER_MEDIA_WITH_THIS_TITLE'})
- context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/submit/start.html']
- form = context['submit_form']
- assert re.match(r'^Could not extract any file extension from ".*?"$', str(form.file.errors[0]))
- assert len(form.file.errors) == 1
+ assert media.media_type == 'mediagoblin.media_types.image'
+ def check_false_image(self, title, filename):
# NOTE: The following 2 tests will ultimately fail, but they
# *will* pass the initial form submission step. Instead,
# they'll be caught as failures during the processing step.
+ response, context = self.do_post({'title': title}, do_follow=True,
+ **self.upload_data(filename))
+ self.check_url(response, '/u/{0}/'.format(self.test_user.username))
+ entry = mg_globals.database.MediaEntry.find_one({'title': title})
+ assert_equal(entry.state, 'failed')
+ assert_equal(entry.fail_error, u'mediagoblin.processing:BadMediaFail')
+ def test_evil_jpg(self):
# Test non-supported file with .jpg extension
# -------------------------------------------
- template.clear_test_template_context()
- response = self.test_app.post(
- '/submit/', {
- 'title': 'Malicious Upload 2'
- }, upload_files=[(
- 'file', EVIL_JPG)])
- response.follow()
- assert_equal(
- urlparse.urlsplit(response.location)[2],
- '/u/chris/')
-
- entry = mg_globals.database.MediaEntry.find_one(
- {'title': 'Malicious Upload 2'})
- assert_equal(entry.state, 'failed')
- assert_equal(
- entry.fail_error,
- u'mediagoblin.processing:BadMediaFail')
+ self.check_false_image('Malicious Upload 2', EVIL_JPG)
+ def test_evil_png(self):
# Test non-supported file with .png extension
# -------------------------------------------
- template.clear_test_template_context()
- response = self.test_app.post(
- '/submit/', {
- 'title': 'Malicious Upload 3'
- }, upload_files=[(
- 'file', EVIL_PNG)])
- response.follow()
- assert_equal(
- urlparse.urlsplit(response.location)[2],
- '/u/chris/')
-
- entry = mg_globals.database.MediaEntry.find_one(
- {'title': 'Malicious Upload 3'})
- assert_equal(entry.state, 'failed')
- assert_equal(
- entry.fail_error,
- u'mediagoblin.processing:BadMediaFail')
+ self.check_false_image('Malicious Upload 3', EVIL_PNG)
+
+ def test_processing(self):
+ data = {'title': 'Big Blue'}
+ response, request = self.do_post(data, *REQUEST_CONTEXT, do_follow=True,
+ **self.upload_data(BIG_BLUE))
+ media = self.check_media(request, data, 1)
+ last_size = 1024 ** 3 # Needs to be larger than bigblue.png
+ for key, basename in (('original', 'bigblue.png'),
+ ('medium', 'bigblue.medium.png'),
+ ('thumb', 'bigblue.thumbnail.png')):
+ # Does the processed image have a good filename?
+ filename = resource_filename(
+ 'mediagoblin.tests',
+ os.path.join('test_user_dev/media/public',
+ *media.media_files.get(key, [])))
+ assert_true(filename.endswith('_' + basename))
+ # Is it smaller than the last processed image we looked at?
+ size = os.stat(filename).st_size
+ assert_true(last_size > size)
+ last_size = size
diff --git a/mediagoblin/tests/test_submission/bigblue.png b/mediagoblin/tests/test_submission/bigblue.png
new file mode 100644
index 00000000..2b2c2a44
--- /dev/null
+++ b/mediagoblin/tests/test_submission/bigblue.png
Binary files differ
diff --git a/mediagoblin/tests/test_tags.py b/mediagoblin/tests/test_tags.py
index 79f925aa..bc657660 100644
--- a/mediagoblin/tests/test_tags.py
+++ b/mediagoblin/tests/test_tags.py
@@ -15,7 +15,6 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from mediagoblin.tests.tools import setup_fresh_app
-from mediagoblin import mg_globals
from mediagoblin.tools import text
@setup_fresh_app
diff --git a/mediagoblin/tests/test_workbench.py b/mediagoblin/tests/test_workbench.py
index b5243a9b..04a74653 100644
--- a/mediagoblin/tests/test_workbench.py
+++ b/mediagoblin/tests/test_workbench.py
@@ -17,7 +17,6 @@
import os
import tempfile
-from nose.tools import assert_raises
from mediagoblin import workbench
from mediagoblin.tests.test_storage import get_tmp_filestorage
diff --git a/mediagoblin/tests/tools.py b/mediagoblin/tests/tools.py
index 7cf355b0..5b4e3746 100644
--- a/mediagoblin/tests/tools.py
+++ b/mediagoblin/tests/tools.py
@@ -26,8 +26,11 @@ from mediagoblin.tools import testing
from mediagoblin.init.config import read_mediagoblin_config
from mediagoblin.decorators import _make_safe
from mediagoblin.db.open import setup_connection_and_db_from_config
+from mediagoblin.db.sql.base import Session
from mediagoblin.meddleware import BaseMeddleware
from mediagoblin.auth.lib import bcrypt_gen_password_hash
+from mediagoblin.gmg_commands.dbupdate import run_dbupdate
+from mediagoblin.init.celery import setup_celery_app
MEDIAGOBLIN_TEST_DB_NAME = u'__mediagoblin_tests__'
@@ -125,26 +128,19 @@ def get_test_app(dump_old_app=True):
global_config, validation_result = read_mediagoblin_config(TEST_APP_CONFIG)
app_config = global_config['mediagoblin']
- # Wipe database
- # @@: For now we're dropping collections, but we could also just
- # collection.remove() ?
- connection, db = setup_connection_and_db_from_config(app_config)
- assert db.name == MEDIAGOBLIN_TEST_DB_NAME
-
- collections_to_wipe = [
- collection
- for collection in db.collection_names()
- if not collection.startswith('system.')]
-
- for collection in collections_to_wipe:
- db.drop_collection(collection)
-
- # TODO: Drop and recreate indexes
+ # Run database setup/migrations
+ run_dbupdate(app_config)
# setup app and return
test_app = loadapp(
'config:' + TEST_SERVER_CONFIG)
+ Session.rollback()
+ Session.remove()
+
+ # Re-setup celery
+ setup_celery_app(app_config, global_config)
+
# Insert the TestingMeddleware, which can do some
# sanity checks on every request/response.
# Doing it this way is probably not the cleanest way.
@@ -216,4 +212,11 @@ def fixture_add_user(username = u'chris', password = 'toast',
test_user.save()
+ # Reload
+ test_user = mg_globals.database.User.find_one({'username': username})
+
+ # ... and detach from session:
+ from mediagoblin.db.sql.base import Session
+ Session.expunge(test_user)
+
return test_user