aboutsummaryrefslogtreecommitdiffstats
path: root/mediagoblin/tests
diff options
context:
space:
mode:
Diffstat (limited to 'mediagoblin/tests')
-rw-r--r--mediagoblin/tests/__init__.py22
-rw-r--r--mediagoblin/tests/appconfig_context_modified.ini26
-rw-r--r--mediagoblin/tests/appconfig_plugin_specs.ini21
-rw-r--r--mediagoblin/tests/appconfig_static_plugin.ini26
-rw-r--r--mediagoblin/tests/conftest.py41
-rw-r--r--mediagoblin/tests/fake_carrot_conf_bad.ini14
-rw-r--r--mediagoblin/tests/fake_carrot_conf_empty.ini0
-rw-r--r--mediagoblin/tests/fake_carrot_conf_good.ini13
-rw-r--r--mediagoblin/tests/fake_celery_conf.ini9
-rw-r--r--mediagoblin/tests/fake_celery_module.py15
-rw-r--r--mediagoblin/tests/fake_config_spec.ini10
-rw-r--r--mediagoblin/tests/pytest.ini2
-rw-r--r--mediagoblin/tests/resources.py41
-rw-r--r--mediagoblin/tests/test_api.py92
-rw-r--r--mediagoblin/tests/test_auth.py396
-rw-r--r--mediagoblin/tests/test_celery_setup.py60
-rw-r--r--mediagoblin/tests/test_collections.py32
-rw-r--r--mediagoblin/tests/test_config.py97
-rw-r--r--mediagoblin/tests/test_csrf_middleware.py86
-rw-r--r--mediagoblin/tests/test_edit.py144
-rw-r--r--mediagoblin/tests/test_exif.py431
-rw-r--r--mediagoblin/tests/test_exif/bad.jpg18
-rw-r--r--mediagoblin/tests/test_exif/empty.jpgbin0 -> 26636 bytes
-rw-r--r--mediagoblin/tests/test_exif/good.jpgbin0 -> 207590 bytes
-rw-r--r--mediagoblin/tests/test_exif/has-gps.jpgbin0 -> 141246 bytes
-rw-r--r--mediagoblin/tests/test_globals.py42
-rw-r--r--mediagoblin/tests/test_http_callback.py83
-rw-r--r--mediagoblin/tests/test_messages.py50
-rw-r--r--mediagoblin/tests/test_mgoblin_app.ini33
-rw-r--r--mediagoblin/tests/test_misc.py91
-rw-r--r--mediagoblin/tests/test_modelmethods.py167
-rw-r--r--mediagoblin/tests/test_oauth.py222
-rw-r--r--mediagoblin/tests/test_paste.ini40
-rw-r--r--mediagoblin/tests/test_pdf.py39
-rw-r--r--mediagoblin/tests/test_piwigo.py71
-rw-r--r--mediagoblin/tests/test_pluginapi.py466
-rw-r--r--mediagoblin/tests/test_processing.py18
-rw-r--r--mediagoblin/tests/test_session.py30
-rw-r--r--mediagoblin/tests/test_sql_migrations.py896
-rw-r--r--mediagoblin/tests/test_staticdirect.py9
-rw-r--r--mediagoblin/tests/test_storage.py321
-rw-r--r--mediagoblin/tests/test_submission.py294
-rw-r--r--mediagoblin/tests/test_submission/bigblue.pngbin0 -> 3142 bytes
-rwxr-xr-xmediagoblin/tests/test_submission/evil3
-rwxr-xr-xmediagoblin/tests/test_submission/evil.jpg3
-rwxr-xr-xmediagoblin/tests/test_submission/evil.png3
-rw-r--r--mediagoblin/tests/test_submission/good.jpgbin0 -> 10059 bytes
-rw-r--r--mediagoblin/tests/test_submission/good.pdfbin0 -> 194007 bytes
-rw-r--r--mediagoblin/tests/test_submission/good.pngbin0 -> 50598 bytes
-rw-r--r--mediagoblin/tests/test_tags.py39
-rw-r--r--mediagoblin/tests/test_timesince.py57
-rw-r--r--mediagoblin/tests/test_util.py145
-rw-r--r--mediagoblin/tests/test_workbench.py122
-rw-r--r--mediagoblin/tests/testplugins/__init__.py15
-rw-r--r--mediagoblin/tests/testplugins/callables1/__init__.py43
-rw-r--r--mediagoblin/tests/testplugins/callables2/__init__.py41
-rw-r--r--mediagoblin/tests/testplugins/callables3/__init__.py41
-rw-r--r--mediagoblin/tests/testplugins/modify_context/__init__.py55
-rw-r--r--mediagoblin/tests/testplugins/modify_context/templates/contextplugin/general.html5
-rw-r--r--mediagoblin/tests/testplugins/modify_context/templates/contextplugin/specific.html6
-rw-r--r--mediagoblin/tests/testplugins/modify_context/views.py33
-rw-r--r--mediagoblin/tests/testplugins/pluginspec/__init__.py22
-rw-r--r--mediagoblin/tests/testplugins/pluginspec/config_spec.ini4
-rw-r--r--mediagoblin/tests/testplugins/staticstuff/__init__.py36
-rw-r--r--mediagoblin/tests/testplugins/staticstuff/static/css/bunnify.css4
-rw-r--r--mediagoblin/tests/testplugins/staticstuff/views.py28
-rw-r--r--mediagoblin/tests/tools.py233
67 files changed, 5406 insertions, 0 deletions
diff --git a/mediagoblin/tests/__init__.py b/mediagoblin/tests/__init__.py
new file mode 100644
index 00000000..cf200791
--- /dev/null
+++ b/mediagoblin/tests/__init__.py
@@ -0,0 +1,22 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+def setup_package():
+
+ import warnings
+ from sqlalchemy.exc import SAWarning
+ warnings.simplefilter("error", SAWarning)
diff --git a/mediagoblin/tests/appconfig_context_modified.ini b/mediagoblin/tests/appconfig_context_modified.ini
new file mode 100644
index 00000000..80ca69b1
--- /dev/null
+++ b/mediagoblin/tests/appconfig_context_modified.ini
@@ -0,0 +1,26 @@
+[mediagoblin]
+direct_remote_path = /test_static/
+email_sender_address = "notice@mediagoblin.example.org"
+email_debug_mode = true
+
+# TODO: Switch to using an in-memory database
+sql_engine = "sqlite:///%(here)s/user_dev/mediagoblin.db"
+
+# Celery shouldn't be set up by the application as it's setup via
+# mediagoblin.init.celery.from_celery
+celery_setup_elsewhere = true
+
+[storage:publicstore]
+base_dir = %(here)s/user_dev/media/public
+base_url = /mgoblin_media/
+
+[storage:queuestore]
+base_dir = %(here)s/user_dev/media/queue
+
+[celery]
+CELERY_ALWAYS_EAGER = true
+CELERY_RESULT_DBURI = "sqlite:///%(here)s/user_dev/celery.db"
+BROKER_HOST = "sqlite:///%(here)s/user_dev/kombu.db"
+
+[plugins]
+[[mediagoblin.tests.testplugins.modify_context]]
diff --git a/mediagoblin/tests/appconfig_plugin_specs.ini b/mediagoblin/tests/appconfig_plugin_specs.ini
new file mode 100644
index 00000000..5511cd97
--- /dev/null
+++ b/mediagoblin/tests/appconfig_plugin_specs.ini
@@ -0,0 +1,21 @@
+[mediagoblin]
+direct_remote_path = /mgoblin_static/
+email_sender_address = "notice@mediagoblin.example.org"
+
+## Uncomment and change to your DB's appropiate setting.
+## Default is a local sqlite db "mediagoblin.db".
+# sql_engine = postgresql:///gmg
+
+# set to false to enable sending notices
+email_debug_mode = true
+
+# Set to false to disable registrations
+allow_registration = true
+
+[plugins]
+[[mediagoblin.tests.testplugins.pluginspec]]
+some_string = "not blork"
+some_int = "not an int"
+
+# this one shouldn't have its own config
+[[mediagoblin.tests.testplugins.callables1]]
diff --git a/mediagoblin/tests/appconfig_static_plugin.ini b/mediagoblin/tests/appconfig_static_plugin.ini
new file mode 100644
index 00000000..dc251171
--- /dev/null
+++ b/mediagoblin/tests/appconfig_static_plugin.ini
@@ -0,0 +1,26 @@
+[mediagoblin]
+direct_remote_path = /test_static/
+email_sender_address = "notice@mediagoblin.example.org"
+email_debug_mode = true
+
+# TODO: Switch to using an in-memory database
+sql_engine = "sqlite:///%(here)s/user_dev/mediagoblin.db"
+
+# Celery shouldn't be set up by the application as it's setup via
+# mediagoblin.init.celery.from_celery
+celery_setup_elsewhere = true
+
+[storage:publicstore]
+base_dir = %(here)s/user_dev/media/public
+base_url = /mgoblin_media/
+
+[storage:queuestore]
+base_dir = %(here)s/user_dev/media/queue
+
+[celery]
+CELERY_ALWAYS_EAGER = true
+CELERY_RESULT_DBURI = "sqlite:///%(here)s/user_dev/celery.db"
+BROKER_HOST = "sqlite:///%(here)s/user_dev/kombu.db"
+
+[plugins]
+[[mediagoblin.tests.testplugins.staticstuff]]
diff --git a/mediagoblin/tests/conftest.py b/mediagoblin/tests/conftest.py
new file mode 100644
index 00000000..dbb0aa0a
--- /dev/null
+++ b/mediagoblin/tests/conftest.py
@@ -0,0 +1,41 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2013 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import pytest
+
+from mediagoblin.tests import tools
+from mediagoblin.tools.testing import _activate_testing
+
+
+@pytest.fixture()
+def test_app(request):
+ """
+ py.test fixture to pass sandboxed mediagoblin applications into tests that
+ want them.
+
+ You could make a local version of this method for your own tests
+ to override the paste and config files being used by passing them
+ in differently to get_app.
+ """
+ return tools.get_app(request)
+
+
+@pytest.fixture()
+def pt_fixture_enable_testing():
+ """
+ py.test fixture to enable testing mode in tools.
+ """
+ _activate_testing()
diff --git a/mediagoblin/tests/fake_carrot_conf_bad.ini b/mediagoblin/tests/fake_carrot_conf_bad.ini
new file mode 100644
index 00000000..9d8cf518
--- /dev/null
+++ b/mediagoblin/tests/fake_carrot_conf_bad.ini
@@ -0,0 +1,14 @@
+[carrotapp]
+# Whether or not our carrots are going to be turned into cake.
+## These should throw errors
+carrotcake = slobber
+num_carrots = GROSS
+
+# A message encouraging our users to eat their carrots.
+encouragement_phrase = 586956856856 # shouldn't throw error
+
+# Something extra!
+blah_blah = "blah!" # shouldn't throw error either
+
+[celery]
+EAT_CELERY_WITH_CARROTS = pants # yeah that's def an error right there.
diff --git a/mediagoblin/tests/fake_carrot_conf_empty.ini b/mediagoblin/tests/fake_carrot_conf_empty.ini
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/mediagoblin/tests/fake_carrot_conf_empty.ini
diff --git a/mediagoblin/tests/fake_carrot_conf_good.ini b/mediagoblin/tests/fake_carrot_conf_good.ini
new file mode 100644
index 00000000..1377907b
--- /dev/null
+++ b/mediagoblin/tests/fake_carrot_conf_good.ini
@@ -0,0 +1,13 @@
+[carrotapp]
+# Whether or not our carrots are going to be turned into cake.
+carrotcake = true
+num_carrots = 88
+
+# A message encouraging our users to eat their carrots.
+encouragement_phrase = "I'd love it if you eat your carrots!"
+
+# Something extra!
+blah_blah = "blah!"
+
+[celery]
+EAT_CELERY_WITH_CARROTS = False
diff --git a/mediagoblin/tests/fake_celery_conf.ini b/mediagoblin/tests/fake_celery_conf.ini
new file mode 100644
index 00000000..67b0cba6
--- /dev/null
+++ b/mediagoblin/tests/fake_celery_conf.ini
@@ -0,0 +1,9 @@
+[mediagoblin]
+# I got nothin' in this file!
+
+[celery]
+SOME_VARIABLE = floop
+MAIL_PORT = 2000
+CELERYD_ETA_SCHEDULER_PRECISION = 1.3
+CELERY_RESULT_PERSISTENT = true
+CELERY_IMPORTS = foo.bar.baz, this.is.an.import
diff --git a/mediagoblin/tests/fake_celery_module.py b/mediagoblin/tests/fake_celery_module.py
new file mode 100644
index 00000000..621845ba
--- /dev/null
+++ b/mediagoblin/tests/fake_celery_module.py
@@ -0,0 +1,15 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
diff --git a/mediagoblin/tests/fake_config_spec.ini b/mediagoblin/tests/fake_config_spec.ini
new file mode 100644
index 00000000..43f2e236
--- /dev/null
+++ b/mediagoblin/tests/fake_config_spec.ini
@@ -0,0 +1,10 @@
+[carrotapp]
+# Whether or not our carrots are going to be turned into cake.
+carrotcake = boolean(default=False)
+num_carrots = integer(default=1)
+
+# A message encouraging our users to eat their carrots.
+encouragement_phrase = string()
+
+[celery]
+EAT_CELERY_WITH_CARROTS = boolean(default=True) \ No newline at end of file
diff --git a/mediagoblin/tests/pytest.ini b/mediagoblin/tests/pytest.ini
new file mode 100644
index 00000000..e561c074
--- /dev/null
+++ b/mediagoblin/tests/pytest.ini
@@ -0,0 +1,2 @@
+[pytest]
+usefixtures = tmpdir pt_fixture_enable_testing
diff --git a/mediagoblin/tests/resources.py b/mediagoblin/tests/resources.py
new file mode 100644
index 00000000..f7b3037d
--- /dev/null
+++ b/mediagoblin/tests/resources.py
@@ -0,0 +1,41 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2013 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+from pkg_resources import resource_filename
+
+
+def resource(filename):
+ return resource_filename('mediagoblin.tests', 'test_submission/' + filename)
+
+
+GOOD_JPG = resource('good.jpg')
+GOOD_PNG = resource('good.png')
+EVIL_FILE = resource('evil')
+EVIL_JPG = resource('evil.jpg')
+EVIL_PNG = resource('evil.png')
+BIG_BLUE = resource('bigblue.png')
+GOOD_PDF = resource('good.pdf')
+
+
+def resource_exif(f):
+ return resource_filename('mediagoblin.tests', 'test_exif/' + f)
+
+
+GOOD_JPG = resource_exif('good.jpg')
+EMPTY_JPG = resource_exif('empty.jpg')
+BAD_JPG = resource_exif('bad.jpg')
+GPS_JPG = resource_exif('has-gps.jpg')
diff --git a/mediagoblin/tests/test_api.py b/mediagoblin/tests/test_api.py
new file mode 100644
index 00000000..89cf1026
--- /dev/null
+++ b/mediagoblin/tests/test_api.py
@@ -0,0 +1,92 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+import logging
+import base64
+
+import pytest
+
+from mediagoblin import mg_globals
+from mediagoblin.tools import template, pluginapi
+from mediagoblin.tests.tools import fixture_add_user
+from .resources import GOOD_JPG, GOOD_PNG, EVIL_FILE, EVIL_JPG, EVIL_PNG, \
+ BIG_BLUE
+
+
+_log = logging.getLogger(__name__)
+
+
+class TestAPI(object):
+ def setup(self):
+ self.db = mg_globals.database
+
+ self.user_password = u'4cc355_70k3N'
+ self.user = fixture_add_user(u'joapi', self.user_password)
+
+ def login(self, test_app):
+ test_app.post(
+ '/auth/login/', {
+ 'username': self.user.username,
+ 'password': self.user_password})
+
+ def get_context(self, template_name):
+ return template.TEMPLATE_TEST_CONTEXT[template_name]
+
+ def http_auth_headers(self):
+ return {'Authorization': 'Basic {0}'.format(
+ base64.b64encode(':'.join([
+ self.user.username,
+ self.user_password])))}
+
+ def do_post(self, data, test_app, **kwargs):
+ url = kwargs.pop('url', '/api/submit')
+ do_follow = kwargs.pop('do_follow', False)
+
+ if not 'headers' in kwargs.keys():
+ kwargs['headers'] = self.http_auth_headers()
+
+ response = test_app.post(url, data, **kwargs)
+
+ if do_follow:
+ response.follow()
+
+ return response
+
+ def upload_data(self, filename):
+ return {'upload_files': [('file', filename)]}
+
+ def test_1_test_test_view(self, test_app):
+ self.login(test_app)
+
+ response = test_app.get(
+ '/api/test',
+ headers=self.http_auth_headers())
+
+ assert response.body == \
+ '{"username": "joapi", "email": "joapi@example.com"}'
+
+ def test_2_test_submission(self, test_app):
+ self.login(test_app)
+
+ response = self.do_post(
+ {'title': 'Great JPG!'},
+ test_app,
+ **self.upload_data(GOOD_JPG))
+
+ assert response.status_int == 200
+
+ assert self.db.MediaEntry.query.filter_by(title=u'Great JPG!').first()
diff --git a/mediagoblin/tests/test_auth.py b/mediagoblin/tests/test_auth.py
new file mode 100644
index 00000000..755727f9
--- /dev/null
+++ b/mediagoblin/tests/test_auth.py
@@ -0,0 +1,396 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import urlparse
+import datetime
+
+from mediagoblin import mg_globals
+from mediagoblin.auth import lib as auth_lib
+from mediagoblin.db.models import User
+from mediagoblin.tests.tools import fixture_add_user
+from mediagoblin.tools import template, mail
+
+
+########################
+# Test bcrypt auth funcs
+########################
+
+def test_bcrypt_check_password():
+ # Check known 'lollerskates' password against check function
+ assert auth_lib.bcrypt_check_password(
+ 'lollerskates',
+ '$2a$12$PXU03zfrVCujBhVeICTwtOaHTUs5FFwsscvSSTJkqx/2RQ0Lhy/nO')
+
+ assert not auth_lib.bcrypt_check_password(
+ 'notthepassword',
+ '$2a$12$PXU03zfrVCujBhVeICTwtOaHTUs5FFwsscvSSTJkqx/2RQ0Lhy/nO')
+
+ # Same thing, but with extra fake salt.
+ assert not auth_lib.bcrypt_check_password(
+ 'notthepassword',
+ '$2a$12$ELVlnw3z1FMu6CEGs/L8XO8vl0BuWSlUHgh0rUrry9DUXGMUNWwl6',
+ '3><7R45417')
+
+
+def test_bcrypt_gen_password_hash():
+ pw = 'youwillneverguessthis'
+
+ # Normal password hash generation, and check on that hash
+ hashed_pw = auth_lib.bcrypt_gen_password_hash(pw)
+ assert auth_lib.bcrypt_check_password(
+ pw, hashed_pw)
+ assert not auth_lib.bcrypt_check_password(
+ 'notthepassword', hashed_pw)
+
+ # Same thing, extra salt.
+ hashed_pw = auth_lib.bcrypt_gen_password_hash(pw, '3><7R45417')
+ assert auth_lib.bcrypt_check_password(
+ pw, hashed_pw, '3><7R45417')
+ assert not auth_lib.bcrypt_check_password(
+ 'notthepassword', hashed_pw, '3><7R45417')
+
+
+def test_register_views(test_app):
+ """
+ Massive test function that all our registration-related views all work.
+ """
+ # Test doing a simple GET on the page
+ # -----------------------------------
+
+ test_app.get('/auth/register/')
+ # Make sure it rendered with the appropriate template
+ assert 'mediagoblin/auth/register.html' in template.TEMPLATE_TEST_CONTEXT
+
+ # Try to register without providing anything, should error
+ # --------------------------------------------------------
+
+ template.clear_test_template_context()
+ test_app.post(
+ '/auth/register/', {})
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html']
+ form = context['register_form']
+ assert form.username.errors == [u'This field is required.']
+ assert form.password.errors == [u'This field is required.']
+ assert form.email.errors == [u'This field is required.']
+
+ # Try to register with fields that are known to be invalid
+ # --------------------------------------------------------
+
+ ## too short
+ template.clear_test_template_context()
+ test_app.post(
+ '/auth/register/', {
+ 'username': 'l',
+ 'password': 'o',
+ 'email': 'l'})
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html']
+ form = context['register_form']
+
+ assert form.username.errors == [u'Field must be between 3 and 30 characters long.']
+ assert form.password.errors == [u'Field must be between 5 and 1024 characters long.']
+
+ ## bad form
+ template.clear_test_template_context()
+ test_app.post(
+ '/auth/register/', {
+ 'username': '@_@',
+ 'email': 'lollerskates'})
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html']
+ form = context['register_form']
+
+ assert form.username.errors == [u'This field does not take email addresses.']
+ assert form.email.errors == [u'This field requires an email address.']
+
+ ## At this point there should be no users in the database ;)
+ assert User.query.count() == 0
+
+ # Successful register
+ # -------------------
+ template.clear_test_template_context()
+ response = test_app.post(
+ '/auth/register/', {
+ 'username': u'happygirl',
+ 'password': 'iamsohappy',
+ 'email': 'happygrrl@example.org'})
+ response.follow()
+
+ ## Did we redirect to the proper page? Use the right template?
+ assert urlparse.urlsplit(response.location)[2] == '/u/happygirl/'
+ assert 'mediagoblin/user_pages/user.html' in template.TEMPLATE_TEST_CONTEXT
+
+ ## Make sure user is in place
+ new_user = mg_globals.database.User.find_one(
+ {'username': u'happygirl'})
+ assert new_user
+ assert new_user.status == u'needs_email_verification'
+ assert new_user.email_verified == False
+
+ ## Make sure user is logged in
+ request = template.TEMPLATE_TEST_CONTEXT[
+ 'mediagoblin/user_pages/user.html']['request']
+ assert request.session['user_id'] == unicode(new_user.id)
+
+ ## Make sure we get email confirmation, and try verifying
+ assert len(mail.EMAIL_TEST_INBOX) == 1
+ message = mail.EMAIL_TEST_INBOX.pop()
+ assert message['To'] == 'happygrrl@example.org'
+ email_context = template.TEMPLATE_TEST_CONTEXT[
+ 'mediagoblin/auth/verification_email.txt']
+ assert email_context['verification_url'] in message.get_payload(decode=True)
+
+ path = urlparse.urlsplit(email_context['verification_url'])[2]
+ get_params = urlparse.urlsplit(email_context['verification_url'])[3]
+ assert path == u'/auth/verify_email/'
+ parsed_get_params = urlparse.parse_qs(get_params)
+
+ ### user should have these same parameters
+ assert parsed_get_params['userid'] == [
+ unicode(new_user.id)]
+ assert parsed_get_params['token'] == [
+ new_user.verification_key]
+
+ ## Try verifying with bs verification key, shouldn't work
+ template.clear_test_template_context()
+ response = test_app.get(
+ "/auth/verify_email/?userid=%s&token=total_bs" % unicode(
+ new_user.id))
+ response.follow()
+ context = template.TEMPLATE_TEST_CONTEXT[
+ 'mediagoblin/user_pages/user.html']
+ # assert context['verification_successful'] == True
+ # TODO: Would be good to test messages here when we can do so...
+ new_user = mg_globals.database.User.find_one(
+ {'username': u'happygirl'})
+ assert new_user
+ assert new_user.status == u'needs_email_verification'
+ assert new_user.email_verified == False
+
+ ## Verify the email activation works
+ template.clear_test_template_context()
+ response = test_app.get("%s?%s" % (path, get_params))
+ response.follow()
+ context = template.TEMPLATE_TEST_CONTEXT[
+ 'mediagoblin/user_pages/user.html']
+ # assert context['verification_successful'] == True
+ # TODO: Would be good to test messages here when we can do so...
+ new_user = mg_globals.database.User.find_one(
+ {'username': u'happygirl'})
+ assert new_user
+ assert new_user.status == u'active'
+ assert new_user.email_verified == True
+
+ # Uniqueness checks
+ # -----------------
+ ## We shouldn't be able to register with that user twice
+ template.clear_test_template_context()
+ response = test_app.post(
+ '/auth/register/', {
+ 'username': u'happygirl',
+ 'password': 'iamsohappy2',
+ 'email': 'happygrrl2@example.org'})
+
+ context = template.TEMPLATE_TEST_CONTEXT[
+ 'mediagoblin/auth/register.html']
+ form = context['register_form']
+ assert form.username.errors == [
+ u'Sorry, a user with that name already exists.']
+
+ ## TODO: Also check for double instances of an email address?
+
+ ### Oops, forgot the password
+ # -------------------
+ template.clear_test_template_context()
+ response = test_app.post(
+ '/auth/forgot_password/',
+ {'username': u'happygirl'})
+ response.follow()
+
+ ## Did we redirect to the proper page? Use the right template?
+ assert urlparse.urlsplit(response.location)[2] == '/auth/login/'
+ assert 'mediagoblin/auth/login.html' in template.TEMPLATE_TEST_CONTEXT
+
+ ## Make sure link to change password is sent by email
+ assert len(mail.EMAIL_TEST_INBOX) == 1
+ message = mail.EMAIL_TEST_INBOX.pop()
+ assert message['To'] == 'happygrrl@example.org'
+ email_context = template.TEMPLATE_TEST_CONTEXT[
+ 'mediagoblin/auth/fp_verification_email.txt']
+ #TODO - change the name of verification_url to something forgot-password-ish
+ assert email_context['verification_url'] in message.get_payload(decode=True)
+
+ path = urlparse.urlsplit(email_context['verification_url'])[2]
+ get_params = urlparse.urlsplit(email_context['verification_url'])[3]
+ assert path == u'/auth/forgot_password/verify/'
+ parsed_get_params = urlparse.parse_qs(get_params)
+
+ # user should have matching parameters
+ new_user = mg_globals.database.User.find_one({'username': u'happygirl'})
+ assert parsed_get_params['userid'] == [unicode(new_user.id)]
+ assert parsed_get_params['token'] == [new_user.fp_verification_key]
+
+ ### The forgotten password token should be set to expire in ~ 10 days
+ # A few ticks have expired so there are only 9 full days left...
+ assert (new_user.fp_token_expire - datetime.datetime.now()).days == 9
+
+ ## Try using a bs password-changing verification key, shouldn't work
+ template.clear_test_template_context()
+ response = test_app.get(
+ "/auth/forgot_password/verify/?userid=%s&token=total_bs" % unicode(
+ new_user.id), status=404)
+ assert response.status.split()[0] == u'404' # status="404 NOT FOUND"
+
+ ## Try using an expired token to change password, shouldn't work
+ template.clear_test_template_context()
+ new_user = mg_globals.database.User.find_one({'username': u'happygirl'})
+ real_token_expiration = new_user.fp_token_expire
+ new_user.fp_token_expire = datetime.datetime.now()
+ new_user.save()
+ response = test_app.get("%s?%s" % (path, get_params), status=404)
+ assert response.status.split()[0] == u'404' # status="404 NOT FOUND"
+ new_user.fp_token_expire = real_token_expiration
+ new_user.save()
+
+ ## Verify step 1 of password-change works -- can see form to change password
+ template.clear_test_template_context()
+ response = test_app.get("%s?%s" % (path, get_params))
+ assert 'mediagoblin/auth/change_fp.html' in template.TEMPLATE_TEST_CONTEXT
+
+ ## Verify step 2.1 of password-change works -- report success to user
+ template.clear_test_template_context()
+ response = test_app.post(
+ '/auth/forgot_password/verify/', {
+ 'userid': parsed_get_params['userid'],
+ 'password': 'iamveryveryhappy',
+ 'token': parsed_get_params['token']})
+ response.follow()
+ assert 'mediagoblin/auth/login.html' in template.TEMPLATE_TEST_CONTEXT
+
+ ## Verify step 2.2 of password-change works -- login w/ new password success
+ template.clear_test_template_context()
+ response = test_app.post(
+ '/auth/login/', {
+ 'username': u'happygirl',
+ 'password': 'iamveryveryhappy'})
+
+ # User should be redirected
+ response.follow()
+ assert urlparse.urlsplit(response.location)[2] == '/'
+ assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT
+
+
+def test_authentication_views(test_app):
+ """
+ Test logging in and logging out
+ """
+ # Make a new user
+ test_user = fixture_add_user(active_user=False)
+
+ # Get login
+ # ---------
+ test_app.get('/auth/login/')
+ assert 'mediagoblin/auth/login.html' in template.TEMPLATE_TEST_CONTEXT
+
+ # Failed login - blank form
+ # -------------------------
+ template.clear_test_template_context()
+ response = test_app.post('/auth/login/')
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/login.html']
+ form = context['login_form']
+ assert form.username.errors == [u'This field is required.']
+ assert form.password.errors == [u'This field is required.']
+
+ # Failed login - blank user
+ # -------------------------
+ template.clear_test_template_context()
+ response = test_app.post(
+ '/auth/login/', {
+ 'password': u'toast'})
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/login.html']
+ form = context['login_form']
+ assert form.username.errors == [u'This field is required.']
+
+ # Failed login - blank password
+ # -----------------------------
+ template.clear_test_template_context()
+ response = test_app.post(
+ '/auth/login/', {
+ 'username': u'chris'})
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/login.html']
+ form = context['login_form']
+ assert form.password.errors == [u'This field is required.']
+
+ # Failed login - bad user
+ # -----------------------
+ template.clear_test_template_context()
+ response = test_app.post(
+ '/auth/login/', {
+ 'username': u'steve',
+ 'password': 'toast'})
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/login.html']
+ assert context['login_failed']
+
+ # Failed login - bad password
+ # ---------------------------
+ template.clear_test_template_context()
+ response = test_app.post(
+ '/auth/login/', {
+ 'username': u'chris',
+ 'password': 'jam_and_ham'})
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/login.html']
+ assert context['login_failed']
+
+ # Successful login
+ # ----------------
+ template.clear_test_template_context()
+ response = test_app.post(
+ '/auth/login/', {
+ 'username': u'chris',
+ 'password': 'toast'})
+
+ # User should be redirected
+ response.follow()
+ assert urlparse.urlsplit(response.location)[2] == '/'
+ assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT
+
+ # Make sure user is in the session
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html']
+ session = context['request'].session
+ assert session['user_id'] == unicode(test_user.id)
+
+ # Successful logout
+ # -----------------
+ template.clear_test_template_context()
+ response = test_app.get('/auth/logout/')
+
+ # Should be redirected to index page
+ response.follow()
+ assert urlparse.urlsplit(response.location)[2] == '/'
+ assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT
+
+ # Make sure the user is not in the session
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html']
+ session = context['request'].session
+ assert 'user_id' not in session
+
+ # User is redirected to custom URL if POST['next'] is set
+ # -------------------------------------------------------
+ template.clear_test_template_context()
+ response = test_app.post(
+ '/auth/login/', {
+ 'username': u'chris',
+ 'password': 'toast',
+ 'next' : '/u/chris/'})
+ assert urlparse.urlsplit(response.location)[2] == '/u/chris/'
diff --git a/mediagoblin/tests/test_celery_setup.py b/mediagoblin/tests/test_celery_setup.py
new file mode 100644
index 00000000..5530c6f2
--- /dev/null
+++ b/mediagoblin/tests/test_celery_setup.py
@@ -0,0 +1,60 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import pkg_resources
+
+from mediagoblin.init import celery as celery_setup
+from mediagoblin.init.config import read_mediagoblin_config
+
+
+TEST_CELERY_CONF_NOSPECIALDB = pkg_resources.resource_filename(
+ 'mediagoblin.tests', 'fake_celery_conf.ini')
+
+
+def test_setup_celery_from_config():
+ def _wipe_testmodule_clean(module):
+ vars_to_wipe = [
+ var for var in dir(module)
+ if not var.startswith('__') and not var.endswith('__')]
+ for var in vars_to_wipe:
+ delattr(module, var)
+
+ global_config, validation_result = read_mediagoblin_config(
+ TEST_CELERY_CONF_NOSPECIALDB)
+ app_config = global_config['mediagoblin']
+
+ celery_setup.setup_celery_from_config(
+ app_config, global_config,
+ 'mediagoblin.tests.fake_celery_module', set_environ=False)
+
+ from mediagoblin.tests import fake_celery_module
+ assert fake_celery_module.SOME_VARIABLE == 'floop'
+ assert fake_celery_module.MAIL_PORT == 2000
+ assert isinstance(fake_celery_module.MAIL_PORT, int)
+ assert fake_celery_module.CELERYD_ETA_SCHEDULER_PRECISION == 1.3
+ assert isinstance(fake_celery_module.CELERYD_ETA_SCHEDULER_PRECISION, float)
+ assert fake_celery_module.CELERY_RESULT_PERSISTENT is True
+ assert fake_celery_module.CELERY_IMPORTS == [
+ 'foo.bar.baz', 'this.is.an.import', 'mediagoblin.processing.task']
+ assert fake_celery_module.CELERY_RESULT_BACKEND == 'database'
+ assert fake_celery_module.CELERY_RESULT_DBURI == (
+ 'sqlite:///' +
+ pkg_resources.resource_filename('mediagoblin.tests', 'celery.db'))
+
+ assert fake_celery_module.BROKER_TRANSPORT == 'sqlalchemy'
+ assert fake_celery_module.BROKER_HOST == (
+ 'sqlite:///' +
+ pkg_resources.resource_filename('mediagoblin.tests', 'kombu.db'))
diff --git a/mediagoblin/tests/test_collections.py b/mediagoblin/tests/test_collections.py
new file mode 100644
index 00000000..87782f30
--- /dev/null
+++ b/mediagoblin/tests/test_collections.py
@@ -0,0 +1,32 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2013 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+from mediagoblin.tests.tools import fixture_add_collection, fixture_add_user
+from mediagoblin.db.models import Collection, User
+
+
+def test_user_deletes_collection(test_app):
+ # Setup db.
+ user = fixture_add_user()
+ coll = fixture_add_collection(user=user)
+ # Reload into session:
+ user = User.query.get(user.id)
+
+ cnt1 = Collection.query.count()
+ user.delete()
+ cnt2 = Collection.query.count()
+
+ assert cnt1 == cnt2 + 1
diff --git a/mediagoblin/tests/test_config.py b/mediagoblin/tests/test_config.py
new file mode 100644
index 00000000..b13adae6
--- /dev/null
+++ b/mediagoblin/tests/test_config.py
@@ -0,0 +1,97 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import pkg_resources
+
+from mediagoblin.init import config
+
+
+CARROT_CONF_GOOD = pkg_resources.resource_filename(
+ 'mediagoblin.tests', 'fake_carrot_conf_good.ini')
+CARROT_CONF_EMPTY = pkg_resources.resource_filename(
+ 'mediagoblin.tests', 'fake_carrot_conf_empty.ini')
+CARROT_CONF_BAD = pkg_resources.resource_filename(
+ 'mediagoblin.tests', 'fake_carrot_conf_bad.ini')
+FAKE_CONFIG_SPEC = pkg_resources.resource_filename(
+ 'mediagoblin.tests', 'fake_config_spec.ini')
+
+
+def test_read_mediagoblin_config():
+ # An empty file
+ this_conf, validation_results = config.read_mediagoblin_config(
+ CARROT_CONF_EMPTY, FAKE_CONFIG_SPEC)
+
+ assert this_conf['carrotapp']['carrotcake'] == False
+ assert this_conf['carrotapp']['num_carrots'] == 1
+ assert 'encouragement_phrase' not in this_conf['carrotapp']
+ assert this_conf['celery']['EAT_CELERY_WITH_CARROTS'] == True
+
+ # A good file
+ this_conf, validation_results = config.read_mediagoblin_config(
+ CARROT_CONF_GOOD, FAKE_CONFIG_SPEC)
+
+ assert this_conf['carrotapp']['carrotcake'] == True
+ assert this_conf['carrotapp']['num_carrots'] == 88
+ assert this_conf['carrotapp']['encouragement_phrase'] == \
+ "I'd love it if you eat your carrots!"
+ assert this_conf['carrotapp']['blah_blah'] == "blah!"
+ assert this_conf['celery']['EAT_CELERY_WITH_CARROTS'] == False
+
+ # A bad file
+ this_conf, validation_results = config.read_mediagoblin_config(
+ CARROT_CONF_BAD, FAKE_CONFIG_SPEC)
+
+ # These should still open but will have errors that we'll test for
+ # in test_generate_validation_report()
+ assert this_conf['carrotapp']['carrotcake'] == 'slobber'
+ assert this_conf['carrotapp']['num_carrots'] == 'GROSS'
+ assert this_conf['carrotapp']['encouragement_phrase'] == \
+ "586956856856"
+ assert this_conf['carrotapp']['blah_blah'] == "blah!"
+ assert this_conf['celery']['EAT_CELERY_WITH_CARROTS'] == "pants"
+
+
+def test_generate_validation_report():
+ # Empty
+ this_conf, validation_results = config.read_mediagoblin_config(
+ CARROT_CONF_EMPTY, FAKE_CONFIG_SPEC)
+ report = config.generate_validation_report(this_conf, validation_results)
+ assert report is None
+
+ # Good
+ this_conf, validation_results = config.read_mediagoblin_config(
+ CARROT_CONF_GOOD, FAKE_CONFIG_SPEC)
+ report = config.generate_validation_report(this_conf, validation_results)
+ assert report is None
+
+ # Bad
+ this_conf, validation_results = config.read_mediagoblin_config(
+ CARROT_CONF_BAD, FAKE_CONFIG_SPEC)
+ report = config.generate_validation_report(this_conf, validation_results)
+
+ assert report.startswith("""\
+There were validation problems loading this config file:
+--------------------------------------------------------""")
+
+ expected_warnings = [
+ 'carrotapp:carrotcake = the value "slobber" is of the wrong type.',
+ 'carrotapp:num_carrots = the value "GROSS" is of the wrong type.',
+ 'celery:EAT_CELERY_WITH_CARROTS = the value "pants" is of the wrong type.']
+ warnings = report.splitlines()[2:]
+
+ assert len(warnings) == 3
+ for warning in expected_warnings:
+ assert warning in warnings
diff --git a/mediagoblin/tests/test_csrf_middleware.py b/mediagoblin/tests/test_csrf_middleware.py
new file mode 100644
index 00000000..a272caf6
--- /dev/null
+++ b/mediagoblin/tests/test_csrf_middleware.py
@@ -0,0 +1,86 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+from mediagoblin import mg_globals
+
+
+def test_csrf_cookie_set(test_app):
+ cookie_name = mg_globals.app_config['csrf_cookie_name']
+
+ # get login page
+ response = test_app.get('/auth/login/')
+
+ # assert that the mediagoblin nonce cookie has been set
+ assert 'Set-Cookie' in response.headers
+ assert cookie_name in response.cookies_set
+
+ # assert that we're also sending a vary header
+ assert response.headers.get('Vary', False) == 'Cookie'
+
+
+# We need a fresh app for this test on webtest < 1.3.6.
+# We do not understand why, but it fixes the tests.
+# If we require webtest >= 1.3.6, we can switch to a non fresh app here.
+#
+# ... this comment might be irrelevant post-pytest-fixtures, but I'm not
+# removing it yet in case we move to module-level tests :)
+# -- cwebber
+def test_csrf_token_must_match(test_app):
+
+ # construct a request with no cookie or form token
+ assert test_app.post('/auth/login/',
+ extra_environ={'gmg.verify_csrf': True},
+ expect_errors=True).status_int == 403
+
+ # construct a request with a cookie, but no form token
+ assert test_app.post('/auth/login/',
+ headers={'Cookie': str('%s=foo' %
+ mg_globals.app_config['csrf_cookie_name'])},
+ extra_environ={'gmg.verify_csrf': True},
+ expect_errors=True).status_int == 403
+
+ # if both the cookie and form token are provided, they must match
+ assert test_app.post('/auth/login/',
+ {'csrf_token': 'blarf'},
+ headers={'Cookie': str('%s=foo' %
+ mg_globals.app_config['csrf_cookie_name'])},
+ extra_environ={'gmg.verify_csrf': True},
+ expect_errors=True).\
+ status_int == 403
+
+ assert test_app.post('/auth/login/',
+ {'csrf_token': 'foo'},
+ headers={'Cookie': str('%s=foo' %
+ mg_globals.app_config['csrf_cookie_name'])},
+ extra_environ={'gmg.verify_csrf': True}).\
+ status_int == 200
+
+def test_csrf_exempt(test_app):
+ # monkey with the views to decorate a known endpoint
+ import mediagoblin.auth.views
+ from mediagoblin.meddleware.csrf import csrf_exempt
+
+ mediagoblin.auth.views.login = csrf_exempt(
+ mediagoblin.auth.views.login
+ )
+
+ # construct a request with no cookie or form token
+ assert test_app.post('/auth/login/',
+ extra_environ={'gmg.verify_csrf': True},
+ expect_errors=False).status_int == 200
+
+ # restore the CSRF protection in case other tests expect it
+ mediagoblin.auth.views.login.csrf_enabled = True
diff --git a/mediagoblin/tests/test_edit.py b/mediagoblin/tests/test_edit.py
new file mode 100644
index 00000000..08b4f8cf
--- /dev/null
+++ b/mediagoblin/tests/test_edit.py
@@ -0,0 +1,144 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import urlparse
+import pytest
+
+from mediagoblin import mg_globals
+from mediagoblin.db.models import User
+from mediagoblin.tests.tools import fixture_add_user
+from mediagoblin.tools import template
+from mediagoblin.auth.lib import bcrypt_check_password
+
+class TestUserEdit(object):
+ def setup(self):
+ # set up new user
+ self.user_password = u'toast'
+ self.user = fixture_add_user(password = self.user_password)
+
+ def login(self, test_app):
+ test_app.post(
+ '/auth/login/', {
+ 'username': self.user.username,
+ 'password': self.user_password})
+
+
+ def test_user_deletion(self, test_app):
+ """Delete user via web interface"""
+ self.login(test_app)
+
+ # Make sure user exists
+ assert User.query.filter_by(username=u'chris').first()
+
+ res = test_app.post('/edit/account/delete/', {'confirmed': 'y'})
+
+ # Make sure user has been deleted
+ assert User.query.filter_by(username=u'chris').first() == None
+
+ #TODO: make sure all corresponding items comments etc have been
+ # deleted too. Perhaps in submission test?
+
+ #Restore user at end of test
+ self.user = fixture_add_user(password = self.user_password)
+ self.login(test_app)
+
+
+ def test_change_password(self, test_app):
+ """Test changing password correctly and incorrectly"""
+ self.login(test_app)
+
+ # test that the password can be changed
+ template.clear_test_template_context()
+ res = test_app.post(
+ '/edit/password/', {
+ 'old_password': 'toast',
+ 'new_password': '123456',
+ })
+ res.follow()
+
+ # Did we redirect to the correct page?
+ assert urlparse.urlsplit(res.location)[2] == '/edit/account/'
+
+ # test_user has to be fetched again in order to have the current values
+ test_user = User.query.filter_by(username=u'chris').first()
+ assert bcrypt_check_password('123456', test_user.pw_hash)
+ # Update current user passwd
+ self.user_password = '123456'
+
+ # test that the password cannot be changed if the given
+ # old_password is wrong
+ template.clear_test_template_context()
+ test_app.post(
+ '/edit/password/', {
+ 'old_password': 'toast',
+ 'new_password': '098765',
+ })
+
+ test_user = User.query.filter_by(username=u'chris').first()
+ assert not bcrypt_check_password('098765', test_user.pw_hash)
+
+
+ def test_change_bio_url(self, test_app):
+ """Test changing bio and URL"""
+ self.login(test_app)
+
+ # Test if legacy profile editing URL redirects correctly
+ res = test_app.post(
+ '/edit/profile/', {
+ 'bio': u'I love toast!',
+ 'url': u'http://dustycloud.org/'}, expect_errors=True)
+
+ # Should redirect to /u/chris/edit/
+ assert res.status_int == 302
+ assert res.headers['Location'].endswith("/u/chris/edit/")
+
+ res = test_app.post(
+ '/u/chris/edit/', {
+ 'bio': u'I love toast!',
+ 'url': u'http://dustycloud.org/'})
+
+ test_user = User.query.filter_by(username=u'chris').first()
+ assert test_user.bio == u'I love toast!'
+ assert test_user.url == u'http://dustycloud.org/'
+
+ # change a different user than the logged in (should fail with 403)
+ fixture_add_user(username=u"foo")
+ res = test_app.post(
+ '/u/foo/edit/', {
+ 'bio': u'I love toast!',
+ 'url': u'http://dustycloud.org/'}, expect_errors=True)
+ assert res.status_int == 403
+
+ # test changing the bio and the URL inproperly
+ too_long_bio = 150 * 'T' + 150 * 'o' + 150 * 'a' + 150 * 's' + 150* 't'
+
+ test_app.post(
+ '/u/chris/edit/', {
+ # more than 500 characters
+ 'bio': too_long_bio,
+ 'url': 'this-is-no-url'})
+
+ # Check form errors
+ context = template.TEMPLATE_TEST_CONTEXT[
+ 'mediagoblin/edit/edit_profile.html']
+ form = context['form']
+
+ assert form.bio.errors == [
+ u'Field must be between 0 and 500 characters long.']
+ assert form.url.errors == [
+ u'This address contains errors']
+
+# test changing the url inproperly
diff --git a/mediagoblin/tests/test_exif.py b/mediagoblin/tests/test_exif.py
new file mode 100644
index 00000000..c07e24ae
--- /dev/null
+++ b/mediagoblin/tests/test_exif.py
@@ -0,0 +1,431 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+try:
+ from PIL import Image
+except ImportError:
+ import Image
+
+from mediagoblin.tools.exif import exif_fix_image_orientation, \
+ extract_exif, clean_exif, get_gps_data, get_useful
+from .resources import GOOD_JPG, EMPTY_JPG, BAD_JPG, GPS_JPG
+
+
+def assert_in(a, b):
+ assert a in b, "%r not in %r" % (a, b)
+
+
+def test_exif_extraction():
+ '''
+ Test EXIF extraction from a good image
+ '''
+ result = extract_exif(GOOD_JPG)
+ clean = clean_exif(result)
+ useful = get_useful(clean)
+ gps = get_gps_data(result)
+
+ # Do we have the result?
+ assert len(result) == 56
+
+ # Do we have clean data?
+ assert len(clean) == 53
+
+ # GPS data?
+ assert gps == {}
+
+ # Do we have the "useful" tags?
+ assert useful == {'EXIF CVAPattern': {'field_length': 8,
+ 'field_offset': 26224,
+ 'field_type': 7,
+ 'printable': u'[0, 2, 0, 2, 1, 2, 0, 1]',
+ 'tag': 41730,
+ 'values': [0, 2, 0, 2, 1, 2, 0, 1]},
+ 'EXIF ColorSpace': {'field_length': 2,
+ 'field_offset': 476,
+ 'field_type': 3,
+ 'printable': u'sRGB',
+ 'tag': 40961,
+ 'values': [1]},
+ 'EXIF ComponentsConfiguration': {'field_length': 4,
+ 'field_offset': 308,
+ 'field_type': 7,
+ 'printable': u'YCbCr',
+ 'tag': 37121,
+ 'values': [1, 2, 3, 0]},
+ 'EXIF CompressedBitsPerPixel': {'field_length': 8,
+ 'field_offset': 756,
+ 'field_type': 5,
+ 'printable': u'4',
+ 'tag': 37122,
+ 'values': [[4, 1]]},
+ 'EXIF Contrast': {'field_length': 2,
+ 'field_offset': 656,
+ 'field_type': 3,
+ 'printable': u'Soft',
+ 'tag': 41992,
+ 'values': [1]},
+ 'EXIF CustomRendered': {'field_length': 2,
+ 'field_offset': 572,
+ 'field_type': 3,
+ 'printable': u'Normal',
+ 'tag': 41985,
+ 'values': [0]},
+ 'EXIF DateTimeDigitized': {'field_length': 20,
+ 'field_offset': 736,
+ 'field_type': 2,
+ 'printable': u'2011:06:22 12:20:33',
+ 'tag': 36868,
+ 'values': u'2011:06:22 12:20:33'},
+ 'EXIF DateTimeOriginal': {'field_length': 20,
+ 'field_offset': 716,
+ 'field_type': 2,
+ 'printable': u'2011:06:22 12:20:33',
+ 'tag': 36867,
+ 'values': u'2011:06:22 12:20:33'},
+ 'EXIF DigitalZoomRatio': {'field_length': 8,
+ 'field_offset': 26232,
+ 'field_type': 5,
+ 'printable': u'1',
+ 'tag': 41988,
+ 'values': [[1, 1]]},
+ 'EXIF ExifImageLength': {'field_length': 2,
+ 'field_offset': 500,
+ 'field_type': 3,
+ 'printable': u'2592',
+ 'tag': 40963,
+ 'values': [2592]},
+ 'EXIF ExifImageWidth': {'field_length': 2,
+ 'field_offset': 488,
+ 'field_type': 3,
+ 'printable': u'3872',
+ 'tag': 40962,
+ 'values': [3872]},
+ 'EXIF ExifVersion': {'field_length': 4,
+ 'field_offset': 272,
+ 'field_type': 7,
+ 'printable': u'0221',
+ 'tag': 36864,
+ 'values': [48, 50, 50, 49]},
+ 'EXIF ExposureBiasValue': {'field_length': 8,
+ 'field_offset': 764,
+ 'field_type': 10,
+ 'printable': u'0',
+ 'tag': 37380,
+ 'values': [[0, 1]]},
+ 'EXIF ExposureMode': {'field_length': 2,
+ 'field_offset': 584,
+ 'field_type': 3,
+ 'printable': u'Manual Exposure',
+ 'tag': 41986,
+ 'values': [1]},
+ 'EXIF ExposureProgram': {'field_length': 2,
+ 'field_offset': 248,
+ 'field_type': 3,
+ 'printable': u'Manual',
+ 'tag': 34850,
+ 'values': [1]},
+ 'EXIF ExposureTime': {'field_length': 8,
+ 'field_offset': 700,
+ 'field_type': 5,
+ 'printable': u'1/125',
+ 'tag': 33434,
+ 'values': [[1, 125]]},
+ 'EXIF FNumber': {'field_length': 8,
+ 'field_offset': 708,
+ 'field_type': 5,
+ 'printable': u'10',
+ 'tag': 33437,
+ 'values': [[10, 1]]},
+ 'EXIF FileSource': {'field_length': 1,
+ 'field_offset': 536,
+ 'field_type': 7,
+ 'printable': u'Digital Camera',
+ 'tag': 41728,
+ 'values': [3]},
+ 'EXIF Flash': {'field_length': 2,
+ 'field_offset': 380,
+ 'field_type': 3,
+ 'printable': u'Flash did not fire',
+ 'tag': 37385,
+ 'values': [0]},
+ 'EXIF FlashPixVersion': {'field_length': 4,
+ 'field_offset': 464,
+ 'field_type': 7,
+ 'printable': u'0100',
+ 'tag': 40960,
+ 'values': [48, 49, 48, 48]},
+ 'EXIF FocalLength': {'field_length': 8,
+ 'field_offset': 780,
+ 'field_type': 5,
+ 'printable': u'18',
+ 'tag': 37386,
+ 'values': [[18, 1]]},
+ 'EXIF FocalLengthIn35mmFilm': {'field_length': 2,
+ 'field_offset': 620,
+ 'field_type': 3,
+ 'printable': u'27',
+ 'tag': 41989,
+ 'values': [27]},
+ 'EXIF GainControl': {'field_length': 2,
+ 'field_offset': 644,
+ 'field_type': 3,
+ 'printable': u'None',
+ 'tag': 41991,
+ 'values': [0]},
+ 'EXIF ISOSpeedRatings': {'field_length': 2,
+ 'field_offset': 260,
+ 'field_type': 3,
+ 'printable': u'100',
+ 'tag': 34855,
+ 'values': [100]},
+ 'EXIF InteroperabilityOffset': {'field_length': 4,
+ 'field_offset': 512,
+ 'field_type': 4,
+ 'printable': u'26240',
+ 'tag': 40965,
+ 'values': [26240]},
+ 'EXIF LightSource': {'field_length': 2,
+ 'field_offset': 368,
+ 'field_type': 3,
+ 'printable': u'Unknown',
+ 'tag': 37384,
+ 'values': [0]},
+ 'EXIF MaxApertureValue': {'field_length': 8,
+ 'field_offset': 772,
+ 'field_type': 5,
+ 'printable': u'18/5',
+ 'tag': 37381,
+ 'values': [[18, 5]]},
+ 'EXIF MeteringMode': {'field_length': 2,
+ 'field_offset': 356,
+ 'field_type': 3,
+ 'printable': u'Pattern',
+ 'tag': 37383,
+ 'values': [5]},
+ 'EXIF Saturation': {'field_length': 2,
+ 'field_offset': 668,
+ 'field_type': 3,
+ 'printable': u'Normal',
+ 'tag': 41993,
+ 'values': [0]},
+ 'EXIF SceneCaptureType': {'field_length': 2,
+ 'field_offset': 632,
+ 'field_type': 3,
+ 'printable': u'Standard',
+ 'tag': 41990,
+ 'values': [0]},
+ 'EXIF SceneType': {'field_length': 1,
+ 'field_offset': 548,
+ 'field_type': 7,
+ 'printable': u'Directly Photographed',
+ 'tag': 41729,
+ 'values': [1]},
+ 'EXIF SensingMethod': {'field_length': 2,
+ 'field_offset': 524,
+ 'field_type': 3,
+ 'printable': u'One-chip color area',
+ 'tag': 41495,
+ 'values': [2]},
+ 'EXIF Sharpness': {'field_length': 2,
+ 'field_offset': 680,
+ 'field_type': 3,
+ 'printable': u'Normal',
+ 'tag': 41994,
+ 'values': [0]},
+ 'EXIF SubSecTime': {'field_length': 3,
+ 'field_offset': 428,
+ 'field_type': 2,
+ 'printable': u'10',
+ 'tag': 37520,
+ 'values': u'10'},
+ 'EXIF SubSecTimeDigitized': {'field_length': 3,
+ 'field_offset': 452,
+ 'field_type': 2,
+ 'printable': u'10',
+ 'tag': 37522,
+ 'values': u'10'},
+ 'EXIF SubSecTimeOriginal': {'field_length': 3,
+ 'field_offset': 440,
+ 'field_type': 2,
+ 'printable': u'10',
+ 'tag': 37521,
+ 'values': u'10'},
+ 'EXIF SubjectDistanceRange': {'field_length': 2,
+ 'field_offset': 692,
+ 'field_type': 3,
+ 'printable': u'0',
+ 'tag': 41996,
+ 'values': [0]},
+ 'EXIF WhiteBalance': {'field_length': 2,
+ 'field_offset': 596,
+ 'field_type': 3,
+ 'printable': u'Auto',
+ 'tag': 41987,
+ 'values': [0]},
+ 'Image DateTime': {'field_length': 20,
+ 'field_offset': 194,
+ 'field_type': 2,
+ 'printable': u'2011:06:22 12:20:33',
+ 'tag': 306,
+ 'values': u'2011:06:22 12:20:33'},
+ 'Image ExifOffset': {'field_length': 4,
+ 'field_offset': 126,
+ 'field_type': 4,
+ 'printable': u'214',
+ 'tag': 34665,
+ 'values': [214]},
+ 'Image Make': {'field_length': 18,
+ 'field_offset': 134,
+ 'field_type': 2,
+ 'printable': u'NIKON CORPORATION',
+ 'tag': 271,
+ 'values': u'NIKON CORPORATION'},
+ 'Image Model': {'field_length': 10,
+ 'field_offset': 152,
+ 'field_type': 2,
+ 'printable': u'NIKON D80',
+ 'tag': 272,
+ 'values': u'NIKON D80'},
+ 'Image Orientation': {'field_length': 2,
+ 'field_offset': 42,
+ 'field_type': 3,
+ 'printable': u'Rotated 90 CCW',
+ 'tag': 274,
+ 'values': [6]},
+ 'Image ResolutionUnit': {'field_length': 2,
+ 'field_offset': 78,
+ 'field_type': 3,
+ 'printable': u'Pixels/Inch',
+ 'tag': 296,
+ 'values': [2]},
+ 'Image Software': {'field_length': 15,
+ 'field_offset': 178,
+ 'field_type': 2,
+ 'printable': u'Shotwell 0.9.3',
+ 'tag': 305,
+ 'values': u'Shotwell 0.9.3'},
+ 'Image XResolution': {'field_length': 8,
+ 'field_offset': 162,
+ 'field_type': 5,
+ 'printable': u'300',
+ 'tag': 282,
+ 'values': [[300, 1]]},
+ 'Image YCbCrPositioning': {'field_length': 2,
+ 'field_offset': 114,
+ 'field_type': 3,
+ 'printable': u'Co-sited',
+ 'tag': 531,
+ 'values': [2]},
+ 'Image YResolution': {'field_length': 8,
+ 'field_offset': 170,
+ 'field_type': 5,
+ 'printable': u'300',
+ 'tag': 283,
+ 'values': [[300, 1]]},
+ 'Thumbnail Compression': {'field_length': 2,
+ 'field_offset': 26280,
+ 'field_type': 3,
+ 'printable': u'JPEG (old-style)',
+ 'tag': 259,
+ 'values': [6]},
+ 'Thumbnail ResolutionUnit': {'field_length': 2,
+ 'field_offset': 26316,
+ 'field_type': 3,
+ 'printable': u'Pixels/Inch',
+ 'tag': 296,
+ 'values': [2]},
+ 'Thumbnail XResolution': {'field_length': 8,
+ 'field_offset': 26360,
+ 'field_type': 5,
+ 'printable': u'300',
+ 'tag': 282,
+ 'values': [[300, 1]]},
+ 'Thumbnail YCbCrPositioning': {'field_length': 2,
+ 'field_offset': 26352,
+ 'field_type': 3,
+ 'printable': u'Co-sited',
+ 'tag': 531,
+ 'values': [2]},
+ 'Thumbnail YResolution': {'field_length': 8,
+ 'field_offset': 26368,
+ 'field_type': 5,
+ 'printable': u'300',
+ 'tag': 283,
+ 'values': [[300, 1]]}}
+
+
+def test_exif_image_orientation():
+ '''
+ Test image reorientation based on EXIF data
+ '''
+ result = extract_exif(GOOD_JPG)
+
+ image = exif_fix_image_orientation(
+ Image.open(GOOD_JPG),
+ result)
+
+ # Are the dimensions correct?
+ assert image.size == (428, 640)
+
+ # If this pixel looks right, the rest of the image probably will too.
+ assert_in(image.getdata()[10000],
+ ((41, 28, 11), (43, 27, 11))
+ )
+
+
+def test_exif_no_exif():
+ '''
+ Test an image without exif
+ '''
+ result = extract_exif(EMPTY_JPG)
+ clean = clean_exif(result)
+ useful = get_useful(clean)
+ gps = get_gps_data(result)
+
+ assert result == {}
+ assert clean == {}
+ assert gps == {}
+ assert useful == {}
+
+
+def test_exif_bad_image():
+ '''
+ Test EXIF extraction from a faithful, but bad image
+ '''
+ result = extract_exif(BAD_JPG)
+ clean = clean_exif(result)
+ useful = get_useful(clean)
+ gps = get_gps_data(result)
+
+ assert result == {}
+ assert clean == {}
+ assert gps == {}
+ assert useful == {}
+
+
+def test_exif_gps_data():
+ '''
+ Test extractiion of GPS data
+ '''
+ result = extract_exif(GPS_JPG)
+ gps = get_gps_data(result)
+
+ assert gps == {
+ 'latitude': 59.336666666666666,
+ 'direction': 25.674046740467404,
+ 'altitude': 37.64365671641791,
+ 'longitude': 18.016166666666667}
diff --git a/mediagoblin/tests/test_exif/bad.jpg b/mediagoblin/tests/test_exif/bad.jpg
new file mode 100644
index 00000000..4cde23cd
--- /dev/null
+++ b/mediagoblin/tests/test_exif/bad.jpg
@@ -0,0 +1,18 @@
+V2UncmUgbm8gc3RyYW5nZXJzIHRvIGxvdmUKWW91IGtub3cgdGhlIHJ1bGVzIGFuZCBzbyBkbyBJ
+CkEgZnVsbCBjb21taXRtZW50J3Mgd2hhdCBJJ20gdGhpbmtpbicgb2YKWW91IHdvdWxkbid0IGdl
+dCB0aGlzIGZyb20gYW55IG90aGVyIGd1eQpJIGp1c3Qgd2FubmEgdGVsbCB5b3UgaG93IEknbSBm
+ZWVsaW4nCkdvdHRhIG1ha2UgeW91IHVuZGVyc3RhbmQKCihDaG9ydXMpCk5ldmVyIGdvbm5hIGdp
+dmUgeW91IHVwCk5ldmVyIGdvbm5hIGxldCB5b3UgZG93bgpOZXZlciBnb25uYSBydW4gYXJvdW5k
+IGFuZCBkZXNlcnQgeW91Ck5ldmVyIGdvbm5hIG1ha2UgeW91IGNyeQpOZXZlciBnb25uYSBzYXkg
+Z29vZGJ5ZQpOZXZlciBnb25uYSB0ZWxsIGEgbGllIGFuZCBodXJ0IHlvdQoKV2UndmUga25vdyBl
+YWNoIG90aGVyIGZvciBzbyBsb25nCllvdXIgaGVhcnQncyBiZWVuIGFjaGluJyBidXQgeW91J3Jl
+IHRvbyBzaHkgdG8gc2F5IGl0Ckluc2lkZSB3ZSBib3RoIGtub3cgd2hhdCdzIGJlZW4gZ29pbmcg
+b24KV2Uga25vdyB0aGUgZ2FtZSBhbmQgd2UncmUgZ29ubmEgcGxheSBpdApBbmQgaWYgeW91IGFz
+ayBtZSBob3cgSSdtIGZlZWxpbicKRG9uJ3QgdGVsbCBtZSB5b3UncmUgdG9vIGJsaW5kIHRvIHNl
+ZQoKKENob3J1cyB4MikKCihHaXZlIHlvdSB1cCwgZ2l2ZSB5b3UgdXApCk5ldmVyIGdvbm5hIGdp
+dmUsIG5ldmVyIGdvbm5hIGdpdmUKKEdpdmUgeW91IHVwKQpOZXZlciBnb25uYSBnaXZlLCBuZXZl
+ciBnb25uYSBnaXZlCihHaXZlIHlvdSB1cCkKCldlJ3ZlIGtub3cgZWFjaCBvdGhlciBmb3Igc28g
+bG9uZwpZb3VyIGhlYXJ0J3MgYmVlbiBhY2hpbicgYnV0IHlvdSdyZSB0b28gc2h5IHRvIHNheSBp
+dApJbnNpZGUgd2UgYm90aCBrbm93IHdoYXQncyBiZWVuIGdvaW5nIG9uCldlIGtub3cgdGhlIGdh
+bWUgYW5kIHdlJ3JlIGdvbm5hIHBsYXkgaXQKSSBqdXN0IHdhbm5hIHRlbGwgeW91IGhvdyBJJ20g
+ZmVlbGluJwpHb3R0YSBtYWtlIHlvdSB1bmRlcnN0YW5kCgooQ2hvcnVzIHgzKQo=
diff --git a/mediagoblin/tests/test_exif/empty.jpg b/mediagoblin/tests/test_exif/empty.jpg
new file mode 100644
index 00000000..37533af5
--- /dev/null
+++ b/mediagoblin/tests/test_exif/empty.jpg
Binary files differ
diff --git a/mediagoblin/tests/test_exif/good.jpg b/mediagoblin/tests/test_exif/good.jpg
new file mode 100644
index 00000000..0ee956fe
--- /dev/null
+++ b/mediagoblin/tests/test_exif/good.jpg
Binary files differ
diff --git a/mediagoblin/tests/test_exif/has-gps.jpg b/mediagoblin/tests/test_exif/has-gps.jpg
new file mode 100644
index 00000000..f6f39d86
--- /dev/null
+++ b/mediagoblin/tests/test_exif/has-gps.jpg
Binary files differ
diff --git a/mediagoblin/tests/test_globals.py b/mediagoblin/tests/test_globals.py
new file mode 100644
index 00000000..fe3088f8
--- /dev/null
+++ b/mediagoblin/tests/test_globals.py
@@ -0,0 +1,42 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import pytest
+
+from mediagoblin import mg_globals
+
+
+class TestGlobals(object):
+ def setup(self):
+ self.old_database = mg_globals.database
+
+ def teardown(self):
+ mg_globals.database = self.old_database
+
+ def test_setup_globals(self):
+ mg_globals.setup_globals(
+ database='my favorite database!',
+ public_store='my favorite public_store!',
+ queue_store='my favorite queue_store!')
+
+ assert mg_globals.database == 'my favorite database!'
+ assert mg_globals.public_store == 'my favorite public_store!'
+ assert mg_globals.queue_store == 'my favorite queue_store!'
+
+ pytest.raises(
+ AssertionError,
+ mg_globals.setup_globals,
+ no_such_global_foo="Dummy")
diff --git a/mediagoblin/tests/test_http_callback.py b/mediagoblin/tests/test_http_callback.py
new file mode 100644
index 00000000..a0511af7
--- /dev/null
+++ b/mediagoblin/tests/test_http_callback.py
@@ -0,0 +1,83 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import json
+
+import pytest
+from urlparse import urlparse, parse_qs
+
+from mediagoblin import mg_globals
+from mediagoblin.tools import processing
+from mediagoblin.tests.tools import fixture_add_user
+from mediagoblin.tests.test_submission import GOOD_PNG
+from mediagoblin.tests import test_oauth as oauth
+
+
+class TestHTTPCallback(object):
+ @pytest.fixture(autouse=True)
+ def setup(self, test_app):
+ self.test_app = test_app
+
+ self.db = mg_globals.database
+
+ self.user_password = u'secret'
+ self.user = fixture_add_user(u'call_back', self.user_password)
+
+ self.login()
+
+ def login(self):
+ self.test_app.post('/auth/login/', {
+ 'username': self.user.username,
+ 'password': self.user_password})
+
+ def get_access_token(self, client_id, client_secret, code):
+ response = self.test_app.get('/oauth/access_token', {
+ 'code': code,
+ 'client_id': client_id,
+ 'client_secret': client_secret})
+
+ response_data = json.loads(response.body)
+
+ return response_data['access_token']
+
+ def test_callback(self):
+ ''' Test processing HTTP callback '''
+ self.oauth = oauth.TestOAuth()
+ self.oauth.setup(self.test_app)
+
+ redirect, client_id = self.oauth.test_4_authorize_confidential_client()
+
+ code = parse_qs(urlparse(redirect.location).query)['code'][0]
+
+ client = self.db.OAuthClient.query.filter(
+ self.db.OAuthClient.identifier == unicode(client_id)).first()
+
+ client_secret = client.secret
+
+ access_token = self.get_access_token(client_id, client_secret, code)
+
+ callback_url = 'https://foo.example?secrettestmediagoblinparam'
+
+ self.test_app.post('/api/submit?client_id={0}&access_token={1}\
+&client_secret={2}'.format(
+ client_id,
+ access_token,
+ client_secret), {
+ 'title': 'Test',
+ 'callback_url': callback_url},
+ upload_files=[('file', GOOD_PNG)])
+
+ assert processing.TESTS_CALLBACKS[callback_url]['state'] == u'processed'
diff --git a/mediagoblin/tests/test_messages.py b/mediagoblin/tests/test_messages.py
new file mode 100644
index 00000000..22f9e800
--- /dev/null
+++ b/mediagoblin/tests/test_messages.py
@@ -0,0 +1,50 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+from mediagoblin import messages
+from mediagoblin.tools import template
+
+
+def test_messages(test_app):
+ """
+ Added messages should show up in the request.session,
+ fetched messages should be the same as the added ones,
+ and fetching should clear the message list.
+ """
+ # Aquire a request object
+ test_app.get('/')
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html']
+ request = context['request']
+
+ # The message queue should be empty
+ assert request.session.get('messages', []) == []
+
+ # First of all, we should clear the messages queue
+ messages.clear_add_message()
+ # Adding a message should modify the session accordingly
+ messages.add_message(request, 'herp_derp', 'First!')
+ test_msg_queue = [{'text': 'First!', 'level': 'herp_derp'}]
+
+ # Alternative tests to the following, test divided in two steps:
+ # assert request.session['messages'] == test_msg_queue
+ # 1. Tests if add_message worked
+ assert messages.ADD_MESSAGE_TEST[-1] == test_msg_queue
+ # 2. Tests if add_message updated session information
+ assert messages.ADD_MESSAGE_TEST[-1] == request.session['messages']
+
+ # fetch_messages should return and empty the queue
+ assert messages.fetch_messages(request) == test_msg_queue
+ assert request.session.get('messages') == []
diff --git a/mediagoblin/tests/test_mgoblin_app.ini b/mediagoblin/tests/test_mgoblin_app.ini
new file mode 100644
index 00000000..0466b53b
--- /dev/null
+++ b/mediagoblin/tests/test_mgoblin_app.ini
@@ -0,0 +1,33 @@
+[mediagoblin]
+direct_remote_path = /test_static/
+email_sender_address = "notice@mediagoblin.example.org"
+email_debug_mode = true
+
+# TODO: Switch to using an in-memory database
+sql_engine = "sqlite:///%(here)s/user_dev/mediagoblin.db"
+
+# tag parsing
+tags_max_length = 50
+
+# So we can start to test attachments:
+allow_attachments = True
+
+media_types = mediagoblin.media_types.image, mediagoblin.media_types.pdf
+
+[storage:publicstore]
+base_dir = %(here)s/user_dev/media/public
+base_url = /mgoblin_media/
+
+[storage:queuestore]
+base_dir = %(here)s/user_dev/media/queue
+
+[celery]
+CELERY_ALWAYS_EAGER = true
+CELERY_RESULT_DBURI = "sqlite:///%(here)s/user_dev/celery.db"
+BROKER_HOST = "sqlite:///%(here)s/user_dev/kombu.db"
+
+[plugins]
+[[mediagoblin.plugins.api]]
+[[mediagoblin.plugins.oauth]]
+[[mediagoblin.plugins.httpapiauth]]
+[[mediagoblin.plugins.piwigo]]
diff --git a/mediagoblin/tests/test_misc.py b/mediagoblin/tests/test_misc.py
new file mode 100644
index 00000000..755d863f
--- /dev/null
+++ b/mediagoblin/tests/test_misc.py
@@ -0,0 +1,91 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+from mediagoblin.db.base import Session
+from mediagoblin.db.models import User, MediaEntry, MediaComment
+from mediagoblin.tests.tools import fixture_add_user, fixture_media_entry
+
+
+def test_404_for_non_existent(test_app):
+ res = test_app.get('/does-not-exist/', expect_errors=True)
+ assert res.status_int == 404
+
+
+def test_user_deletes_other_comments(test_app):
+ user_a = fixture_add_user(u"chris_a")
+ user_b = fixture_add_user(u"chris_b")
+
+ media_a = fixture_media_entry(uploader=user_a.id, save=False)
+ media_b = fixture_media_entry(uploader=user_b.id, save=False)
+ Session.add(media_a)
+ Session.add(media_b)
+ Session.flush()
+
+ # Create all 4 possible comments:
+ for u_id in (user_a.id, user_b.id):
+ for m_id in (media_a.id, media_b.id):
+ cmt = MediaComment()
+ cmt.media_entry = m_id
+ cmt.author = u_id
+ cmt.content = u"Some Comment"
+ Session.add(cmt)
+
+ Session.flush()
+
+ usr_cnt1 = User.query.count()
+ med_cnt1 = MediaEntry.query.count()
+ cmt_cnt1 = MediaComment.query.count()
+
+ User.query.get(user_a.id).delete(commit=False)
+
+ usr_cnt2 = User.query.count()
+ med_cnt2 = MediaEntry.query.count()
+ cmt_cnt2 = MediaComment.query.count()
+
+ # One user deleted
+ assert usr_cnt2 == usr_cnt1 - 1
+ # One media gone
+ assert med_cnt2 == med_cnt1 - 1
+ # Three of four comments gone.
+ assert cmt_cnt2 == cmt_cnt1 - 3
+
+ User.query.get(user_b.id).delete()
+
+ usr_cnt2 = User.query.count()
+ med_cnt2 = MediaEntry.query.count()
+ cmt_cnt2 = MediaComment.query.count()
+
+ # All users gone
+ assert usr_cnt2 == usr_cnt1 - 2
+ # All media gone
+ assert med_cnt2 == med_cnt1 - 2
+ # All comments gone
+ assert cmt_cnt2 == cmt_cnt1 - 4
+
+
+def test_media_deletes_broken_attachment(test_app):
+ user_a = fixture_add_user(u"chris_a")
+
+ media = fixture_media_entry(uploader=user_a.id, save=False)
+ media.attachment_files.append(dict(
+ name=u"some name",
+ filepath=[u"does", u"not", u"exist"],
+ ))
+ Session.add(media)
+ Session.flush()
+
+ MediaEntry.query.get(media.id).delete()
+ User.query.get(user_a.id).delete()
diff --git a/mediagoblin/tests/test_modelmethods.py b/mediagoblin/tests/test_modelmethods.py
new file mode 100644
index 00000000..427aa47c
--- /dev/null
+++ b/mediagoblin/tests/test_modelmethods.py
@@ -0,0 +1,167 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+# Maybe not every model needs a test, but some models have special
+# methods, and so it makes sense to test them here.
+
+from mediagoblin.db.base import Session
+from mediagoblin.db.models import MediaEntry
+
+from mediagoblin.tests.tools import fixture_add_user
+
+import mock
+
+
+class FakeUUID(object):
+ hex = 'testtest-test-test-test-testtesttest'
+
+UUID_MOCK = mock.Mock(return_value=FakeUUID())
+
+
+class TestMediaEntrySlugs(object):
+ def _setup(self):
+ self.chris_user = fixture_add_user(u'chris')
+ self.emily_user = fixture_add_user(u'emily')
+ self.existing_entry = self._insert_media_entry_fixture(
+ title=u"Beware, I exist!",
+ slug=u"beware-i-exist")
+
+ def _insert_media_entry_fixture(self, title=None, slug=None, this_id=None,
+ uploader=None, save=True):
+ entry = MediaEntry()
+ entry.title = title or u"Some title"
+ entry.slug = slug
+ entry.id = this_id
+ entry.uploader = uploader or self.chris_user.id
+ entry.media_type = u'image'
+
+ if save:
+ entry.save()
+
+ return entry
+
+ def test_unique_slug_from_title(self, test_app):
+ self._setup()
+
+ entry = self._insert_media_entry_fixture(u"Totally unique slug!", save=False)
+ entry.generate_slug()
+ assert entry.slug == u'totally-unique-slug'
+
+ def test_old_good_unique_slug(self, test_app):
+ self._setup()
+
+ entry = self._insert_media_entry_fixture(
+ u"A title here", u"a-different-slug-there", save=False)
+ entry.generate_slug()
+ assert entry.slug == u"a-different-slug-there"
+
+ def test_old_weird_slug(self, test_app):
+ self._setup()
+
+ entry = self._insert_media_entry_fixture(
+ slug=u"wowee!!!!!", save=False)
+ entry.generate_slug()
+ assert entry.slug == u"wowee"
+
+ def test_existing_slug_use_id(self, test_app):
+ self._setup()
+
+ entry = self._insert_media_entry_fixture(
+ u"Beware, I exist!!", this_id=9000, save=False)
+ entry.generate_slug()
+ assert entry.slug == u"beware-i-exist-9000"
+
+ def test_existing_slug_cant_use_id(self, test_app):
+ self._setup()
+
+ # Getting tired of dealing with test_app and this mock.patch
+ # thing conflicting, getting lazy.
+ @mock.patch('uuid.uuid4', UUID_MOCK)
+ def _real_test():
+ # This one grabs the nine thousand slug
+ self._insert_media_entry_fixture(
+ slug=u"beware-i-exist-9000")
+
+ entry = self._insert_media_entry_fixture(
+ u"Beware, I exist!!", this_id=9000, save=False)
+ entry.generate_slug()
+ assert entry.slug == u"beware-i-exist-test"
+
+ _real_test()
+
+ def test_existing_slug_cant_use_id_extra_junk(self, test_app):
+ self._setup()
+
+ # Getting tired of dealing with test_app and this mock.patch
+ # thing conflicting, getting lazy.
+ @mock.patch('uuid.uuid4', UUID_MOCK)
+ def _real_test():
+ # This one grabs the nine thousand slug
+ self._insert_media_entry_fixture(
+ slug=u"beware-i-exist-9000")
+
+ # This one grabs makes sure the annoyance doesn't stop
+ self._insert_media_entry_fixture(
+ slug=u"beware-i-exist-test")
+
+ entry = self._insert_media_entry_fixture(
+ u"Beware, I exist!!", this_id=9000, save=False)
+ entry.generate_slug()
+ assert entry.slug == u"beware-i-exist-testtest"
+
+ _real_test()
+
+ def test_garbage_slug(self, test_app):
+ """
+ Titles that sound totally like Q*Bert shouldn't have slugs at
+ all. We'll just reference them by id.
+
+ ,
+ / \ (@!#?@!)
+ |\,/| ,-, /
+ | |#| ( ")~
+ / \|/ \ L L
+ |\,/|\,/|
+ | |#, |#|
+ / \|/ \|/ \
+ |\,/|\,/|\,/|
+ | |#| |#| |#|
+ / \|/ \|/ \|/ \
+ |\,/|\,/|\,/|\,/|
+ | |#| |#| |#| |#|
+ \|/ \|/ \|/ \|/
+ """
+ self._setup()
+
+ qbert_entry = self._insert_media_entry_fixture(
+ u"@!#?@!", save=False)
+ qbert_entry.generate_slug()
+ assert qbert_entry.slug is None
+
+
+def test_media_data_init(test_app):
+ Session.rollback()
+ Session.remove()
+ media = MediaEntry()
+ media.media_type = u"mediagoblin.media_types.image"
+ assert media.media_data is None
+ media.media_data_init()
+ assert media.media_data is not None
+ obj_in_session = 0
+ for obj in Session():
+ obj_in_session += 1
+ print repr(obj)
+ assert obj_in_session == 0
diff --git a/mediagoblin/tests/test_oauth.py b/mediagoblin/tests/test_oauth.py
new file mode 100644
index 00000000..ea3bd798
--- /dev/null
+++ b/mediagoblin/tests/test_oauth.py
@@ -0,0 +1,222 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import json
+import logging
+
+import pytest
+from urlparse import parse_qs, urlparse
+
+from mediagoblin import mg_globals
+from mediagoblin.tools import template, pluginapi
+from mediagoblin.tests.tools import fixture_add_user
+
+
+_log = logging.getLogger(__name__)
+
+
+class TestOAuth(object):
+ @pytest.fixture(autouse=True)
+ def setup(self, test_app):
+ self.test_app = test_app
+
+ self.db = mg_globals.database
+
+ self.pman = pluginapi.PluginManager()
+
+ self.user_password = u'4cc355_70k3N'
+ self.user = fixture_add_user(u'joauth', self.user_password)
+
+ self.login()
+
+ def login(self):
+ self.test_app.post(
+ '/auth/login/', {
+ 'username': self.user.username,
+ 'password': self.user_password})
+
+ def register_client(self, name, client_type, description=None,
+ redirect_uri=''):
+ return self.test_app.post(
+ '/oauth/client/register', {
+ 'name': name,
+ 'description': description,
+ 'type': client_type,
+ 'redirect_uri': redirect_uri})
+
+ def get_context(self, template_name):
+ return template.TEMPLATE_TEST_CONTEXT[template_name]
+
+ def test_1_public_client_registration_without_redirect_uri(self):
+ ''' Test 'public' OAuth client registration without any redirect uri '''
+ response = self.register_client(
+ u'OMGOMGOMG', 'public', 'OMGOMG Apache License v2')
+
+ ctx = self.get_context('oauth/client/register.html')
+
+ client = self.db.OAuthClient.query.filter(
+ self.db.OAuthClient.name == u'OMGOMGOMG').first()
+
+ assert response.status_int == 200
+
+ # Should display an error
+ assert len(ctx['form'].redirect_uri.errors)
+
+ # Should not pass through
+ assert not client
+
+ def test_2_successful_public_client_registration(self):
+ ''' Successfully register a public client '''
+ uri = 'http://foo.example'
+ self.register_client(
+ u'OMGOMG', 'public', 'OMG!', uri)
+
+ client = self.db.OAuthClient.query.filter(
+ self.db.OAuthClient.name == u'OMGOMG').first()
+
+ # redirect_uri should be set
+ assert client.redirect_uri == uri
+
+ # Client should have been registered
+ assert client
+
+ def test_3_successful_confidential_client_reg(self):
+ ''' Register a confidential OAuth client '''
+ response = self.register_client(
+ u'GMOGMO', 'confidential', 'NO GMO!')
+
+ assert response.status_int == 302
+
+ client = self.db.OAuthClient.query.filter(
+ self.db.OAuthClient.name == u'GMOGMO').first()
+
+ # Client should have been registered
+ assert client
+
+ return client
+
+ def test_4_authorize_confidential_client(self):
+ ''' Authorize a confidential client as a logged in user '''
+ client = self.test_3_successful_confidential_client_reg()
+
+ client_identifier = client.identifier
+
+ redirect_uri = 'https://foo.example'
+ response = self.test_app.get('/oauth/authorize', {
+ 'client_id': client.identifier,
+ 'scope': 'all',
+ 'redirect_uri': redirect_uri})
+
+ # User-agent should NOT be redirected
+ assert response.status_int == 200
+
+ ctx = self.get_context('oauth/authorize.html')
+
+ form = ctx['form']
+
+ # Short for client authorization post reponse
+ capr = self.test_app.post(
+ '/oauth/client/authorize', {
+ 'client_id': form.client_id.data,
+ 'allow': 'Allow',
+ 'next': form.next.data})
+
+ assert capr.status_int == 302
+
+ authorization_response = capr.follow()
+
+ assert authorization_response.location.startswith(redirect_uri)
+
+ return authorization_response, client_identifier
+
+ def get_code_from_redirect_uri(self, uri):
+ ''' Get the value of ?code= from an URI '''
+ return parse_qs(urlparse(uri).query)['code'][0]
+
+ def test_token_endpoint_successful_confidential_request(self):
+ ''' Successful request against token endpoint '''
+ code_redirect, client_id = self.test_4_authorize_confidential_client()
+
+ code = self.get_code_from_redirect_uri(code_redirect.location)
+
+ client = self.db.OAuthClient.query.filter(
+ self.db.OAuthClient.identifier == unicode(client_id)).first()
+
+ token_res = self.test_app.get('/oauth/access_token?client_id={0}&\
+code={1}&client_secret={2}'.format(client_id, code, client.secret))
+
+ assert token_res.status_int == 200
+
+ token_data = json.loads(token_res.body)
+
+ assert not 'error' in token_data
+ assert 'access_token' in token_data
+ assert 'token_type' in token_data
+ assert 'expires_in' in token_data
+ assert type(token_data['expires_in']) == int
+ assert token_data['expires_in'] > 0
+
+ # There should be a refresh token provided in the token data
+ assert len(token_data['refresh_token'])
+
+ return client_id, token_data
+
+ def test_token_endpont_missing_id_confidential_request(self):
+ ''' Unsuccessful request against token endpoint, missing client_id '''
+ code_redirect, client_id = self.test_4_authorize_confidential_client()
+
+ code = self.get_code_from_redirect_uri(code_redirect.location)
+
+ client = self.db.OAuthClient.query.filter(
+ self.db.OAuthClient.identifier == unicode(client_id)).first()
+
+ token_res = self.test_app.get('/oauth/access_token?\
+code={0}&client_secret={1}'.format(code, client.secret))
+
+ assert token_res.status_int == 200
+
+ token_data = json.loads(token_res.body)
+
+ assert 'error' in token_data
+ assert not 'access_token' in token_data
+ assert token_data['error'] == 'invalid_request'
+ assert len(token_data['error_description'])
+
+ def test_refresh_token(self):
+ ''' Try to get a new access token using the refresh token '''
+ # Get an access token and a refresh token
+ client_id, token_data =\
+ self.test_token_endpoint_successful_confidential_request()
+
+ client = self.db.OAuthClient.query.filter(
+ self.db.OAuthClient.identifier == client_id).first()
+
+ token_res = self.test_app.get('/oauth/access_token',
+ {'refresh_token': token_data['refresh_token'],
+ 'client_id': client_id,
+ 'client_secret': client.secret
+ })
+
+ assert token_res.status_int == 200
+
+ new_token_data = json.loads(token_res.body)
+
+ assert not 'error' in new_token_data
+ assert 'access_token' in new_token_data
+ assert 'token_type' in new_token_data
+ assert 'expires_in' in new_token_data
+ assert type(new_token_data['expires_in']) == int
+ assert new_token_data['expires_in'] > 0
diff --git a/mediagoblin/tests/test_paste.ini b/mediagoblin/tests/test_paste.ini
new file mode 100644
index 00000000..a9595432
--- /dev/null
+++ b/mediagoblin/tests/test_paste.ini
@@ -0,0 +1,40 @@
+[DEFAULT]
+debug = true
+
+[composite:main]
+use = egg:Paste#urlmap
+/ = mediagoblin
+/mgoblin_media/ = publicstore_serve
+/test_static/ = mediagoblin_static
+/theme_static/ = theme_static
+/plugin_static/ = plugin_static
+
+[app:mediagoblin]
+use = egg:mediagoblin#app
+config = %(here)s/mediagoblin.ini
+
+[app:publicstore_serve]
+use = egg:Paste#static
+document_root = %(here)s/user_dev/media/public
+
+[app:mediagoblin_static]
+use = egg:Paste#static
+document_root = %(here)s/mediagoblin/static/
+
+[app:theme_static]
+use = egg:Paste#static
+document_root = %(here)s/user_dev/theme_static/
+cache_max_age = 86400
+
+[app:plugin_static]
+use = egg:Paste#static
+document_root = %(here)s/user_dev/plugin_static/
+cache_max_age = 86400
+
+[celery]
+CELERY_ALWAYS_EAGER = true
+
+[server:main]
+use = egg:Paste#http
+host = 127.0.0.1
+port = 6543
diff --git a/mediagoblin/tests/test_pdf.py b/mediagoblin/tests/test_pdf.py
new file mode 100644
index 00000000..b4d1940a
--- /dev/null
+++ b/mediagoblin/tests/test_pdf.py
@@ -0,0 +1,39 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2013 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import tempfile
+import shutil
+import os
+import pytest
+
+from mediagoblin.media_types.pdf.processing import (
+ pdf_info, check_prerequisites, create_pdf_thumb)
+from .resources import GOOD_PDF as GOOD
+
+
+@pytest.mark.skipif("not check_prerequisites()")
+def test_pdf():
+ good_dict = {'pdf_version_major': 1, 'pdf_title': '',
+ 'pdf_page_size_width': 612, 'pdf_author': '',
+ 'pdf_keywords': '', 'pdf_pages': 10,
+ 'pdf_producer': 'dvips + GNU Ghostscript 7.05',
+ 'pdf_version_minor': 3,
+ 'pdf_creator': 'LaTeX with hyperref package',
+ 'pdf_page_size_height': 792}
+ assert pdf_info(GOOD) == good_dict
+ temp_dir = tempfile.mkdtemp()
+ create_pdf_thumb(GOOD, os.path.join(temp_dir, 'good_256_256.png'), 256, 256)
+ shutil.rmtree(temp_dir)
diff --git a/mediagoblin/tests/test_piwigo.py b/mediagoblin/tests/test_piwigo.py
new file mode 100644
index 00000000..16ad0111
--- /dev/null
+++ b/mediagoblin/tests/test_piwigo.py
@@ -0,0 +1,71 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2013 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import pytest
+from .tools import fixture_add_user
+
+
+XML_PREFIX = "<?xml version='1.0' encoding='utf-8'?>\n"
+
+
+class Test_PWG(object):
+ @pytest.fixture(autouse=True)
+ def setup(self, test_app):
+ self.test_app = test_app
+
+ fixture_add_user()
+
+ self.username = u"chris"
+ self.password = "toast"
+
+ def do_post(self, method, params):
+ params["method"] = method
+ return self.test_app.post("/api/piwigo/ws.php", params)
+
+ def do_get(self, method, params=None):
+ if params is None:
+ params = {}
+ params["method"] = method
+ return self.test_app.get("/api/piwigo/ws.php", params)
+
+ def test_session(self):
+ resp = self.do_post("pwg.session.login",
+ {"username": u"nouser", "password": "wrong"})
+ assert resp.body == XML_PREFIX \
+ + '<rsp stat="fail"><err code="999" msg="Invalid username/password"/></rsp>'
+
+ resp = self.do_post("pwg.session.login",
+ {"username": self.username, "password": "wrong"})
+ assert resp.body == XML_PREFIX \
+ + '<rsp stat="fail"><err code="999" msg="Invalid username/password"/></rsp>'
+
+ resp = self.do_get("pwg.session.getStatus")
+ assert resp.body == XML_PREFIX \
+ + '<rsp stat="ok"><username>guest</username></rsp>'
+
+ resp = self.do_post("pwg.session.login",
+ {"username": self.username, "password": self.password})
+ assert resp.body == XML_PREFIX + '<rsp stat="ok">1</rsp>'
+
+ resp = self.do_get("pwg.session.getStatus")
+ assert resp.body == XML_PREFIX \
+ + '<rsp stat="ok"><username>chris</username></rsp>'
+
+ self.do_get("pwg.session.logout")
+
+ resp = self.do_get("pwg.session.getStatus")
+ assert resp.body == XML_PREFIX \
+ + '<rsp stat="ok"><username>guest</username></rsp>'
diff --git a/mediagoblin/tests/test_pluginapi.py b/mediagoblin/tests/test_pluginapi.py
new file mode 100644
index 00000000..eae0ce15
--- /dev/null
+++ b/mediagoblin/tests/test_pluginapi.py
@@ -0,0 +1,466 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import json
+import sys
+
+from configobj import ConfigObj
+import pytest
+import pkg_resources
+from validate import VdtTypeError
+
+from mediagoblin import mg_globals
+from mediagoblin.init.plugins import setup_plugins
+from mediagoblin.init.config import read_mediagoblin_config
+from mediagoblin.gmg_commands.assetlink import link_plugin_assets
+from mediagoblin.tools import pluginapi
+from mediagoblin.tests.tools import get_app
+from mediagoblin.tools.common import CollectingPrinter
+
+
+def with_cleanup(*modules_to_delete):
+ def _with_cleanup(fun):
+ """Wrapper that saves and restores mg_globals"""
+ def _with_cleanup_inner(*args, **kwargs):
+ old_app_config = mg_globals.app_config
+ old_global_config = mg_globals.global_config
+ # Need to delete icky modules before and after so as to make
+ # sure things work correctly.
+ for module in modules_to_delete:
+ try:
+ del sys.modules[module]
+ except KeyError:
+ pass
+ # The plugin cache gets populated as a side-effect of
+ # importing, so it's best to clear it before and after a test.
+ pman = pluginapi.PluginManager()
+ pman.clear()
+ try:
+ return fun(*args, **kwargs)
+ finally:
+ mg_globals.app_config = old_app_config
+ mg_globals.global_config = old_global_config
+ # Need to delete icky modules before and after so as to make
+ # sure things work correctly.
+ for module in modules_to_delete:
+ try:
+ del sys.modules[module]
+ except KeyError:
+ pass
+ pman.clear()
+
+ _with_cleanup_inner.__name__ = fun.__name__
+ return _with_cleanup_inner
+ return _with_cleanup
+
+
+def build_config(sections):
+ """Builds a ConfigObj object with specified data
+
+ :arg sections: list of ``(section_name, section_data,
+ subsection_list)`` tuples where section_data is a dict and
+ subsection_list is a list of ``(section_name, section_data,
+ subsection_list)``, ...
+
+ For example:
+
+ >>> build_config([
+ ... ('mediagoblin', {'key1': 'val1'}, []),
+ ... ('section2', {}, [
+ ... ('subsection1', {}, [])
+ ... ])
+ ... ])
+ """
+ cfg = ConfigObj()
+ cfg.filename = 'foo'
+ def _iter_section(cfg, section_list):
+ for section_name, data, subsection_list in section_list:
+ cfg[section_name] = data
+ _iter_section(cfg[section_name], subsection_list)
+
+ _iter_section(cfg, sections)
+ return cfg
+
+
+@with_cleanup()
+def test_no_plugins():
+ """Run setup_plugins with no plugins in config"""
+ cfg = build_config([('mediagoblin', {}, [])])
+ mg_globals.app_config = cfg['mediagoblin']
+ mg_globals.global_config = cfg
+
+ pman = pluginapi.PluginManager()
+ setup_plugins()
+
+ # Make sure we didn't load anything.
+ assert len(pman.plugins) == 0
+
+
+@with_cleanup('mediagoblin.plugins.sampleplugin')
+def test_one_plugin():
+ """Run setup_plugins with a single working plugin"""
+ cfg = build_config([
+ ('mediagoblin', {}, []),
+ ('plugins', {}, [
+ ('mediagoblin.plugins.sampleplugin', {}, [])
+ ])
+ ])
+
+ mg_globals.app_config = cfg['mediagoblin']
+ mg_globals.global_config = cfg
+
+ pman = pluginapi.PluginManager()
+ setup_plugins()
+
+ # Make sure we only found one plugin
+ assert len(pman.plugins) == 1
+ # Make sure the plugin is the one we think it is.
+ assert pman.plugins[0] == 'mediagoblin.plugins.sampleplugin'
+ # Make sure there was one hook registered
+ assert len(pman.hooks) == 1
+ # Make sure _setup_plugin_called was called once
+ import mediagoblin.plugins.sampleplugin
+ assert mediagoblin.plugins.sampleplugin._setup_plugin_called == 1
+
+
+@with_cleanup('mediagoblin.plugins.sampleplugin')
+def test_same_plugin_twice():
+ """Run setup_plugins with a single working plugin twice"""
+ cfg = build_config([
+ ('mediagoblin', {}, []),
+ ('plugins', {}, [
+ ('mediagoblin.plugins.sampleplugin', {}, []),
+ ('mediagoblin.plugins.sampleplugin', {}, []),
+ ])
+ ])
+
+ mg_globals.app_config = cfg['mediagoblin']
+ mg_globals.global_config = cfg
+
+ pman = pluginapi.PluginManager()
+ setup_plugins()
+
+ # Make sure we only found one plugin
+ assert len(pman.plugins) == 1
+ # Make sure the plugin is the one we think it is.
+ assert pman.plugins[0] == 'mediagoblin.plugins.sampleplugin'
+ # Make sure there was one hook registered
+ assert len(pman.hooks) == 1
+ # Make sure _setup_plugin_called was called once
+ import mediagoblin.plugins.sampleplugin
+ assert mediagoblin.plugins.sampleplugin._setup_plugin_called == 1
+
+
+@with_cleanup()
+def test_disabled_plugin():
+ """Run setup_plugins with a single working plugin twice"""
+ cfg = build_config([
+ ('mediagoblin', {}, []),
+ ('plugins', {}, [
+ ('-mediagoblin.plugins.sampleplugin', {}, []),
+ ])
+ ])
+
+ mg_globals.app_config = cfg['mediagoblin']
+ mg_globals.global_config = cfg
+
+ pman = pluginapi.PluginManager()
+ setup_plugins()
+
+ # Make sure we didn't load the plugin
+ assert len(pman.plugins) == 0
+
+
+CONFIG_ALL_CALLABLES = [
+ ('mediagoblin', {}, []),
+ ('plugins', {}, [
+ ('mediagoblin.tests.testplugins.callables1', {}, []),
+ ('mediagoblin.tests.testplugins.callables2', {}, []),
+ ('mediagoblin.tests.testplugins.callables3', {}, []),
+ ])
+ ]
+
+
+@with_cleanup()
+def test_hook_handle():
+ """
+ Test the hook_handle method
+ """
+ cfg = build_config(CONFIG_ALL_CALLABLES)
+
+ mg_globals.app_config = cfg['mediagoblin']
+ mg_globals.global_config = cfg
+
+ setup_plugins()
+
+ # Just one hook provided
+ call_log = []
+ assert pluginapi.hook_handle(
+ "just_one", call_log) == "Called just once"
+ assert call_log == ["expect this one call"]
+
+ # Nothing provided and unhandled not okay
+ call_log = []
+ pluginapi.hook_handle(
+ "nothing_handling", call_log) == None
+ assert call_log == []
+
+ # Nothing provided and unhandled okay
+ call_log = []
+ assert pluginapi.hook_handle(
+ "nothing_handling", call_log, unhandled_okay=True) is None
+ assert call_log == []
+
+ # Multiple provided, go with the first!
+ call_log = []
+ assert pluginapi.hook_handle(
+ "multi_handle", call_log) == "the first returns"
+ assert call_log == ["Hi, I'm the first"]
+
+ # Multiple provided, one has CantHandleIt
+ call_log = []
+ assert pluginapi.hook_handle(
+ "multi_handle_with_canthandle",
+ call_log) == "the second returns"
+ assert call_log == ["Hi, I'm the second"]
+
+
+@with_cleanup()
+def test_hook_runall():
+ """
+ Test the hook_runall method
+ """
+ cfg = build_config(CONFIG_ALL_CALLABLES)
+
+ mg_globals.app_config = cfg['mediagoblin']
+ mg_globals.global_config = cfg
+
+ setup_plugins()
+
+ # Just one hook, check results
+ call_log = []
+ assert pluginapi.hook_runall(
+ "just_one", call_log) == ["Called just once"]
+ assert call_log == ["expect this one call"]
+
+ # None provided, check results
+ call_log = []
+ assert pluginapi.hook_runall(
+ "nothing_handling", call_log) == []
+ assert call_log == []
+
+ # Multiple provided, check results
+ call_log = []
+ assert pluginapi.hook_runall(
+ "multi_handle", call_log) == [
+ "the first returns",
+ "the second returns",
+ "the third returns",
+ ]
+ assert call_log == [
+ "Hi, I'm the first",
+ "Hi, I'm the second",
+ "Hi, I'm the third"]
+
+ # Multiple provided, one has CantHandleIt, check results
+ call_log = []
+ assert pluginapi.hook_runall(
+ "multi_handle_with_canthandle", call_log) == [
+ "the second returns",
+ "the third returns",
+ ]
+ assert call_log == [
+ "Hi, I'm the second",
+ "Hi, I'm the third"]
+
+
+@with_cleanup()
+def test_hook_transform():
+ """
+ Test the hook_transform method
+ """
+ cfg = build_config(CONFIG_ALL_CALLABLES)
+
+ mg_globals.app_config = cfg['mediagoblin']
+ mg_globals.global_config = cfg
+
+ setup_plugins()
+
+ assert pluginapi.hook_transform(
+ "expand_tuple", (-1, 0)) == (-1, 0, 1, 2, 3)
+
+
+def test_plugin_config():
+ """
+ Make sure plugins can set up their own config
+ """
+ config, validation_result = read_mediagoblin_config(
+ pkg_resources.resource_filename(
+ 'mediagoblin.tests', 'appconfig_plugin_specs.ini'))
+
+ pluginspec_section = config['plugins'][
+ 'mediagoblin.tests.testplugins.pluginspec']
+ assert pluginspec_section['some_string'] == 'not blork'
+ assert pluginspec_section['dont_change_me'] == 'still the default'
+
+ # Make sure validation works... this should be an error
+ assert isinstance(
+ validation_result[
+ 'plugins'][
+ 'mediagoblin.tests.testplugins.pluginspec'][
+ 'some_int'],
+ VdtTypeError)
+
+ # the callables thing shouldn't really have anything though.
+ assert len(config['plugins'][
+ 'mediagoblin.tests.testplugins.callables1']) == 0
+
+
+@pytest.fixture()
+def context_modified_app(request):
+ """
+ Get a MediaGoblin app fixture using appconfig_context_modified.ini
+ """
+ return get_app(
+ request,
+ mgoblin_config=pkg_resources.resource_filename(
+ 'mediagoblin.tests', 'appconfig_context_modified.ini'))
+
+
+def test_modify_context(context_modified_app):
+ """
+ Test that we can modify both the view/template specific and
+ global contexts for templates.
+ """
+ # Specific thing passed into a page
+ result = context_modified_app.get("/modify_context/specific/")
+ assert result.body.strip() == """Specific page!
+
+specific thing: in yer specificpage
+global thing: globally appended!
+something: orother
+doubleme: happyhappy"""
+
+ # General test, should have global context variable only
+ result = context_modified_app.get("/modify_context/")
+ assert result.body.strip() == """General page!
+
+global thing: globally appended!
+lol: cats
+doubleme: joyjoy"""
+
+
+@pytest.fixture()
+def static_plugin_app(request):
+ """
+ Get a MediaGoblin app fixture using appconfig_static_plugin.ini
+ """
+ return get_app(
+ request,
+ mgoblin_config=pkg_resources.resource_filename(
+ 'mediagoblin.tests', 'appconfig_static_plugin.ini'))
+
+
+def test_plugin_assetlink(static_plugin_app):
+ """
+ Test that the assetlink command works correctly
+ """
+ linked_assets_dir = mg_globals.app_config['plugin_linked_assets_dir']
+ plugin_link_dir = os.path.join(
+ linked_assets_dir.rstrip(os.path.sep),
+ 'staticstuff')
+
+ plugin_statics = pluginapi.hook_runall("static_setup")
+ assert len(plugin_statics) == 1
+ plugin_static = plugin_statics[0]
+
+ def run_assetlink():
+ printer = CollectingPrinter()
+
+ link_plugin_assets(
+ plugin_static, linked_assets_dir, printer)
+
+ return printer
+
+ # it shouldn't exist yet
+ assert not os.path.lexists(plugin_link_dir)
+
+ # link dir doesn't exist, link it
+ result = run_assetlink().collection[0]
+ assert result == \
+ 'Linked asset directory for plugin "staticstuff":\n %s\nto:\n %s\n' % (
+ plugin_static.file_path.rstrip(os.path.sep),
+ plugin_link_dir)
+ assert os.path.lexists(plugin_link_dir)
+ assert os.path.islink(plugin_link_dir)
+ assert os.path.realpath(plugin_link_dir) == plugin_static.file_path
+
+ # link dir exists, leave it alone
+ # (and it should exist still since we just ran it..)
+ result = run_assetlink().collection[0]
+ assert result == 'Skipping "staticstuff"; already set up.\n'
+ assert os.path.lexists(plugin_link_dir)
+ assert os.path.islink(plugin_link_dir)
+ assert os.path.realpath(plugin_link_dir) == plugin_static.file_path
+
+ # link dir exists, is a symlink to somewhere else (re-link)
+ junk_file_path = os.path.join(
+ linked_assets_dir.rstrip(os.path.sep),
+ 'junk.txt')
+ with file(junk_file_path, 'w') as junk_file:
+ junk_file.write('barf')
+
+ os.unlink(plugin_link_dir)
+ os.symlink(junk_file_path, plugin_link_dir)
+
+ result = run_assetlink().combined_string
+ assert result == """Old link found for "staticstuff"; removing.
+Linked asset directory for plugin "staticstuff":
+ %s
+to:
+ %s
+""" % (plugin_static.file_path.rstrip(os.path.sep), plugin_link_dir)
+ assert os.path.lexists(plugin_link_dir)
+ assert os.path.islink(plugin_link_dir)
+ assert os.path.realpath(plugin_link_dir) == plugin_static.file_path
+
+ # link dir exists, but is a non-symlink
+ os.unlink(plugin_link_dir)
+ with file(plugin_link_dir, 'w') as clobber_file:
+ clobber_file.write('clobbered!')
+
+ result = run_assetlink().collection[0]
+ assert result == 'Could not link "staticstuff": %s exists and is not a symlink\n' % (
+ plugin_link_dir)
+
+ with file(plugin_link_dir, 'r') as clobber_file:
+ assert clobber_file.read() == 'clobbered!'
+
+
+def test_plugin_staticdirect(static_plugin_app):
+ """
+ Test that the staticdirect utilities pull up the right things
+ """
+ result = json.loads(
+ static_plugin_app.get('/staticstuff/').body)
+
+ assert len(result) == 2
+
+ assert result['mgoblin_bunny_pic'] == '/test_static/images/bunny_pic.png'
+ assert result['plugin_bunny_css'] == \
+ '/plugin_static/staticstuff/css/bunnify.css'
+
diff --git a/mediagoblin/tests/test_processing.py b/mediagoblin/tests/test_processing.py
new file mode 100644
index 00000000..591add96
--- /dev/null
+++ b/mediagoblin/tests/test_processing.py
@@ -0,0 +1,18 @@
+#!/usr/bin/env python
+
+from mediagoblin import processing
+
+class TestProcessing(object):
+ def run_fill(self, input, format, output=None):
+ builder = processing.FilenameBuilder(input)
+ result = builder.fill(format)
+ if output is None:
+ return result
+ assert output == result
+
+ def test_easy_filename_fill(self):
+ self.run_fill('/home/user/foo.TXT', '{basename}bar{ext}', 'foobar.txt')
+
+ def test_long_filename_fill(self):
+ self.run_fill('{0}.png'.format('A' * 300), 'image-{basename}{ext}',
+ 'image-{0}.png'.format('A' * 245))
diff --git a/mediagoblin/tests/test_session.py b/mediagoblin/tests/test_session.py
new file mode 100644
index 00000000..78d790eb
--- /dev/null
+++ b/mediagoblin/tests/test_session.py
@@ -0,0 +1,30 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+from mediagoblin.tools import session
+
+def test_session():
+ sess = session.Session()
+ assert not sess
+ assert not sess.is_updated()
+ sess['user_id'] = 27
+ assert sess
+ assert not sess.is_updated()
+ sess.save()
+ assert sess.is_updated()
+ sess.delete()
+ assert not sess
+ assert sess.is_updated()
diff --git a/mediagoblin/tests/test_sql_migrations.py b/mediagoblin/tests/test_sql_migrations.py
new file mode 100644
index 00000000..2fc4c043
--- /dev/null
+++ b/mediagoblin/tests/test_sql_migrations.py
@@ -0,0 +1,896 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2012, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import copy
+
+from sqlalchemy import (
+ Table, Column, MetaData, Index,
+ Integer, Float, Unicode, UnicodeText, DateTime, Boolean,
+ ForeignKey, UniqueConstraint, PickleType, VARCHAR)
+from sqlalchemy.orm import sessionmaker, relationship
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.sql import select, insert
+from migrate import changeset
+
+from mediagoblin.db.base import GMGTableBase
+from mediagoblin.db.migration_tools import MigrationManager, RegisterMigration
+from mediagoblin.tools.common import CollectingPrinter
+
+
+# This one will get filled with local migrations
+FULL_MIGRATIONS = {}
+
+
+#######################################################
+# Migration set 1: Define initial models, no migrations
+#######################################################
+
+Base1 = declarative_base(cls=GMGTableBase)
+
+class Creature1(Base1):
+ __tablename__ = "creature"
+
+ id = Column(Integer, primary_key=True)
+ name = Column(Unicode, unique=True, nullable=False, index=True)
+ num_legs = Column(Integer, nullable=False)
+ is_demon = Column(Boolean)
+
+class Level1(Base1):
+ __tablename__ = "level"
+
+ id = Column(Unicode, primary_key=True)
+ name = Column(Unicode)
+ description = Column(Unicode)
+ exits = Column(PickleType)
+
+SET1_MODELS = [Creature1, Level1]
+
+SET1_MIGRATIONS = {}
+
+#######################################################
+# Migration set 2: A few migrations and new model
+#######################################################
+
+Base2 = declarative_base(cls=GMGTableBase)
+
+class Creature2(Base2):
+ __tablename__ = "creature"
+
+ id = Column(Integer, primary_key=True)
+ name = Column(Unicode, unique=True, nullable=False, index=True)
+ num_legs = Column(Integer, nullable=False)
+ magical_powers = relationship("CreaturePower2")
+
+class CreaturePower2(Base2):
+ __tablename__ = "creature_power"
+
+ id = Column(Integer, primary_key=True)
+ creature = Column(
+ Integer, ForeignKey('creature.id'), nullable=False)
+ name = Column(Unicode)
+ description = Column(Unicode)
+ hitpower = Column(Integer, nullable=False)
+
+class Level2(Base2):
+ __tablename__ = "level"
+
+ id = Column(Unicode, primary_key=True)
+ name = Column(Unicode)
+ description = Column(Unicode)
+
+class LevelExit2(Base2):
+ __tablename__ = "level_exit"
+
+ id = Column(Integer, primary_key=True)
+ name = Column(Unicode)
+ from_level = Column(
+ Unicode, ForeignKey('level.id'), nullable=False)
+ to_level = Column(
+ Unicode, ForeignKey('level.id'), nullable=False)
+
+SET2_MODELS = [Creature2, CreaturePower2, Level2, LevelExit2]
+
+
+@RegisterMigration(1, FULL_MIGRATIONS)
+def creature_remove_is_demon(db_conn):
+ """
+ Remove the is_demon field from the creature model. We don't need
+ it!
+ """
+ # :( Commented out 'cuz of:
+ # http://code.google.com/p/sqlalchemy-migrate/issues/detail?id=143&thanks=143&ts=1327882242
+
+ # metadata = MetaData(bind=db_conn.bind)
+ # creature_table = Table(
+ # 'creature', metadata,
+ # autoload=True, autoload_with=db_conn.bind)
+ # creature_table.drop_column('is_demon')
+ pass
+
+
+@RegisterMigration(2, FULL_MIGRATIONS)
+def creature_powers_new_table(db_conn):
+ """
+ Add a new table for creature powers. Nothing needs to go in it
+ yet though as there wasn't anything that previously held this
+ information
+ """
+ metadata = MetaData(bind=db_conn.bind)
+
+ # We have to access the creature table so sqlalchemy can make the
+ # foreign key relationship
+ creature_table = Table(
+ 'creature', metadata,
+ autoload=True, autoload_with=db_conn.bind)
+
+ creature_powers = Table(
+ 'creature_power', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('creature',
+ Integer, ForeignKey('creature.id'), nullable=False),
+ Column('name', Unicode),
+ Column('description', Unicode),
+ Column('hitpower', Integer, nullable=False))
+ metadata.create_all(db_conn.bind)
+
+
+@RegisterMigration(3, FULL_MIGRATIONS)
+def level_exits_new_table(db_conn):
+ """
+ Make a new table for level exits and move the previously pickled
+ stuff over to here (then drop the old unneeded table)
+ """
+ # First, create the table
+ # -----------------------
+ metadata = MetaData(bind=db_conn.bind)
+
+ # Minimal representation of level table.
+ # Not auto-introspecting here because of pickle table. I'm not
+ # sure sqlalchemy can auto-introspect pickle columns.
+ levels = Table(
+ 'level', metadata,
+ Column('id', Unicode, primary_key=True),
+ Column('name', Unicode),
+ Column('description', Unicode),
+ Column('exits', PickleType))
+
+ level_exits = Table(
+ 'level_exit', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('name', Unicode),
+ Column('from_level',
+ Unicode, ForeignKey('level.id'), nullable=False),
+ Column('to_level',
+ Unicode, ForeignKey('level.id'), nullable=False))
+ metadata.create_all(db_conn.bind)
+
+ # And now, convert all the old exit pickles to new level exits
+ # ------------------------------------------------------------
+
+ # query over and insert
+ result = db_conn.execute(
+ select([levels], levels.c.exits!=None))
+
+ for level in result:
+
+ for exit_name, to_level in level['exits'].iteritems():
+ # Insert the level exit
+ db_conn.execute(
+ level_exits.insert().values(
+ name=exit_name,
+ from_level=level.id,
+ to_level=to_level))
+
+ # Finally, drop the old level exits pickle table
+ # ----------------------------------------------
+ levels.drop_column('exits')
+
+
+# A hack! At this point we freeze-fame and get just a partial list of
+# migrations
+
+SET2_MIGRATIONS = copy.copy(FULL_MIGRATIONS)
+
+#######################################################
+# Migration set 3: Final migrations
+#######################################################
+
+Base3 = declarative_base(cls=GMGTableBase)
+
+class Creature3(Base3):
+ __tablename__ = "creature"
+
+ id = Column(Integer, primary_key=True)
+ name = Column(Unicode, unique=True, nullable=False, index=True)
+ num_limbs= Column(Integer, nullable=False)
+ magical_powers = relationship("CreaturePower3")
+
+class CreaturePower3(Base3):
+ __tablename__ = "creature_power"
+
+ id = Column(Integer, primary_key=True)
+ creature = Column(
+ Integer, ForeignKey('creature.id'), nullable=False, index=True)
+ name = Column(Unicode)
+ description = Column(Unicode)
+ hitpower = Column(Float, nullable=False)
+
+class Level3(Base3):
+ __tablename__ = "level"
+
+ id = Column(Unicode, primary_key=True)
+ name = Column(Unicode)
+ description = Column(Unicode)
+
+class LevelExit3(Base3):
+ __tablename__ = "level_exit"
+
+ id = Column(Integer, primary_key=True)
+ name = Column(Unicode)
+ from_level = Column(
+ Unicode, ForeignKey('level.id'), nullable=False, index=True)
+ to_level = Column(
+ Unicode, ForeignKey('level.id'), nullable=False, index=True)
+
+
+SET3_MODELS = [Creature3, CreaturePower3, Level3, LevelExit3]
+SET3_MIGRATIONS = FULL_MIGRATIONS
+
+
+@RegisterMigration(4, FULL_MIGRATIONS)
+def creature_num_legs_to_num_limbs(db_conn):
+ """
+ Turns out we're tracking all sorts of limbs, not "legs"
+ specifically. Humans would be 4 here, for instance. So we
+ renamed the column.
+ """
+ metadata = MetaData(bind=db_conn.bind)
+ creature_table = Table(
+ 'creature', metadata,
+ autoload=True, autoload_with=db_conn.bind)
+ creature_table.c.num_legs.alter(name=u"num_limbs")
+
+
+@RegisterMigration(5, FULL_MIGRATIONS)
+def level_exit_index_from_and_to_level(db_conn):
+ """
+ Index the from and to levels of the level exit table.
+ """
+ metadata = MetaData(bind=db_conn.bind)
+ level_exit = Table(
+ 'level_exit', metadata,
+ autoload=True, autoload_with=db_conn.bind)
+ Index('ix_level_exit_from_level',
+ level_exit.c.from_level).create(db_conn.bind)
+ Index('ix_level_exit_to_level',
+ level_exit.c.to_level).create(db_conn.bind)
+
+
+@RegisterMigration(6, FULL_MIGRATIONS)
+def creature_power_index_creature(db_conn):
+ """
+ Index our foreign key relationship to the creatures
+ """
+ metadata = MetaData(bind=db_conn.bind)
+ creature_power = Table(
+ 'creature_power', metadata,
+ autoload=True, autoload_with=db_conn.bind)
+ Index('ix_creature_power_creature',
+ creature_power.c.creature).create(db_conn.bind)
+
+
+@RegisterMigration(7, FULL_MIGRATIONS)
+def creature_power_hitpower_to_float(db_conn):
+ """
+ Convert hitpower column on creature power table from integer to
+ float.
+
+ Turns out we want super precise values of how much hitpower there
+ really is.
+ """
+ metadata = MetaData(bind=db_conn.bind)
+
+ # We have to access the creature table so sqlalchemy can make the
+ # foreign key relationship
+ creature_table = Table(
+ 'creature', metadata,
+ autoload=True, autoload_with=db_conn.bind)
+
+ creature_power = Table(
+ 'creature_power', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('creature', Integer,
+ ForeignKey('creature.id'), nullable=False,
+ index=True),
+ Column('name', Unicode),
+ Column('description', Unicode),
+ Column('hitpower', Integer, nullable=False))
+
+ creature_power.c.hitpower.alter(type=Float)
+
+
+@RegisterMigration(8, FULL_MIGRATIONS)
+def creature_power_name_creature_unique(db_conn):
+ """
+ Add a unique constraint to name and creature on creature_power.
+
+ We don't want multiple creature powers with the same name per creature!
+ """
+ # Note: We don't actually check to see if this constraint is set
+ # up because at present there's no way to do so in sqlalchemy :\
+
+ metadata = MetaData(bind=db_conn.bind)
+
+ creature_power = Table(
+ 'creature_power', metadata,
+ autoload=True, autoload_with=db_conn.bind)
+
+ cons = changeset.constraint.UniqueConstraint(
+ 'name', 'creature', table=creature_power)
+
+ cons.create()
+
+
+def _insert_migration1_objects(session):
+ """
+ Test objects to insert for the first set of things
+ """
+ # Insert creatures
+ session.add_all(
+ [Creature1(name=u'centipede',
+ num_legs=100,
+ is_demon=False),
+ Creature1(name=u'wolf',
+ num_legs=4,
+ is_demon=False),
+ # don't ask me what a wizardsnake is.
+ Creature1(name=u'wizardsnake',
+ num_legs=0,
+ is_demon=True)])
+
+ # Insert levels
+ session.add_all(
+ [Level1(id=u'necroplex',
+ name=u'The Necroplex',
+ description=u'A complex full of pure deathzone.',
+ exits={
+ u'deathwell': u'evilstorm',
+ u'portal': u'central_park'}),
+ Level1(id=u'evilstorm',
+ name=u'Evil Storm',
+ description=u'A storm full of pure evil.',
+ exits={}), # you can't escape the evilstorm
+ Level1(id=u'central_park',
+ name=u'Central Park, NY, NY',
+ description=u"New York's friendly Central Park.",
+ exits={
+ u'portal': u'necroplex'})])
+
+ session.commit()
+
+
+def _insert_migration2_objects(session):
+ """
+ Test objects to insert for the second set of things
+ """
+ # Insert creatures
+ session.add_all(
+ [Creature2(
+ name=u'centipede',
+ num_legs=100),
+ Creature2(
+ name=u'wolf',
+ num_legs=4,
+ magical_powers = [
+ CreaturePower2(
+ name=u"ice breath",
+ description=u"A blast of icy breath!",
+ hitpower=20),
+ CreaturePower2(
+ name=u"death stare",
+ description=u"A frightening stare, for sure!",
+ hitpower=45)]),
+ Creature2(
+ name=u'wizardsnake',
+ num_legs=0,
+ magical_powers=[
+ CreaturePower2(
+ name=u'death_rattle',
+ description=u'A rattle... of DEATH!',
+ hitpower=1000),
+ CreaturePower2(
+ name=u'sneaky_stare',
+ description=u"The sneakiest stare you've ever seen!",
+ hitpower=300),
+ CreaturePower2(
+ name=u'slithery_smoke',
+ description=u"A blast of slithery, slithery smoke.",
+ hitpower=10),
+ CreaturePower2(
+ name=u'treacherous_tremors',
+ description=u"The ground shakes beneath footed animals!",
+ hitpower=0)])])
+
+ # Insert levels
+ session.add_all(
+ [Level2(id=u'necroplex',
+ name=u'The Necroplex',
+ description=u'A complex full of pure deathzone.'),
+ Level2(id=u'evilstorm',
+ name=u'Evil Storm',
+ description=u'A storm full of pure evil.',
+ exits=[]), # you can't escape the evilstorm
+ Level2(id=u'central_park',
+ name=u'Central Park, NY, NY',
+ description=u"New York's friendly Central Park.")])
+
+ # necroplex exits
+ session.add_all(
+ [LevelExit2(name=u'deathwell',
+ from_level=u'necroplex',
+ to_level=u'evilstorm'),
+ LevelExit2(name=u'portal',
+ from_level=u'necroplex',
+ to_level=u'central_park')])
+
+ # there are no evilstorm exits because there is no exit from the
+ # evilstorm
+
+ # central park exits
+ session.add_all(
+ [LevelExit2(name=u'portal',
+ from_level=u'central_park',
+ to_level=u'necroplex')])
+
+ session.commit()
+
+
+def _insert_migration3_objects(session):
+ """
+ Test objects to insert for the third set of things
+ """
+ # Insert creatures
+ session.add_all(
+ [Creature3(
+ name=u'centipede',
+ num_limbs=100),
+ Creature3(
+ name=u'wolf',
+ num_limbs=4,
+ magical_powers = [
+ CreaturePower3(
+ name=u"ice breath",
+ description=u"A blast of icy breath!",
+ hitpower=20.0),
+ CreaturePower3(
+ name=u"death stare",
+ description=u"A frightening stare, for sure!",
+ hitpower=45.0)]),
+ Creature3(
+ name=u'wizardsnake',
+ num_limbs=0,
+ magical_powers=[
+ CreaturePower3(
+ name=u'death_rattle',
+ description=u'A rattle... of DEATH!',
+ hitpower=1000.0),
+ CreaturePower3(
+ name=u'sneaky_stare',
+ description=u"The sneakiest stare you've ever seen!",
+ hitpower=300.0),
+ CreaturePower3(
+ name=u'slithery_smoke',
+ description=u"A blast of slithery, slithery smoke.",
+ hitpower=10.0),
+ CreaturePower3(
+ name=u'treacherous_tremors',
+ description=u"The ground shakes beneath footed animals!",
+ hitpower=0.0)])],
+ # annnnnd one more to test a floating point hitpower
+ Creature3(
+ name=u'deity',
+ numb_limbs=30,
+ magical_powers=[
+ CreaturePower3(
+ name=u'smite',
+ description=u'Smitten by holy wrath!',
+ hitpower=9999.9)]))
+
+ # Insert levels
+ session.add_all(
+ [Level3(id=u'necroplex',
+ name=u'The Necroplex',
+ description=u'A complex full of pure deathzone.'),
+ Level3(id=u'evilstorm',
+ name=u'Evil Storm',
+ description=u'A storm full of pure evil.',
+ exits=[]), # you can't escape the evilstorm
+ Level3(id=u'central_park',
+ name=u'Central Park, NY, NY',
+ description=u"New York's friendly Central Park.")])
+
+ # necroplex exits
+ session.add_all(
+ [LevelExit3(name=u'deathwell',
+ from_level=u'necroplex',
+ to_level=u'evilstorm'),
+ LevelExit3(name=u'portal',
+ from_level=u'necroplex',
+ to_level=u'central_park')])
+
+ # there are no evilstorm exits because there is no exit from the
+ # evilstorm
+
+ # central park exits
+ session.add_all(
+ [LevelExit3(name=u'portal',
+ from_level=u'central_park',
+ to_level=u'necroplex')])
+
+ session.commit()
+
+
+def create_test_engine():
+ from sqlalchemy import create_engine
+ engine = create_engine('sqlite:///:memory:', echo=False)
+ Session = sessionmaker(bind=engine)
+ return engine, Session
+
+
+def assert_col_type(column, this_class):
+ assert isinstance(column.type, this_class)
+
+
+def _get_level3_exits(session, level):
+ return dict(
+ [(level_exit.name, level_exit.to_level)
+ for level_exit in
+ session.query(LevelExit3).filter_by(from_level=level.id)])
+
+
+def test_set1_to_set3():
+ # Create / connect to database
+ # ----------------------------
+
+ engine, Session = create_test_engine()
+
+ # Create tables by migrating on empty initial set
+ # -----------------------------------------------
+
+ printer = CollectingPrinter()
+ migration_manager = MigrationManager(
+ u'__main__', SET1_MODELS, SET1_MIGRATIONS, Session(),
+ printer)
+
+ # Check latest migration and database current migration
+ assert migration_manager.latest_migration == 0
+ assert migration_manager.database_current_migration == None
+
+ result = migration_manager.init_or_migrate()
+
+ # Make sure output was "inited"
+ assert result == u'inited'
+ # Check output
+ assert printer.combined_string == (
+ "-> Initializing main mediagoblin tables... done.\n")
+ # Check version in database
+ assert migration_manager.latest_migration == 0
+ assert migration_manager.database_current_migration == 0
+
+ # Install the initial set
+ # -----------------------
+
+ _insert_migration1_objects(Session())
+
+ # Try to "re-migrate" with same manager settings... nothing should happen
+ migration_manager = MigrationManager(
+ u'__main__', SET1_MODELS, SET1_MIGRATIONS, Session(),
+ printer)
+ assert migration_manager.init_or_migrate() == None
+
+ # Check version in database
+ assert migration_manager.latest_migration == 0
+ assert migration_manager.database_current_migration == 0
+
+ # Sanity check a few things in the database...
+ metadata = MetaData(bind=engine)
+
+ # Check the structure of the creature table
+ creature_table = Table(
+ 'creature', metadata,
+ autoload=True, autoload_with=engine)
+ assert set(creature_table.c.keys()) == set(
+ ['id', 'name', 'num_legs', 'is_demon'])
+ assert_col_type(creature_table.c.id, Integer)
+ assert_col_type(creature_table.c.name, VARCHAR)
+ assert creature_table.c.name.nullable is False
+ #assert creature_table.c.name.index is True
+ #assert creature_table.c.name.unique is True
+ assert_col_type(creature_table.c.num_legs, Integer)
+ assert creature_table.c.num_legs.nullable is False
+ assert_col_type(creature_table.c.is_demon, Boolean)
+
+ # Check the structure of the level table
+ level_table = Table(
+ 'level', metadata,
+ autoload=True, autoload_with=engine)
+ assert set(level_table.c.keys()) == set(
+ ['id', 'name', 'description', 'exits'])
+ assert_col_type(level_table.c.id, VARCHAR)
+ assert level_table.c.id.primary_key is True
+ assert_col_type(level_table.c.name, VARCHAR)
+ assert_col_type(level_table.c.description, VARCHAR)
+ # Skipping exits... Not sure if we can detect pickletype, not a
+ # big deal regardless.
+
+ # Now check to see if stuff seems to be in there.
+ session = Session()
+
+ creature = session.query(Creature1).filter_by(
+ name=u'centipede').one()
+ assert creature.num_legs == 100
+ assert creature.is_demon == False
+
+ creature = session.query(Creature1).filter_by(
+ name=u'wolf').one()
+ assert creature.num_legs == 4
+ assert creature.is_demon == False
+
+ creature = session.query(Creature1).filter_by(
+ name=u'wizardsnake').one()
+ assert creature.num_legs == 0
+ assert creature.is_demon == True
+
+ level = session.query(Level1).filter_by(
+ id=u'necroplex').one()
+ assert level.name == u'The Necroplex'
+ assert level.description == u'A complex full of pure deathzone.'
+ assert level.exits == {
+ 'deathwell': 'evilstorm',
+ 'portal': 'central_park'}
+
+ level = session.query(Level1).filter_by(
+ id=u'evilstorm').one()
+ assert level.name == u'Evil Storm'
+ assert level.description == u'A storm full of pure evil.'
+ assert level.exits == {} # You still can't escape the evilstorm!
+
+ level = session.query(Level1).filter_by(
+ id=u'central_park').one()
+ assert level.name == u'Central Park, NY, NY'
+ assert level.description == u"New York's friendly Central Park."
+ assert level.exits == {
+ 'portal': 'necroplex'}
+
+ # Create new migration manager, but make sure the db migration
+ # isn't said to be updated yet
+ printer = CollectingPrinter()
+ migration_manager = MigrationManager(
+ u'__main__', SET3_MODELS, SET3_MIGRATIONS, Session(),
+ printer)
+
+ assert migration_manager.latest_migration == 8
+ assert migration_manager.database_current_migration == 0
+
+ # Migrate
+ result = migration_manager.init_or_migrate()
+
+ # Make sure result was "migrated"
+ assert result == u'migrated'
+
+ # TODO: Check output to user
+ assert printer.combined_string == """\
+-> Updating main mediagoblin tables:
+ + Running migration 1, "creature_remove_is_demon"... done.
+ + Running migration 2, "creature_powers_new_table"... done.
+ + Running migration 3, "level_exits_new_table"... done.
+ + Running migration 4, "creature_num_legs_to_num_limbs"... done.
+ + Running migration 5, "level_exit_index_from_and_to_level"... done.
+ + Running migration 6, "creature_power_index_creature"... done.
+ + Running migration 7, "creature_power_hitpower_to_float"... done.
+ + Running migration 8, "creature_power_name_creature_unique"... done.
+"""
+
+ # Make sure version matches expected
+ migration_manager = MigrationManager(
+ u'__main__', SET3_MODELS, SET3_MIGRATIONS, Session(),
+ printer)
+ assert migration_manager.latest_migration == 8
+ assert migration_manager.database_current_migration == 8
+
+ # Check all things in database match expected
+
+ # Check the creature table
+ metadata = MetaData(bind=engine)
+ creature_table = Table(
+ 'creature', metadata,
+ autoload=True, autoload_with=engine)
+ # assert set(creature_table.c.keys()) == set(
+ # ['id', 'name', 'num_limbs'])
+ assert set(creature_table.c.keys()) == set(
+ [u'id', 'name', u'num_limbs', u'is_demon'])
+ assert_col_type(creature_table.c.id, Integer)
+ assert_col_type(creature_table.c.name, VARCHAR)
+ assert creature_table.c.name.nullable is False
+ #assert creature_table.c.name.index is True
+ #assert creature_table.c.name.unique is True
+ assert_col_type(creature_table.c.num_limbs, Integer)
+ assert creature_table.c.num_limbs.nullable is False
+
+ # Check the CreaturePower table
+ creature_power_table = Table(
+ 'creature_power', metadata,
+ autoload=True, autoload_with=engine)
+ assert set(creature_power_table.c.keys()) == set(
+ ['id', 'creature', 'name', 'description', 'hitpower'])
+ assert_col_type(creature_power_table.c.id, Integer)
+ assert_col_type(creature_power_table.c.creature, Integer)
+ assert creature_power_table.c.creature.nullable is False
+ assert_col_type(creature_power_table.c.name, VARCHAR)
+ assert_col_type(creature_power_table.c.description, VARCHAR)
+ assert_col_type(creature_power_table.c.hitpower, Float)
+ assert creature_power_table.c.hitpower.nullable is False
+
+ # Check the structure of the level table
+ level_table = Table(
+ 'level', metadata,
+ autoload=True, autoload_with=engine)
+ assert set(level_table.c.keys()) == set(
+ ['id', 'name', 'description'])
+ assert_col_type(level_table.c.id, VARCHAR)
+ assert level_table.c.id.primary_key is True
+ assert_col_type(level_table.c.name, VARCHAR)
+ assert_col_type(level_table.c.description, VARCHAR)
+
+ # Check the structure of the level_exits table
+ level_exit_table = Table(
+ 'level_exit', metadata,
+ autoload=True, autoload_with=engine)
+ assert set(level_exit_table.c.keys()) == set(
+ ['id', 'name', 'from_level', 'to_level'])
+ assert_col_type(level_exit_table.c.id, Integer)
+ assert_col_type(level_exit_table.c.name, VARCHAR)
+ assert_col_type(level_exit_table.c.from_level, VARCHAR)
+ assert level_exit_table.c.from_level.nullable is False
+ #assert level_exit_table.c.from_level.index is True
+ assert_col_type(level_exit_table.c.to_level, VARCHAR)
+ assert level_exit_table.c.to_level.nullable is False
+ #assert level_exit_table.c.to_level.index is True
+
+ # Now check to see if stuff seems to be in there.
+ session = Session()
+ creature = session.query(Creature3).filter_by(
+ name=u'centipede').one()
+ assert creature.num_limbs == 100.0
+ assert creature.magical_powers == []
+
+ creature = session.query(Creature3).filter_by(
+ name=u'wolf').one()
+ assert creature.num_limbs == 4.0
+ assert creature.magical_powers == []
+
+ creature = session.query(Creature3).filter_by(
+ name=u'wizardsnake').one()
+ assert creature.num_limbs == 0.0
+ assert creature.magical_powers == []
+
+ level = session.query(Level3).filter_by(
+ id=u'necroplex').one()
+ assert level.name == u'The Necroplex'
+ assert level.description == u'A complex full of pure deathzone.'
+ level_exits = _get_level3_exits(session, level)
+ assert level_exits == {
+ u'deathwell': u'evilstorm',
+ u'portal': u'central_park'}
+
+ level = session.query(Level3).filter_by(
+ id=u'evilstorm').one()
+ assert level.name == u'Evil Storm'
+ assert level.description == u'A storm full of pure evil.'
+ level_exits = _get_level3_exits(session, level)
+ assert level_exits == {} # You still can't escape the evilstorm!
+
+ level = session.query(Level3).filter_by(
+ id=u'central_park').one()
+ assert level.name == u'Central Park, NY, NY'
+ assert level.description == u"New York's friendly Central Park."
+ level_exits = _get_level3_exits(session, level)
+ assert level_exits == {
+ 'portal': 'necroplex'}
+
+
+#def test_set2_to_set3():
+ # Create / connect to database
+ # Create tables by migrating on empty initial set
+
+ # Install the initial set
+ # Check version in database
+ # Sanity check a few things in the database
+
+ # Migrate
+ # Make sure version matches expected
+ # Check all things in database match expected
+ # pass
+
+
+#def test_set1_to_set2_to_set3():
+ # Create / connect to database
+ # Create tables by migrating on empty initial set
+
+ # Install the initial set
+ # Check version in database
+ # Sanity check a few things in the database
+
+ # Migrate
+ # Make sure version matches expected
+ # Check all things in database match expected
+
+ # Migrate again
+ # Make sure version matches expected again
+ # Check all things in database match expected again
+
+ ##### Set2
+ # creature_table = Table(
+ # 'creature', metadata,
+ # autoload=True, autoload_with=db_conn.bind)
+ # assert set(creature_table.c.keys()) == set(
+ # ['id', 'name', 'num_legs'])
+ # assert_col_type(creature_table.c.id, Integer)
+ # assert_col_type(creature_table.c.name, VARCHAR)
+ # assert creature_table.c.name.nullable is False
+ # assert creature_table.c.name.index is True
+ # assert creature_table.c.name.unique is True
+ # assert_col_type(creature_table.c.num_legs, Integer)
+ # assert creature_table.c.num_legs.nullable is False
+
+ # # Check the CreaturePower table
+ # creature_power_table = Table(
+ # 'creature_power', metadata,
+ # autoload=True, autoload_with=db_conn.bind)
+ # assert set(creature_power_table.c.keys()) == set(
+ # ['id', 'creature', 'name', 'description', 'hitpower'])
+ # assert_col_type(creature_power_table.c.id, Integer)
+ # assert_col_type(creature_power_table.c.creature, Integer)
+ # assert creature_power_table.c.creature.nullable is False
+ # assert_col_type(creature_power_table.c.name, VARCHAR)
+ # assert_col_type(creature_power_table.c.description, VARCHAR)
+ # assert_col_type(creature_power_table.c.hitpower, Integer)
+ # assert creature_power_table.c.hitpower.nullable is False
+
+ # # Check the structure of the level table
+ # level_table = Table(
+ # 'level', metadata,
+ # autoload=True, autoload_with=db_conn.bind)
+ # assert set(level_table.c.keys()) == set(
+ # ['id', 'name', 'description'])
+ # assert_col_type(level_table.c.id, VARCHAR)
+ # assert level_table.c.id.primary_key is True
+ # assert_col_type(level_table.c.name, VARCHAR)
+ # assert_col_type(level_table.c.description, VARCHAR)
+
+ # # Check the structure of the level_exits table
+ # level_exit_table = Table(
+ # 'level_exit', metadata,
+ # autoload=True, autoload_with=db_conn.bind)
+ # assert set(level_exit_table.c.keys()) == set(
+ # ['id', 'name', 'from_level', 'to_level'])
+ # assert_col_type(level_exit_table.c.id, Integer)
+ # assert_col_type(level_exit_table.c.name, VARCHAR)
+ # assert_col_type(level_exit_table.c.from_level, VARCHAR)
+ # assert level_exit_table.c.from_level.nullable is False
+ # assert_col_type(level_exit_table.c.to_level, VARCHAR)
+
+ # pass
diff --git a/mediagoblin/tests/test_staticdirect.py b/mediagoblin/tests/test_staticdirect.py
new file mode 100644
index 00000000..3a9e2fd9
--- /dev/null
+++ b/mediagoblin/tests/test_staticdirect.py
@@ -0,0 +1,9 @@
+from mediagoblin.tools import staticdirect
+
+def test_staticdirect():
+ sdirect = staticdirect.StaticDirect(
+ {None: "/static/",
+ "theme": "http://example.org/themestatic"})
+ assert sdirect("css/monkeys.css") == "/static/css/monkeys.css"
+ assert sdirect("images/lollerskate.png", "theme") == \
+ "http://example.org/themestatic/images/lollerskate.png"
diff --git a/mediagoblin/tests/test_storage.py b/mediagoblin/tests/test_storage.py
new file mode 100644
index 00000000..f6f1d18f
--- /dev/null
+++ b/mediagoblin/tests/test_storage.py
@@ -0,0 +1,321 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+import os
+import tempfile
+
+import pytest
+from werkzeug.utils import secure_filename
+
+from mediagoblin import storage
+
+
+################
+# Test utilities
+################
+
+def test_clean_listy_filepath():
+ expected = [u'dir1', u'dir2', u'linooks.jpg']
+ assert storage.clean_listy_filepath(
+ ['dir1', 'dir2', 'linooks.jpg']) == expected
+
+ expected = [u'dir1', u'foo_.._nasty', u'linooks.jpg']
+ assert storage.clean_listy_filepath(
+ ['/dir1/', 'foo/../nasty', 'linooks.jpg']) == expected
+
+ expected = [u'etc', u'passwd']
+ assert storage.clean_listy_filepath(
+ ['../../../etc/', 'passwd']) == expected
+
+ with pytest.raises(storage.InvalidFilepath):
+ storage.clean_listy_filepath(['../../', 'linooks.jpg'])
+
+
+class FakeStorageSystem():
+ def __init__(self, foobie, blech, **kwargs):
+ self.foobie = foobie
+ self.blech = blech
+
+class FakeRemoteStorage(storage.filestorage.BasicFileStorage):
+ # Theoretically despite this, all the methods should work but it
+ # should force copying to the workbench
+ local_storage = False
+
+ def copy_local_to_storage(self, *args, **kwargs):
+ return storage.StorageInterface.copy_local_to_storage(
+ self, *args, **kwargs)
+
+
+def test_storage_system_from_config():
+ this_storage = storage.storage_system_from_config(
+ {'base_url': 'http://example.org/moodia/',
+ 'base_dir': '/tmp/',
+ 'garbage_arg': 'garbage_arg',
+ 'garbage_arg': 'trash'})
+ assert this_storage.base_url == 'http://example.org/moodia/'
+ assert this_storage.base_dir == '/tmp/'
+ assert this_storage.__class__ is storage.filestorage.BasicFileStorage
+
+ this_storage = storage.storage_system_from_config(
+ {'foobie': 'eiboof',
+ 'blech': 'hcelb',
+ 'garbage_arg': 'garbage_arg',
+ 'storage_class':
+ 'mediagoblin.tests.test_storage:FakeStorageSystem'})
+ assert this_storage.foobie == 'eiboof'
+ assert this_storage.blech == 'hcelb'
+ assert unicode(this_storage.__class__) == \
+ u'mediagoblin.tests.test_storage.FakeStorageSystem'
+
+
+##########################
+# Basic file storage tests
+##########################
+
+def get_tmp_filestorage(mount_url=None, fake_remote=False):
+ tmpdir = tempfile.mkdtemp(prefix="test_gmg_storage")
+ if fake_remote:
+ this_storage = FakeRemoteStorage(tmpdir, mount_url)
+ else:
+ this_storage = storage.filestorage.BasicFileStorage(tmpdir, mount_url)
+ return tmpdir, this_storage
+
+
+def cleanup_storage(this_storage, tmpdir, *paths):
+ for p in paths:
+ while p:
+ assert this_storage.delete_dir(p) == True
+ p.pop(-1)
+ os.rmdir(tmpdir)
+
+
+def test_basic_storage__resolve_filepath():
+ tmpdir, this_storage = get_tmp_filestorage()
+
+ result = this_storage._resolve_filepath(['dir1', 'dir2', 'filename.jpg'])
+ assert result == os.path.join(
+ tmpdir, 'dir1/dir2/filename.jpg')
+
+ result = this_storage._resolve_filepath(['../../etc/', 'passwd'])
+ assert result == os.path.join(
+ tmpdir, 'etc/passwd')
+
+ pytest.raises(
+ storage.InvalidFilepath,
+ this_storage._resolve_filepath,
+ ['../../', 'etc', 'passwd'])
+
+ cleanup_storage(this_storage, tmpdir)
+
+
+def test_basic_storage_file_exists():
+ tmpdir, this_storage = get_tmp_filestorage()
+
+ os.makedirs(os.path.join(tmpdir, 'dir1', 'dir2'))
+ filename = os.path.join(tmpdir, 'dir1', 'dir2', 'filename.txt')
+ with open(filename, 'w') as ourfile:
+ ourfile.write("I'm having a lovely day!")
+
+ assert this_storage.file_exists(['dir1', 'dir2', 'filename.txt'])
+ assert not this_storage.file_exists(['dir1', 'dir2', 'thisfile.lol'])
+ assert not this_storage.file_exists(['dnedir1', 'dnedir2', 'somefile.lol'])
+
+ this_storage.delete_file(['dir1', 'dir2', 'filename.txt'])
+ cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'])
+
+
+def test_basic_storage_get_unique_filepath():
+ tmpdir, this_storage = get_tmp_filestorage()
+
+ # write something that exists
+ os.makedirs(os.path.join(tmpdir, 'dir1', 'dir2'))
+ filename = os.path.join(tmpdir, 'dir1', 'dir2', 'filename.txt')
+ with open(filename, 'w') as ourfile:
+ ourfile.write("I'm having a lovely day!")
+
+ # now we want something new, with the same name!
+ new_filepath = this_storage.get_unique_filepath(
+ ['dir1', 'dir2', 'filename.txt'])
+ assert new_filepath[:-1] == [u'dir1', u'dir2']
+
+ new_filename = new_filepath[-1]
+ assert new_filename.endswith('filename.txt')
+ assert len(new_filename) > len('filename.txt')
+ assert new_filename == secure_filename(new_filename)
+
+ os.remove(filename)
+ cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'])
+
+
+def test_basic_storage_get_file():
+ tmpdir, this_storage = get_tmp_filestorage()
+
+ # Write a brand new file
+ filepath = ['dir1', 'dir2', 'ourfile.txt']
+
+ with this_storage.get_file(filepath, 'w') as our_file:
+ our_file.write('First file')
+ with this_storage.get_file(filepath, 'r') as our_file:
+ assert our_file.read() == 'First file'
+ assert os.path.exists(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
+ with file(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'), 'r') as our_file:
+ assert our_file.read() == 'First file'
+
+ # Write to the same path but try to get a unique file.
+ new_filepath = this_storage.get_unique_filepath(filepath)
+ assert not os.path.exists(os.path.join(tmpdir, *new_filepath))
+
+ with this_storage.get_file(new_filepath, 'w') as our_file:
+ our_file.write('Second file')
+ with this_storage.get_file(new_filepath, 'r') as our_file:
+ assert our_file.read() == 'Second file'
+ assert os.path.exists(os.path.join(tmpdir, *new_filepath))
+ with file(os.path.join(tmpdir, *new_filepath), 'r') as our_file:
+ assert our_file.read() == 'Second file'
+
+ # Read from an existing file
+ manually_written_file = os.makedirs(
+ os.path.join(tmpdir, 'testydir'))
+ with file(os.path.join(tmpdir, 'testydir/testyfile.txt'), 'w') as testyfile:
+ testyfile.write('testy file! so testy.')
+
+ with this_storage.get_file(['testydir', 'testyfile.txt']) as testyfile:
+ assert testyfile.read() == 'testy file! so testy.'
+
+ this_storage.delete_file(filepath)
+ this_storage.delete_file(new_filepath)
+ this_storage.delete_file(['testydir', 'testyfile.txt'])
+ cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'], ['testydir'])
+
+
+def test_basic_storage_delete_file():
+ tmpdir, this_storage = get_tmp_filestorage()
+
+ assert not os.path.exists(
+ os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
+
+ filepath = ['dir1', 'dir2', 'ourfile.txt']
+ with this_storage.get_file(filepath, 'w') as our_file:
+ our_file.write('Testing this file')
+
+ assert os.path.exists(
+ os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
+
+ assert this_storage.delete_dir(['dir1', 'dir2']) == False
+ this_storage.delete_file(filepath)
+ assert this_storage.delete_dir(['dir1', 'dir2']) == True
+
+ assert not os.path.exists(
+ os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
+
+ cleanup_storage(this_storage, tmpdir, ['dir1'])
+
+
+def test_basic_storage_url_for_file():
+ # Not supplying a base_url should actually just bork.
+ tmpdir, this_storage = get_tmp_filestorage()
+ pytest.raises(
+ storage.NoWebServing,
+ this_storage.file_url,
+ ['dir1', 'dir2', 'filename.txt'])
+ cleanup_storage(this_storage, tmpdir)
+
+ # base_url without domain
+ tmpdir, this_storage = get_tmp_filestorage('/media/')
+ result = this_storage.file_url(
+ ['dir1', 'dir2', 'filename.txt'])
+ expected = '/media/dir1/dir2/filename.txt'
+ assert result == expected
+ cleanup_storage(this_storage, tmpdir)
+
+ # base_url with domain
+ tmpdir, this_storage = get_tmp_filestorage(
+ 'http://media.example.org/ourmedia/')
+ result = this_storage.file_url(
+ ['dir1', 'dir2', 'filename.txt'])
+ expected = 'http://media.example.org/ourmedia/dir1/dir2/filename.txt'
+ assert result == expected
+ cleanup_storage(this_storage, tmpdir)
+
+
+def test_basic_storage_get_local_path():
+ tmpdir, this_storage = get_tmp_filestorage()
+
+ result = this_storage.get_local_path(
+ ['dir1', 'dir2', 'filename.txt'])
+
+ expected = os.path.join(
+ tmpdir, 'dir1/dir2/filename.txt')
+
+ assert result == expected
+
+ cleanup_storage(this_storage, tmpdir)
+
+
+def test_basic_storage_is_local():
+ tmpdir, this_storage = get_tmp_filestorage()
+ assert this_storage.local_storage is True
+ cleanup_storage(this_storage, tmpdir)
+
+
+def test_basic_storage_copy_locally():
+ tmpdir, this_storage = get_tmp_filestorage()
+
+ dest_tmpdir = tempfile.mkdtemp()
+
+ filepath = ['dir1', 'dir2', 'ourfile.txt']
+ with this_storage.get_file(filepath, 'w') as our_file:
+ our_file.write('Testing this file')
+
+ new_file_dest = os.path.join(dest_tmpdir, 'file2.txt')
+
+ this_storage.copy_locally(filepath, new_file_dest)
+ this_storage.delete_file(filepath)
+
+ assert file(new_file_dest).read() == 'Testing this file'
+
+ os.remove(new_file_dest)
+ os.rmdir(dest_tmpdir)
+ cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'])
+
+
+def _test_copy_local_to_storage_works(tmpdir, this_storage):
+ local_filename = tempfile.mktemp()
+ with file(local_filename, 'w') as tmpfile:
+ tmpfile.write('haha')
+
+ this_storage.copy_local_to_storage(
+ local_filename, ['dir1', 'dir2', 'copiedto.txt'])
+
+ os.remove(local_filename)
+
+ assert file(
+ os.path.join(tmpdir, 'dir1/dir2/copiedto.txt'),
+ 'r').read() == 'haha'
+
+ this_storage.delete_file(['dir1', 'dir2', 'copiedto.txt'])
+ cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'])
+
+
+def test_basic_storage_copy_local_to_storage():
+ tmpdir, this_storage = get_tmp_filestorage()
+ _test_copy_local_to_storage_works(tmpdir, this_storage)
+
+
+def test_general_storage_copy_local_to_storage():
+ tmpdir, this_storage = get_tmp_filestorage(fake_remote=True)
+ _test_copy_local_to_storage_works(tmpdir, this_storage)
diff --git a/mediagoblin/tests/test_submission.py b/mediagoblin/tests/test_submission.py
new file mode 100644
index 00000000..162b2d19
--- /dev/null
+++ b/mediagoblin/tests/test_submission.py
@@ -0,0 +1,294 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import sys
+reload(sys)
+sys.setdefaultencoding('utf-8')
+
+import urlparse
+import os
+import pytest
+
+from mediagoblin.tests.tools import fixture_add_user
+from mediagoblin import mg_globals
+from mediagoblin.db.models import MediaEntry
+from mediagoblin.tools import template
+from mediagoblin.media_types.image import MEDIA_MANAGER as img_MEDIA_MANAGER
+from mediagoblin.media_types.pdf.processing import check_prerequisites as pdf_check_prerequisites
+
+from .resources import GOOD_JPG, GOOD_PNG, EVIL_FILE, EVIL_JPG, EVIL_PNG, \
+ BIG_BLUE, GOOD_PDF, GPS_JPG
+
+GOOD_TAG_STRING = u'yin,yang'
+BAD_TAG_STRING = unicode('rage,' + 'f' * 26 + 'u' * 26)
+
+FORM_CONTEXT = ['mediagoblin/submit/start.html', 'submit_form']
+REQUEST_CONTEXT = ['mediagoblin/user_pages/user.html', 'request']
+
+
+class TestSubmission:
+ @pytest.fixture(autouse=True)
+ def setup(self, test_app):
+ self.test_app = test_app
+
+ # TODO: Possibly abstract into a decorator like:
+ # @as_authenticated_user('chris')
+ test_user = fixture_add_user()
+
+ self.test_user = test_user
+
+ self.login()
+
+ def login(self):
+ self.test_app.post(
+ '/auth/login/', {
+ 'username': u'chris',
+ 'password': 'toast'})
+
+ def logout(self):
+ self.test_app.get('/auth/logout/')
+
+ def do_post(self, data, *context_keys, **kwargs):
+ url = kwargs.pop('url', '/submit/')
+ do_follow = kwargs.pop('do_follow', False)
+ template.clear_test_template_context()
+ response = self.test_app.post(url, data, **kwargs)
+ if do_follow:
+ response.follow()
+ context_data = template.TEMPLATE_TEST_CONTEXT
+ for key in context_keys:
+ context_data = context_data[key]
+ return response, context_data
+
+ def upload_data(self, filename):
+ return {'upload_files': [('file', filename)]}
+
+ def check_comments(self, request, media_id, count):
+ comments = request.db.MediaComment.find({'media_entry': media_id})
+ assert count == len(list(comments))
+
+ def test_missing_fields(self):
+ # Test blank form
+ # ---------------
+ response, form = self.do_post({}, *FORM_CONTEXT)
+ assert form.file.errors == [u'You must provide a file.']
+
+ # Test blank file
+ # ---------------
+ response, form = self.do_post({'title': u'test title'}, *FORM_CONTEXT)
+ assert form.file.errors == [u'You must provide a file.']
+
+ def check_url(self, response, path):
+ assert urlparse.urlsplit(response.location)[2] == path
+
+ def check_normal_upload(self, title, filename):
+ response, context = self.do_post({'title': title}, do_follow=True,
+ **self.upload_data(filename))
+ self.check_url(response, '/u/{0}/'.format(self.test_user.username))
+ assert 'mediagoblin/user_pages/user.html' in context
+ # Make sure the media view is at least reachable, logged in...
+ url = '/u/{0}/m/{1}/'.format(self.test_user.username,
+ title.lower().replace(' ', '-'))
+ self.test_app.get(url)
+ # ... and logged out too.
+ self.logout()
+ self.test_app.get(url)
+
+ def test_normal_jpg(self):
+ self.check_normal_upload(u'Normal upload 1', GOOD_JPG)
+
+ def test_normal_png(self):
+ self.check_normal_upload(u'Normal upload 2', GOOD_PNG)
+
+ @pytest.mark.skipif("not pdf_check_prerequisites()")
+ def test_normal_pdf(self):
+ response, context = self.do_post({'title': u'Normal upload 3 (pdf)'},
+ do_follow=True,
+ **self.upload_data(GOOD_PDF))
+ self.check_url(response, '/u/{0}/'.format(self.test_user.username))
+ assert 'mediagoblin/user_pages/user.html' in context
+
+ def check_media(self, request, find_data, count=None):
+ media = MediaEntry.find(find_data)
+ if count is not None:
+ assert media.count() == count
+ if count == 0:
+ return
+ return media[0]
+
+ def test_tags(self):
+ # Good tag string
+ # --------
+ response, request = self.do_post({'title': u'Balanced Goblin 2',
+ 'tags': GOOD_TAG_STRING},
+ *REQUEST_CONTEXT, do_follow=True,
+ **self.upload_data(GOOD_JPG))
+ media = self.check_media(request, {'title': u'Balanced Goblin 2'}, 1)
+ assert media.tags[0]['name'] == u'yin'
+ assert media.tags[0]['slug'] == u'yin'
+
+ assert media.tags[1]['name'] == u'yang'
+ assert media.tags[1]['slug'] == u'yang'
+
+ # Test tags that are too long
+ # ---------------
+ response, form = self.do_post({'title': u'Balanced Goblin 2',
+ 'tags': BAD_TAG_STRING},
+ *FORM_CONTEXT,
+ **self.upload_data(GOOD_JPG))
+ assert form.tags.errors == [
+ u'Tags must be shorter than 50 characters. ' \
+ 'Tags that are too long: ' \
+ 'ffffffffffffffffffffffffffuuuuuuuuuuuuuuuuuuuuuuuuuu']
+
+ def test_delete(self):
+ response, request = self.do_post({'title': u'Balanced Goblin'},
+ *REQUEST_CONTEXT, do_follow=True,
+ **self.upload_data(GOOD_JPG))
+ media = self.check_media(request, {'title': u'Balanced Goblin'}, 1)
+ media_id = media.id
+
+ # render and post to the edit page.
+ edit_url = request.urlgen(
+ 'mediagoblin.edit.edit_media',
+ user=self.test_user.username, media_id=media_id)
+ self.test_app.get(edit_url)
+ self.test_app.post(edit_url,
+ {'title': u'Balanced Goblin',
+ 'slug': u"Balanced=Goblin",
+ 'tags': u''})
+ media = self.check_media(request, {'title': u'Balanced Goblin'}, 1)
+ assert media.slug == u"balanced-goblin"
+
+ # Add a comment, so we can test for its deletion later.
+ self.check_comments(request, media_id, 0)
+ comment_url = request.urlgen(
+ 'mediagoblin.user_pages.media_post_comment',
+ user=self.test_user.username, media_id=media_id)
+ response = self.do_post({'comment_content': 'i love this test'},
+ url=comment_url, do_follow=True)[0]
+ self.check_comments(request, media_id, 1)
+
+ # Do not confirm deletion
+ # ---------------------------------------------------
+ delete_url = request.urlgen(
+ 'mediagoblin.user_pages.media_confirm_delete',
+ user=self.test_user.username, media_id=media_id)
+ # Empty data means don't confirm
+ response = self.do_post({}, do_follow=True, url=delete_url)[0]
+ media = self.check_media(request, {'title': u'Balanced Goblin'}, 1)
+ media_id = media.id
+
+ # Confirm deletion
+ # ---------------------------------------------------
+ response, request = self.do_post({'confirm': 'y'}, *REQUEST_CONTEXT,
+ do_follow=True, url=delete_url)
+ self.check_media(request, {'id': media_id}, 0)
+ self.check_comments(request, media_id, 0)
+
+ def test_evil_file(self):
+ # Test non-suppoerted file with non-supported extension
+ # -----------------------------------------------------
+ response, form = self.do_post({'title': u'Malicious Upload 1'},
+ *FORM_CONTEXT,
+ **self.upload_data(EVIL_FILE))
+ assert len(form.file.errors) == 1
+ assert 'Sorry, I don\'t support that file type :(' == \
+ str(form.file.errors[0])
+
+
+ def test_get_media_manager(self):
+ """Test if the get_media_manger function returns sensible things
+ """
+ response, request = self.do_post({'title': u'Balanced Goblin'},
+ *REQUEST_CONTEXT, do_follow=True,
+ **self.upload_data(GOOD_JPG))
+ media = self.check_media(request, {'title': u'Balanced Goblin'}, 1)
+
+ assert media.media_type == u'mediagoblin.media_types.image'
+ assert isinstance(media.media_manager, img_MEDIA_MANAGER)
+ assert media.media_manager.entry == media
+
+
+ def test_sniffing(self):
+ '''
+ Test sniffing mechanism to assert that regular uploads work as intended
+ '''
+ template.clear_test_template_context()
+ response = self.test_app.post(
+ '/submit/', {
+ 'title': u'UNIQUE_TITLE_PLS_DONT_CREATE_OTHER_MEDIA_WITH_THIS_TITLE'
+ }, upload_files=[(
+ 'file', GOOD_JPG)])
+
+ response.follow()
+
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/user_pages/user.html']
+
+ request = context['request']
+
+ media = request.db.MediaEntry.find_one({
+ u'title': u'UNIQUE_TITLE_PLS_DONT_CREATE_OTHER_MEDIA_WITH_THIS_TITLE'})
+
+ assert media.media_type == 'mediagoblin.media_types.image'
+
+ def check_false_image(self, title, filename):
+ # NOTE: The following 2 tests will ultimately fail, but they
+ # *will* pass the initial form submission step. Instead,
+ # they'll be caught as failures during the processing step.
+ response, context = self.do_post({'title': title}, do_follow=True,
+ **self.upload_data(filename))
+ self.check_url(response, '/u/{0}/'.format(self.test_user.username))
+ entry = mg_globals.database.MediaEntry.find_one({'title': title})
+ assert entry.state == 'failed'
+ assert entry.fail_error == u'mediagoblin.processing:BadMediaFail'
+
+ def test_evil_jpg(self):
+ # Test non-supported file with .jpg extension
+ # -------------------------------------------
+ self.check_false_image(u'Malicious Upload 2', EVIL_JPG)
+
+ def test_evil_png(self):
+ # Test non-supported file with .png extension
+ # -------------------------------------------
+ self.check_false_image(u'Malicious Upload 3', EVIL_PNG)
+
+ def test_media_data(self):
+ self.check_normal_upload(u"With GPS data", GPS_JPG)
+ media = self.check_media(None, {"title": u"With GPS data"}, 1)
+ assert media.media_data.gps_latitude == 59.336666666666666
+
+ def test_processing(self):
+ public_store_dir = mg_globals.global_config[
+ 'storage:publicstore']['base_dir']
+
+ data = {'title': u'Big Blue'}
+ response, request = self.do_post(data, *REQUEST_CONTEXT, do_follow=True,
+ **self.upload_data(BIG_BLUE))
+ media = self.check_media(request, data, 1)
+ last_size = 1024 ** 3 # Needs to be larger than bigblue.png
+ for key, basename in (('original', 'bigblue.png'),
+ ('medium', 'bigblue.medium.png'),
+ ('thumb', 'bigblue.thumbnail.png')):
+ # Does the processed image have a good filename?
+ filename = os.path.join(
+ public_store_dir,
+ *media.media_files[key])
+ assert filename.endswith('_' + basename)
+ # Is it smaller than the last processed image we looked at?
+ size = os.stat(filename).st_size
+ assert last_size > size
+ last_size = size
diff --git a/mediagoblin/tests/test_submission/bigblue.png b/mediagoblin/tests/test_submission/bigblue.png
new file mode 100644
index 00000000..2b2c2a44
--- /dev/null
+++ b/mediagoblin/tests/test_submission/bigblue.png
Binary files differ
diff --git a/mediagoblin/tests/test_submission/evil b/mediagoblin/tests/test_submission/evil
new file mode 100755
index 00000000..2c850e29
--- /dev/null
+++ b/mediagoblin/tests/test_submission/evil
@@ -0,0 +1,3 @@
+#!/bin/sh
+
+echo "In yer base, doin spooky things" \ No newline at end of file
diff --git a/mediagoblin/tests/test_submission/evil.jpg b/mediagoblin/tests/test_submission/evil.jpg
new file mode 100755
index 00000000..2c850e29
--- /dev/null
+++ b/mediagoblin/tests/test_submission/evil.jpg
@@ -0,0 +1,3 @@
+#!/bin/sh
+
+echo "In yer base, doin spooky things" \ No newline at end of file
diff --git a/mediagoblin/tests/test_submission/evil.png b/mediagoblin/tests/test_submission/evil.png
new file mode 100755
index 00000000..2c850e29
--- /dev/null
+++ b/mediagoblin/tests/test_submission/evil.png
@@ -0,0 +1,3 @@
+#!/bin/sh
+
+echo "In yer base, doin spooky things" \ No newline at end of file
diff --git a/mediagoblin/tests/test_submission/good.jpg b/mediagoblin/tests/test_submission/good.jpg
new file mode 100644
index 00000000..936458e9
--- /dev/null
+++ b/mediagoblin/tests/test_submission/good.jpg
Binary files differ
diff --git a/mediagoblin/tests/test_submission/good.pdf b/mediagoblin/tests/test_submission/good.pdf
new file mode 100644
index 00000000..ab5db006
--- /dev/null
+++ b/mediagoblin/tests/test_submission/good.pdf
Binary files differ
diff --git a/mediagoblin/tests/test_submission/good.png b/mediagoblin/tests/test_submission/good.png
new file mode 100644
index 00000000..c1eadf9c
--- /dev/null
+++ b/mediagoblin/tests/test_submission/good.png
Binary files differ
diff --git a/mediagoblin/tests/test_tags.py b/mediagoblin/tests/test_tags.py
new file mode 100644
index 00000000..e25cc283
--- /dev/null
+++ b/mediagoblin/tests/test_tags.py
@@ -0,0 +1,39 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+from mediagoblin.tools import text
+
+def test_list_of_dicts_conversion(test_app):
+ """
+ When the user adds tags to a media entry, the string from the form is
+ converted into a list of tags, where each tag is stored in the database
+ as a dict. Each tag dict should contain the tag's name and slug. Another
+ function performs the reverse operation when populating a form to edit tags.
+ """
+ # Leading, trailing, and internal whitespace should be removed and slugified
+ assert text.convert_to_tag_list_of_dicts('sleep , 6 AM, chainsaw! ') == [
+ {'name': u'sleep', 'slug': u'sleep'},
+ {'name': u'6 AM', 'slug': u'6-am'},
+ {'name': u'chainsaw!', 'slug': u'chainsaw'}]
+
+ # If the user enters two identical tags, record only one of them
+ assert text.convert_to_tag_list_of_dicts('echo,echo') == [{'name': u'echo',
+ 'slug': u'echo'}]
+
+ # Make sure converting the list of dicts to a string works
+ assert text.media_tags_as_string([{'name': u'yin', 'slug': u'yin'},
+ {'name': u'yang', 'slug': u'yang'}]) == \
+ u'yin, yang'
diff --git a/mediagoblin/tests/test_timesince.py b/mediagoblin/tests/test_timesince.py
new file mode 100644
index 00000000..6579eb09
--- /dev/null
+++ b/mediagoblin/tests/test_timesince.py
@@ -0,0 +1,57 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+from datetime import datetime, timedelta
+
+from mediagoblin.tools.timesince import is_aware, timesince
+
+
+def test_timesince():
+ test_time = datetime.now()
+
+ # it should ignore second and microseconds
+ assert timesince(test_time, test_time + timedelta(microseconds=1)) == "0 minutes"
+ assert timesince(test_time, test_time + timedelta(seconds=1)) == "0 minutes"
+
+ # test minutes, hours, days, weeks, months and years (singular and plural)
+ assert timesince(test_time, test_time + timedelta(minutes=1)) == "1 minute"
+ assert timesince(test_time, test_time + timedelta(minutes=2)) == "2 minutes"
+
+ assert timesince(test_time, test_time + timedelta(hours=1)) == "1 hour"
+ assert timesince(test_time, test_time + timedelta(hours=2)) == "2 hours"
+
+ assert timesince(test_time, test_time + timedelta(days=1)) == "1 day"
+ assert timesince(test_time, test_time + timedelta(days=2)) == "2 days"
+
+ assert timesince(test_time, test_time + timedelta(days=7)) == "1 week"
+ assert timesince(test_time, test_time + timedelta(days=14)) == "2 weeks"
+
+ assert timesince(test_time, test_time + timedelta(days=30)) == "1 month"
+ assert timesince(test_time, test_time + timedelta(days=60)) == "2 months"
+
+ assert timesince(test_time, test_time + timedelta(days=365)) == "1 year"
+ assert timesince(test_time, test_time + timedelta(days=730)) == "2 years"
+
+ # okay now we want to test combinations
+ # e.g. 1 hour, 5 days
+ assert timesince(test_time, test_time + timedelta(days=5, hours=1)) == "5 days, 1 hour"
+
+ assert timesince(test_time, test_time + timedelta(days=15)) == "2 weeks, 1 day"
+
+ assert timesince(test_time, test_time + timedelta(days=97)) == "3 months, 1 week"
+
+ assert timesince(test_time, test_time + timedelta(days=2250)) == "6 years, 2 months"
+
diff --git a/mediagoblin/tests/test_util.py b/mediagoblin/tests/test_util.py
new file mode 100644
index 00000000..bc14f528
--- /dev/null
+++ b/mediagoblin/tests/test_util.py
@@ -0,0 +1,145 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import email
+
+from mediagoblin.tools import common, url, translate, mail, text, testing
+
+testing._activate_testing()
+
+
+def _import_component_testing_method(silly_string):
+ # Just for the sake of testing that our component importer works.
+ return u"'%s' is the silliest string I've ever seen" % silly_string
+
+
+def test_import_component():
+ imported_func = common.import_component(
+ 'mediagoblin.tests.test_util:_import_component_testing_method')
+ result = imported_func('hooobaladoobala')
+ expected = u"'hooobaladoobala' is the silliest string I've ever seen"
+ assert result == expected
+
+
+def test_send_email():
+ mail._clear_test_inboxes()
+
+ # send the email
+ mail.send_email(
+ "sender@mediagoblin.example.org",
+ ["amanda@example.org", "akila@example.org"],
+ "Testing is so much fun!",
+ """HAYYY GUYS!
+
+I hope you like unit tests JUST AS MUCH AS I DO!""")
+
+ # check the main inbox
+ assert len(mail.EMAIL_TEST_INBOX) == 1
+ message = mail.EMAIL_TEST_INBOX.pop()
+ assert message['From'] == "sender@mediagoblin.example.org"
+ assert message['To'] == "amanda@example.org, akila@example.org"
+ assert message['Subject'] == "Testing is so much fun!"
+ assert message.get_payload(decode=True) == """HAYYY GUYS!
+
+I hope you like unit tests JUST AS MUCH AS I DO!"""
+
+ # Check everything that the FakeMhost.sendmail() method got is correct
+ assert len(mail.EMAIL_TEST_MBOX_INBOX) == 1
+ mbox_dict = mail.EMAIL_TEST_MBOX_INBOX.pop()
+ assert mbox_dict['from'] == "sender@mediagoblin.example.org"
+ assert mbox_dict['to'] == ["amanda@example.org", "akila@example.org"]
+ mbox_message = email.message_from_string(mbox_dict['message'])
+ assert mbox_message['From'] == "sender@mediagoblin.example.org"
+ assert mbox_message['To'] == "amanda@example.org, akila@example.org"
+ assert mbox_message['Subject'] == "Testing is so much fun!"
+ assert mbox_message.get_payload(decode=True) == """HAYYY GUYS!
+
+I hope you like unit tests JUST AS MUCH AS I DO!"""
+
+def test_slugify():
+ assert url.slugify(u'a walk in the park') == u'a-walk-in-the-park'
+ assert url.slugify(u'A Walk in the Park') == u'a-walk-in-the-park'
+ assert url.slugify(u'a walk in the park') == u'a-walk-in-the-park'
+ assert url.slugify(u'a walk in-the-park') == u'a-walk-in-the-park'
+ assert url.slugify(u'a w@lk in the park?') == u'a-w-lk-in-the-park'
+ assert url.slugify(u'a walk in the par\u0107') == u'a-walk-in-the-parc'
+ assert url.slugify(u'\u00E0\u0042\u00E7\u010F\u00EB\u0066') == u'abcdef'
+
+def test_locale_to_lower_upper():
+ """
+ Test cc.i18n.util.locale_to_lower_upper()
+ """
+ assert translate.locale_to_lower_upper('en') == 'en'
+ assert translate.locale_to_lower_upper('en_US') == 'en_US'
+ assert translate.locale_to_lower_upper('en-us') == 'en_US'
+
+ # crazy renditions. Useful?
+ assert translate.locale_to_lower_upper('en-US') == 'en_US'
+ assert translate.locale_to_lower_upper('en_us') == 'en_US'
+
+
+def test_locale_to_lower_lower():
+ """
+ Test cc.i18n.util.locale_to_lower_lower()
+ """
+ assert translate.locale_to_lower_lower('en') == 'en'
+ assert translate.locale_to_lower_lower('en_US') == 'en-us'
+ assert translate.locale_to_lower_lower('en-us') == 'en-us'
+
+ # crazy renditions. Useful?
+ assert translate.locale_to_lower_lower('en-US') == 'en-us'
+ assert translate.locale_to_lower_lower('en_us') == 'en-us'
+
+
+def test_gettext_lazy_proxy():
+ from mediagoblin.tools.translate import lazy_pass_to_ugettext as _
+ from mediagoblin.tools.translate import pass_to_ugettext, set_thread_locale
+ proxy = _(u"Password")
+ orig = u"Password"
+
+ set_thread_locale("es")
+ p1 = unicode(proxy)
+ p1_should = pass_to_ugettext(orig)
+ assert p1_should != orig, "Test useless, string not translated"
+ assert p1 == p1_should
+
+ set_thread_locale("sv")
+ p2 = unicode(proxy)
+ p2_should = pass_to_ugettext(orig)
+ assert p2_should != orig, "Test broken, string not translated"
+ assert p2 == p2_should
+
+ assert p1_should != p2_should, "Test broken, same translated string"
+ assert p1 != p2
+
+
+def test_html_cleaner():
+ # Remove images
+ result = text.clean_html(
+ '<p>Hi everybody! '
+ '<img src="http://example.org/huge-purple-barney.png" /></p>\n'
+ '<p>:)</p>')
+ assert result == (
+ '<div>'
+ '<p>Hi everybody! </p>\n'
+ '<p>:)</p>'
+ '</div>')
+
+ # Remove evil javascript
+ result = text.clean_html(
+ '<p><a href="javascript:nasty_surprise">innocent link!</a></p>')
+ assert result == (
+ '<p><a href="">innocent link!</a></p>')
diff --git a/mediagoblin/tests/test_workbench.py b/mediagoblin/tests/test_workbench.py
new file mode 100644
index 00000000..6695618b
--- /dev/null
+++ b/mediagoblin/tests/test_workbench.py
@@ -0,0 +1,122 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import tempfile
+
+
+from mediagoblin.tools import workbench
+from mediagoblin.mg_globals import setup_globals
+from mediagoblin.decorators import get_workbench
+from mediagoblin.tests.test_storage import get_tmp_filestorage, cleanup_storage
+
+
+class TestWorkbench(object):
+ def setup(self):
+ self.workbench_base = tempfile.mkdtemp(prefix='gmg_workbench_testing')
+ self.workbench_manager = workbench.WorkbenchManager(
+ self.workbench_base)
+
+ def teardown(self):
+ # If the workbench is empty, this should work.
+ os.rmdir(self.workbench_base)
+
+ def test_create_workbench(self):
+ workbench = self.workbench_manager.create()
+ assert os.path.isdir(workbench.dir)
+ assert workbench.dir.startswith(self.workbench_manager.base_workbench_dir)
+ workbench.destroy()
+
+ def test_joinpath(self):
+ this_workbench = self.workbench_manager.create()
+ tmpname = this_workbench.joinpath('temp.txt')
+ assert tmpname == os.path.join(this_workbench.dir, 'temp.txt')
+ this_workbench.destroy()
+
+ def test_destroy_workbench(self):
+ # kill a workbench
+ this_workbench = self.workbench_manager.create()
+ tmpfile_name = this_workbench.joinpath('temp.txt')
+ tmpfile = file(tmpfile_name, 'w')
+ with tmpfile:
+ tmpfile.write('lollerskates')
+
+ assert os.path.exists(tmpfile_name)
+
+ wb_dir = this_workbench.dir
+ this_workbench.destroy()
+ assert not os.path.exists(tmpfile_name)
+ assert not os.path.exists(wb_dir)
+
+ def test_localized_file(self):
+ tmpdir, this_storage = get_tmp_filestorage()
+ this_workbench = self.workbench_manager.create()
+
+ # Write a brand new file
+ filepath = ['dir1', 'dir2', 'ourfile.txt']
+
+ with this_storage.get_file(filepath, 'w') as our_file:
+ our_file.write('Our file')
+
+ # with a local file storage
+ filename = this_workbench.localized_file(this_storage, filepath)
+ assert filename == os.path.join(
+ tmpdir, 'dir1/dir2/ourfile.txt')
+ this_storage.delete_file(filepath)
+ cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'])
+
+ # with a fake remote file storage
+ tmpdir, this_storage = get_tmp_filestorage(fake_remote=True)
+
+ # ... write a brand new file, again ;)
+ with this_storage.get_file(filepath, 'w') as our_file:
+ our_file.write('Our file')
+
+ filename = this_workbench.localized_file(this_storage, filepath)
+ assert filename == os.path.join(
+ this_workbench.dir, 'ourfile.txt')
+
+ # fake remote file storage, filename_if_copying set
+ filename = this_workbench.localized_file(
+ this_storage, filepath, 'thisfile')
+ assert filename == os.path.join(
+ this_workbench.dir, 'thisfile.txt')
+
+ # fake remote file storage, filename_if_copying set,
+ # keep_extension_if_copying set to false
+ filename = this_workbench.localized_file(
+ this_storage, filepath, 'thisfile.text', False)
+ assert filename == os.path.join(
+ this_workbench.dir, 'thisfile.text')
+
+ this_storage.delete_file(filepath)
+ cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'])
+ this_workbench.destroy()
+
+ def test_workbench_decorator(self):
+ """Test @get_workbench decorator and automatic cleanup"""
+ # The decorator needs mg_globals.workbench_manager
+ setup_globals(workbench_manager=self.workbench_manager)
+
+ @get_workbench
+ def create_it(workbench=None):
+ # workbench dir exists?
+ assert os.path.isdir(workbench.dir)
+ return workbench.dir
+
+ benchdir = create_it()
+ # workbench dir has been cleaned up automatically?
+ assert not os.path.isdir(benchdir)
diff --git a/mediagoblin/tests/testplugins/__init__.py b/mediagoblin/tests/testplugins/__init__.py
new file mode 100644
index 00000000..621845ba
--- /dev/null
+++ b/mediagoblin/tests/testplugins/__init__.py
@@ -0,0 +1,15 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
diff --git a/mediagoblin/tests/testplugins/callables1/__init__.py b/mediagoblin/tests/testplugins/callables1/__init__.py
new file mode 100644
index 00000000..fe801a01
--- /dev/null
+++ b/mediagoblin/tests/testplugins/callables1/__init__.py
@@ -0,0 +1,43 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+def setup_plugin():
+ pass
+
+
+def just_one(call_log):
+ call_log.append("expect this one call")
+ return "Called just once"
+
+
+def multi_handle(call_log):
+ call_log.append("Hi, I'm the first")
+ return "the first returns"
+
+def multi_handle_with_canthandle(call_log):
+ return None
+
+
+def expand_tuple(this_tuple):
+ return this_tuple + (1,)
+
+hooks = {
+ 'setup': setup_plugin,
+ 'just_one': just_one,
+ 'multi_handle': multi_handle,
+ 'multi_handle_with_canthandle': multi_handle_with_canthandle,
+ 'expand_tuple': expand_tuple,
+ }
diff --git a/mediagoblin/tests/testplugins/callables2/__init__.py b/mediagoblin/tests/testplugins/callables2/__init__.py
new file mode 100644
index 00000000..9d5cf950
--- /dev/null
+++ b/mediagoblin/tests/testplugins/callables2/__init__.py
@@ -0,0 +1,41 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+def setup_plugin():
+ pass
+
+
+def just_one(call_log):
+ assert "SHOULD NOT HAPPEN"
+
+def multi_handle(call_log):
+ call_log.append("Hi, I'm the second")
+ return "the second returns"
+
+def multi_handle_with_canthandle(call_log):
+ call_log.append("Hi, I'm the second")
+ return "the second returns"
+
+def expand_tuple(this_tuple):
+ return this_tuple + (2,)
+
+hooks = {
+ 'setup': setup_plugin,
+ 'just_one': just_one,
+ 'multi_handle': multi_handle,
+ 'multi_handle_with_canthandle': multi_handle_with_canthandle,
+ 'expand_tuple': expand_tuple,
+ }
diff --git a/mediagoblin/tests/testplugins/callables3/__init__.py b/mediagoblin/tests/testplugins/callables3/__init__.py
new file mode 100644
index 00000000..04efc8fc
--- /dev/null
+++ b/mediagoblin/tests/testplugins/callables3/__init__.py
@@ -0,0 +1,41 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+def setup_plugin():
+ pass
+
+
+def just_one(call_log):
+ assert "SHOULD NOT HAPPEN"
+
+def multi_handle(call_log):
+ call_log.append("Hi, I'm the third")
+ return "the third returns"
+
+def multi_handle_with_canthandle(call_log):
+ call_log.append("Hi, I'm the third")
+ return "the third returns"
+
+def expand_tuple(this_tuple):
+ return this_tuple + (3,)
+
+hooks = {
+ 'setup': setup_plugin,
+ 'just_one': just_one,
+ 'multi_handle': multi_handle,
+ 'multi_handle_with_canthandle': multi_handle_with_canthandle,
+ 'expand_tuple': expand_tuple,
+ }
diff --git a/mediagoblin/tests/testplugins/modify_context/__init__.py b/mediagoblin/tests/testplugins/modify_context/__init__.py
new file mode 100644
index 00000000..164e66c1
--- /dev/null
+++ b/mediagoblin/tests/testplugins/modify_context/__init__.py
@@ -0,0 +1,55 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+from mediagoblin.tools import pluginapi
+import pkg_resources
+
+
+def append_to_specific_context(context):
+ context['specific_page_append'] = 'in yer specificpage'
+ return context
+
+def append_to_global_context(context):
+ context['global_append'] = 'globally appended!'
+ return context
+
+def double_doubleme(context):
+ if 'doubleme' in context:
+ context['doubleme'] = context['doubleme'] * 2
+ return context
+
+
+def setup_plugin():
+ routes = [
+ ('modify_context.specific_page',
+ '/modify_context/specific/',
+ 'mediagoblin.tests.testplugins.modify_context.views:specific'),
+ ('modify_context.general_page',
+ '/modify_context/',
+ 'mediagoblin.tests.testplugins.modify_context.views:general')]
+
+ pluginapi.register_routes(routes)
+ pluginapi.register_template_path(
+ pkg_resources.resource_filename(
+ 'mediagoblin.tests.testplugins.modify_context', 'templates'))
+
+
+hooks = {
+ 'setup': setup_plugin,
+ ('modify_context.specific_page',
+ 'contextplugin/specific.html'): append_to_specific_context,
+ 'template_global_context': append_to_global_context,
+ 'template_context_prerender': double_doubleme}
diff --git a/mediagoblin/tests/testplugins/modify_context/templates/contextplugin/general.html b/mediagoblin/tests/testplugins/modify_context/templates/contextplugin/general.html
new file mode 100644
index 00000000..9cf96d3e
--- /dev/null
+++ b/mediagoblin/tests/testplugins/modify_context/templates/contextplugin/general.html
@@ -0,0 +1,5 @@
+General page!
+
+global thing: {{ global_append }}
+lol: {{ lol }}
+doubleme: {{ doubleme }}
diff --git a/mediagoblin/tests/testplugins/modify_context/templates/contextplugin/specific.html b/mediagoblin/tests/testplugins/modify_context/templates/contextplugin/specific.html
new file mode 100644
index 00000000..5b1b4c4a
--- /dev/null
+++ b/mediagoblin/tests/testplugins/modify_context/templates/contextplugin/specific.html
@@ -0,0 +1,6 @@
+Specific page!
+
+specific thing: {{ specific_page_append }}
+global thing: {{ global_append }}
+something: {{ something }}
+doubleme: {{ doubleme }}
diff --git a/mediagoblin/tests/testplugins/modify_context/views.py b/mediagoblin/tests/testplugins/modify_context/views.py
new file mode 100644
index 00000000..701ec6f9
--- /dev/null
+++ b/mediagoblin/tests/testplugins/modify_context/views.py
@@ -0,0 +1,33 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+from mediagoblin.tools.response import render_to_response
+
+
+def specific(request):
+ return render_to_response(
+ request,
+ 'contextplugin/specific.html',
+ {"something": "orother",
+ "doubleme": "happy"})
+
+
+def general(request):
+ return render_to_response(
+ request,
+ 'contextplugin/general.html',
+ {"lol": "cats",
+ "doubleme": "joy"})
diff --git a/mediagoblin/tests/testplugins/pluginspec/__init__.py b/mediagoblin/tests/testplugins/pluginspec/__init__.py
new file mode 100644
index 00000000..76ca2b1f
--- /dev/null
+++ b/mediagoblin/tests/testplugins/pluginspec/__init__.py
@@ -0,0 +1,22 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+def setup_plugin():
+ pass
+
+hooks = {
+ 'setup': setup_plugin,
+}
diff --git a/mediagoblin/tests/testplugins/pluginspec/config_spec.ini b/mediagoblin/tests/testplugins/pluginspec/config_spec.ini
new file mode 100644
index 00000000..5c9c3bd7
--- /dev/null
+++ b/mediagoblin/tests/testplugins/pluginspec/config_spec.ini
@@ -0,0 +1,4 @@
+[plugin_spec]
+some_string = string(default="blork")
+some_int = integer(default=50)
+dont_change_me = string(default="still the default") \ No newline at end of file
diff --git a/mediagoblin/tests/testplugins/staticstuff/__init__.py b/mediagoblin/tests/testplugins/staticstuff/__init__.py
new file mode 100644
index 00000000..a2591646
--- /dev/null
+++ b/mediagoblin/tests/testplugins/staticstuff/__init__.py
@@ -0,0 +1,36 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+from mediagoblin.tools.staticdirect import PluginStatic
+from mediagoblin.tools import pluginapi
+from pkg_resources import resource_filename
+
+def setup_plugin():
+ routes = [
+ ('staticstuff.static_demo',
+ '/staticstuff/',
+ 'mediagoblin.tests.testplugins.staticstuff.views:static_demo')]
+
+ pluginapi.register_routes(routes)
+
+
+hooks = {
+ 'setup': setup_plugin,
+ 'static_setup': lambda: PluginStatic(
+ 'staticstuff',
+ resource_filename(
+ 'mediagoblin.tests.testplugins.staticstuff',
+ 'static'))}
diff --git a/mediagoblin/tests/testplugins/staticstuff/static/css/bunnify.css b/mediagoblin/tests/testplugins/staticstuff/static/css/bunnify.css
new file mode 100644
index 00000000..1294ab8a
--- /dev/null
+++ b/mediagoblin/tests/testplugins/staticstuff/static/css/bunnify.css
@@ -0,0 +1,4 @@
+body {
+ background-color: #5edcf1;
+ color: #eb8add;
+} \ No newline at end of file
diff --git a/mediagoblin/tests/testplugins/staticstuff/views.py b/mediagoblin/tests/testplugins/staticstuff/views.py
new file mode 100644
index 00000000..34a5e8cb
--- /dev/null
+++ b/mediagoblin/tests/testplugins/staticstuff/views.py
@@ -0,0 +1,28 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import json
+
+from werkzeug import Response
+
+
+def static_demo(request):
+ return Response(json.dumps({
+ # this does not exist, but we'll pretend it does ;)
+ 'mgoblin_bunny_pic': request.staticdirect(
+ 'images/bunny_pic.png'),
+ 'plugin_bunny_css': request.staticdirect(
+ 'css/bunnify.css', 'staticstuff')}))
diff --git a/mediagoblin/tests/tools.py b/mediagoblin/tests/tools.py
new file mode 100644
index 00000000..2ee39e89
--- /dev/null
+++ b/mediagoblin/tests/tools.py
@@ -0,0 +1,233 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+import sys
+import os
+import pkg_resources
+import shutil
+
+from functools import wraps
+
+from paste.deploy import loadapp
+from webtest import TestApp
+
+from mediagoblin import mg_globals
+from mediagoblin.db.models import User, MediaEntry, Collection
+from mediagoblin.tools import testing
+from mediagoblin.init.config import read_mediagoblin_config
+from mediagoblin.db.base import Session
+from mediagoblin.meddleware import BaseMeddleware
+from mediagoblin.auth.lib import bcrypt_gen_password_hash
+from mediagoblin.gmg_commands.dbupdate import run_dbupdate
+
+
+MEDIAGOBLIN_TEST_DB_NAME = u'__mediagoblin_tests__'
+TEST_SERVER_CONFIG = pkg_resources.resource_filename(
+ 'mediagoblin.tests', 'test_paste.ini')
+TEST_APP_CONFIG = pkg_resources.resource_filename(
+ 'mediagoblin.tests', 'test_mgoblin_app.ini')
+
+
+USER_DEV_DIRECTORIES_TO_SETUP = ['media/public', 'media/queue']
+
+
+class TestingMeddleware(BaseMeddleware):
+ """
+ Meddleware for the Unit tests
+
+ It might make sense to perform some tests on all
+ requests/responses. Or prepare them in a special
+ manner. For example all html responses could be tested
+ for being valid html *after* being rendered.
+
+ This module is getting inserted at the front of the
+ meddleware list, which means: requests are handed here
+ first, responses last. So this wraps up the "normal"
+ app.
+
+ If you need to add a test, either add it directly to
+ the appropiate process_request or process_response, or
+ create a new method and call it from process_*.
+ """
+
+ def process_response(self, request, response):
+ # All following tests should be for html only!
+ if getattr(response, 'content_type', None) != "text/html":
+ # Get out early
+ return
+
+ # If the template contains a reference to
+ # /mgoblin_static/ instead of using
+ # /request.staticdirect(), error out here.
+ # This could probably be implemented as a grep on
+ # the shipped templates easier...
+ if response.text.find("/mgoblin_static/") >= 0:
+ raise AssertionError(
+ "Response HTML contains reference to /mgoblin_static/ "
+ "instead of staticdirect. Request was for: "
+ + request.full_path)
+
+ return
+
+
+def get_app(request, paste_config=None, mgoblin_config=None):
+ """Create a MediaGoblin app for testing.
+
+ Args:
+ - request: Not an http request, but a pytest fixture request. We
+ use this to make temporary directories that pytest
+ automatically cleans up as needed.
+ - paste_config: particular paste config used by this application.
+ - mgoblin_config: particular mediagoblin config used by this
+ application.
+ """
+ paste_config = paste_config or TEST_SERVER_CONFIG
+ mgoblin_config = mgoblin_config or TEST_APP_CONFIG
+
+ # This is the directory we're copying the paste/mgoblin config stuff into
+ run_dir = request.config._tmpdirhandler.mktemp(
+ 'mgoblin_app', numbered=True)
+ user_dev_dir = run_dir.mkdir('user_dev').strpath
+
+ new_paste_config = run_dir.join('paste.ini').strpath
+ new_mgoblin_config = run_dir.join('mediagoblin.ini').strpath
+ shutil.copyfile(paste_config, new_paste_config)
+ shutil.copyfile(mgoblin_config, new_mgoblin_config)
+
+ Session.rollback()
+ Session.remove()
+
+ # install user_dev directories
+ for directory in USER_DEV_DIRECTORIES_TO_SETUP:
+ full_dir = os.path.join(user_dev_dir, directory)
+ os.makedirs(full_dir)
+
+ # Get app config
+ global_config, validation_result = read_mediagoblin_config(new_mgoblin_config)
+ app_config = global_config['mediagoblin']
+
+ # Run database setup/migrations
+ run_dbupdate(app_config, global_config)
+
+ # setup app and return
+ test_app = loadapp(
+ 'config:' + new_paste_config)
+
+ # Insert the TestingMeddleware, which can do some
+ # sanity checks on every request/response.
+ # Doing it this way is probably not the cleanest way.
+ # We'll fix it, when we have plugins!
+ mg_globals.app.meddleware.insert(0, TestingMeddleware(mg_globals.app))
+
+ app = TestApp(test_app)
+
+ return app
+
+
+def install_fixtures_simple(db, fixtures):
+ """
+ Very simply install fixtures in the database
+ """
+ for collection_name, collection_fixtures in fixtures.iteritems():
+ collection = db[collection_name]
+ for fixture in collection_fixtures:
+ collection.insert(fixture)
+
+
+def assert_db_meets_expected(db, expected):
+ """
+ Assert a database contains the things we expect it to.
+
+ Objects are found via 'id', so you should make sure your document
+ has an id.
+
+ Args:
+ - db: pymongo or mongokit database connection
+ - expected: the data we expect. Formatted like:
+ {'collection_name': [
+ {'id': 'foo',
+ 'some_field': 'some_value'},]}
+ """
+ for collection_name, collection_data in expected.iteritems():
+ collection = db[collection_name]
+ for expected_document in collection_data:
+ document = collection.find_one({'id': expected_document['id']})
+ assert document is not None # make sure it exists
+ assert document == expected_document # make sure it matches
+
+
+def fixture_add_user(username=u'chris', password=u'toast',
+ active_user=True):
+ # Reuse existing user or create a new one
+ test_user = User.query.filter_by(username=username).first()
+ if test_user is None:
+ test_user = User()
+ test_user.username = username
+ test_user.email = username + u'@example.com'
+ if password is not None:
+ test_user.pw_hash = bcrypt_gen_password_hash(password)
+ if active_user:
+ test_user.email_verified = True
+ test_user.status = u'active'
+
+ test_user.save()
+
+ # Reload
+ test_user = User.query.filter_by(username=username).first()
+
+ # ... and detach from session:
+ Session.expunge(test_user)
+
+ return test_user
+
+
+def fixture_media_entry(title=u"Some title", slug=None,
+ uploader=None, save=True, gen_slug=True):
+ entry = MediaEntry()
+ entry.title = title
+ entry.slug = slug
+ entry.uploader = uploader or fixture_add_user().id
+ entry.media_type = u'image'
+
+ if gen_slug:
+ entry.generate_slug()
+ if save:
+ entry.save()
+
+ return entry
+
+
+def fixture_add_collection(name=u"My first Collection", user=None):
+ if user is None:
+ user = fixture_add_user()
+ coll = Collection.query.filter_by(creator=user.id, title=name).first()
+ if coll is not None:
+ return coll
+ coll = Collection()
+ coll.creator = user.id
+ coll.title = name
+ coll.generate_slug()
+ coll.save()
+
+ # Reload
+ Session.refresh(coll)
+
+ # ... and detach from session:
+ Session.expunge(coll)
+
+ return coll
+