diff options
Diffstat (limited to 'mediagoblin/tests')
67 files changed, 5406 insertions, 0 deletions
diff --git a/mediagoblin/tests/__init__.py b/mediagoblin/tests/__init__.py new file mode 100644 index 00000000..cf200791 --- /dev/null +++ b/mediagoblin/tests/__init__.py @@ -0,0 +1,22 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +def setup_package(): + + import warnings + from sqlalchemy.exc import SAWarning + warnings.simplefilter("error", SAWarning) diff --git a/mediagoblin/tests/appconfig_context_modified.ini b/mediagoblin/tests/appconfig_context_modified.ini new file mode 100644 index 00000000..80ca69b1 --- /dev/null +++ b/mediagoblin/tests/appconfig_context_modified.ini @@ -0,0 +1,26 @@ +[mediagoblin] +direct_remote_path = /test_static/ +email_sender_address = "notice@mediagoblin.example.org" +email_debug_mode = true + +# TODO: Switch to using an in-memory database +sql_engine = "sqlite:///%(here)s/user_dev/mediagoblin.db" + +# Celery shouldn't be set up by the application as it's setup via +# mediagoblin.init.celery.from_celery +celery_setup_elsewhere = true + +[storage:publicstore] +base_dir = %(here)s/user_dev/media/public +base_url = /mgoblin_media/ + +[storage:queuestore] +base_dir = %(here)s/user_dev/media/queue + +[celery] +CELERY_ALWAYS_EAGER = true +CELERY_RESULT_DBURI = "sqlite:///%(here)s/user_dev/celery.db" +BROKER_HOST = "sqlite:///%(here)s/user_dev/kombu.db" + +[plugins] +[[mediagoblin.tests.testplugins.modify_context]] diff --git a/mediagoblin/tests/appconfig_plugin_specs.ini b/mediagoblin/tests/appconfig_plugin_specs.ini new file mode 100644 index 00000000..5511cd97 --- /dev/null +++ b/mediagoblin/tests/appconfig_plugin_specs.ini @@ -0,0 +1,21 @@ +[mediagoblin] +direct_remote_path = /mgoblin_static/ +email_sender_address = "notice@mediagoblin.example.org" + +## Uncomment and change to your DB's appropiate setting. +## Default is a local sqlite db "mediagoblin.db". +# sql_engine = postgresql:///gmg + +# set to false to enable sending notices +email_debug_mode = true + +# Set to false to disable registrations +allow_registration = true + +[plugins] +[[mediagoblin.tests.testplugins.pluginspec]] +some_string = "not blork" +some_int = "not an int" + +# this one shouldn't have its own config +[[mediagoblin.tests.testplugins.callables1]] diff --git a/mediagoblin/tests/appconfig_static_plugin.ini b/mediagoblin/tests/appconfig_static_plugin.ini new file mode 100644 index 00000000..dc251171 --- /dev/null +++ b/mediagoblin/tests/appconfig_static_plugin.ini @@ -0,0 +1,26 @@ +[mediagoblin] +direct_remote_path = /test_static/ +email_sender_address = "notice@mediagoblin.example.org" +email_debug_mode = true + +# TODO: Switch to using an in-memory database +sql_engine = "sqlite:///%(here)s/user_dev/mediagoblin.db" + +# Celery shouldn't be set up by the application as it's setup via +# mediagoblin.init.celery.from_celery +celery_setup_elsewhere = true + +[storage:publicstore] +base_dir = %(here)s/user_dev/media/public +base_url = /mgoblin_media/ + +[storage:queuestore] +base_dir = %(here)s/user_dev/media/queue + +[celery] +CELERY_ALWAYS_EAGER = true +CELERY_RESULT_DBURI = "sqlite:///%(here)s/user_dev/celery.db" +BROKER_HOST = "sqlite:///%(here)s/user_dev/kombu.db" + +[plugins] +[[mediagoblin.tests.testplugins.staticstuff]] diff --git a/mediagoblin/tests/conftest.py b/mediagoblin/tests/conftest.py new file mode 100644 index 00000000..dbb0aa0a --- /dev/null +++ b/mediagoblin/tests/conftest.py @@ -0,0 +1,41 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2013 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import pytest + +from mediagoblin.tests import tools +from mediagoblin.tools.testing import _activate_testing + + +@pytest.fixture() +def test_app(request): + """ + py.test fixture to pass sandboxed mediagoblin applications into tests that + want them. + + You could make a local version of this method for your own tests + to override the paste and config files being used by passing them + in differently to get_app. + """ + return tools.get_app(request) + + +@pytest.fixture() +def pt_fixture_enable_testing(): + """ + py.test fixture to enable testing mode in tools. + """ + _activate_testing() diff --git a/mediagoblin/tests/fake_carrot_conf_bad.ini b/mediagoblin/tests/fake_carrot_conf_bad.ini new file mode 100644 index 00000000..9d8cf518 --- /dev/null +++ b/mediagoblin/tests/fake_carrot_conf_bad.ini @@ -0,0 +1,14 @@ +[carrotapp] +# Whether or not our carrots are going to be turned into cake. +## These should throw errors +carrotcake = slobber +num_carrots = GROSS + +# A message encouraging our users to eat their carrots. +encouragement_phrase = 586956856856 # shouldn't throw error + +# Something extra! +blah_blah = "blah!" # shouldn't throw error either + +[celery] +EAT_CELERY_WITH_CARROTS = pants # yeah that's def an error right there. diff --git a/mediagoblin/tests/fake_carrot_conf_empty.ini b/mediagoblin/tests/fake_carrot_conf_empty.ini new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/mediagoblin/tests/fake_carrot_conf_empty.ini diff --git a/mediagoblin/tests/fake_carrot_conf_good.ini b/mediagoblin/tests/fake_carrot_conf_good.ini new file mode 100644 index 00000000..1377907b --- /dev/null +++ b/mediagoblin/tests/fake_carrot_conf_good.ini @@ -0,0 +1,13 @@ +[carrotapp] +# Whether or not our carrots are going to be turned into cake. +carrotcake = true +num_carrots = 88 + +# A message encouraging our users to eat their carrots. +encouragement_phrase = "I'd love it if you eat your carrots!" + +# Something extra! +blah_blah = "blah!" + +[celery] +EAT_CELERY_WITH_CARROTS = False diff --git a/mediagoblin/tests/fake_celery_conf.ini b/mediagoblin/tests/fake_celery_conf.ini new file mode 100644 index 00000000..67b0cba6 --- /dev/null +++ b/mediagoblin/tests/fake_celery_conf.ini @@ -0,0 +1,9 @@ +[mediagoblin] +# I got nothin' in this file! + +[celery] +SOME_VARIABLE = floop +MAIL_PORT = 2000 +CELERYD_ETA_SCHEDULER_PRECISION = 1.3 +CELERY_RESULT_PERSISTENT = true +CELERY_IMPORTS = foo.bar.baz, this.is.an.import diff --git a/mediagoblin/tests/fake_celery_module.py b/mediagoblin/tests/fake_celery_module.py new file mode 100644 index 00000000..621845ba --- /dev/null +++ b/mediagoblin/tests/fake_celery_module.py @@ -0,0 +1,15 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. diff --git a/mediagoblin/tests/fake_config_spec.ini b/mediagoblin/tests/fake_config_spec.ini new file mode 100644 index 00000000..43f2e236 --- /dev/null +++ b/mediagoblin/tests/fake_config_spec.ini @@ -0,0 +1,10 @@ +[carrotapp] +# Whether or not our carrots are going to be turned into cake. +carrotcake = boolean(default=False) +num_carrots = integer(default=1) + +# A message encouraging our users to eat their carrots. +encouragement_phrase = string() + +[celery] +EAT_CELERY_WITH_CARROTS = boolean(default=True)
\ No newline at end of file diff --git a/mediagoblin/tests/pytest.ini b/mediagoblin/tests/pytest.ini new file mode 100644 index 00000000..e561c074 --- /dev/null +++ b/mediagoblin/tests/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +usefixtures = tmpdir pt_fixture_enable_testing diff --git a/mediagoblin/tests/resources.py b/mediagoblin/tests/resources.py new file mode 100644 index 00000000..f7b3037d --- /dev/null +++ b/mediagoblin/tests/resources.py @@ -0,0 +1,41 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2013 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +from pkg_resources import resource_filename + + +def resource(filename): + return resource_filename('mediagoblin.tests', 'test_submission/' + filename) + + +GOOD_JPG = resource('good.jpg') +GOOD_PNG = resource('good.png') +EVIL_FILE = resource('evil') +EVIL_JPG = resource('evil.jpg') +EVIL_PNG = resource('evil.png') +BIG_BLUE = resource('bigblue.png') +GOOD_PDF = resource('good.pdf') + + +def resource_exif(f): + return resource_filename('mediagoblin.tests', 'test_exif/' + f) + + +GOOD_JPG = resource_exif('good.jpg') +EMPTY_JPG = resource_exif('empty.jpg') +BAD_JPG = resource_exif('bad.jpg') +GPS_JPG = resource_exif('has-gps.jpg') diff --git a/mediagoblin/tests/test_api.py b/mediagoblin/tests/test_api.py new file mode 100644 index 00000000..89cf1026 --- /dev/null +++ b/mediagoblin/tests/test_api.py @@ -0,0 +1,92 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +import logging +import base64 + +import pytest + +from mediagoblin import mg_globals +from mediagoblin.tools import template, pluginapi +from mediagoblin.tests.tools import fixture_add_user +from .resources import GOOD_JPG, GOOD_PNG, EVIL_FILE, EVIL_JPG, EVIL_PNG, \ + BIG_BLUE + + +_log = logging.getLogger(__name__) + + +class TestAPI(object): + def setup(self): + self.db = mg_globals.database + + self.user_password = u'4cc355_70k3N' + self.user = fixture_add_user(u'joapi', self.user_password) + + def login(self, test_app): + test_app.post( + '/auth/login/', { + 'username': self.user.username, + 'password': self.user_password}) + + def get_context(self, template_name): + return template.TEMPLATE_TEST_CONTEXT[template_name] + + def http_auth_headers(self): + return {'Authorization': 'Basic {0}'.format( + base64.b64encode(':'.join([ + self.user.username, + self.user_password])))} + + def do_post(self, data, test_app, **kwargs): + url = kwargs.pop('url', '/api/submit') + do_follow = kwargs.pop('do_follow', False) + + if not 'headers' in kwargs.keys(): + kwargs['headers'] = self.http_auth_headers() + + response = test_app.post(url, data, **kwargs) + + if do_follow: + response.follow() + + return response + + def upload_data(self, filename): + return {'upload_files': [('file', filename)]} + + def test_1_test_test_view(self, test_app): + self.login(test_app) + + response = test_app.get( + '/api/test', + headers=self.http_auth_headers()) + + assert response.body == \ + '{"username": "joapi", "email": "joapi@example.com"}' + + def test_2_test_submission(self, test_app): + self.login(test_app) + + response = self.do_post( + {'title': 'Great JPG!'}, + test_app, + **self.upload_data(GOOD_JPG)) + + assert response.status_int == 200 + + assert self.db.MediaEntry.query.filter_by(title=u'Great JPG!').first() diff --git a/mediagoblin/tests/test_auth.py b/mediagoblin/tests/test_auth.py new file mode 100644 index 00000000..755727f9 --- /dev/null +++ b/mediagoblin/tests/test_auth.py @@ -0,0 +1,396 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import urlparse +import datetime + +from mediagoblin import mg_globals +from mediagoblin.auth import lib as auth_lib +from mediagoblin.db.models import User +from mediagoblin.tests.tools import fixture_add_user +from mediagoblin.tools import template, mail + + +######################## +# Test bcrypt auth funcs +######################## + +def test_bcrypt_check_password(): + # Check known 'lollerskates' password against check function + assert auth_lib.bcrypt_check_password( + 'lollerskates', + '$2a$12$PXU03zfrVCujBhVeICTwtOaHTUs5FFwsscvSSTJkqx/2RQ0Lhy/nO') + + assert not auth_lib.bcrypt_check_password( + 'notthepassword', + '$2a$12$PXU03zfrVCujBhVeICTwtOaHTUs5FFwsscvSSTJkqx/2RQ0Lhy/nO') + + # Same thing, but with extra fake salt. + assert not auth_lib.bcrypt_check_password( + 'notthepassword', + '$2a$12$ELVlnw3z1FMu6CEGs/L8XO8vl0BuWSlUHgh0rUrry9DUXGMUNWwl6', + '3><7R45417') + + +def test_bcrypt_gen_password_hash(): + pw = 'youwillneverguessthis' + + # Normal password hash generation, and check on that hash + hashed_pw = auth_lib.bcrypt_gen_password_hash(pw) + assert auth_lib.bcrypt_check_password( + pw, hashed_pw) + assert not auth_lib.bcrypt_check_password( + 'notthepassword', hashed_pw) + + # Same thing, extra salt. + hashed_pw = auth_lib.bcrypt_gen_password_hash(pw, '3><7R45417') + assert auth_lib.bcrypt_check_password( + pw, hashed_pw, '3><7R45417') + assert not auth_lib.bcrypt_check_password( + 'notthepassword', hashed_pw, '3><7R45417') + + +def test_register_views(test_app): + """ + Massive test function that all our registration-related views all work. + """ + # Test doing a simple GET on the page + # ----------------------------------- + + test_app.get('/auth/register/') + # Make sure it rendered with the appropriate template + assert 'mediagoblin/auth/register.html' in template.TEMPLATE_TEST_CONTEXT + + # Try to register without providing anything, should error + # -------------------------------------------------------- + + template.clear_test_template_context() + test_app.post( + '/auth/register/', {}) + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html'] + form = context['register_form'] + assert form.username.errors == [u'This field is required.'] + assert form.password.errors == [u'This field is required.'] + assert form.email.errors == [u'This field is required.'] + + # Try to register with fields that are known to be invalid + # -------------------------------------------------------- + + ## too short + template.clear_test_template_context() + test_app.post( + '/auth/register/', { + 'username': 'l', + 'password': 'o', + 'email': 'l'}) + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html'] + form = context['register_form'] + + assert form.username.errors == [u'Field must be between 3 and 30 characters long.'] + assert form.password.errors == [u'Field must be between 5 and 1024 characters long.'] + + ## bad form + template.clear_test_template_context() + test_app.post( + '/auth/register/', { + 'username': '@_@', + 'email': 'lollerskates'}) + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html'] + form = context['register_form'] + + assert form.username.errors == [u'This field does not take email addresses.'] + assert form.email.errors == [u'This field requires an email address.'] + + ## At this point there should be no users in the database ;) + assert User.query.count() == 0 + + # Successful register + # ------------------- + template.clear_test_template_context() + response = test_app.post( + '/auth/register/', { + 'username': u'happygirl', + 'password': 'iamsohappy', + 'email': 'happygrrl@example.org'}) + response.follow() + + ## Did we redirect to the proper page? Use the right template? + assert urlparse.urlsplit(response.location)[2] == '/u/happygirl/' + assert 'mediagoblin/user_pages/user.html' in template.TEMPLATE_TEST_CONTEXT + + ## Make sure user is in place + new_user = mg_globals.database.User.find_one( + {'username': u'happygirl'}) + assert new_user + assert new_user.status == u'needs_email_verification' + assert new_user.email_verified == False + + ## Make sure user is logged in + request = template.TEMPLATE_TEST_CONTEXT[ + 'mediagoblin/user_pages/user.html']['request'] + assert request.session['user_id'] == unicode(new_user.id) + + ## Make sure we get email confirmation, and try verifying + assert len(mail.EMAIL_TEST_INBOX) == 1 + message = mail.EMAIL_TEST_INBOX.pop() + assert message['To'] == 'happygrrl@example.org' + email_context = template.TEMPLATE_TEST_CONTEXT[ + 'mediagoblin/auth/verification_email.txt'] + assert email_context['verification_url'] in message.get_payload(decode=True) + + path = urlparse.urlsplit(email_context['verification_url'])[2] + get_params = urlparse.urlsplit(email_context['verification_url'])[3] + assert path == u'/auth/verify_email/' + parsed_get_params = urlparse.parse_qs(get_params) + + ### user should have these same parameters + assert parsed_get_params['userid'] == [ + unicode(new_user.id)] + assert parsed_get_params['token'] == [ + new_user.verification_key] + + ## Try verifying with bs verification key, shouldn't work + template.clear_test_template_context() + response = test_app.get( + "/auth/verify_email/?userid=%s&token=total_bs" % unicode( + new_user.id)) + response.follow() + context = template.TEMPLATE_TEST_CONTEXT[ + 'mediagoblin/user_pages/user.html'] + # assert context['verification_successful'] == True + # TODO: Would be good to test messages here when we can do so... + new_user = mg_globals.database.User.find_one( + {'username': u'happygirl'}) + assert new_user + assert new_user.status == u'needs_email_verification' + assert new_user.email_verified == False + + ## Verify the email activation works + template.clear_test_template_context() + response = test_app.get("%s?%s" % (path, get_params)) + response.follow() + context = template.TEMPLATE_TEST_CONTEXT[ + 'mediagoblin/user_pages/user.html'] + # assert context['verification_successful'] == True + # TODO: Would be good to test messages here when we can do so... + new_user = mg_globals.database.User.find_one( + {'username': u'happygirl'}) + assert new_user + assert new_user.status == u'active' + assert new_user.email_verified == True + + # Uniqueness checks + # ----------------- + ## We shouldn't be able to register with that user twice + template.clear_test_template_context() + response = test_app.post( + '/auth/register/', { + 'username': u'happygirl', + 'password': 'iamsohappy2', + 'email': 'happygrrl2@example.org'}) + + context = template.TEMPLATE_TEST_CONTEXT[ + 'mediagoblin/auth/register.html'] + form = context['register_form'] + assert form.username.errors == [ + u'Sorry, a user with that name already exists.'] + + ## TODO: Also check for double instances of an email address? + + ### Oops, forgot the password + # ------------------- + template.clear_test_template_context() + response = test_app.post( + '/auth/forgot_password/', + {'username': u'happygirl'}) + response.follow() + + ## Did we redirect to the proper page? Use the right template? + assert urlparse.urlsplit(response.location)[2] == '/auth/login/' + assert 'mediagoblin/auth/login.html' in template.TEMPLATE_TEST_CONTEXT + + ## Make sure link to change password is sent by email + assert len(mail.EMAIL_TEST_INBOX) == 1 + message = mail.EMAIL_TEST_INBOX.pop() + assert message['To'] == 'happygrrl@example.org' + email_context = template.TEMPLATE_TEST_CONTEXT[ + 'mediagoblin/auth/fp_verification_email.txt'] + #TODO - change the name of verification_url to something forgot-password-ish + assert email_context['verification_url'] in message.get_payload(decode=True) + + path = urlparse.urlsplit(email_context['verification_url'])[2] + get_params = urlparse.urlsplit(email_context['verification_url'])[3] + assert path == u'/auth/forgot_password/verify/' + parsed_get_params = urlparse.parse_qs(get_params) + + # user should have matching parameters + new_user = mg_globals.database.User.find_one({'username': u'happygirl'}) + assert parsed_get_params['userid'] == [unicode(new_user.id)] + assert parsed_get_params['token'] == [new_user.fp_verification_key] + + ### The forgotten password token should be set to expire in ~ 10 days + # A few ticks have expired so there are only 9 full days left... + assert (new_user.fp_token_expire - datetime.datetime.now()).days == 9 + + ## Try using a bs password-changing verification key, shouldn't work + template.clear_test_template_context() + response = test_app.get( + "/auth/forgot_password/verify/?userid=%s&token=total_bs" % unicode( + new_user.id), status=404) + assert response.status.split()[0] == u'404' # status="404 NOT FOUND" + + ## Try using an expired token to change password, shouldn't work + template.clear_test_template_context() + new_user = mg_globals.database.User.find_one({'username': u'happygirl'}) + real_token_expiration = new_user.fp_token_expire + new_user.fp_token_expire = datetime.datetime.now() + new_user.save() + response = test_app.get("%s?%s" % (path, get_params), status=404) + assert response.status.split()[0] == u'404' # status="404 NOT FOUND" + new_user.fp_token_expire = real_token_expiration + new_user.save() + + ## Verify step 1 of password-change works -- can see form to change password + template.clear_test_template_context() + response = test_app.get("%s?%s" % (path, get_params)) + assert 'mediagoblin/auth/change_fp.html' in template.TEMPLATE_TEST_CONTEXT + + ## Verify step 2.1 of password-change works -- report success to user + template.clear_test_template_context() + response = test_app.post( + '/auth/forgot_password/verify/', { + 'userid': parsed_get_params['userid'], + 'password': 'iamveryveryhappy', + 'token': parsed_get_params['token']}) + response.follow() + assert 'mediagoblin/auth/login.html' in template.TEMPLATE_TEST_CONTEXT + + ## Verify step 2.2 of password-change works -- login w/ new password success + template.clear_test_template_context() + response = test_app.post( + '/auth/login/', { + 'username': u'happygirl', + 'password': 'iamveryveryhappy'}) + + # User should be redirected + response.follow() + assert urlparse.urlsplit(response.location)[2] == '/' + assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT + + +def test_authentication_views(test_app): + """ + Test logging in and logging out + """ + # Make a new user + test_user = fixture_add_user(active_user=False) + + # Get login + # --------- + test_app.get('/auth/login/') + assert 'mediagoblin/auth/login.html' in template.TEMPLATE_TEST_CONTEXT + + # Failed login - blank form + # ------------------------- + template.clear_test_template_context() + response = test_app.post('/auth/login/') + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/login.html'] + form = context['login_form'] + assert form.username.errors == [u'This field is required.'] + assert form.password.errors == [u'This field is required.'] + + # Failed login - blank user + # ------------------------- + template.clear_test_template_context() + response = test_app.post( + '/auth/login/', { + 'password': u'toast'}) + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/login.html'] + form = context['login_form'] + assert form.username.errors == [u'This field is required.'] + + # Failed login - blank password + # ----------------------------- + template.clear_test_template_context() + response = test_app.post( + '/auth/login/', { + 'username': u'chris'}) + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/login.html'] + form = context['login_form'] + assert form.password.errors == [u'This field is required.'] + + # Failed login - bad user + # ----------------------- + template.clear_test_template_context() + response = test_app.post( + '/auth/login/', { + 'username': u'steve', + 'password': 'toast'}) + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/login.html'] + assert context['login_failed'] + + # Failed login - bad password + # --------------------------- + template.clear_test_template_context() + response = test_app.post( + '/auth/login/', { + 'username': u'chris', + 'password': 'jam_and_ham'}) + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/login.html'] + assert context['login_failed'] + + # Successful login + # ---------------- + template.clear_test_template_context() + response = test_app.post( + '/auth/login/', { + 'username': u'chris', + 'password': 'toast'}) + + # User should be redirected + response.follow() + assert urlparse.urlsplit(response.location)[2] == '/' + assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT + + # Make sure user is in the session + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html'] + session = context['request'].session + assert session['user_id'] == unicode(test_user.id) + + # Successful logout + # ----------------- + template.clear_test_template_context() + response = test_app.get('/auth/logout/') + + # Should be redirected to index page + response.follow() + assert urlparse.urlsplit(response.location)[2] == '/' + assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT + + # Make sure the user is not in the session + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html'] + session = context['request'].session + assert 'user_id' not in session + + # User is redirected to custom URL if POST['next'] is set + # ------------------------------------------------------- + template.clear_test_template_context() + response = test_app.post( + '/auth/login/', { + 'username': u'chris', + 'password': 'toast', + 'next' : '/u/chris/'}) + assert urlparse.urlsplit(response.location)[2] == '/u/chris/' diff --git a/mediagoblin/tests/test_celery_setup.py b/mediagoblin/tests/test_celery_setup.py new file mode 100644 index 00000000..5530c6f2 --- /dev/null +++ b/mediagoblin/tests/test_celery_setup.py @@ -0,0 +1,60 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import pkg_resources + +from mediagoblin.init import celery as celery_setup +from mediagoblin.init.config import read_mediagoblin_config + + +TEST_CELERY_CONF_NOSPECIALDB = pkg_resources.resource_filename( + 'mediagoblin.tests', 'fake_celery_conf.ini') + + +def test_setup_celery_from_config(): + def _wipe_testmodule_clean(module): + vars_to_wipe = [ + var for var in dir(module) + if not var.startswith('__') and not var.endswith('__')] + for var in vars_to_wipe: + delattr(module, var) + + global_config, validation_result = read_mediagoblin_config( + TEST_CELERY_CONF_NOSPECIALDB) + app_config = global_config['mediagoblin'] + + celery_setup.setup_celery_from_config( + app_config, global_config, + 'mediagoblin.tests.fake_celery_module', set_environ=False) + + from mediagoblin.tests import fake_celery_module + assert fake_celery_module.SOME_VARIABLE == 'floop' + assert fake_celery_module.MAIL_PORT == 2000 + assert isinstance(fake_celery_module.MAIL_PORT, int) + assert fake_celery_module.CELERYD_ETA_SCHEDULER_PRECISION == 1.3 + assert isinstance(fake_celery_module.CELERYD_ETA_SCHEDULER_PRECISION, float) + assert fake_celery_module.CELERY_RESULT_PERSISTENT is True + assert fake_celery_module.CELERY_IMPORTS == [ + 'foo.bar.baz', 'this.is.an.import', 'mediagoblin.processing.task'] + assert fake_celery_module.CELERY_RESULT_BACKEND == 'database' + assert fake_celery_module.CELERY_RESULT_DBURI == ( + 'sqlite:///' + + pkg_resources.resource_filename('mediagoblin.tests', 'celery.db')) + + assert fake_celery_module.BROKER_TRANSPORT == 'sqlalchemy' + assert fake_celery_module.BROKER_HOST == ( + 'sqlite:///' + + pkg_resources.resource_filename('mediagoblin.tests', 'kombu.db')) diff --git a/mediagoblin/tests/test_collections.py b/mediagoblin/tests/test_collections.py new file mode 100644 index 00000000..87782f30 --- /dev/null +++ b/mediagoblin/tests/test_collections.py @@ -0,0 +1,32 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2013 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from mediagoblin.tests.tools import fixture_add_collection, fixture_add_user +from mediagoblin.db.models import Collection, User + + +def test_user_deletes_collection(test_app): + # Setup db. + user = fixture_add_user() + coll = fixture_add_collection(user=user) + # Reload into session: + user = User.query.get(user.id) + + cnt1 = Collection.query.count() + user.delete() + cnt2 = Collection.query.count() + + assert cnt1 == cnt2 + 1 diff --git a/mediagoblin/tests/test_config.py b/mediagoblin/tests/test_config.py new file mode 100644 index 00000000..b13adae6 --- /dev/null +++ b/mediagoblin/tests/test_config.py @@ -0,0 +1,97 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import pkg_resources + +from mediagoblin.init import config + + +CARROT_CONF_GOOD = pkg_resources.resource_filename( + 'mediagoblin.tests', 'fake_carrot_conf_good.ini') +CARROT_CONF_EMPTY = pkg_resources.resource_filename( + 'mediagoblin.tests', 'fake_carrot_conf_empty.ini') +CARROT_CONF_BAD = pkg_resources.resource_filename( + 'mediagoblin.tests', 'fake_carrot_conf_bad.ini') +FAKE_CONFIG_SPEC = pkg_resources.resource_filename( + 'mediagoblin.tests', 'fake_config_spec.ini') + + +def test_read_mediagoblin_config(): + # An empty file + this_conf, validation_results = config.read_mediagoblin_config( + CARROT_CONF_EMPTY, FAKE_CONFIG_SPEC) + + assert this_conf['carrotapp']['carrotcake'] == False + assert this_conf['carrotapp']['num_carrots'] == 1 + assert 'encouragement_phrase' not in this_conf['carrotapp'] + assert this_conf['celery']['EAT_CELERY_WITH_CARROTS'] == True + + # A good file + this_conf, validation_results = config.read_mediagoblin_config( + CARROT_CONF_GOOD, FAKE_CONFIG_SPEC) + + assert this_conf['carrotapp']['carrotcake'] == True + assert this_conf['carrotapp']['num_carrots'] == 88 + assert this_conf['carrotapp']['encouragement_phrase'] == \ + "I'd love it if you eat your carrots!" + assert this_conf['carrotapp']['blah_blah'] == "blah!" + assert this_conf['celery']['EAT_CELERY_WITH_CARROTS'] == False + + # A bad file + this_conf, validation_results = config.read_mediagoblin_config( + CARROT_CONF_BAD, FAKE_CONFIG_SPEC) + + # These should still open but will have errors that we'll test for + # in test_generate_validation_report() + assert this_conf['carrotapp']['carrotcake'] == 'slobber' + assert this_conf['carrotapp']['num_carrots'] == 'GROSS' + assert this_conf['carrotapp']['encouragement_phrase'] == \ + "586956856856" + assert this_conf['carrotapp']['blah_blah'] == "blah!" + assert this_conf['celery']['EAT_CELERY_WITH_CARROTS'] == "pants" + + +def test_generate_validation_report(): + # Empty + this_conf, validation_results = config.read_mediagoblin_config( + CARROT_CONF_EMPTY, FAKE_CONFIG_SPEC) + report = config.generate_validation_report(this_conf, validation_results) + assert report is None + + # Good + this_conf, validation_results = config.read_mediagoblin_config( + CARROT_CONF_GOOD, FAKE_CONFIG_SPEC) + report = config.generate_validation_report(this_conf, validation_results) + assert report is None + + # Bad + this_conf, validation_results = config.read_mediagoblin_config( + CARROT_CONF_BAD, FAKE_CONFIG_SPEC) + report = config.generate_validation_report(this_conf, validation_results) + + assert report.startswith("""\ +There were validation problems loading this config file: +--------------------------------------------------------""") + + expected_warnings = [ + 'carrotapp:carrotcake = the value "slobber" is of the wrong type.', + 'carrotapp:num_carrots = the value "GROSS" is of the wrong type.', + 'celery:EAT_CELERY_WITH_CARROTS = the value "pants" is of the wrong type.'] + warnings = report.splitlines()[2:] + + assert len(warnings) == 3 + for warning in expected_warnings: + assert warning in warnings diff --git a/mediagoblin/tests/test_csrf_middleware.py b/mediagoblin/tests/test_csrf_middleware.py new file mode 100644 index 00000000..a272caf6 --- /dev/null +++ b/mediagoblin/tests/test_csrf_middleware.py @@ -0,0 +1,86 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from mediagoblin import mg_globals + + +def test_csrf_cookie_set(test_app): + cookie_name = mg_globals.app_config['csrf_cookie_name'] + + # get login page + response = test_app.get('/auth/login/') + + # assert that the mediagoblin nonce cookie has been set + assert 'Set-Cookie' in response.headers + assert cookie_name in response.cookies_set + + # assert that we're also sending a vary header + assert response.headers.get('Vary', False) == 'Cookie' + + +# We need a fresh app for this test on webtest < 1.3.6. +# We do not understand why, but it fixes the tests. +# If we require webtest >= 1.3.6, we can switch to a non fresh app here. +# +# ... this comment might be irrelevant post-pytest-fixtures, but I'm not +# removing it yet in case we move to module-level tests :) +# -- cwebber +def test_csrf_token_must_match(test_app): + + # construct a request with no cookie or form token + assert test_app.post('/auth/login/', + extra_environ={'gmg.verify_csrf': True}, + expect_errors=True).status_int == 403 + + # construct a request with a cookie, but no form token + assert test_app.post('/auth/login/', + headers={'Cookie': str('%s=foo' % + mg_globals.app_config['csrf_cookie_name'])}, + extra_environ={'gmg.verify_csrf': True}, + expect_errors=True).status_int == 403 + + # if both the cookie and form token are provided, they must match + assert test_app.post('/auth/login/', + {'csrf_token': 'blarf'}, + headers={'Cookie': str('%s=foo' % + mg_globals.app_config['csrf_cookie_name'])}, + extra_environ={'gmg.verify_csrf': True}, + expect_errors=True).\ + status_int == 403 + + assert test_app.post('/auth/login/', + {'csrf_token': 'foo'}, + headers={'Cookie': str('%s=foo' % + mg_globals.app_config['csrf_cookie_name'])}, + extra_environ={'gmg.verify_csrf': True}).\ + status_int == 200 + +def test_csrf_exempt(test_app): + # monkey with the views to decorate a known endpoint + import mediagoblin.auth.views + from mediagoblin.meddleware.csrf import csrf_exempt + + mediagoblin.auth.views.login = csrf_exempt( + mediagoblin.auth.views.login + ) + + # construct a request with no cookie or form token + assert test_app.post('/auth/login/', + extra_environ={'gmg.verify_csrf': True}, + expect_errors=False).status_int == 200 + + # restore the CSRF protection in case other tests expect it + mediagoblin.auth.views.login.csrf_enabled = True diff --git a/mediagoblin/tests/test_edit.py b/mediagoblin/tests/test_edit.py new file mode 100644 index 00000000..08b4f8cf --- /dev/null +++ b/mediagoblin/tests/test_edit.py @@ -0,0 +1,144 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import urlparse +import pytest + +from mediagoblin import mg_globals +from mediagoblin.db.models import User +from mediagoblin.tests.tools import fixture_add_user +from mediagoblin.tools import template +from mediagoblin.auth.lib import bcrypt_check_password + +class TestUserEdit(object): + def setup(self): + # set up new user + self.user_password = u'toast' + self.user = fixture_add_user(password = self.user_password) + + def login(self, test_app): + test_app.post( + '/auth/login/', { + 'username': self.user.username, + 'password': self.user_password}) + + + def test_user_deletion(self, test_app): + """Delete user via web interface""" + self.login(test_app) + + # Make sure user exists + assert User.query.filter_by(username=u'chris').first() + + res = test_app.post('/edit/account/delete/', {'confirmed': 'y'}) + + # Make sure user has been deleted + assert User.query.filter_by(username=u'chris').first() == None + + #TODO: make sure all corresponding items comments etc have been + # deleted too. Perhaps in submission test? + + #Restore user at end of test + self.user = fixture_add_user(password = self.user_password) + self.login(test_app) + + + def test_change_password(self, test_app): + """Test changing password correctly and incorrectly""" + self.login(test_app) + + # test that the password can be changed + template.clear_test_template_context() + res = test_app.post( + '/edit/password/', { + 'old_password': 'toast', + 'new_password': '123456', + }) + res.follow() + + # Did we redirect to the correct page? + assert urlparse.urlsplit(res.location)[2] == '/edit/account/' + + # test_user has to be fetched again in order to have the current values + test_user = User.query.filter_by(username=u'chris').first() + assert bcrypt_check_password('123456', test_user.pw_hash) + # Update current user passwd + self.user_password = '123456' + + # test that the password cannot be changed if the given + # old_password is wrong + template.clear_test_template_context() + test_app.post( + '/edit/password/', { + 'old_password': 'toast', + 'new_password': '098765', + }) + + test_user = User.query.filter_by(username=u'chris').first() + assert not bcrypt_check_password('098765', test_user.pw_hash) + + + def test_change_bio_url(self, test_app): + """Test changing bio and URL""" + self.login(test_app) + + # Test if legacy profile editing URL redirects correctly + res = test_app.post( + '/edit/profile/', { + 'bio': u'I love toast!', + 'url': u'http://dustycloud.org/'}, expect_errors=True) + + # Should redirect to /u/chris/edit/ + assert res.status_int == 302 + assert res.headers['Location'].endswith("/u/chris/edit/") + + res = test_app.post( + '/u/chris/edit/', { + 'bio': u'I love toast!', + 'url': u'http://dustycloud.org/'}) + + test_user = User.query.filter_by(username=u'chris').first() + assert test_user.bio == u'I love toast!' + assert test_user.url == u'http://dustycloud.org/' + + # change a different user than the logged in (should fail with 403) + fixture_add_user(username=u"foo") + res = test_app.post( + '/u/foo/edit/', { + 'bio': u'I love toast!', + 'url': u'http://dustycloud.org/'}, expect_errors=True) + assert res.status_int == 403 + + # test changing the bio and the URL inproperly + too_long_bio = 150 * 'T' + 150 * 'o' + 150 * 'a' + 150 * 's' + 150* 't' + + test_app.post( + '/u/chris/edit/', { + # more than 500 characters + 'bio': too_long_bio, + 'url': 'this-is-no-url'}) + + # Check form errors + context = template.TEMPLATE_TEST_CONTEXT[ + 'mediagoblin/edit/edit_profile.html'] + form = context['form'] + + assert form.bio.errors == [ + u'Field must be between 0 and 500 characters long.'] + assert form.url.errors == [ + u'This address contains errors'] + +# test changing the url inproperly diff --git a/mediagoblin/tests/test_exif.py b/mediagoblin/tests/test_exif.py new file mode 100644 index 00000000..c07e24ae --- /dev/null +++ b/mediagoblin/tests/test_exif.py @@ -0,0 +1,431 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +try: + from PIL import Image +except ImportError: + import Image + +from mediagoblin.tools.exif import exif_fix_image_orientation, \ + extract_exif, clean_exif, get_gps_data, get_useful +from .resources import GOOD_JPG, EMPTY_JPG, BAD_JPG, GPS_JPG + + +def assert_in(a, b): + assert a in b, "%r not in %r" % (a, b) + + +def test_exif_extraction(): + ''' + Test EXIF extraction from a good image + ''' + result = extract_exif(GOOD_JPG) + clean = clean_exif(result) + useful = get_useful(clean) + gps = get_gps_data(result) + + # Do we have the result? + assert len(result) == 56 + + # Do we have clean data? + assert len(clean) == 53 + + # GPS data? + assert gps == {} + + # Do we have the "useful" tags? + assert useful == {'EXIF CVAPattern': {'field_length': 8, + 'field_offset': 26224, + 'field_type': 7, + 'printable': u'[0, 2, 0, 2, 1, 2, 0, 1]', + 'tag': 41730, + 'values': [0, 2, 0, 2, 1, 2, 0, 1]}, + 'EXIF ColorSpace': {'field_length': 2, + 'field_offset': 476, + 'field_type': 3, + 'printable': u'sRGB', + 'tag': 40961, + 'values': [1]}, + 'EXIF ComponentsConfiguration': {'field_length': 4, + 'field_offset': 308, + 'field_type': 7, + 'printable': u'YCbCr', + 'tag': 37121, + 'values': [1, 2, 3, 0]}, + 'EXIF CompressedBitsPerPixel': {'field_length': 8, + 'field_offset': 756, + 'field_type': 5, + 'printable': u'4', + 'tag': 37122, + 'values': [[4, 1]]}, + 'EXIF Contrast': {'field_length': 2, + 'field_offset': 656, + 'field_type': 3, + 'printable': u'Soft', + 'tag': 41992, + 'values': [1]}, + 'EXIF CustomRendered': {'field_length': 2, + 'field_offset': 572, + 'field_type': 3, + 'printable': u'Normal', + 'tag': 41985, + 'values': [0]}, + 'EXIF DateTimeDigitized': {'field_length': 20, + 'field_offset': 736, + 'field_type': 2, + 'printable': u'2011:06:22 12:20:33', + 'tag': 36868, + 'values': u'2011:06:22 12:20:33'}, + 'EXIF DateTimeOriginal': {'field_length': 20, + 'field_offset': 716, + 'field_type': 2, + 'printable': u'2011:06:22 12:20:33', + 'tag': 36867, + 'values': u'2011:06:22 12:20:33'}, + 'EXIF DigitalZoomRatio': {'field_length': 8, + 'field_offset': 26232, + 'field_type': 5, + 'printable': u'1', + 'tag': 41988, + 'values': [[1, 1]]}, + 'EXIF ExifImageLength': {'field_length': 2, + 'field_offset': 500, + 'field_type': 3, + 'printable': u'2592', + 'tag': 40963, + 'values': [2592]}, + 'EXIF ExifImageWidth': {'field_length': 2, + 'field_offset': 488, + 'field_type': 3, + 'printable': u'3872', + 'tag': 40962, + 'values': [3872]}, + 'EXIF ExifVersion': {'field_length': 4, + 'field_offset': 272, + 'field_type': 7, + 'printable': u'0221', + 'tag': 36864, + 'values': [48, 50, 50, 49]}, + 'EXIF ExposureBiasValue': {'field_length': 8, + 'field_offset': 764, + 'field_type': 10, + 'printable': u'0', + 'tag': 37380, + 'values': [[0, 1]]}, + 'EXIF ExposureMode': {'field_length': 2, + 'field_offset': 584, + 'field_type': 3, + 'printable': u'Manual Exposure', + 'tag': 41986, + 'values': [1]}, + 'EXIF ExposureProgram': {'field_length': 2, + 'field_offset': 248, + 'field_type': 3, + 'printable': u'Manual', + 'tag': 34850, + 'values': [1]}, + 'EXIF ExposureTime': {'field_length': 8, + 'field_offset': 700, + 'field_type': 5, + 'printable': u'1/125', + 'tag': 33434, + 'values': [[1, 125]]}, + 'EXIF FNumber': {'field_length': 8, + 'field_offset': 708, + 'field_type': 5, + 'printable': u'10', + 'tag': 33437, + 'values': [[10, 1]]}, + 'EXIF FileSource': {'field_length': 1, + 'field_offset': 536, + 'field_type': 7, + 'printable': u'Digital Camera', + 'tag': 41728, + 'values': [3]}, + 'EXIF Flash': {'field_length': 2, + 'field_offset': 380, + 'field_type': 3, + 'printable': u'Flash did not fire', + 'tag': 37385, + 'values': [0]}, + 'EXIF FlashPixVersion': {'field_length': 4, + 'field_offset': 464, + 'field_type': 7, + 'printable': u'0100', + 'tag': 40960, + 'values': [48, 49, 48, 48]}, + 'EXIF FocalLength': {'field_length': 8, + 'field_offset': 780, + 'field_type': 5, + 'printable': u'18', + 'tag': 37386, + 'values': [[18, 1]]}, + 'EXIF FocalLengthIn35mmFilm': {'field_length': 2, + 'field_offset': 620, + 'field_type': 3, + 'printable': u'27', + 'tag': 41989, + 'values': [27]}, + 'EXIF GainControl': {'field_length': 2, + 'field_offset': 644, + 'field_type': 3, + 'printable': u'None', + 'tag': 41991, + 'values': [0]}, + 'EXIF ISOSpeedRatings': {'field_length': 2, + 'field_offset': 260, + 'field_type': 3, + 'printable': u'100', + 'tag': 34855, + 'values': [100]}, + 'EXIF InteroperabilityOffset': {'field_length': 4, + 'field_offset': 512, + 'field_type': 4, + 'printable': u'26240', + 'tag': 40965, + 'values': [26240]}, + 'EXIF LightSource': {'field_length': 2, + 'field_offset': 368, + 'field_type': 3, + 'printable': u'Unknown', + 'tag': 37384, + 'values': [0]}, + 'EXIF MaxApertureValue': {'field_length': 8, + 'field_offset': 772, + 'field_type': 5, + 'printable': u'18/5', + 'tag': 37381, + 'values': [[18, 5]]}, + 'EXIF MeteringMode': {'field_length': 2, + 'field_offset': 356, + 'field_type': 3, + 'printable': u'Pattern', + 'tag': 37383, + 'values': [5]}, + 'EXIF Saturation': {'field_length': 2, + 'field_offset': 668, + 'field_type': 3, + 'printable': u'Normal', + 'tag': 41993, + 'values': [0]}, + 'EXIF SceneCaptureType': {'field_length': 2, + 'field_offset': 632, + 'field_type': 3, + 'printable': u'Standard', + 'tag': 41990, + 'values': [0]}, + 'EXIF SceneType': {'field_length': 1, + 'field_offset': 548, + 'field_type': 7, + 'printable': u'Directly Photographed', + 'tag': 41729, + 'values': [1]}, + 'EXIF SensingMethod': {'field_length': 2, + 'field_offset': 524, + 'field_type': 3, + 'printable': u'One-chip color area', + 'tag': 41495, + 'values': [2]}, + 'EXIF Sharpness': {'field_length': 2, + 'field_offset': 680, + 'field_type': 3, + 'printable': u'Normal', + 'tag': 41994, + 'values': [0]}, + 'EXIF SubSecTime': {'field_length': 3, + 'field_offset': 428, + 'field_type': 2, + 'printable': u'10', + 'tag': 37520, + 'values': u'10'}, + 'EXIF SubSecTimeDigitized': {'field_length': 3, + 'field_offset': 452, + 'field_type': 2, + 'printable': u'10', + 'tag': 37522, + 'values': u'10'}, + 'EXIF SubSecTimeOriginal': {'field_length': 3, + 'field_offset': 440, + 'field_type': 2, + 'printable': u'10', + 'tag': 37521, + 'values': u'10'}, + 'EXIF SubjectDistanceRange': {'field_length': 2, + 'field_offset': 692, + 'field_type': 3, + 'printable': u'0', + 'tag': 41996, + 'values': [0]}, + 'EXIF WhiteBalance': {'field_length': 2, + 'field_offset': 596, + 'field_type': 3, + 'printable': u'Auto', + 'tag': 41987, + 'values': [0]}, + 'Image DateTime': {'field_length': 20, + 'field_offset': 194, + 'field_type': 2, + 'printable': u'2011:06:22 12:20:33', + 'tag': 306, + 'values': u'2011:06:22 12:20:33'}, + 'Image ExifOffset': {'field_length': 4, + 'field_offset': 126, + 'field_type': 4, + 'printable': u'214', + 'tag': 34665, + 'values': [214]}, + 'Image Make': {'field_length': 18, + 'field_offset': 134, + 'field_type': 2, + 'printable': u'NIKON CORPORATION', + 'tag': 271, + 'values': u'NIKON CORPORATION'}, + 'Image Model': {'field_length': 10, + 'field_offset': 152, + 'field_type': 2, + 'printable': u'NIKON D80', + 'tag': 272, + 'values': u'NIKON D80'}, + 'Image Orientation': {'field_length': 2, + 'field_offset': 42, + 'field_type': 3, + 'printable': u'Rotated 90 CCW', + 'tag': 274, + 'values': [6]}, + 'Image ResolutionUnit': {'field_length': 2, + 'field_offset': 78, + 'field_type': 3, + 'printable': u'Pixels/Inch', + 'tag': 296, + 'values': [2]}, + 'Image Software': {'field_length': 15, + 'field_offset': 178, + 'field_type': 2, + 'printable': u'Shotwell 0.9.3', + 'tag': 305, + 'values': u'Shotwell 0.9.3'}, + 'Image XResolution': {'field_length': 8, + 'field_offset': 162, + 'field_type': 5, + 'printable': u'300', + 'tag': 282, + 'values': [[300, 1]]}, + 'Image YCbCrPositioning': {'field_length': 2, + 'field_offset': 114, + 'field_type': 3, + 'printable': u'Co-sited', + 'tag': 531, + 'values': [2]}, + 'Image YResolution': {'field_length': 8, + 'field_offset': 170, + 'field_type': 5, + 'printable': u'300', + 'tag': 283, + 'values': [[300, 1]]}, + 'Thumbnail Compression': {'field_length': 2, + 'field_offset': 26280, + 'field_type': 3, + 'printable': u'JPEG (old-style)', + 'tag': 259, + 'values': [6]}, + 'Thumbnail ResolutionUnit': {'field_length': 2, + 'field_offset': 26316, + 'field_type': 3, + 'printable': u'Pixels/Inch', + 'tag': 296, + 'values': [2]}, + 'Thumbnail XResolution': {'field_length': 8, + 'field_offset': 26360, + 'field_type': 5, + 'printable': u'300', + 'tag': 282, + 'values': [[300, 1]]}, + 'Thumbnail YCbCrPositioning': {'field_length': 2, + 'field_offset': 26352, + 'field_type': 3, + 'printable': u'Co-sited', + 'tag': 531, + 'values': [2]}, + 'Thumbnail YResolution': {'field_length': 8, + 'field_offset': 26368, + 'field_type': 5, + 'printable': u'300', + 'tag': 283, + 'values': [[300, 1]]}} + + +def test_exif_image_orientation(): + ''' + Test image reorientation based on EXIF data + ''' + result = extract_exif(GOOD_JPG) + + image = exif_fix_image_orientation( + Image.open(GOOD_JPG), + result) + + # Are the dimensions correct? + assert image.size == (428, 640) + + # If this pixel looks right, the rest of the image probably will too. + assert_in(image.getdata()[10000], + ((41, 28, 11), (43, 27, 11)) + ) + + +def test_exif_no_exif(): + ''' + Test an image without exif + ''' + result = extract_exif(EMPTY_JPG) + clean = clean_exif(result) + useful = get_useful(clean) + gps = get_gps_data(result) + + assert result == {} + assert clean == {} + assert gps == {} + assert useful == {} + + +def test_exif_bad_image(): + ''' + Test EXIF extraction from a faithful, but bad image + ''' + result = extract_exif(BAD_JPG) + clean = clean_exif(result) + useful = get_useful(clean) + gps = get_gps_data(result) + + assert result == {} + assert clean == {} + assert gps == {} + assert useful == {} + + +def test_exif_gps_data(): + ''' + Test extractiion of GPS data + ''' + result = extract_exif(GPS_JPG) + gps = get_gps_data(result) + + assert gps == { + 'latitude': 59.336666666666666, + 'direction': 25.674046740467404, + 'altitude': 37.64365671641791, + 'longitude': 18.016166666666667} diff --git a/mediagoblin/tests/test_exif/bad.jpg b/mediagoblin/tests/test_exif/bad.jpg new file mode 100644 index 00000000..4cde23cd --- /dev/null +++ b/mediagoblin/tests/test_exif/bad.jpg @@ -0,0 +1,18 @@ +V2UncmUgbm8gc3RyYW5nZXJzIHRvIGxvdmUKWW91IGtub3cgdGhlIHJ1bGVzIGFuZCBzbyBkbyBJ +CkEgZnVsbCBjb21taXRtZW50J3Mgd2hhdCBJJ20gdGhpbmtpbicgb2YKWW91IHdvdWxkbid0IGdl +dCB0aGlzIGZyb20gYW55IG90aGVyIGd1eQpJIGp1c3Qgd2FubmEgdGVsbCB5b3UgaG93IEknbSBm +ZWVsaW4nCkdvdHRhIG1ha2UgeW91IHVuZGVyc3RhbmQKCihDaG9ydXMpCk5ldmVyIGdvbm5hIGdp +dmUgeW91IHVwCk5ldmVyIGdvbm5hIGxldCB5b3UgZG93bgpOZXZlciBnb25uYSBydW4gYXJvdW5k +IGFuZCBkZXNlcnQgeW91Ck5ldmVyIGdvbm5hIG1ha2UgeW91IGNyeQpOZXZlciBnb25uYSBzYXkg +Z29vZGJ5ZQpOZXZlciBnb25uYSB0ZWxsIGEgbGllIGFuZCBodXJ0IHlvdQoKV2UndmUga25vdyBl +YWNoIG90aGVyIGZvciBzbyBsb25nCllvdXIgaGVhcnQncyBiZWVuIGFjaGluJyBidXQgeW91J3Jl +IHRvbyBzaHkgdG8gc2F5IGl0Ckluc2lkZSB3ZSBib3RoIGtub3cgd2hhdCdzIGJlZW4gZ29pbmcg +b24KV2Uga25vdyB0aGUgZ2FtZSBhbmQgd2UncmUgZ29ubmEgcGxheSBpdApBbmQgaWYgeW91IGFz +ayBtZSBob3cgSSdtIGZlZWxpbicKRG9uJ3QgdGVsbCBtZSB5b3UncmUgdG9vIGJsaW5kIHRvIHNl +ZQoKKENob3J1cyB4MikKCihHaXZlIHlvdSB1cCwgZ2l2ZSB5b3UgdXApCk5ldmVyIGdvbm5hIGdp +dmUsIG5ldmVyIGdvbm5hIGdpdmUKKEdpdmUgeW91IHVwKQpOZXZlciBnb25uYSBnaXZlLCBuZXZl +ciBnb25uYSBnaXZlCihHaXZlIHlvdSB1cCkKCldlJ3ZlIGtub3cgZWFjaCBvdGhlciBmb3Igc28g +bG9uZwpZb3VyIGhlYXJ0J3MgYmVlbiBhY2hpbicgYnV0IHlvdSdyZSB0b28gc2h5IHRvIHNheSBp +dApJbnNpZGUgd2UgYm90aCBrbm93IHdoYXQncyBiZWVuIGdvaW5nIG9uCldlIGtub3cgdGhlIGdh +bWUgYW5kIHdlJ3JlIGdvbm5hIHBsYXkgaXQKSSBqdXN0IHdhbm5hIHRlbGwgeW91IGhvdyBJJ20g +ZmVlbGluJwpHb3R0YSBtYWtlIHlvdSB1bmRlcnN0YW5kCgooQ2hvcnVzIHgzKQo= diff --git a/mediagoblin/tests/test_exif/empty.jpg b/mediagoblin/tests/test_exif/empty.jpg Binary files differnew file mode 100644 index 00000000..37533af5 --- /dev/null +++ b/mediagoblin/tests/test_exif/empty.jpg diff --git a/mediagoblin/tests/test_exif/good.jpg b/mediagoblin/tests/test_exif/good.jpg Binary files differnew file mode 100644 index 00000000..0ee956fe --- /dev/null +++ b/mediagoblin/tests/test_exif/good.jpg diff --git a/mediagoblin/tests/test_exif/has-gps.jpg b/mediagoblin/tests/test_exif/has-gps.jpg Binary files differnew file mode 100644 index 00000000..f6f39d86 --- /dev/null +++ b/mediagoblin/tests/test_exif/has-gps.jpg diff --git a/mediagoblin/tests/test_globals.py b/mediagoblin/tests/test_globals.py new file mode 100644 index 00000000..fe3088f8 --- /dev/null +++ b/mediagoblin/tests/test_globals.py @@ -0,0 +1,42 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import pytest + +from mediagoblin import mg_globals + + +class TestGlobals(object): + def setup(self): + self.old_database = mg_globals.database + + def teardown(self): + mg_globals.database = self.old_database + + def test_setup_globals(self): + mg_globals.setup_globals( + database='my favorite database!', + public_store='my favorite public_store!', + queue_store='my favorite queue_store!') + + assert mg_globals.database == 'my favorite database!' + assert mg_globals.public_store == 'my favorite public_store!' + assert mg_globals.queue_store == 'my favorite queue_store!' + + pytest.raises( + AssertionError, + mg_globals.setup_globals, + no_such_global_foo="Dummy") diff --git a/mediagoblin/tests/test_http_callback.py b/mediagoblin/tests/test_http_callback.py new file mode 100644 index 00000000..a0511af7 --- /dev/null +++ b/mediagoblin/tests/test_http_callback.py @@ -0,0 +1,83 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import json + +import pytest +from urlparse import urlparse, parse_qs + +from mediagoblin import mg_globals +from mediagoblin.tools import processing +from mediagoblin.tests.tools import fixture_add_user +from mediagoblin.tests.test_submission import GOOD_PNG +from mediagoblin.tests import test_oauth as oauth + + +class TestHTTPCallback(object): + @pytest.fixture(autouse=True) + def setup(self, test_app): + self.test_app = test_app + + self.db = mg_globals.database + + self.user_password = u'secret' + self.user = fixture_add_user(u'call_back', self.user_password) + + self.login() + + def login(self): + self.test_app.post('/auth/login/', { + 'username': self.user.username, + 'password': self.user_password}) + + def get_access_token(self, client_id, client_secret, code): + response = self.test_app.get('/oauth/access_token', { + 'code': code, + 'client_id': client_id, + 'client_secret': client_secret}) + + response_data = json.loads(response.body) + + return response_data['access_token'] + + def test_callback(self): + ''' Test processing HTTP callback ''' + self.oauth = oauth.TestOAuth() + self.oauth.setup(self.test_app) + + redirect, client_id = self.oauth.test_4_authorize_confidential_client() + + code = parse_qs(urlparse(redirect.location).query)['code'][0] + + client = self.db.OAuthClient.query.filter( + self.db.OAuthClient.identifier == unicode(client_id)).first() + + client_secret = client.secret + + access_token = self.get_access_token(client_id, client_secret, code) + + callback_url = 'https://foo.example?secrettestmediagoblinparam' + + self.test_app.post('/api/submit?client_id={0}&access_token={1}\ +&client_secret={2}'.format( + client_id, + access_token, + client_secret), { + 'title': 'Test', + 'callback_url': callback_url}, + upload_files=[('file', GOOD_PNG)]) + + assert processing.TESTS_CALLBACKS[callback_url]['state'] == u'processed' diff --git a/mediagoblin/tests/test_messages.py b/mediagoblin/tests/test_messages.py new file mode 100644 index 00000000..22f9e800 --- /dev/null +++ b/mediagoblin/tests/test_messages.py @@ -0,0 +1,50 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from mediagoblin import messages +from mediagoblin.tools import template + + +def test_messages(test_app): + """ + Added messages should show up in the request.session, + fetched messages should be the same as the added ones, + and fetching should clear the message list. + """ + # Aquire a request object + test_app.get('/') + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html'] + request = context['request'] + + # The message queue should be empty + assert request.session.get('messages', []) == [] + + # First of all, we should clear the messages queue + messages.clear_add_message() + # Adding a message should modify the session accordingly + messages.add_message(request, 'herp_derp', 'First!') + test_msg_queue = [{'text': 'First!', 'level': 'herp_derp'}] + + # Alternative tests to the following, test divided in two steps: + # assert request.session['messages'] == test_msg_queue + # 1. Tests if add_message worked + assert messages.ADD_MESSAGE_TEST[-1] == test_msg_queue + # 2. Tests if add_message updated session information + assert messages.ADD_MESSAGE_TEST[-1] == request.session['messages'] + + # fetch_messages should return and empty the queue + assert messages.fetch_messages(request) == test_msg_queue + assert request.session.get('messages') == [] diff --git a/mediagoblin/tests/test_mgoblin_app.ini b/mediagoblin/tests/test_mgoblin_app.ini new file mode 100644 index 00000000..0466b53b --- /dev/null +++ b/mediagoblin/tests/test_mgoblin_app.ini @@ -0,0 +1,33 @@ +[mediagoblin] +direct_remote_path = /test_static/ +email_sender_address = "notice@mediagoblin.example.org" +email_debug_mode = true + +# TODO: Switch to using an in-memory database +sql_engine = "sqlite:///%(here)s/user_dev/mediagoblin.db" + +# tag parsing +tags_max_length = 50 + +# So we can start to test attachments: +allow_attachments = True + +media_types = mediagoblin.media_types.image, mediagoblin.media_types.pdf + +[storage:publicstore] +base_dir = %(here)s/user_dev/media/public +base_url = /mgoblin_media/ + +[storage:queuestore] +base_dir = %(here)s/user_dev/media/queue + +[celery] +CELERY_ALWAYS_EAGER = true +CELERY_RESULT_DBURI = "sqlite:///%(here)s/user_dev/celery.db" +BROKER_HOST = "sqlite:///%(here)s/user_dev/kombu.db" + +[plugins] +[[mediagoblin.plugins.api]] +[[mediagoblin.plugins.oauth]] +[[mediagoblin.plugins.httpapiauth]] +[[mediagoblin.plugins.piwigo]] diff --git a/mediagoblin/tests/test_misc.py b/mediagoblin/tests/test_misc.py new file mode 100644 index 00000000..755d863f --- /dev/null +++ b/mediagoblin/tests/test_misc.py @@ -0,0 +1,91 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from mediagoblin.db.base import Session +from mediagoblin.db.models import User, MediaEntry, MediaComment +from mediagoblin.tests.tools import fixture_add_user, fixture_media_entry + + +def test_404_for_non_existent(test_app): + res = test_app.get('/does-not-exist/', expect_errors=True) + assert res.status_int == 404 + + +def test_user_deletes_other_comments(test_app): + user_a = fixture_add_user(u"chris_a") + user_b = fixture_add_user(u"chris_b") + + media_a = fixture_media_entry(uploader=user_a.id, save=False) + media_b = fixture_media_entry(uploader=user_b.id, save=False) + Session.add(media_a) + Session.add(media_b) + Session.flush() + + # Create all 4 possible comments: + for u_id in (user_a.id, user_b.id): + for m_id in (media_a.id, media_b.id): + cmt = MediaComment() + cmt.media_entry = m_id + cmt.author = u_id + cmt.content = u"Some Comment" + Session.add(cmt) + + Session.flush() + + usr_cnt1 = User.query.count() + med_cnt1 = MediaEntry.query.count() + cmt_cnt1 = MediaComment.query.count() + + User.query.get(user_a.id).delete(commit=False) + + usr_cnt2 = User.query.count() + med_cnt2 = MediaEntry.query.count() + cmt_cnt2 = MediaComment.query.count() + + # One user deleted + assert usr_cnt2 == usr_cnt1 - 1 + # One media gone + assert med_cnt2 == med_cnt1 - 1 + # Three of four comments gone. + assert cmt_cnt2 == cmt_cnt1 - 3 + + User.query.get(user_b.id).delete() + + usr_cnt2 = User.query.count() + med_cnt2 = MediaEntry.query.count() + cmt_cnt2 = MediaComment.query.count() + + # All users gone + assert usr_cnt2 == usr_cnt1 - 2 + # All media gone + assert med_cnt2 == med_cnt1 - 2 + # All comments gone + assert cmt_cnt2 == cmt_cnt1 - 4 + + +def test_media_deletes_broken_attachment(test_app): + user_a = fixture_add_user(u"chris_a") + + media = fixture_media_entry(uploader=user_a.id, save=False) + media.attachment_files.append(dict( + name=u"some name", + filepath=[u"does", u"not", u"exist"], + )) + Session.add(media) + Session.flush() + + MediaEntry.query.get(media.id).delete() + User.query.get(user_a.id).delete() diff --git a/mediagoblin/tests/test_modelmethods.py b/mediagoblin/tests/test_modelmethods.py new file mode 100644 index 00000000..427aa47c --- /dev/null +++ b/mediagoblin/tests/test_modelmethods.py @@ -0,0 +1,167 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +# Maybe not every model needs a test, but some models have special +# methods, and so it makes sense to test them here. + +from mediagoblin.db.base import Session +from mediagoblin.db.models import MediaEntry + +from mediagoblin.tests.tools import fixture_add_user + +import mock + + +class FakeUUID(object): + hex = 'testtest-test-test-test-testtesttest' + +UUID_MOCK = mock.Mock(return_value=FakeUUID()) + + +class TestMediaEntrySlugs(object): + def _setup(self): + self.chris_user = fixture_add_user(u'chris') + self.emily_user = fixture_add_user(u'emily') + self.existing_entry = self._insert_media_entry_fixture( + title=u"Beware, I exist!", + slug=u"beware-i-exist") + + def _insert_media_entry_fixture(self, title=None, slug=None, this_id=None, + uploader=None, save=True): + entry = MediaEntry() + entry.title = title or u"Some title" + entry.slug = slug + entry.id = this_id + entry.uploader = uploader or self.chris_user.id + entry.media_type = u'image' + + if save: + entry.save() + + return entry + + def test_unique_slug_from_title(self, test_app): + self._setup() + + entry = self._insert_media_entry_fixture(u"Totally unique slug!", save=False) + entry.generate_slug() + assert entry.slug == u'totally-unique-slug' + + def test_old_good_unique_slug(self, test_app): + self._setup() + + entry = self._insert_media_entry_fixture( + u"A title here", u"a-different-slug-there", save=False) + entry.generate_slug() + assert entry.slug == u"a-different-slug-there" + + def test_old_weird_slug(self, test_app): + self._setup() + + entry = self._insert_media_entry_fixture( + slug=u"wowee!!!!!", save=False) + entry.generate_slug() + assert entry.slug == u"wowee" + + def test_existing_slug_use_id(self, test_app): + self._setup() + + entry = self._insert_media_entry_fixture( + u"Beware, I exist!!", this_id=9000, save=False) + entry.generate_slug() + assert entry.slug == u"beware-i-exist-9000" + + def test_existing_slug_cant_use_id(self, test_app): + self._setup() + + # Getting tired of dealing with test_app and this mock.patch + # thing conflicting, getting lazy. + @mock.patch('uuid.uuid4', UUID_MOCK) + def _real_test(): + # This one grabs the nine thousand slug + self._insert_media_entry_fixture( + slug=u"beware-i-exist-9000") + + entry = self._insert_media_entry_fixture( + u"Beware, I exist!!", this_id=9000, save=False) + entry.generate_slug() + assert entry.slug == u"beware-i-exist-test" + + _real_test() + + def test_existing_slug_cant_use_id_extra_junk(self, test_app): + self._setup() + + # Getting tired of dealing with test_app and this mock.patch + # thing conflicting, getting lazy. + @mock.patch('uuid.uuid4', UUID_MOCK) + def _real_test(): + # This one grabs the nine thousand slug + self._insert_media_entry_fixture( + slug=u"beware-i-exist-9000") + + # This one grabs makes sure the annoyance doesn't stop + self._insert_media_entry_fixture( + slug=u"beware-i-exist-test") + + entry = self._insert_media_entry_fixture( + u"Beware, I exist!!", this_id=9000, save=False) + entry.generate_slug() + assert entry.slug == u"beware-i-exist-testtest" + + _real_test() + + def test_garbage_slug(self, test_app): + """ + Titles that sound totally like Q*Bert shouldn't have slugs at + all. We'll just reference them by id. + + , + / \ (@!#?@!) + |\,/| ,-, / + | |#| ( ")~ + / \|/ \ L L + |\,/|\,/| + | |#, |#| + / \|/ \|/ \ + |\,/|\,/|\,/| + | |#| |#| |#| + / \|/ \|/ \|/ \ + |\,/|\,/|\,/|\,/| + | |#| |#| |#| |#| + \|/ \|/ \|/ \|/ + """ + self._setup() + + qbert_entry = self._insert_media_entry_fixture( + u"@!#?@!", save=False) + qbert_entry.generate_slug() + assert qbert_entry.slug is None + + +def test_media_data_init(test_app): + Session.rollback() + Session.remove() + media = MediaEntry() + media.media_type = u"mediagoblin.media_types.image" + assert media.media_data is None + media.media_data_init() + assert media.media_data is not None + obj_in_session = 0 + for obj in Session(): + obj_in_session += 1 + print repr(obj) + assert obj_in_session == 0 diff --git a/mediagoblin/tests/test_oauth.py b/mediagoblin/tests/test_oauth.py new file mode 100644 index 00000000..ea3bd798 --- /dev/null +++ b/mediagoblin/tests/test_oauth.py @@ -0,0 +1,222 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import json +import logging + +import pytest +from urlparse import parse_qs, urlparse + +from mediagoblin import mg_globals +from mediagoblin.tools import template, pluginapi +from mediagoblin.tests.tools import fixture_add_user + + +_log = logging.getLogger(__name__) + + +class TestOAuth(object): + @pytest.fixture(autouse=True) + def setup(self, test_app): + self.test_app = test_app + + self.db = mg_globals.database + + self.pman = pluginapi.PluginManager() + + self.user_password = u'4cc355_70k3N' + self.user = fixture_add_user(u'joauth', self.user_password) + + self.login() + + def login(self): + self.test_app.post( + '/auth/login/', { + 'username': self.user.username, + 'password': self.user_password}) + + def register_client(self, name, client_type, description=None, + redirect_uri=''): + return self.test_app.post( + '/oauth/client/register', { + 'name': name, + 'description': description, + 'type': client_type, + 'redirect_uri': redirect_uri}) + + def get_context(self, template_name): + return template.TEMPLATE_TEST_CONTEXT[template_name] + + def test_1_public_client_registration_without_redirect_uri(self): + ''' Test 'public' OAuth client registration without any redirect uri ''' + response = self.register_client( + u'OMGOMGOMG', 'public', 'OMGOMG Apache License v2') + + ctx = self.get_context('oauth/client/register.html') + + client = self.db.OAuthClient.query.filter( + self.db.OAuthClient.name == u'OMGOMGOMG').first() + + assert response.status_int == 200 + + # Should display an error + assert len(ctx['form'].redirect_uri.errors) + + # Should not pass through + assert not client + + def test_2_successful_public_client_registration(self): + ''' Successfully register a public client ''' + uri = 'http://foo.example' + self.register_client( + u'OMGOMG', 'public', 'OMG!', uri) + + client = self.db.OAuthClient.query.filter( + self.db.OAuthClient.name == u'OMGOMG').first() + + # redirect_uri should be set + assert client.redirect_uri == uri + + # Client should have been registered + assert client + + def test_3_successful_confidential_client_reg(self): + ''' Register a confidential OAuth client ''' + response = self.register_client( + u'GMOGMO', 'confidential', 'NO GMO!') + + assert response.status_int == 302 + + client = self.db.OAuthClient.query.filter( + self.db.OAuthClient.name == u'GMOGMO').first() + + # Client should have been registered + assert client + + return client + + def test_4_authorize_confidential_client(self): + ''' Authorize a confidential client as a logged in user ''' + client = self.test_3_successful_confidential_client_reg() + + client_identifier = client.identifier + + redirect_uri = 'https://foo.example' + response = self.test_app.get('/oauth/authorize', { + 'client_id': client.identifier, + 'scope': 'all', + 'redirect_uri': redirect_uri}) + + # User-agent should NOT be redirected + assert response.status_int == 200 + + ctx = self.get_context('oauth/authorize.html') + + form = ctx['form'] + + # Short for client authorization post reponse + capr = self.test_app.post( + '/oauth/client/authorize', { + 'client_id': form.client_id.data, + 'allow': 'Allow', + 'next': form.next.data}) + + assert capr.status_int == 302 + + authorization_response = capr.follow() + + assert authorization_response.location.startswith(redirect_uri) + + return authorization_response, client_identifier + + def get_code_from_redirect_uri(self, uri): + ''' Get the value of ?code= from an URI ''' + return parse_qs(urlparse(uri).query)['code'][0] + + def test_token_endpoint_successful_confidential_request(self): + ''' Successful request against token endpoint ''' + code_redirect, client_id = self.test_4_authorize_confidential_client() + + code = self.get_code_from_redirect_uri(code_redirect.location) + + client = self.db.OAuthClient.query.filter( + self.db.OAuthClient.identifier == unicode(client_id)).first() + + token_res = self.test_app.get('/oauth/access_token?client_id={0}&\ +code={1}&client_secret={2}'.format(client_id, code, client.secret)) + + assert token_res.status_int == 200 + + token_data = json.loads(token_res.body) + + assert not 'error' in token_data + assert 'access_token' in token_data + assert 'token_type' in token_data + assert 'expires_in' in token_data + assert type(token_data['expires_in']) == int + assert token_data['expires_in'] > 0 + + # There should be a refresh token provided in the token data + assert len(token_data['refresh_token']) + + return client_id, token_data + + def test_token_endpont_missing_id_confidential_request(self): + ''' Unsuccessful request against token endpoint, missing client_id ''' + code_redirect, client_id = self.test_4_authorize_confidential_client() + + code = self.get_code_from_redirect_uri(code_redirect.location) + + client = self.db.OAuthClient.query.filter( + self.db.OAuthClient.identifier == unicode(client_id)).first() + + token_res = self.test_app.get('/oauth/access_token?\ +code={0}&client_secret={1}'.format(code, client.secret)) + + assert token_res.status_int == 200 + + token_data = json.loads(token_res.body) + + assert 'error' in token_data + assert not 'access_token' in token_data + assert token_data['error'] == 'invalid_request' + assert len(token_data['error_description']) + + def test_refresh_token(self): + ''' Try to get a new access token using the refresh token ''' + # Get an access token and a refresh token + client_id, token_data =\ + self.test_token_endpoint_successful_confidential_request() + + client = self.db.OAuthClient.query.filter( + self.db.OAuthClient.identifier == client_id).first() + + token_res = self.test_app.get('/oauth/access_token', + {'refresh_token': token_data['refresh_token'], + 'client_id': client_id, + 'client_secret': client.secret + }) + + assert token_res.status_int == 200 + + new_token_data = json.loads(token_res.body) + + assert not 'error' in new_token_data + assert 'access_token' in new_token_data + assert 'token_type' in new_token_data + assert 'expires_in' in new_token_data + assert type(new_token_data['expires_in']) == int + assert new_token_data['expires_in'] > 0 diff --git a/mediagoblin/tests/test_paste.ini b/mediagoblin/tests/test_paste.ini new file mode 100644 index 00000000..a9595432 --- /dev/null +++ b/mediagoblin/tests/test_paste.ini @@ -0,0 +1,40 @@ +[DEFAULT] +debug = true + +[composite:main] +use = egg:Paste#urlmap +/ = mediagoblin +/mgoblin_media/ = publicstore_serve +/test_static/ = mediagoblin_static +/theme_static/ = theme_static +/plugin_static/ = plugin_static + +[app:mediagoblin] +use = egg:mediagoblin#app +config = %(here)s/mediagoblin.ini + +[app:publicstore_serve] +use = egg:Paste#static +document_root = %(here)s/user_dev/media/public + +[app:mediagoblin_static] +use = egg:Paste#static +document_root = %(here)s/mediagoblin/static/ + +[app:theme_static] +use = egg:Paste#static +document_root = %(here)s/user_dev/theme_static/ +cache_max_age = 86400 + +[app:plugin_static] +use = egg:Paste#static +document_root = %(here)s/user_dev/plugin_static/ +cache_max_age = 86400 + +[celery] +CELERY_ALWAYS_EAGER = true + +[server:main] +use = egg:Paste#http +host = 127.0.0.1 +port = 6543 diff --git a/mediagoblin/tests/test_pdf.py b/mediagoblin/tests/test_pdf.py new file mode 100644 index 00000000..b4d1940a --- /dev/null +++ b/mediagoblin/tests/test_pdf.py @@ -0,0 +1,39 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2013 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import tempfile +import shutil +import os +import pytest + +from mediagoblin.media_types.pdf.processing import ( + pdf_info, check_prerequisites, create_pdf_thumb) +from .resources import GOOD_PDF as GOOD + + +@pytest.mark.skipif("not check_prerequisites()") +def test_pdf(): + good_dict = {'pdf_version_major': 1, 'pdf_title': '', + 'pdf_page_size_width': 612, 'pdf_author': '', + 'pdf_keywords': '', 'pdf_pages': 10, + 'pdf_producer': 'dvips + GNU Ghostscript 7.05', + 'pdf_version_minor': 3, + 'pdf_creator': 'LaTeX with hyperref package', + 'pdf_page_size_height': 792} + assert pdf_info(GOOD) == good_dict + temp_dir = tempfile.mkdtemp() + create_pdf_thumb(GOOD, os.path.join(temp_dir, 'good_256_256.png'), 256, 256) + shutil.rmtree(temp_dir) diff --git a/mediagoblin/tests/test_piwigo.py b/mediagoblin/tests/test_piwigo.py new file mode 100644 index 00000000..16ad0111 --- /dev/null +++ b/mediagoblin/tests/test_piwigo.py @@ -0,0 +1,71 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2013 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import pytest +from .tools import fixture_add_user + + +XML_PREFIX = "<?xml version='1.0' encoding='utf-8'?>\n" + + +class Test_PWG(object): + @pytest.fixture(autouse=True) + def setup(self, test_app): + self.test_app = test_app + + fixture_add_user() + + self.username = u"chris" + self.password = "toast" + + def do_post(self, method, params): + params["method"] = method + return self.test_app.post("/api/piwigo/ws.php", params) + + def do_get(self, method, params=None): + if params is None: + params = {} + params["method"] = method + return self.test_app.get("/api/piwigo/ws.php", params) + + def test_session(self): + resp = self.do_post("pwg.session.login", + {"username": u"nouser", "password": "wrong"}) + assert resp.body == XML_PREFIX \ + + '<rsp stat="fail"><err code="999" msg="Invalid username/password"/></rsp>' + + resp = self.do_post("pwg.session.login", + {"username": self.username, "password": "wrong"}) + assert resp.body == XML_PREFIX \ + + '<rsp stat="fail"><err code="999" msg="Invalid username/password"/></rsp>' + + resp = self.do_get("pwg.session.getStatus") + assert resp.body == XML_PREFIX \ + + '<rsp stat="ok"><username>guest</username></rsp>' + + resp = self.do_post("pwg.session.login", + {"username": self.username, "password": self.password}) + assert resp.body == XML_PREFIX + '<rsp stat="ok">1</rsp>' + + resp = self.do_get("pwg.session.getStatus") + assert resp.body == XML_PREFIX \ + + '<rsp stat="ok"><username>chris</username></rsp>' + + self.do_get("pwg.session.logout") + + resp = self.do_get("pwg.session.getStatus") + assert resp.body == XML_PREFIX \ + + '<rsp stat="ok"><username>guest</username></rsp>' diff --git a/mediagoblin/tests/test_pluginapi.py b/mediagoblin/tests/test_pluginapi.py new file mode 100644 index 00000000..eae0ce15 --- /dev/null +++ b/mediagoblin/tests/test_pluginapi.py @@ -0,0 +1,466 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import json +import sys + +from configobj import ConfigObj +import pytest +import pkg_resources +from validate import VdtTypeError + +from mediagoblin import mg_globals +from mediagoblin.init.plugins import setup_plugins +from mediagoblin.init.config import read_mediagoblin_config +from mediagoblin.gmg_commands.assetlink import link_plugin_assets +from mediagoblin.tools import pluginapi +from mediagoblin.tests.tools import get_app +from mediagoblin.tools.common import CollectingPrinter + + +def with_cleanup(*modules_to_delete): + def _with_cleanup(fun): + """Wrapper that saves and restores mg_globals""" + def _with_cleanup_inner(*args, **kwargs): + old_app_config = mg_globals.app_config + old_global_config = mg_globals.global_config + # Need to delete icky modules before and after so as to make + # sure things work correctly. + for module in modules_to_delete: + try: + del sys.modules[module] + except KeyError: + pass + # The plugin cache gets populated as a side-effect of + # importing, so it's best to clear it before and after a test. + pman = pluginapi.PluginManager() + pman.clear() + try: + return fun(*args, **kwargs) + finally: + mg_globals.app_config = old_app_config + mg_globals.global_config = old_global_config + # Need to delete icky modules before and after so as to make + # sure things work correctly. + for module in modules_to_delete: + try: + del sys.modules[module] + except KeyError: + pass + pman.clear() + + _with_cleanup_inner.__name__ = fun.__name__ + return _with_cleanup_inner + return _with_cleanup + + +def build_config(sections): + """Builds a ConfigObj object with specified data + + :arg sections: list of ``(section_name, section_data, + subsection_list)`` tuples where section_data is a dict and + subsection_list is a list of ``(section_name, section_data, + subsection_list)``, ... + + For example: + + >>> build_config([ + ... ('mediagoblin', {'key1': 'val1'}, []), + ... ('section2', {}, [ + ... ('subsection1', {}, []) + ... ]) + ... ]) + """ + cfg = ConfigObj() + cfg.filename = 'foo' + def _iter_section(cfg, section_list): + for section_name, data, subsection_list in section_list: + cfg[section_name] = data + _iter_section(cfg[section_name], subsection_list) + + _iter_section(cfg, sections) + return cfg + + +@with_cleanup() +def test_no_plugins(): + """Run setup_plugins with no plugins in config""" + cfg = build_config([('mediagoblin', {}, [])]) + mg_globals.app_config = cfg['mediagoblin'] + mg_globals.global_config = cfg + + pman = pluginapi.PluginManager() + setup_plugins() + + # Make sure we didn't load anything. + assert len(pman.plugins) == 0 + + +@with_cleanup('mediagoblin.plugins.sampleplugin') +def test_one_plugin(): + """Run setup_plugins with a single working plugin""" + cfg = build_config([ + ('mediagoblin', {}, []), + ('plugins', {}, [ + ('mediagoblin.plugins.sampleplugin', {}, []) + ]) + ]) + + mg_globals.app_config = cfg['mediagoblin'] + mg_globals.global_config = cfg + + pman = pluginapi.PluginManager() + setup_plugins() + + # Make sure we only found one plugin + assert len(pman.plugins) == 1 + # Make sure the plugin is the one we think it is. + assert pman.plugins[0] == 'mediagoblin.plugins.sampleplugin' + # Make sure there was one hook registered + assert len(pman.hooks) == 1 + # Make sure _setup_plugin_called was called once + import mediagoblin.plugins.sampleplugin + assert mediagoblin.plugins.sampleplugin._setup_plugin_called == 1 + + +@with_cleanup('mediagoblin.plugins.sampleplugin') +def test_same_plugin_twice(): + """Run setup_plugins with a single working plugin twice""" + cfg = build_config([ + ('mediagoblin', {}, []), + ('plugins', {}, [ + ('mediagoblin.plugins.sampleplugin', {}, []), + ('mediagoblin.plugins.sampleplugin', {}, []), + ]) + ]) + + mg_globals.app_config = cfg['mediagoblin'] + mg_globals.global_config = cfg + + pman = pluginapi.PluginManager() + setup_plugins() + + # Make sure we only found one plugin + assert len(pman.plugins) == 1 + # Make sure the plugin is the one we think it is. + assert pman.plugins[0] == 'mediagoblin.plugins.sampleplugin' + # Make sure there was one hook registered + assert len(pman.hooks) == 1 + # Make sure _setup_plugin_called was called once + import mediagoblin.plugins.sampleplugin + assert mediagoblin.plugins.sampleplugin._setup_plugin_called == 1 + + +@with_cleanup() +def test_disabled_plugin(): + """Run setup_plugins with a single working plugin twice""" + cfg = build_config([ + ('mediagoblin', {}, []), + ('plugins', {}, [ + ('-mediagoblin.plugins.sampleplugin', {}, []), + ]) + ]) + + mg_globals.app_config = cfg['mediagoblin'] + mg_globals.global_config = cfg + + pman = pluginapi.PluginManager() + setup_plugins() + + # Make sure we didn't load the plugin + assert len(pman.plugins) == 0 + + +CONFIG_ALL_CALLABLES = [ + ('mediagoblin', {}, []), + ('plugins', {}, [ + ('mediagoblin.tests.testplugins.callables1', {}, []), + ('mediagoblin.tests.testplugins.callables2', {}, []), + ('mediagoblin.tests.testplugins.callables3', {}, []), + ]) + ] + + +@with_cleanup() +def test_hook_handle(): + """ + Test the hook_handle method + """ + cfg = build_config(CONFIG_ALL_CALLABLES) + + mg_globals.app_config = cfg['mediagoblin'] + mg_globals.global_config = cfg + + setup_plugins() + + # Just one hook provided + call_log = [] + assert pluginapi.hook_handle( + "just_one", call_log) == "Called just once" + assert call_log == ["expect this one call"] + + # Nothing provided and unhandled not okay + call_log = [] + pluginapi.hook_handle( + "nothing_handling", call_log) == None + assert call_log == [] + + # Nothing provided and unhandled okay + call_log = [] + assert pluginapi.hook_handle( + "nothing_handling", call_log, unhandled_okay=True) is None + assert call_log == [] + + # Multiple provided, go with the first! + call_log = [] + assert pluginapi.hook_handle( + "multi_handle", call_log) == "the first returns" + assert call_log == ["Hi, I'm the first"] + + # Multiple provided, one has CantHandleIt + call_log = [] + assert pluginapi.hook_handle( + "multi_handle_with_canthandle", + call_log) == "the second returns" + assert call_log == ["Hi, I'm the second"] + + +@with_cleanup() +def test_hook_runall(): + """ + Test the hook_runall method + """ + cfg = build_config(CONFIG_ALL_CALLABLES) + + mg_globals.app_config = cfg['mediagoblin'] + mg_globals.global_config = cfg + + setup_plugins() + + # Just one hook, check results + call_log = [] + assert pluginapi.hook_runall( + "just_one", call_log) == ["Called just once"] + assert call_log == ["expect this one call"] + + # None provided, check results + call_log = [] + assert pluginapi.hook_runall( + "nothing_handling", call_log) == [] + assert call_log == [] + + # Multiple provided, check results + call_log = [] + assert pluginapi.hook_runall( + "multi_handle", call_log) == [ + "the first returns", + "the second returns", + "the third returns", + ] + assert call_log == [ + "Hi, I'm the first", + "Hi, I'm the second", + "Hi, I'm the third"] + + # Multiple provided, one has CantHandleIt, check results + call_log = [] + assert pluginapi.hook_runall( + "multi_handle_with_canthandle", call_log) == [ + "the second returns", + "the third returns", + ] + assert call_log == [ + "Hi, I'm the second", + "Hi, I'm the third"] + + +@with_cleanup() +def test_hook_transform(): + """ + Test the hook_transform method + """ + cfg = build_config(CONFIG_ALL_CALLABLES) + + mg_globals.app_config = cfg['mediagoblin'] + mg_globals.global_config = cfg + + setup_plugins() + + assert pluginapi.hook_transform( + "expand_tuple", (-1, 0)) == (-1, 0, 1, 2, 3) + + +def test_plugin_config(): + """ + Make sure plugins can set up their own config + """ + config, validation_result = read_mediagoblin_config( + pkg_resources.resource_filename( + 'mediagoblin.tests', 'appconfig_plugin_specs.ini')) + + pluginspec_section = config['plugins'][ + 'mediagoblin.tests.testplugins.pluginspec'] + assert pluginspec_section['some_string'] == 'not blork' + assert pluginspec_section['dont_change_me'] == 'still the default' + + # Make sure validation works... this should be an error + assert isinstance( + validation_result[ + 'plugins'][ + 'mediagoblin.tests.testplugins.pluginspec'][ + 'some_int'], + VdtTypeError) + + # the callables thing shouldn't really have anything though. + assert len(config['plugins'][ + 'mediagoblin.tests.testplugins.callables1']) == 0 + + +@pytest.fixture() +def context_modified_app(request): + """ + Get a MediaGoblin app fixture using appconfig_context_modified.ini + """ + return get_app( + request, + mgoblin_config=pkg_resources.resource_filename( + 'mediagoblin.tests', 'appconfig_context_modified.ini')) + + +def test_modify_context(context_modified_app): + """ + Test that we can modify both the view/template specific and + global contexts for templates. + """ + # Specific thing passed into a page + result = context_modified_app.get("/modify_context/specific/") + assert result.body.strip() == """Specific page! + +specific thing: in yer specificpage +global thing: globally appended! +something: orother +doubleme: happyhappy""" + + # General test, should have global context variable only + result = context_modified_app.get("/modify_context/") + assert result.body.strip() == """General page! + +global thing: globally appended! +lol: cats +doubleme: joyjoy""" + + +@pytest.fixture() +def static_plugin_app(request): + """ + Get a MediaGoblin app fixture using appconfig_static_plugin.ini + """ + return get_app( + request, + mgoblin_config=pkg_resources.resource_filename( + 'mediagoblin.tests', 'appconfig_static_plugin.ini')) + + +def test_plugin_assetlink(static_plugin_app): + """ + Test that the assetlink command works correctly + """ + linked_assets_dir = mg_globals.app_config['plugin_linked_assets_dir'] + plugin_link_dir = os.path.join( + linked_assets_dir.rstrip(os.path.sep), + 'staticstuff') + + plugin_statics = pluginapi.hook_runall("static_setup") + assert len(plugin_statics) == 1 + plugin_static = plugin_statics[0] + + def run_assetlink(): + printer = CollectingPrinter() + + link_plugin_assets( + plugin_static, linked_assets_dir, printer) + + return printer + + # it shouldn't exist yet + assert not os.path.lexists(plugin_link_dir) + + # link dir doesn't exist, link it + result = run_assetlink().collection[0] + assert result == \ + 'Linked asset directory for plugin "staticstuff":\n %s\nto:\n %s\n' % ( + plugin_static.file_path.rstrip(os.path.sep), + plugin_link_dir) + assert os.path.lexists(plugin_link_dir) + assert os.path.islink(plugin_link_dir) + assert os.path.realpath(plugin_link_dir) == plugin_static.file_path + + # link dir exists, leave it alone + # (and it should exist still since we just ran it..) + result = run_assetlink().collection[0] + assert result == 'Skipping "staticstuff"; already set up.\n' + assert os.path.lexists(plugin_link_dir) + assert os.path.islink(plugin_link_dir) + assert os.path.realpath(plugin_link_dir) == plugin_static.file_path + + # link dir exists, is a symlink to somewhere else (re-link) + junk_file_path = os.path.join( + linked_assets_dir.rstrip(os.path.sep), + 'junk.txt') + with file(junk_file_path, 'w') as junk_file: + junk_file.write('barf') + + os.unlink(plugin_link_dir) + os.symlink(junk_file_path, plugin_link_dir) + + result = run_assetlink().combined_string + assert result == """Old link found for "staticstuff"; removing. +Linked asset directory for plugin "staticstuff": + %s +to: + %s +""" % (plugin_static.file_path.rstrip(os.path.sep), plugin_link_dir) + assert os.path.lexists(plugin_link_dir) + assert os.path.islink(plugin_link_dir) + assert os.path.realpath(plugin_link_dir) == plugin_static.file_path + + # link dir exists, but is a non-symlink + os.unlink(plugin_link_dir) + with file(plugin_link_dir, 'w') as clobber_file: + clobber_file.write('clobbered!') + + result = run_assetlink().collection[0] + assert result == 'Could not link "staticstuff": %s exists and is not a symlink\n' % ( + plugin_link_dir) + + with file(plugin_link_dir, 'r') as clobber_file: + assert clobber_file.read() == 'clobbered!' + + +def test_plugin_staticdirect(static_plugin_app): + """ + Test that the staticdirect utilities pull up the right things + """ + result = json.loads( + static_plugin_app.get('/staticstuff/').body) + + assert len(result) == 2 + + assert result['mgoblin_bunny_pic'] == '/test_static/images/bunny_pic.png' + assert result['plugin_bunny_css'] == \ + '/plugin_static/staticstuff/css/bunnify.css' + diff --git a/mediagoblin/tests/test_processing.py b/mediagoblin/tests/test_processing.py new file mode 100644 index 00000000..591add96 --- /dev/null +++ b/mediagoblin/tests/test_processing.py @@ -0,0 +1,18 @@ +#!/usr/bin/env python + +from mediagoblin import processing + +class TestProcessing(object): + def run_fill(self, input, format, output=None): + builder = processing.FilenameBuilder(input) + result = builder.fill(format) + if output is None: + return result + assert output == result + + def test_easy_filename_fill(self): + self.run_fill('/home/user/foo.TXT', '{basename}bar{ext}', 'foobar.txt') + + def test_long_filename_fill(self): + self.run_fill('{0}.png'.format('A' * 300), 'image-{basename}{ext}', + 'image-{0}.png'.format('A' * 245)) diff --git a/mediagoblin/tests/test_session.py b/mediagoblin/tests/test_session.py new file mode 100644 index 00000000..78d790eb --- /dev/null +++ b/mediagoblin/tests/test_session.py @@ -0,0 +1,30 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from mediagoblin.tools import session + +def test_session(): + sess = session.Session() + assert not sess + assert not sess.is_updated() + sess['user_id'] = 27 + assert sess + assert not sess.is_updated() + sess.save() + assert sess.is_updated() + sess.delete() + assert not sess + assert sess.is_updated() diff --git a/mediagoblin/tests/test_sql_migrations.py b/mediagoblin/tests/test_sql_migrations.py new file mode 100644 index 00000000..2fc4c043 --- /dev/null +++ b/mediagoblin/tests/test_sql_migrations.py @@ -0,0 +1,896 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2012, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import copy + +from sqlalchemy import ( + Table, Column, MetaData, Index, + Integer, Float, Unicode, UnicodeText, DateTime, Boolean, + ForeignKey, UniqueConstraint, PickleType, VARCHAR) +from sqlalchemy.orm import sessionmaker, relationship +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.sql import select, insert +from migrate import changeset + +from mediagoblin.db.base import GMGTableBase +from mediagoblin.db.migration_tools import MigrationManager, RegisterMigration +from mediagoblin.tools.common import CollectingPrinter + + +# This one will get filled with local migrations +FULL_MIGRATIONS = {} + + +####################################################### +# Migration set 1: Define initial models, no migrations +####################################################### + +Base1 = declarative_base(cls=GMGTableBase) + +class Creature1(Base1): + __tablename__ = "creature" + + id = Column(Integer, primary_key=True) + name = Column(Unicode, unique=True, nullable=False, index=True) + num_legs = Column(Integer, nullable=False) + is_demon = Column(Boolean) + +class Level1(Base1): + __tablename__ = "level" + + id = Column(Unicode, primary_key=True) + name = Column(Unicode) + description = Column(Unicode) + exits = Column(PickleType) + +SET1_MODELS = [Creature1, Level1] + +SET1_MIGRATIONS = {} + +####################################################### +# Migration set 2: A few migrations and new model +####################################################### + +Base2 = declarative_base(cls=GMGTableBase) + +class Creature2(Base2): + __tablename__ = "creature" + + id = Column(Integer, primary_key=True) + name = Column(Unicode, unique=True, nullable=False, index=True) + num_legs = Column(Integer, nullable=False) + magical_powers = relationship("CreaturePower2") + +class CreaturePower2(Base2): + __tablename__ = "creature_power" + + id = Column(Integer, primary_key=True) + creature = Column( + Integer, ForeignKey('creature.id'), nullable=False) + name = Column(Unicode) + description = Column(Unicode) + hitpower = Column(Integer, nullable=False) + +class Level2(Base2): + __tablename__ = "level" + + id = Column(Unicode, primary_key=True) + name = Column(Unicode) + description = Column(Unicode) + +class LevelExit2(Base2): + __tablename__ = "level_exit" + + id = Column(Integer, primary_key=True) + name = Column(Unicode) + from_level = Column( + Unicode, ForeignKey('level.id'), nullable=False) + to_level = Column( + Unicode, ForeignKey('level.id'), nullable=False) + +SET2_MODELS = [Creature2, CreaturePower2, Level2, LevelExit2] + + +@RegisterMigration(1, FULL_MIGRATIONS) +def creature_remove_is_demon(db_conn): + """ + Remove the is_demon field from the creature model. We don't need + it! + """ + # :( Commented out 'cuz of: + # http://code.google.com/p/sqlalchemy-migrate/issues/detail?id=143&thanks=143&ts=1327882242 + + # metadata = MetaData(bind=db_conn.bind) + # creature_table = Table( + # 'creature', metadata, + # autoload=True, autoload_with=db_conn.bind) + # creature_table.drop_column('is_demon') + pass + + +@RegisterMigration(2, FULL_MIGRATIONS) +def creature_powers_new_table(db_conn): + """ + Add a new table for creature powers. Nothing needs to go in it + yet though as there wasn't anything that previously held this + information + """ + metadata = MetaData(bind=db_conn.bind) + + # We have to access the creature table so sqlalchemy can make the + # foreign key relationship + creature_table = Table( + 'creature', metadata, + autoload=True, autoload_with=db_conn.bind) + + creature_powers = Table( + 'creature_power', metadata, + Column('id', Integer, primary_key=True), + Column('creature', + Integer, ForeignKey('creature.id'), nullable=False), + Column('name', Unicode), + Column('description', Unicode), + Column('hitpower', Integer, nullable=False)) + metadata.create_all(db_conn.bind) + + +@RegisterMigration(3, FULL_MIGRATIONS) +def level_exits_new_table(db_conn): + """ + Make a new table for level exits and move the previously pickled + stuff over to here (then drop the old unneeded table) + """ + # First, create the table + # ----------------------- + metadata = MetaData(bind=db_conn.bind) + + # Minimal representation of level table. + # Not auto-introspecting here because of pickle table. I'm not + # sure sqlalchemy can auto-introspect pickle columns. + levels = Table( + 'level', metadata, + Column('id', Unicode, primary_key=True), + Column('name', Unicode), + Column('description', Unicode), + Column('exits', PickleType)) + + level_exits = Table( + 'level_exit', metadata, + Column('id', Integer, primary_key=True), + Column('name', Unicode), + Column('from_level', + Unicode, ForeignKey('level.id'), nullable=False), + Column('to_level', + Unicode, ForeignKey('level.id'), nullable=False)) + metadata.create_all(db_conn.bind) + + # And now, convert all the old exit pickles to new level exits + # ------------------------------------------------------------ + + # query over and insert + result = db_conn.execute( + select([levels], levels.c.exits!=None)) + + for level in result: + + for exit_name, to_level in level['exits'].iteritems(): + # Insert the level exit + db_conn.execute( + level_exits.insert().values( + name=exit_name, + from_level=level.id, + to_level=to_level)) + + # Finally, drop the old level exits pickle table + # ---------------------------------------------- + levels.drop_column('exits') + + +# A hack! At this point we freeze-fame and get just a partial list of +# migrations + +SET2_MIGRATIONS = copy.copy(FULL_MIGRATIONS) + +####################################################### +# Migration set 3: Final migrations +####################################################### + +Base3 = declarative_base(cls=GMGTableBase) + +class Creature3(Base3): + __tablename__ = "creature" + + id = Column(Integer, primary_key=True) + name = Column(Unicode, unique=True, nullable=False, index=True) + num_limbs= Column(Integer, nullable=False) + magical_powers = relationship("CreaturePower3") + +class CreaturePower3(Base3): + __tablename__ = "creature_power" + + id = Column(Integer, primary_key=True) + creature = Column( + Integer, ForeignKey('creature.id'), nullable=False, index=True) + name = Column(Unicode) + description = Column(Unicode) + hitpower = Column(Float, nullable=False) + +class Level3(Base3): + __tablename__ = "level" + + id = Column(Unicode, primary_key=True) + name = Column(Unicode) + description = Column(Unicode) + +class LevelExit3(Base3): + __tablename__ = "level_exit" + + id = Column(Integer, primary_key=True) + name = Column(Unicode) + from_level = Column( + Unicode, ForeignKey('level.id'), nullable=False, index=True) + to_level = Column( + Unicode, ForeignKey('level.id'), nullable=False, index=True) + + +SET3_MODELS = [Creature3, CreaturePower3, Level3, LevelExit3] +SET3_MIGRATIONS = FULL_MIGRATIONS + + +@RegisterMigration(4, FULL_MIGRATIONS) +def creature_num_legs_to_num_limbs(db_conn): + """ + Turns out we're tracking all sorts of limbs, not "legs" + specifically. Humans would be 4 here, for instance. So we + renamed the column. + """ + metadata = MetaData(bind=db_conn.bind) + creature_table = Table( + 'creature', metadata, + autoload=True, autoload_with=db_conn.bind) + creature_table.c.num_legs.alter(name=u"num_limbs") + + +@RegisterMigration(5, FULL_MIGRATIONS) +def level_exit_index_from_and_to_level(db_conn): + """ + Index the from and to levels of the level exit table. + """ + metadata = MetaData(bind=db_conn.bind) + level_exit = Table( + 'level_exit', metadata, + autoload=True, autoload_with=db_conn.bind) + Index('ix_level_exit_from_level', + level_exit.c.from_level).create(db_conn.bind) + Index('ix_level_exit_to_level', + level_exit.c.to_level).create(db_conn.bind) + + +@RegisterMigration(6, FULL_MIGRATIONS) +def creature_power_index_creature(db_conn): + """ + Index our foreign key relationship to the creatures + """ + metadata = MetaData(bind=db_conn.bind) + creature_power = Table( + 'creature_power', metadata, + autoload=True, autoload_with=db_conn.bind) + Index('ix_creature_power_creature', + creature_power.c.creature).create(db_conn.bind) + + +@RegisterMigration(7, FULL_MIGRATIONS) +def creature_power_hitpower_to_float(db_conn): + """ + Convert hitpower column on creature power table from integer to + float. + + Turns out we want super precise values of how much hitpower there + really is. + """ + metadata = MetaData(bind=db_conn.bind) + + # We have to access the creature table so sqlalchemy can make the + # foreign key relationship + creature_table = Table( + 'creature', metadata, + autoload=True, autoload_with=db_conn.bind) + + creature_power = Table( + 'creature_power', metadata, + Column('id', Integer, primary_key=True), + Column('creature', Integer, + ForeignKey('creature.id'), nullable=False, + index=True), + Column('name', Unicode), + Column('description', Unicode), + Column('hitpower', Integer, nullable=False)) + + creature_power.c.hitpower.alter(type=Float) + + +@RegisterMigration(8, FULL_MIGRATIONS) +def creature_power_name_creature_unique(db_conn): + """ + Add a unique constraint to name and creature on creature_power. + + We don't want multiple creature powers with the same name per creature! + """ + # Note: We don't actually check to see if this constraint is set + # up because at present there's no way to do so in sqlalchemy :\ + + metadata = MetaData(bind=db_conn.bind) + + creature_power = Table( + 'creature_power', metadata, + autoload=True, autoload_with=db_conn.bind) + + cons = changeset.constraint.UniqueConstraint( + 'name', 'creature', table=creature_power) + + cons.create() + + +def _insert_migration1_objects(session): + """ + Test objects to insert for the first set of things + """ + # Insert creatures + session.add_all( + [Creature1(name=u'centipede', + num_legs=100, + is_demon=False), + Creature1(name=u'wolf', + num_legs=4, + is_demon=False), + # don't ask me what a wizardsnake is. + Creature1(name=u'wizardsnake', + num_legs=0, + is_demon=True)]) + + # Insert levels + session.add_all( + [Level1(id=u'necroplex', + name=u'The Necroplex', + description=u'A complex full of pure deathzone.', + exits={ + u'deathwell': u'evilstorm', + u'portal': u'central_park'}), + Level1(id=u'evilstorm', + name=u'Evil Storm', + description=u'A storm full of pure evil.', + exits={}), # you can't escape the evilstorm + Level1(id=u'central_park', + name=u'Central Park, NY, NY', + description=u"New York's friendly Central Park.", + exits={ + u'portal': u'necroplex'})]) + + session.commit() + + +def _insert_migration2_objects(session): + """ + Test objects to insert for the second set of things + """ + # Insert creatures + session.add_all( + [Creature2( + name=u'centipede', + num_legs=100), + Creature2( + name=u'wolf', + num_legs=4, + magical_powers = [ + CreaturePower2( + name=u"ice breath", + description=u"A blast of icy breath!", + hitpower=20), + CreaturePower2( + name=u"death stare", + description=u"A frightening stare, for sure!", + hitpower=45)]), + Creature2( + name=u'wizardsnake', + num_legs=0, + magical_powers=[ + CreaturePower2( + name=u'death_rattle', + description=u'A rattle... of DEATH!', + hitpower=1000), + CreaturePower2( + name=u'sneaky_stare', + description=u"The sneakiest stare you've ever seen!", + hitpower=300), + CreaturePower2( + name=u'slithery_smoke', + description=u"A blast of slithery, slithery smoke.", + hitpower=10), + CreaturePower2( + name=u'treacherous_tremors', + description=u"The ground shakes beneath footed animals!", + hitpower=0)])]) + + # Insert levels + session.add_all( + [Level2(id=u'necroplex', + name=u'The Necroplex', + description=u'A complex full of pure deathzone.'), + Level2(id=u'evilstorm', + name=u'Evil Storm', + description=u'A storm full of pure evil.', + exits=[]), # you can't escape the evilstorm + Level2(id=u'central_park', + name=u'Central Park, NY, NY', + description=u"New York's friendly Central Park.")]) + + # necroplex exits + session.add_all( + [LevelExit2(name=u'deathwell', + from_level=u'necroplex', + to_level=u'evilstorm'), + LevelExit2(name=u'portal', + from_level=u'necroplex', + to_level=u'central_park')]) + + # there are no evilstorm exits because there is no exit from the + # evilstorm + + # central park exits + session.add_all( + [LevelExit2(name=u'portal', + from_level=u'central_park', + to_level=u'necroplex')]) + + session.commit() + + +def _insert_migration3_objects(session): + """ + Test objects to insert for the third set of things + """ + # Insert creatures + session.add_all( + [Creature3( + name=u'centipede', + num_limbs=100), + Creature3( + name=u'wolf', + num_limbs=4, + magical_powers = [ + CreaturePower3( + name=u"ice breath", + description=u"A blast of icy breath!", + hitpower=20.0), + CreaturePower3( + name=u"death stare", + description=u"A frightening stare, for sure!", + hitpower=45.0)]), + Creature3( + name=u'wizardsnake', + num_limbs=0, + magical_powers=[ + CreaturePower3( + name=u'death_rattle', + description=u'A rattle... of DEATH!', + hitpower=1000.0), + CreaturePower3( + name=u'sneaky_stare', + description=u"The sneakiest stare you've ever seen!", + hitpower=300.0), + CreaturePower3( + name=u'slithery_smoke', + description=u"A blast of slithery, slithery smoke.", + hitpower=10.0), + CreaturePower3( + name=u'treacherous_tremors', + description=u"The ground shakes beneath footed animals!", + hitpower=0.0)])], + # annnnnd one more to test a floating point hitpower + Creature3( + name=u'deity', + numb_limbs=30, + magical_powers=[ + CreaturePower3( + name=u'smite', + description=u'Smitten by holy wrath!', + hitpower=9999.9)])) + + # Insert levels + session.add_all( + [Level3(id=u'necroplex', + name=u'The Necroplex', + description=u'A complex full of pure deathzone.'), + Level3(id=u'evilstorm', + name=u'Evil Storm', + description=u'A storm full of pure evil.', + exits=[]), # you can't escape the evilstorm + Level3(id=u'central_park', + name=u'Central Park, NY, NY', + description=u"New York's friendly Central Park.")]) + + # necroplex exits + session.add_all( + [LevelExit3(name=u'deathwell', + from_level=u'necroplex', + to_level=u'evilstorm'), + LevelExit3(name=u'portal', + from_level=u'necroplex', + to_level=u'central_park')]) + + # there are no evilstorm exits because there is no exit from the + # evilstorm + + # central park exits + session.add_all( + [LevelExit3(name=u'portal', + from_level=u'central_park', + to_level=u'necroplex')]) + + session.commit() + + +def create_test_engine(): + from sqlalchemy import create_engine + engine = create_engine('sqlite:///:memory:', echo=False) + Session = sessionmaker(bind=engine) + return engine, Session + + +def assert_col_type(column, this_class): + assert isinstance(column.type, this_class) + + +def _get_level3_exits(session, level): + return dict( + [(level_exit.name, level_exit.to_level) + for level_exit in + session.query(LevelExit3).filter_by(from_level=level.id)]) + + +def test_set1_to_set3(): + # Create / connect to database + # ---------------------------- + + engine, Session = create_test_engine() + + # Create tables by migrating on empty initial set + # ----------------------------------------------- + + printer = CollectingPrinter() + migration_manager = MigrationManager( + u'__main__', SET1_MODELS, SET1_MIGRATIONS, Session(), + printer) + + # Check latest migration and database current migration + assert migration_manager.latest_migration == 0 + assert migration_manager.database_current_migration == None + + result = migration_manager.init_or_migrate() + + # Make sure output was "inited" + assert result == u'inited' + # Check output + assert printer.combined_string == ( + "-> Initializing main mediagoblin tables... done.\n") + # Check version in database + assert migration_manager.latest_migration == 0 + assert migration_manager.database_current_migration == 0 + + # Install the initial set + # ----------------------- + + _insert_migration1_objects(Session()) + + # Try to "re-migrate" with same manager settings... nothing should happen + migration_manager = MigrationManager( + u'__main__', SET1_MODELS, SET1_MIGRATIONS, Session(), + printer) + assert migration_manager.init_or_migrate() == None + + # Check version in database + assert migration_manager.latest_migration == 0 + assert migration_manager.database_current_migration == 0 + + # Sanity check a few things in the database... + metadata = MetaData(bind=engine) + + # Check the structure of the creature table + creature_table = Table( + 'creature', metadata, + autoload=True, autoload_with=engine) + assert set(creature_table.c.keys()) == set( + ['id', 'name', 'num_legs', 'is_demon']) + assert_col_type(creature_table.c.id, Integer) + assert_col_type(creature_table.c.name, VARCHAR) + assert creature_table.c.name.nullable is False + #assert creature_table.c.name.index is True + #assert creature_table.c.name.unique is True + assert_col_type(creature_table.c.num_legs, Integer) + assert creature_table.c.num_legs.nullable is False + assert_col_type(creature_table.c.is_demon, Boolean) + + # Check the structure of the level table + level_table = Table( + 'level', metadata, + autoload=True, autoload_with=engine) + assert set(level_table.c.keys()) == set( + ['id', 'name', 'description', 'exits']) + assert_col_type(level_table.c.id, VARCHAR) + assert level_table.c.id.primary_key is True + assert_col_type(level_table.c.name, VARCHAR) + assert_col_type(level_table.c.description, VARCHAR) + # Skipping exits... Not sure if we can detect pickletype, not a + # big deal regardless. + + # Now check to see if stuff seems to be in there. + session = Session() + + creature = session.query(Creature1).filter_by( + name=u'centipede').one() + assert creature.num_legs == 100 + assert creature.is_demon == False + + creature = session.query(Creature1).filter_by( + name=u'wolf').one() + assert creature.num_legs == 4 + assert creature.is_demon == False + + creature = session.query(Creature1).filter_by( + name=u'wizardsnake').one() + assert creature.num_legs == 0 + assert creature.is_demon == True + + level = session.query(Level1).filter_by( + id=u'necroplex').one() + assert level.name == u'The Necroplex' + assert level.description == u'A complex full of pure deathzone.' + assert level.exits == { + 'deathwell': 'evilstorm', + 'portal': 'central_park'} + + level = session.query(Level1).filter_by( + id=u'evilstorm').one() + assert level.name == u'Evil Storm' + assert level.description == u'A storm full of pure evil.' + assert level.exits == {} # You still can't escape the evilstorm! + + level = session.query(Level1).filter_by( + id=u'central_park').one() + assert level.name == u'Central Park, NY, NY' + assert level.description == u"New York's friendly Central Park." + assert level.exits == { + 'portal': 'necroplex'} + + # Create new migration manager, but make sure the db migration + # isn't said to be updated yet + printer = CollectingPrinter() + migration_manager = MigrationManager( + u'__main__', SET3_MODELS, SET3_MIGRATIONS, Session(), + printer) + + assert migration_manager.latest_migration == 8 + assert migration_manager.database_current_migration == 0 + + # Migrate + result = migration_manager.init_or_migrate() + + # Make sure result was "migrated" + assert result == u'migrated' + + # TODO: Check output to user + assert printer.combined_string == """\ +-> Updating main mediagoblin tables: + + Running migration 1, "creature_remove_is_demon"... done. + + Running migration 2, "creature_powers_new_table"... done. + + Running migration 3, "level_exits_new_table"... done. + + Running migration 4, "creature_num_legs_to_num_limbs"... done. + + Running migration 5, "level_exit_index_from_and_to_level"... done. + + Running migration 6, "creature_power_index_creature"... done. + + Running migration 7, "creature_power_hitpower_to_float"... done. + + Running migration 8, "creature_power_name_creature_unique"... done. +""" + + # Make sure version matches expected + migration_manager = MigrationManager( + u'__main__', SET3_MODELS, SET3_MIGRATIONS, Session(), + printer) + assert migration_manager.latest_migration == 8 + assert migration_manager.database_current_migration == 8 + + # Check all things in database match expected + + # Check the creature table + metadata = MetaData(bind=engine) + creature_table = Table( + 'creature', metadata, + autoload=True, autoload_with=engine) + # assert set(creature_table.c.keys()) == set( + # ['id', 'name', 'num_limbs']) + assert set(creature_table.c.keys()) == set( + [u'id', 'name', u'num_limbs', u'is_demon']) + assert_col_type(creature_table.c.id, Integer) + assert_col_type(creature_table.c.name, VARCHAR) + assert creature_table.c.name.nullable is False + #assert creature_table.c.name.index is True + #assert creature_table.c.name.unique is True + assert_col_type(creature_table.c.num_limbs, Integer) + assert creature_table.c.num_limbs.nullable is False + + # Check the CreaturePower table + creature_power_table = Table( + 'creature_power', metadata, + autoload=True, autoload_with=engine) + assert set(creature_power_table.c.keys()) == set( + ['id', 'creature', 'name', 'description', 'hitpower']) + assert_col_type(creature_power_table.c.id, Integer) + assert_col_type(creature_power_table.c.creature, Integer) + assert creature_power_table.c.creature.nullable is False + assert_col_type(creature_power_table.c.name, VARCHAR) + assert_col_type(creature_power_table.c.description, VARCHAR) + assert_col_type(creature_power_table.c.hitpower, Float) + assert creature_power_table.c.hitpower.nullable is False + + # Check the structure of the level table + level_table = Table( + 'level', metadata, + autoload=True, autoload_with=engine) + assert set(level_table.c.keys()) == set( + ['id', 'name', 'description']) + assert_col_type(level_table.c.id, VARCHAR) + assert level_table.c.id.primary_key is True + assert_col_type(level_table.c.name, VARCHAR) + assert_col_type(level_table.c.description, VARCHAR) + + # Check the structure of the level_exits table + level_exit_table = Table( + 'level_exit', metadata, + autoload=True, autoload_with=engine) + assert set(level_exit_table.c.keys()) == set( + ['id', 'name', 'from_level', 'to_level']) + assert_col_type(level_exit_table.c.id, Integer) + assert_col_type(level_exit_table.c.name, VARCHAR) + assert_col_type(level_exit_table.c.from_level, VARCHAR) + assert level_exit_table.c.from_level.nullable is False + #assert level_exit_table.c.from_level.index is True + assert_col_type(level_exit_table.c.to_level, VARCHAR) + assert level_exit_table.c.to_level.nullable is False + #assert level_exit_table.c.to_level.index is True + + # Now check to see if stuff seems to be in there. + session = Session() + creature = session.query(Creature3).filter_by( + name=u'centipede').one() + assert creature.num_limbs == 100.0 + assert creature.magical_powers == [] + + creature = session.query(Creature3).filter_by( + name=u'wolf').one() + assert creature.num_limbs == 4.0 + assert creature.magical_powers == [] + + creature = session.query(Creature3).filter_by( + name=u'wizardsnake').one() + assert creature.num_limbs == 0.0 + assert creature.magical_powers == [] + + level = session.query(Level3).filter_by( + id=u'necroplex').one() + assert level.name == u'The Necroplex' + assert level.description == u'A complex full of pure deathzone.' + level_exits = _get_level3_exits(session, level) + assert level_exits == { + u'deathwell': u'evilstorm', + u'portal': u'central_park'} + + level = session.query(Level3).filter_by( + id=u'evilstorm').one() + assert level.name == u'Evil Storm' + assert level.description == u'A storm full of pure evil.' + level_exits = _get_level3_exits(session, level) + assert level_exits == {} # You still can't escape the evilstorm! + + level = session.query(Level3).filter_by( + id=u'central_park').one() + assert level.name == u'Central Park, NY, NY' + assert level.description == u"New York's friendly Central Park." + level_exits = _get_level3_exits(session, level) + assert level_exits == { + 'portal': 'necroplex'} + + +#def test_set2_to_set3(): + # Create / connect to database + # Create tables by migrating on empty initial set + + # Install the initial set + # Check version in database + # Sanity check a few things in the database + + # Migrate + # Make sure version matches expected + # Check all things in database match expected + # pass + + +#def test_set1_to_set2_to_set3(): + # Create / connect to database + # Create tables by migrating on empty initial set + + # Install the initial set + # Check version in database + # Sanity check a few things in the database + + # Migrate + # Make sure version matches expected + # Check all things in database match expected + + # Migrate again + # Make sure version matches expected again + # Check all things in database match expected again + + ##### Set2 + # creature_table = Table( + # 'creature', metadata, + # autoload=True, autoload_with=db_conn.bind) + # assert set(creature_table.c.keys()) == set( + # ['id', 'name', 'num_legs']) + # assert_col_type(creature_table.c.id, Integer) + # assert_col_type(creature_table.c.name, VARCHAR) + # assert creature_table.c.name.nullable is False + # assert creature_table.c.name.index is True + # assert creature_table.c.name.unique is True + # assert_col_type(creature_table.c.num_legs, Integer) + # assert creature_table.c.num_legs.nullable is False + + # # Check the CreaturePower table + # creature_power_table = Table( + # 'creature_power', metadata, + # autoload=True, autoload_with=db_conn.bind) + # assert set(creature_power_table.c.keys()) == set( + # ['id', 'creature', 'name', 'description', 'hitpower']) + # assert_col_type(creature_power_table.c.id, Integer) + # assert_col_type(creature_power_table.c.creature, Integer) + # assert creature_power_table.c.creature.nullable is False + # assert_col_type(creature_power_table.c.name, VARCHAR) + # assert_col_type(creature_power_table.c.description, VARCHAR) + # assert_col_type(creature_power_table.c.hitpower, Integer) + # assert creature_power_table.c.hitpower.nullable is False + + # # Check the structure of the level table + # level_table = Table( + # 'level', metadata, + # autoload=True, autoload_with=db_conn.bind) + # assert set(level_table.c.keys()) == set( + # ['id', 'name', 'description']) + # assert_col_type(level_table.c.id, VARCHAR) + # assert level_table.c.id.primary_key is True + # assert_col_type(level_table.c.name, VARCHAR) + # assert_col_type(level_table.c.description, VARCHAR) + + # # Check the structure of the level_exits table + # level_exit_table = Table( + # 'level_exit', metadata, + # autoload=True, autoload_with=db_conn.bind) + # assert set(level_exit_table.c.keys()) == set( + # ['id', 'name', 'from_level', 'to_level']) + # assert_col_type(level_exit_table.c.id, Integer) + # assert_col_type(level_exit_table.c.name, VARCHAR) + # assert_col_type(level_exit_table.c.from_level, VARCHAR) + # assert level_exit_table.c.from_level.nullable is False + # assert_col_type(level_exit_table.c.to_level, VARCHAR) + + # pass diff --git a/mediagoblin/tests/test_staticdirect.py b/mediagoblin/tests/test_staticdirect.py new file mode 100644 index 00000000..3a9e2fd9 --- /dev/null +++ b/mediagoblin/tests/test_staticdirect.py @@ -0,0 +1,9 @@ +from mediagoblin.tools import staticdirect + +def test_staticdirect(): + sdirect = staticdirect.StaticDirect( + {None: "/static/", + "theme": "http://example.org/themestatic"}) + assert sdirect("css/monkeys.css") == "/static/css/monkeys.css" + assert sdirect("images/lollerskate.png", "theme") == \ + "http://example.org/themestatic/images/lollerskate.png" diff --git a/mediagoblin/tests/test_storage.py b/mediagoblin/tests/test_storage.py new file mode 100644 index 00000000..f6f1d18f --- /dev/null +++ b/mediagoblin/tests/test_storage.py @@ -0,0 +1,321 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +import os +import tempfile + +import pytest +from werkzeug.utils import secure_filename + +from mediagoblin import storage + + +################ +# Test utilities +################ + +def test_clean_listy_filepath(): + expected = [u'dir1', u'dir2', u'linooks.jpg'] + assert storage.clean_listy_filepath( + ['dir1', 'dir2', 'linooks.jpg']) == expected + + expected = [u'dir1', u'foo_.._nasty', u'linooks.jpg'] + assert storage.clean_listy_filepath( + ['/dir1/', 'foo/../nasty', 'linooks.jpg']) == expected + + expected = [u'etc', u'passwd'] + assert storage.clean_listy_filepath( + ['../../../etc/', 'passwd']) == expected + + with pytest.raises(storage.InvalidFilepath): + storage.clean_listy_filepath(['../../', 'linooks.jpg']) + + +class FakeStorageSystem(): + def __init__(self, foobie, blech, **kwargs): + self.foobie = foobie + self.blech = blech + +class FakeRemoteStorage(storage.filestorage.BasicFileStorage): + # Theoretically despite this, all the methods should work but it + # should force copying to the workbench + local_storage = False + + def copy_local_to_storage(self, *args, **kwargs): + return storage.StorageInterface.copy_local_to_storage( + self, *args, **kwargs) + + +def test_storage_system_from_config(): + this_storage = storage.storage_system_from_config( + {'base_url': 'http://example.org/moodia/', + 'base_dir': '/tmp/', + 'garbage_arg': 'garbage_arg', + 'garbage_arg': 'trash'}) + assert this_storage.base_url == 'http://example.org/moodia/' + assert this_storage.base_dir == '/tmp/' + assert this_storage.__class__ is storage.filestorage.BasicFileStorage + + this_storage = storage.storage_system_from_config( + {'foobie': 'eiboof', + 'blech': 'hcelb', + 'garbage_arg': 'garbage_arg', + 'storage_class': + 'mediagoblin.tests.test_storage:FakeStorageSystem'}) + assert this_storage.foobie == 'eiboof' + assert this_storage.blech == 'hcelb' + assert unicode(this_storage.__class__) == \ + u'mediagoblin.tests.test_storage.FakeStorageSystem' + + +########################## +# Basic file storage tests +########################## + +def get_tmp_filestorage(mount_url=None, fake_remote=False): + tmpdir = tempfile.mkdtemp(prefix="test_gmg_storage") + if fake_remote: + this_storage = FakeRemoteStorage(tmpdir, mount_url) + else: + this_storage = storage.filestorage.BasicFileStorage(tmpdir, mount_url) + return tmpdir, this_storage + + +def cleanup_storage(this_storage, tmpdir, *paths): + for p in paths: + while p: + assert this_storage.delete_dir(p) == True + p.pop(-1) + os.rmdir(tmpdir) + + +def test_basic_storage__resolve_filepath(): + tmpdir, this_storage = get_tmp_filestorage() + + result = this_storage._resolve_filepath(['dir1', 'dir2', 'filename.jpg']) + assert result == os.path.join( + tmpdir, 'dir1/dir2/filename.jpg') + + result = this_storage._resolve_filepath(['../../etc/', 'passwd']) + assert result == os.path.join( + tmpdir, 'etc/passwd') + + pytest.raises( + storage.InvalidFilepath, + this_storage._resolve_filepath, + ['../../', 'etc', 'passwd']) + + cleanup_storage(this_storage, tmpdir) + + +def test_basic_storage_file_exists(): + tmpdir, this_storage = get_tmp_filestorage() + + os.makedirs(os.path.join(tmpdir, 'dir1', 'dir2')) + filename = os.path.join(tmpdir, 'dir1', 'dir2', 'filename.txt') + with open(filename, 'w') as ourfile: + ourfile.write("I'm having a lovely day!") + + assert this_storage.file_exists(['dir1', 'dir2', 'filename.txt']) + assert not this_storage.file_exists(['dir1', 'dir2', 'thisfile.lol']) + assert not this_storage.file_exists(['dnedir1', 'dnedir2', 'somefile.lol']) + + this_storage.delete_file(['dir1', 'dir2', 'filename.txt']) + cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2']) + + +def test_basic_storage_get_unique_filepath(): + tmpdir, this_storage = get_tmp_filestorage() + + # write something that exists + os.makedirs(os.path.join(tmpdir, 'dir1', 'dir2')) + filename = os.path.join(tmpdir, 'dir1', 'dir2', 'filename.txt') + with open(filename, 'w') as ourfile: + ourfile.write("I'm having a lovely day!") + + # now we want something new, with the same name! + new_filepath = this_storage.get_unique_filepath( + ['dir1', 'dir2', 'filename.txt']) + assert new_filepath[:-1] == [u'dir1', u'dir2'] + + new_filename = new_filepath[-1] + assert new_filename.endswith('filename.txt') + assert len(new_filename) > len('filename.txt') + assert new_filename == secure_filename(new_filename) + + os.remove(filename) + cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2']) + + +def test_basic_storage_get_file(): + tmpdir, this_storage = get_tmp_filestorage() + + # Write a brand new file + filepath = ['dir1', 'dir2', 'ourfile.txt'] + + with this_storage.get_file(filepath, 'w') as our_file: + our_file.write('First file') + with this_storage.get_file(filepath, 'r') as our_file: + assert our_file.read() == 'First file' + assert os.path.exists(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt')) + with file(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'), 'r') as our_file: + assert our_file.read() == 'First file' + + # Write to the same path but try to get a unique file. + new_filepath = this_storage.get_unique_filepath(filepath) + assert not os.path.exists(os.path.join(tmpdir, *new_filepath)) + + with this_storage.get_file(new_filepath, 'w') as our_file: + our_file.write('Second file') + with this_storage.get_file(new_filepath, 'r') as our_file: + assert our_file.read() == 'Second file' + assert os.path.exists(os.path.join(tmpdir, *new_filepath)) + with file(os.path.join(tmpdir, *new_filepath), 'r') as our_file: + assert our_file.read() == 'Second file' + + # Read from an existing file + manually_written_file = os.makedirs( + os.path.join(tmpdir, 'testydir')) + with file(os.path.join(tmpdir, 'testydir/testyfile.txt'), 'w') as testyfile: + testyfile.write('testy file! so testy.') + + with this_storage.get_file(['testydir', 'testyfile.txt']) as testyfile: + assert testyfile.read() == 'testy file! so testy.' + + this_storage.delete_file(filepath) + this_storage.delete_file(new_filepath) + this_storage.delete_file(['testydir', 'testyfile.txt']) + cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'], ['testydir']) + + +def test_basic_storage_delete_file(): + tmpdir, this_storage = get_tmp_filestorage() + + assert not os.path.exists( + os.path.join(tmpdir, 'dir1/dir2/ourfile.txt')) + + filepath = ['dir1', 'dir2', 'ourfile.txt'] + with this_storage.get_file(filepath, 'w') as our_file: + our_file.write('Testing this file') + + assert os.path.exists( + os.path.join(tmpdir, 'dir1/dir2/ourfile.txt')) + + assert this_storage.delete_dir(['dir1', 'dir2']) == False + this_storage.delete_file(filepath) + assert this_storage.delete_dir(['dir1', 'dir2']) == True + + assert not os.path.exists( + os.path.join(tmpdir, 'dir1/dir2/ourfile.txt')) + + cleanup_storage(this_storage, tmpdir, ['dir1']) + + +def test_basic_storage_url_for_file(): + # Not supplying a base_url should actually just bork. + tmpdir, this_storage = get_tmp_filestorage() + pytest.raises( + storage.NoWebServing, + this_storage.file_url, + ['dir1', 'dir2', 'filename.txt']) + cleanup_storage(this_storage, tmpdir) + + # base_url without domain + tmpdir, this_storage = get_tmp_filestorage('/media/') + result = this_storage.file_url( + ['dir1', 'dir2', 'filename.txt']) + expected = '/media/dir1/dir2/filename.txt' + assert result == expected + cleanup_storage(this_storage, tmpdir) + + # base_url with domain + tmpdir, this_storage = get_tmp_filestorage( + 'http://media.example.org/ourmedia/') + result = this_storage.file_url( + ['dir1', 'dir2', 'filename.txt']) + expected = 'http://media.example.org/ourmedia/dir1/dir2/filename.txt' + assert result == expected + cleanup_storage(this_storage, tmpdir) + + +def test_basic_storage_get_local_path(): + tmpdir, this_storage = get_tmp_filestorage() + + result = this_storage.get_local_path( + ['dir1', 'dir2', 'filename.txt']) + + expected = os.path.join( + tmpdir, 'dir1/dir2/filename.txt') + + assert result == expected + + cleanup_storage(this_storage, tmpdir) + + +def test_basic_storage_is_local(): + tmpdir, this_storage = get_tmp_filestorage() + assert this_storage.local_storage is True + cleanup_storage(this_storage, tmpdir) + + +def test_basic_storage_copy_locally(): + tmpdir, this_storage = get_tmp_filestorage() + + dest_tmpdir = tempfile.mkdtemp() + + filepath = ['dir1', 'dir2', 'ourfile.txt'] + with this_storage.get_file(filepath, 'w') as our_file: + our_file.write('Testing this file') + + new_file_dest = os.path.join(dest_tmpdir, 'file2.txt') + + this_storage.copy_locally(filepath, new_file_dest) + this_storage.delete_file(filepath) + + assert file(new_file_dest).read() == 'Testing this file' + + os.remove(new_file_dest) + os.rmdir(dest_tmpdir) + cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2']) + + +def _test_copy_local_to_storage_works(tmpdir, this_storage): + local_filename = tempfile.mktemp() + with file(local_filename, 'w') as tmpfile: + tmpfile.write('haha') + + this_storage.copy_local_to_storage( + local_filename, ['dir1', 'dir2', 'copiedto.txt']) + + os.remove(local_filename) + + assert file( + os.path.join(tmpdir, 'dir1/dir2/copiedto.txt'), + 'r').read() == 'haha' + + this_storage.delete_file(['dir1', 'dir2', 'copiedto.txt']) + cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2']) + + +def test_basic_storage_copy_local_to_storage(): + tmpdir, this_storage = get_tmp_filestorage() + _test_copy_local_to_storage_works(tmpdir, this_storage) + + +def test_general_storage_copy_local_to_storage(): + tmpdir, this_storage = get_tmp_filestorage(fake_remote=True) + _test_copy_local_to_storage_works(tmpdir, this_storage) diff --git a/mediagoblin/tests/test_submission.py b/mediagoblin/tests/test_submission.py new file mode 100644 index 00000000..162b2d19 --- /dev/null +++ b/mediagoblin/tests/test_submission.py @@ -0,0 +1,294 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import sys +reload(sys) +sys.setdefaultencoding('utf-8') + +import urlparse +import os +import pytest + +from mediagoblin.tests.tools import fixture_add_user +from mediagoblin import mg_globals +from mediagoblin.db.models import MediaEntry +from mediagoblin.tools import template +from mediagoblin.media_types.image import MEDIA_MANAGER as img_MEDIA_MANAGER +from mediagoblin.media_types.pdf.processing import check_prerequisites as pdf_check_prerequisites + +from .resources import GOOD_JPG, GOOD_PNG, EVIL_FILE, EVIL_JPG, EVIL_PNG, \ + BIG_BLUE, GOOD_PDF, GPS_JPG + +GOOD_TAG_STRING = u'yin,yang' +BAD_TAG_STRING = unicode('rage,' + 'f' * 26 + 'u' * 26) + +FORM_CONTEXT = ['mediagoblin/submit/start.html', 'submit_form'] +REQUEST_CONTEXT = ['mediagoblin/user_pages/user.html', 'request'] + + +class TestSubmission: + @pytest.fixture(autouse=True) + def setup(self, test_app): + self.test_app = test_app + + # TODO: Possibly abstract into a decorator like: + # @as_authenticated_user('chris') + test_user = fixture_add_user() + + self.test_user = test_user + + self.login() + + def login(self): + self.test_app.post( + '/auth/login/', { + 'username': u'chris', + 'password': 'toast'}) + + def logout(self): + self.test_app.get('/auth/logout/') + + def do_post(self, data, *context_keys, **kwargs): + url = kwargs.pop('url', '/submit/') + do_follow = kwargs.pop('do_follow', False) + template.clear_test_template_context() + response = self.test_app.post(url, data, **kwargs) + if do_follow: + response.follow() + context_data = template.TEMPLATE_TEST_CONTEXT + for key in context_keys: + context_data = context_data[key] + return response, context_data + + def upload_data(self, filename): + return {'upload_files': [('file', filename)]} + + def check_comments(self, request, media_id, count): + comments = request.db.MediaComment.find({'media_entry': media_id}) + assert count == len(list(comments)) + + def test_missing_fields(self): + # Test blank form + # --------------- + response, form = self.do_post({}, *FORM_CONTEXT) + assert form.file.errors == [u'You must provide a file.'] + + # Test blank file + # --------------- + response, form = self.do_post({'title': u'test title'}, *FORM_CONTEXT) + assert form.file.errors == [u'You must provide a file.'] + + def check_url(self, response, path): + assert urlparse.urlsplit(response.location)[2] == path + + def check_normal_upload(self, title, filename): + response, context = self.do_post({'title': title}, do_follow=True, + **self.upload_data(filename)) + self.check_url(response, '/u/{0}/'.format(self.test_user.username)) + assert 'mediagoblin/user_pages/user.html' in context + # Make sure the media view is at least reachable, logged in... + url = '/u/{0}/m/{1}/'.format(self.test_user.username, + title.lower().replace(' ', '-')) + self.test_app.get(url) + # ... and logged out too. + self.logout() + self.test_app.get(url) + + def test_normal_jpg(self): + self.check_normal_upload(u'Normal upload 1', GOOD_JPG) + + def test_normal_png(self): + self.check_normal_upload(u'Normal upload 2', GOOD_PNG) + + @pytest.mark.skipif("not pdf_check_prerequisites()") + def test_normal_pdf(self): + response, context = self.do_post({'title': u'Normal upload 3 (pdf)'}, + do_follow=True, + **self.upload_data(GOOD_PDF)) + self.check_url(response, '/u/{0}/'.format(self.test_user.username)) + assert 'mediagoblin/user_pages/user.html' in context + + def check_media(self, request, find_data, count=None): + media = MediaEntry.find(find_data) + if count is not None: + assert media.count() == count + if count == 0: + return + return media[0] + + def test_tags(self): + # Good tag string + # -------- + response, request = self.do_post({'title': u'Balanced Goblin 2', + 'tags': GOOD_TAG_STRING}, + *REQUEST_CONTEXT, do_follow=True, + **self.upload_data(GOOD_JPG)) + media = self.check_media(request, {'title': u'Balanced Goblin 2'}, 1) + assert media.tags[0]['name'] == u'yin' + assert media.tags[0]['slug'] == u'yin' + + assert media.tags[1]['name'] == u'yang' + assert media.tags[1]['slug'] == u'yang' + + # Test tags that are too long + # --------------- + response, form = self.do_post({'title': u'Balanced Goblin 2', + 'tags': BAD_TAG_STRING}, + *FORM_CONTEXT, + **self.upload_data(GOOD_JPG)) + assert form.tags.errors == [ + u'Tags must be shorter than 50 characters. ' \ + 'Tags that are too long: ' \ + 'ffffffffffffffffffffffffffuuuuuuuuuuuuuuuuuuuuuuuuuu'] + + def test_delete(self): + response, request = self.do_post({'title': u'Balanced Goblin'}, + *REQUEST_CONTEXT, do_follow=True, + **self.upload_data(GOOD_JPG)) + media = self.check_media(request, {'title': u'Balanced Goblin'}, 1) + media_id = media.id + + # render and post to the edit page. + edit_url = request.urlgen( + 'mediagoblin.edit.edit_media', + user=self.test_user.username, media_id=media_id) + self.test_app.get(edit_url) + self.test_app.post(edit_url, + {'title': u'Balanced Goblin', + 'slug': u"Balanced=Goblin", + 'tags': u''}) + media = self.check_media(request, {'title': u'Balanced Goblin'}, 1) + assert media.slug == u"balanced-goblin" + + # Add a comment, so we can test for its deletion later. + self.check_comments(request, media_id, 0) + comment_url = request.urlgen( + 'mediagoblin.user_pages.media_post_comment', + user=self.test_user.username, media_id=media_id) + response = self.do_post({'comment_content': 'i love this test'}, + url=comment_url, do_follow=True)[0] + self.check_comments(request, media_id, 1) + + # Do not confirm deletion + # --------------------------------------------------- + delete_url = request.urlgen( + 'mediagoblin.user_pages.media_confirm_delete', + user=self.test_user.username, media_id=media_id) + # Empty data means don't confirm + response = self.do_post({}, do_follow=True, url=delete_url)[0] + media = self.check_media(request, {'title': u'Balanced Goblin'}, 1) + media_id = media.id + + # Confirm deletion + # --------------------------------------------------- + response, request = self.do_post({'confirm': 'y'}, *REQUEST_CONTEXT, + do_follow=True, url=delete_url) + self.check_media(request, {'id': media_id}, 0) + self.check_comments(request, media_id, 0) + + def test_evil_file(self): + # Test non-suppoerted file with non-supported extension + # ----------------------------------------------------- + response, form = self.do_post({'title': u'Malicious Upload 1'}, + *FORM_CONTEXT, + **self.upload_data(EVIL_FILE)) + assert len(form.file.errors) == 1 + assert 'Sorry, I don\'t support that file type :(' == \ + str(form.file.errors[0]) + + + def test_get_media_manager(self): + """Test if the get_media_manger function returns sensible things + """ + response, request = self.do_post({'title': u'Balanced Goblin'}, + *REQUEST_CONTEXT, do_follow=True, + **self.upload_data(GOOD_JPG)) + media = self.check_media(request, {'title': u'Balanced Goblin'}, 1) + + assert media.media_type == u'mediagoblin.media_types.image' + assert isinstance(media.media_manager, img_MEDIA_MANAGER) + assert media.media_manager.entry == media + + + def test_sniffing(self): + ''' + Test sniffing mechanism to assert that regular uploads work as intended + ''' + template.clear_test_template_context() + response = self.test_app.post( + '/submit/', { + 'title': u'UNIQUE_TITLE_PLS_DONT_CREATE_OTHER_MEDIA_WITH_THIS_TITLE' + }, upload_files=[( + 'file', GOOD_JPG)]) + + response.follow() + + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/user_pages/user.html'] + + request = context['request'] + + media = request.db.MediaEntry.find_one({ + u'title': u'UNIQUE_TITLE_PLS_DONT_CREATE_OTHER_MEDIA_WITH_THIS_TITLE'}) + + assert media.media_type == 'mediagoblin.media_types.image' + + def check_false_image(self, title, filename): + # NOTE: The following 2 tests will ultimately fail, but they + # *will* pass the initial form submission step. Instead, + # they'll be caught as failures during the processing step. + response, context = self.do_post({'title': title}, do_follow=True, + **self.upload_data(filename)) + self.check_url(response, '/u/{0}/'.format(self.test_user.username)) + entry = mg_globals.database.MediaEntry.find_one({'title': title}) + assert entry.state == 'failed' + assert entry.fail_error == u'mediagoblin.processing:BadMediaFail' + + def test_evil_jpg(self): + # Test non-supported file with .jpg extension + # ------------------------------------------- + self.check_false_image(u'Malicious Upload 2', EVIL_JPG) + + def test_evil_png(self): + # Test non-supported file with .png extension + # ------------------------------------------- + self.check_false_image(u'Malicious Upload 3', EVIL_PNG) + + def test_media_data(self): + self.check_normal_upload(u"With GPS data", GPS_JPG) + media = self.check_media(None, {"title": u"With GPS data"}, 1) + assert media.media_data.gps_latitude == 59.336666666666666 + + def test_processing(self): + public_store_dir = mg_globals.global_config[ + 'storage:publicstore']['base_dir'] + + data = {'title': u'Big Blue'} + response, request = self.do_post(data, *REQUEST_CONTEXT, do_follow=True, + **self.upload_data(BIG_BLUE)) + media = self.check_media(request, data, 1) + last_size = 1024 ** 3 # Needs to be larger than bigblue.png + for key, basename in (('original', 'bigblue.png'), + ('medium', 'bigblue.medium.png'), + ('thumb', 'bigblue.thumbnail.png')): + # Does the processed image have a good filename? + filename = os.path.join( + public_store_dir, + *media.media_files[key]) + assert filename.endswith('_' + basename) + # Is it smaller than the last processed image we looked at? + size = os.stat(filename).st_size + assert last_size > size + last_size = size diff --git a/mediagoblin/tests/test_submission/bigblue.png b/mediagoblin/tests/test_submission/bigblue.png Binary files differnew file mode 100644 index 00000000..2b2c2a44 --- /dev/null +++ b/mediagoblin/tests/test_submission/bigblue.png diff --git a/mediagoblin/tests/test_submission/evil b/mediagoblin/tests/test_submission/evil new file mode 100755 index 00000000..2c850e29 --- /dev/null +++ b/mediagoblin/tests/test_submission/evil @@ -0,0 +1,3 @@ +#!/bin/sh + +echo "In yer base, doin spooky things"
\ No newline at end of file diff --git a/mediagoblin/tests/test_submission/evil.jpg b/mediagoblin/tests/test_submission/evil.jpg new file mode 100755 index 00000000..2c850e29 --- /dev/null +++ b/mediagoblin/tests/test_submission/evil.jpg @@ -0,0 +1,3 @@ +#!/bin/sh + +echo "In yer base, doin spooky things"
\ No newline at end of file diff --git a/mediagoblin/tests/test_submission/evil.png b/mediagoblin/tests/test_submission/evil.png new file mode 100755 index 00000000..2c850e29 --- /dev/null +++ b/mediagoblin/tests/test_submission/evil.png @@ -0,0 +1,3 @@ +#!/bin/sh + +echo "In yer base, doin spooky things"
\ No newline at end of file diff --git a/mediagoblin/tests/test_submission/good.jpg b/mediagoblin/tests/test_submission/good.jpg Binary files differnew file mode 100644 index 00000000..936458e9 --- /dev/null +++ b/mediagoblin/tests/test_submission/good.jpg diff --git a/mediagoblin/tests/test_submission/good.pdf b/mediagoblin/tests/test_submission/good.pdf Binary files differnew file mode 100644 index 00000000..ab5db006 --- /dev/null +++ b/mediagoblin/tests/test_submission/good.pdf diff --git a/mediagoblin/tests/test_submission/good.png b/mediagoblin/tests/test_submission/good.png Binary files differnew file mode 100644 index 00000000..c1eadf9c --- /dev/null +++ b/mediagoblin/tests/test_submission/good.png diff --git a/mediagoblin/tests/test_tags.py b/mediagoblin/tests/test_tags.py new file mode 100644 index 00000000..e25cc283 --- /dev/null +++ b/mediagoblin/tests/test_tags.py @@ -0,0 +1,39 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from mediagoblin.tools import text + +def test_list_of_dicts_conversion(test_app): + """ + When the user adds tags to a media entry, the string from the form is + converted into a list of tags, where each tag is stored in the database + as a dict. Each tag dict should contain the tag's name and slug. Another + function performs the reverse operation when populating a form to edit tags. + """ + # Leading, trailing, and internal whitespace should be removed and slugified + assert text.convert_to_tag_list_of_dicts('sleep , 6 AM, chainsaw! ') == [ + {'name': u'sleep', 'slug': u'sleep'}, + {'name': u'6 AM', 'slug': u'6-am'}, + {'name': u'chainsaw!', 'slug': u'chainsaw'}] + + # If the user enters two identical tags, record only one of them + assert text.convert_to_tag_list_of_dicts('echo,echo') == [{'name': u'echo', + 'slug': u'echo'}] + + # Make sure converting the list of dicts to a string works + assert text.media_tags_as_string([{'name': u'yin', 'slug': u'yin'}, + {'name': u'yang', 'slug': u'yang'}]) == \ + u'yin, yang' diff --git a/mediagoblin/tests/test_timesince.py b/mediagoblin/tests/test_timesince.py new file mode 100644 index 00000000..6579eb09 --- /dev/null +++ b/mediagoblin/tests/test_timesince.py @@ -0,0 +1,57 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from datetime import datetime, timedelta + +from mediagoblin.tools.timesince import is_aware, timesince + + +def test_timesince(): + test_time = datetime.now() + + # it should ignore second and microseconds + assert timesince(test_time, test_time + timedelta(microseconds=1)) == "0 minutes" + assert timesince(test_time, test_time + timedelta(seconds=1)) == "0 minutes" + + # test minutes, hours, days, weeks, months and years (singular and plural) + assert timesince(test_time, test_time + timedelta(minutes=1)) == "1 minute" + assert timesince(test_time, test_time + timedelta(minutes=2)) == "2 minutes" + + assert timesince(test_time, test_time + timedelta(hours=1)) == "1 hour" + assert timesince(test_time, test_time + timedelta(hours=2)) == "2 hours" + + assert timesince(test_time, test_time + timedelta(days=1)) == "1 day" + assert timesince(test_time, test_time + timedelta(days=2)) == "2 days" + + assert timesince(test_time, test_time + timedelta(days=7)) == "1 week" + assert timesince(test_time, test_time + timedelta(days=14)) == "2 weeks" + + assert timesince(test_time, test_time + timedelta(days=30)) == "1 month" + assert timesince(test_time, test_time + timedelta(days=60)) == "2 months" + + assert timesince(test_time, test_time + timedelta(days=365)) == "1 year" + assert timesince(test_time, test_time + timedelta(days=730)) == "2 years" + + # okay now we want to test combinations + # e.g. 1 hour, 5 days + assert timesince(test_time, test_time + timedelta(days=5, hours=1)) == "5 days, 1 hour" + + assert timesince(test_time, test_time + timedelta(days=15)) == "2 weeks, 1 day" + + assert timesince(test_time, test_time + timedelta(days=97)) == "3 months, 1 week" + + assert timesince(test_time, test_time + timedelta(days=2250)) == "6 years, 2 months" + diff --git a/mediagoblin/tests/test_util.py b/mediagoblin/tests/test_util.py new file mode 100644 index 00000000..bc14f528 --- /dev/null +++ b/mediagoblin/tests/test_util.py @@ -0,0 +1,145 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import email + +from mediagoblin.tools import common, url, translate, mail, text, testing + +testing._activate_testing() + + +def _import_component_testing_method(silly_string): + # Just for the sake of testing that our component importer works. + return u"'%s' is the silliest string I've ever seen" % silly_string + + +def test_import_component(): + imported_func = common.import_component( + 'mediagoblin.tests.test_util:_import_component_testing_method') + result = imported_func('hooobaladoobala') + expected = u"'hooobaladoobala' is the silliest string I've ever seen" + assert result == expected + + +def test_send_email(): + mail._clear_test_inboxes() + + # send the email + mail.send_email( + "sender@mediagoblin.example.org", + ["amanda@example.org", "akila@example.org"], + "Testing is so much fun!", + """HAYYY GUYS! + +I hope you like unit tests JUST AS MUCH AS I DO!""") + + # check the main inbox + assert len(mail.EMAIL_TEST_INBOX) == 1 + message = mail.EMAIL_TEST_INBOX.pop() + assert message['From'] == "sender@mediagoblin.example.org" + assert message['To'] == "amanda@example.org, akila@example.org" + assert message['Subject'] == "Testing is so much fun!" + assert message.get_payload(decode=True) == """HAYYY GUYS! + +I hope you like unit tests JUST AS MUCH AS I DO!""" + + # Check everything that the FakeMhost.sendmail() method got is correct + assert len(mail.EMAIL_TEST_MBOX_INBOX) == 1 + mbox_dict = mail.EMAIL_TEST_MBOX_INBOX.pop() + assert mbox_dict['from'] == "sender@mediagoblin.example.org" + assert mbox_dict['to'] == ["amanda@example.org", "akila@example.org"] + mbox_message = email.message_from_string(mbox_dict['message']) + assert mbox_message['From'] == "sender@mediagoblin.example.org" + assert mbox_message['To'] == "amanda@example.org, akila@example.org" + assert mbox_message['Subject'] == "Testing is so much fun!" + assert mbox_message.get_payload(decode=True) == """HAYYY GUYS! + +I hope you like unit tests JUST AS MUCH AS I DO!""" + +def test_slugify(): + assert url.slugify(u'a walk in the park') == u'a-walk-in-the-park' + assert url.slugify(u'A Walk in the Park') == u'a-walk-in-the-park' + assert url.slugify(u'a walk in the park') == u'a-walk-in-the-park' + assert url.slugify(u'a walk in-the-park') == u'a-walk-in-the-park' + assert url.slugify(u'a w@lk in the park?') == u'a-w-lk-in-the-park' + assert url.slugify(u'a walk in the par\u0107') == u'a-walk-in-the-parc' + assert url.slugify(u'\u00E0\u0042\u00E7\u010F\u00EB\u0066') == u'abcdef' + +def test_locale_to_lower_upper(): + """ + Test cc.i18n.util.locale_to_lower_upper() + """ + assert translate.locale_to_lower_upper('en') == 'en' + assert translate.locale_to_lower_upper('en_US') == 'en_US' + assert translate.locale_to_lower_upper('en-us') == 'en_US' + + # crazy renditions. Useful? + assert translate.locale_to_lower_upper('en-US') == 'en_US' + assert translate.locale_to_lower_upper('en_us') == 'en_US' + + +def test_locale_to_lower_lower(): + """ + Test cc.i18n.util.locale_to_lower_lower() + """ + assert translate.locale_to_lower_lower('en') == 'en' + assert translate.locale_to_lower_lower('en_US') == 'en-us' + assert translate.locale_to_lower_lower('en-us') == 'en-us' + + # crazy renditions. Useful? + assert translate.locale_to_lower_lower('en-US') == 'en-us' + assert translate.locale_to_lower_lower('en_us') == 'en-us' + + +def test_gettext_lazy_proxy(): + from mediagoblin.tools.translate import lazy_pass_to_ugettext as _ + from mediagoblin.tools.translate import pass_to_ugettext, set_thread_locale + proxy = _(u"Password") + orig = u"Password" + + set_thread_locale("es") + p1 = unicode(proxy) + p1_should = pass_to_ugettext(orig) + assert p1_should != orig, "Test useless, string not translated" + assert p1 == p1_should + + set_thread_locale("sv") + p2 = unicode(proxy) + p2_should = pass_to_ugettext(orig) + assert p2_should != orig, "Test broken, string not translated" + assert p2 == p2_should + + assert p1_should != p2_should, "Test broken, same translated string" + assert p1 != p2 + + +def test_html_cleaner(): + # Remove images + result = text.clean_html( + '<p>Hi everybody! ' + '<img src="http://example.org/huge-purple-barney.png" /></p>\n' + '<p>:)</p>') + assert result == ( + '<div>' + '<p>Hi everybody! </p>\n' + '<p>:)</p>' + '</div>') + + # Remove evil javascript + result = text.clean_html( + '<p><a href="javascript:nasty_surprise">innocent link!</a></p>') + assert result == ( + '<p><a href="">innocent link!</a></p>') diff --git a/mediagoblin/tests/test_workbench.py b/mediagoblin/tests/test_workbench.py new file mode 100644 index 00000000..6695618b --- /dev/null +++ b/mediagoblin/tests/test_workbench.py @@ -0,0 +1,122 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import tempfile + + +from mediagoblin.tools import workbench +from mediagoblin.mg_globals import setup_globals +from mediagoblin.decorators import get_workbench +from mediagoblin.tests.test_storage import get_tmp_filestorage, cleanup_storage + + +class TestWorkbench(object): + def setup(self): + self.workbench_base = tempfile.mkdtemp(prefix='gmg_workbench_testing') + self.workbench_manager = workbench.WorkbenchManager( + self.workbench_base) + + def teardown(self): + # If the workbench is empty, this should work. + os.rmdir(self.workbench_base) + + def test_create_workbench(self): + workbench = self.workbench_manager.create() + assert os.path.isdir(workbench.dir) + assert workbench.dir.startswith(self.workbench_manager.base_workbench_dir) + workbench.destroy() + + def test_joinpath(self): + this_workbench = self.workbench_manager.create() + tmpname = this_workbench.joinpath('temp.txt') + assert tmpname == os.path.join(this_workbench.dir, 'temp.txt') + this_workbench.destroy() + + def test_destroy_workbench(self): + # kill a workbench + this_workbench = self.workbench_manager.create() + tmpfile_name = this_workbench.joinpath('temp.txt') + tmpfile = file(tmpfile_name, 'w') + with tmpfile: + tmpfile.write('lollerskates') + + assert os.path.exists(tmpfile_name) + + wb_dir = this_workbench.dir + this_workbench.destroy() + assert not os.path.exists(tmpfile_name) + assert not os.path.exists(wb_dir) + + def test_localized_file(self): + tmpdir, this_storage = get_tmp_filestorage() + this_workbench = self.workbench_manager.create() + + # Write a brand new file + filepath = ['dir1', 'dir2', 'ourfile.txt'] + + with this_storage.get_file(filepath, 'w') as our_file: + our_file.write('Our file') + + # with a local file storage + filename = this_workbench.localized_file(this_storage, filepath) + assert filename == os.path.join( + tmpdir, 'dir1/dir2/ourfile.txt') + this_storage.delete_file(filepath) + cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2']) + + # with a fake remote file storage + tmpdir, this_storage = get_tmp_filestorage(fake_remote=True) + + # ... write a brand new file, again ;) + with this_storage.get_file(filepath, 'w') as our_file: + our_file.write('Our file') + + filename = this_workbench.localized_file(this_storage, filepath) + assert filename == os.path.join( + this_workbench.dir, 'ourfile.txt') + + # fake remote file storage, filename_if_copying set + filename = this_workbench.localized_file( + this_storage, filepath, 'thisfile') + assert filename == os.path.join( + this_workbench.dir, 'thisfile.txt') + + # fake remote file storage, filename_if_copying set, + # keep_extension_if_copying set to false + filename = this_workbench.localized_file( + this_storage, filepath, 'thisfile.text', False) + assert filename == os.path.join( + this_workbench.dir, 'thisfile.text') + + this_storage.delete_file(filepath) + cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2']) + this_workbench.destroy() + + def test_workbench_decorator(self): + """Test @get_workbench decorator and automatic cleanup""" + # The decorator needs mg_globals.workbench_manager + setup_globals(workbench_manager=self.workbench_manager) + + @get_workbench + def create_it(workbench=None): + # workbench dir exists? + assert os.path.isdir(workbench.dir) + return workbench.dir + + benchdir = create_it() + # workbench dir has been cleaned up automatically? + assert not os.path.isdir(benchdir) diff --git a/mediagoblin/tests/testplugins/__init__.py b/mediagoblin/tests/testplugins/__init__.py new file mode 100644 index 00000000..621845ba --- /dev/null +++ b/mediagoblin/tests/testplugins/__init__.py @@ -0,0 +1,15 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. diff --git a/mediagoblin/tests/testplugins/callables1/__init__.py b/mediagoblin/tests/testplugins/callables1/__init__.py new file mode 100644 index 00000000..fe801a01 --- /dev/null +++ b/mediagoblin/tests/testplugins/callables1/__init__.py @@ -0,0 +1,43 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +def setup_plugin(): + pass + + +def just_one(call_log): + call_log.append("expect this one call") + return "Called just once" + + +def multi_handle(call_log): + call_log.append("Hi, I'm the first") + return "the first returns" + +def multi_handle_with_canthandle(call_log): + return None + + +def expand_tuple(this_tuple): + return this_tuple + (1,) + +hooks = { + 'setup': setup_plugin, + 'just_one': just_one, + 'multi_handle': multi_handle, + 'multi_handle_with_canthandle': multi_handle_with_canthandle, + 'expand_tuple': expand_tuple, + } diff --git a/mediagoblin/tests/testplugins/callables2/__init__.py b/mediagoblin/tests/testplugins/callables2/__init__.py new file mode 100644 index 00000000..9d5cf950 --- /dev/null +++ b/mediagoblin/tests/testplugins/callables2/__init__.py @@ -0,0 +1,41 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +def setup_plugin(): + pass + + +def just_one(call_log): + assert "SHOULD NOT HAPPEN" + +def multi_handle(call_log): + call_log.append("Hi, I'm the second") + return "the second returns" + +def multi_handle_with_canthandle(call_log): + call_log.append("Hi, I'm the second") + return "the second returns" + +def expand_tuple(this_tuple): + return this_tuple + (2,) + +hooks = { + 'setup': setup_plugin, + 'just_one': just_one, + 'multi_handle': multi_handle, + 'multi_handle_with_canthandle': multi_handle_with_canthandle, + 'expand_tuple': expand_tuple, + } diff --git a/mediagoblin/tests/testplugins/callables3/__init__.py b/mediagoblin/tests/testplugins/callables3/__init__.py new file mode 100644 index 00000000..04efc8fc --- /dev/null +++ b/mediagoblin/tests/testplugins/callables3/__init__.py @@ -0,0 +1,41 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +def setup_plugin(): + pass + + +def just_one(call_log): + assert "SHOULD NOT HAPPEN" + +def multi_handle(call_log): + call_log.append("Hi, I'm the third") + return "the third returns" + +def multi_handle_with_canthandle(call_log): + call_log.append("Hi, I'm the third") + return "the third returns" + +def expand_tuple(this_tuple): + return this_tuple + (3,) + +hooks = { + 'setup': setup_plugin, + 'just_one': just_one, + 'multi_handle': multi_handle, + 'multi_handle_with_canthandle': multi_handle_with_canthandle, + 'expand_tuple': expand_tuple, + } diff --git a/mediagoblin/tests/testplugins/modify_context/__init__.py b/mediagoblin/tests/testplugins/modify_context/__init__.py new file mode 100644 index 00000000..164e66c1 --- /dev/null +++ b/mediagoblin/tests/testplugins/modify_context/__init__.py @@ -0,0 +1,55 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from mediagoblin.tools import pluginapi +import pkg_resources + + +def append_to_specific_context(context): + context['specific_page_append'] = 'in yer specificpage' + return context + +def append_to_global_context(context): + context['global_append'] = 'globally appended!' + return context + +def double_doubleme(context): + if 'doubleme' in context: + context['doubleme'] = context['doubleme'] * 2 + return context + + +def setup_plugin(): + routes = [ + ('modify_context.specific_page', + '/modify_context/specific/', + 'mediagoblin.tests.testplugins.modify_context.views:specific'), + ('modify_context.general_page', + '/modify_context/', + 'mediagoblin.tests.testplugins.modify_context.views:general')] + + pluginapi.register_routes(routes) + pluginapi.register_template_path( + pkg_resources.resource_filename( + 'mediagoblin.tests.testplugins.modify_context', 'templates')) + + +hooks = { + 'setup': setup_plugin, + ('modify_context.specific_page', + 'contextplugin/specific.html'): append_to_specific_context, + 'template_global_context': append_to_global_context, + 'template_context_prerender': double_doubleme} diff --git a/mediagoblin/tests/testplugins/modify_context/templates/contextplugin/general.html b/mediagoblin/tests/testplugins/modify_context/templates/contextplugin/general.html new file mode 100644 index 00000000..9cf96d3e --- /dev/null +++ b/mediagoblin/tests/testplugins/modify_context/templates/contextplugin/general.html @@ -0,0 +1,5 @@ +General page! + +global thing: {{ global_append }} +lol: {{ lol }} +doubleme: {{ doubleme }} diff --git a/mediagoblin/tests/testplugins/modify_context/templates/contextplugin/specific.html b/mediagoblin/tests/testplugins/modify_context/templates/contextplugin/specific.html new file mode 100644 index 00000000..5b1b4c4a --- /dev/null +++ b/mediagoblin/tests/testplugins/modify_context/templates/contextplugin/specific.html @@ -0,0 +1,6 @@ +Specific page! + +specific thing: {{ specific_page_append }} +global thing: {{ global_append }} +something: {{ something }} +doubleme: {{ doubleme }} diff --git a/mediagoblin/tests/testplugins/modify_context/views.py b/mediagoblin/tests/testplugins/modify_context/views.py new file mode 100644 index 00000000..701ec6f9 --- /dev/null +++ b/mediagoblin/tests/testplugins/modify_context/views.py @@ -0,0 +1,33 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from mediagoblin.tools.response import render_to_response + + +def specific(request): + return render_to_response( + request, + 'contextplugin/specific.html', + {"something": "orother", + "doubleme": "happy"}) + + +def general(request): + return render_to_response( + request, + 'contextplugin/general.html', + {"lol": "cats", + "doubleme": "joy"}) diff --git a/mediagoblin/tests/testplugins/pluginspec/__init__.py b/mediagoblin/tests/testplugins/pluginspec/__init__.py new file mode 100644 index 00000000..76ca2b1f --- /dev/null +++ b/mediagoblin/tests/testplugins/pluginspec/__init__.py @@ -0,0 +1,22 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +def setup_plugin(): + pass + +hooks = { + 'setup': setup_plugin, +} diff --git a/mediagoblin/tests/testplugins/pluginspec/config_spec.ini b/mediagoblin/tests/testplugins/pluginspec/config_spec.ini new file mode 100644 index 00000000..5c9c3bd7 --- /dev/null +++ b/mediagoblin/tests/testplugins/pluginspec/config_spec.ini @@ -0,0 +1,4 @@ +[plugin_spec] +some_string = string(default="blork") +some_int = integer(default=50) +dont_change_me = string(default="still the default")
\ No newline at end of file diff --git a/mediagoblin/tests/testplugins/staticstuff/__init__.py b/mediagoblin/tests/testplugins/staticstuff/__init__.py new file mode 100644 index 00000000..a2591646 --- /dev/null +++ b/mediagoblin/tests/testplugins/staticstuff/__init__.py @@ -0,0 +1,36 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from mediagoblin.tools.staticdirect import PluginStatic +from mediagoblin.tools import pluginapi +from pkg_resources import resource_filename + +def setup_plugin(): + routes = [ + ('staticstuff.static_demo', + '/staticstuff/', + 'mediagoblin.tests.testplugins.staticstuff.views:static_demo')] + + pluginapi.register_routes(routes) + + +hooks = { + 'setup': setup_plugin, + 'static_setup': lambda: PluginStatic( + 'staticstuff', + resource_filename( + 'mediagoblin.tests.testplugins.staticstuff', + 'static'))} diff --git a/mediagoblin/tests/testplugins/staticstuff/static/css/bunnify.css b/mediagoblin/tests/testplugins/staticstuff/static/css/bunnify.css new file mode 100644 index 00000000..1294ab8a --- /dev/null +++ b/mediagoblin/tests/testplugins/staticstuff/static/css/bunnify.css @@ -0,0 +1,4 @@ +body { + background-color: #5edcf1; + color: #eb8add; +}
\ No newline at end of file diff --git a/mediagoblin/tests/testplugins/staticstuff/views.py b/mediagoblin/tests/testplugins/staticstuff/views.py new file mode 100644 index 00000000..34a5e8cb --- /dev/null +++ b/mediagoblin/tests/testplugins/staticstuff/views.py @@ -0,0 +1,28 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import json + +from werkzeug import Response + + +def static_demo(request): + return Response(json.dumps({ + # this does not exist, but we'll pretend it does ;) + 'mgoblin_bunny_pic': request.staticdirect( + 'images/bunny_pic.png'), + 'plugin_bunny_css': request.staticdirect( + 'css/bunnify.css', 'staticstuff')})) diff --git a/mediagoblin/tests/tools.py b/mediagoblin/tests/tools.py new file mode 100644 index 00000000..2ee39e89 --- /dev/null +++ b/mediagoblin/tests/tools.py @@ -0,0 +1,233 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +import sys +import os +import pkg_resources +import shutil + +from functools import wraps + +from paste.deploy import loadapp +from webtest import TestApp + +from mediagoblin import mg_globals +from mediagoblin.db.models import User, MediaEntry, Collection +from mediagoblin.tools import testing +from mediagoblin.init.config import read_mediagoblin_config +from mediagoblin.db.base import Session +from mediagoblin.meddleware import BaseMeddleware +from mediagoblin.auth.lib import bcrypt_gen_password_hash +from mediagoblin.gmg_commands.dbupdate import run_dbupdate + + +MEDIAGOBLIN_TEST_DB_NAME = u'__mediagoblin_tests__' +TEST_SERVER_CONFIG = pkg_resources.resource_filename( + 'mediagoblin.tests', 'test_paste.ini') +TEST_APP_CONFIG = pkg_resources.resource_filename( + 'mediagoblin.tests', 'test_mgoblin_app.ini') + + +USER_DEV_DIRECTORIES_TO_SETUP = ['media/public', 'media/queue'] + + +class TestingMeddleware(BaseMeddleware): + """ + Meddleware for the Unit tests + + It might make sense to perform some tests on all + requests/responses. Or prepare them in a special + manner. For example all html responses could be tested + for being valid html *after* being rendered. + + This module is getting inserted at the front of the + meddleware list, which means: requests are handed here + first, responses last. So this wraps up the "normal" + app. + + If you need to add a test, either add it directly to + the appropiate process_request or process_response, or + create a new method and call it from process_*. + """ + + def process_response(self, request, response): + # All following tests should be for html only! + if getattr(response, 'content_type', None) != "text/html": + # Get out early + return + + # If the template contains a reference to + # /mgoblin_static/ instead of using + # /request.staticdirect(), error out here. + # This could probably be implemented as a grep on + # the shipped templates easier... + if response.text.find("/mgoblin_static/") >= 0: + raise AssertionError( + "Response HTML contains reference to /mgoblin_static/ " + "instead of staticdirect. Request was for: " + + request.full_path) + + return + + +def get_app(request, paste_config=None, mgoblin_config=None): + """Create a MediaGoblin app for testing. + + Args: + - request: Not an http request, but a pytest fixture request. We + use this to make temporary directories that pytest + automatically cleans up as needed. + - paste_config: particular paste config used by this application. + - mgoblin_config: particular mediagoblin config used by this + application. + """ + paste_config = paste_config or TEST_SERVER_CONFIG + mgoblin_config = mgoblin_config or TEST_APP_CONFIG + + # This is the directory we're copying the paste/mgoblin config stuff into + run_dir = request.config._tmpdirhandler.mktemp( + 'mgoblin_app', numbered=True) + user_dev_dir = run_dir.mkdir('user_dev').strpath + + new_paste_config = run_dir.join('paste.ini').strpath + new_mgoblin_config = run_dir.join('mediagoblin.ini').strpath + shutil.copyfile(paste_config, new_paste_config) + shutil.copyfile(mgoblin_config, new_mgoblin_config) + + Session.rollback() + Session.remove() + + # install user_dev directories + for directory in USER_DEV_DIRECTORIES_TO_SETUP: + full_dir = os.path.join(user_dev_dir, directory) + os.makedirs(full_dir) + + # Get app config + global_config, validation_result = read_mediagoblin_config(new_mgoblin_config) + app_config = global_config['mediagoblin'] + + # Run database setup/migrations + run_dbupdate(app_config, global_config) + + # setup app and return + test_app = loadapp( + 'config:' + new_paste_config) + + # Insert the TestingMeddleware, which can do some + # sanity checks on every request/response. + # Doing it this way is probably not the cleanest way. + # We'll fix it, when we have plugins! + mg_globals.app.meddleware.insert(0, TestingMeddleware(mg_globals.app)) + + app = TestApp(test_app) + + return app + + +def install_fixtures_simple(db, fixtures): + """ + Very simply install fixtures in the database + """ + for collection_name, collection_fixtures in fixtures.iteritems(): + collection = db[collection_name] + for fixture in collection_fixtures: + collection.insert(fixture) + + +def assert_db_meets_expected(db, expected): + """ + Assert a database contains the things we expect it to. + + Objects are found via 'id', so you should make sure your document + has an id. + + Args: + - db: pymongo or mongokit database connection + - expected: the data we expect. Formatted like: + {'collection_name': [ + {'id': 'foo', + 'some_field': 'some_value'},]} + """ + for collection_name, collection_data in expected.iteritems(): + collection = db[collection_name] + for expected_document in collection_data: + document = collection.find_one({'id': expected_document['id']}) + assert document is not None # make sure it exists + assert document == expected_document # make sure it matches + + +def fixture_add_user(username=u'chris', password=u'toast', + active_user=True): + # Reuse existing user or create a new one + test_user = User.query.filter_by(username=username).first() + if test_user is None: + test_user = User() + test_user.username = username + test_user.email = username + u'@example.com' + if password is not None: + test_user.pw_hash = bcrypt_gen_password_hash(password) + if active_user: + test_user.email_verified = True + test_user.status = u'active' + + test_user.save() + + # Reload + test_user = User.query.filter_by(username=username).first() + + # ... and detach from session: + Session.expunge(test_user) + + return test_user + + +def fixture_media_entry(title=u"Some title", slug=None, + uploader=None, save=True, gen_slug=True): + entry = MediaEntry() + entry.title = title + entry.slug = slug + entry.uploader = uploader or fixture_add_user().id + entry.media_type = u'image' + + if gen_slug: + entry.generate_slug() + if save: + entry.save() + + return entry + + +def fixture_add_collection(name=u"My first Collection", user=None): + if user is None: + user = fixture_add_user() + coll = Collection.query.filter_by(creator=user.id, title=name).first() + if coll is not None: + return coll + coll = Collection() + coll.creator = user.id + coll.title = name + coll.generate_slug() + coll.save() + + # Reload + Session.refresh(coll) + + # ... and detach from session: + Session.expunge(coll) + + return coll + |