diff options
Diffstat (limited to 'mediagoblin/tests')
51 files changed, 3355 insertions, 331 deletions
diff --git a/mediagoblin/tests/__init__.py b/mediagoblin/tests/__init__.py index 5a3235c6..cf200791 100644 --- a/mediagoblin/tests/__init__.py +++ b/mediagoblin/tests/__init__.py @@ -14,23 +14,9 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import os -import shutil - -from mediagoblin import mg_globals -from mediagoblin.tests.tools import ( - TEST_USER_DEV, suicide_if_bad_celery_environ) - def setup_package(): - suicide_if_bad_celery_environ() import warnings from sqlalchemy.exc import SAWarning warnings.simplefilter("error", SAWarning) - - -def teardown_package(): - # Remove and reinstall user_dev directories - if os.path.exists(TEST_USER_DEV): - shutil.rmtree(TEST_USER_DEV) diff --git a/mediagoblin/tests/appconfig_context_modified.ini b/mediagoblin/tests/appconfig_context_modified.ini new file mode 100644 index 00000000..cc6721f5 --- /dev/null +++ b/mediagoblin/tests/appconfig_context_modified.ini @@ -0,0 +1,27 @@ +[mediagoblin] +direct_remote_path = /test_static/ +email_sender_address = "notice@mediagoblin.example.org" +email_debug_mode = true + +#Runs with an in-memory sqlite db for speed. +sql_engine = "sqlite://" +run_migrations = true + +# Celery shouldn't be set up by the application as it's setup via +# mediagoblin.init.celery.from_celery +celery_setup_elsewhere = true + +[storage:publicstore] +base_dir = %(here)s/user_dev/media/public +base_url = /mgoblin_media/ + +[storage:queuestore] +base_dir = %(here)s/user_dev/media/queue + +[celery] +CELERY_ALWAYS_EAGER = true +CELERY_RESULT_DBURI = "sqlite:///%(here)s/user_dev/celery.db" +BROKER_HOST = "sqlite:///%(here)s/user_dev/kombu.db" + +[plugins] +[[mediagoblin.tests.testplugins.modify_context]] diff --git a/mediagoblin/tests/appconfig_plugin_specs.ini b/mediagoblin/tests/appconfig_plugin_specs.ini new file mode 100644 index 00000000..5511cd97 --- /dev/null +++ b/mediagoblin/tests/appconfig_plugin_specs.ini @@ -0,0 +1,21 @@ +[mediagoblin] +direct_remote_path = /mgoblin_static/ +email_sender_address = "notice@mediagoblin.example.org" + +## Uncomment and change to your DB's appropiate setting. +## Default is a local sqlite db "mediagoblin.db". +# sql_engine = postgresql:///gmg + +# set to false to enable sending notices +email_debug_mode = true + +# Set to false to disable registrations +allow_registration = true + +[plugins] +[[mediagoblin.tests.testplugins.pluginspec]] +some_string = "not blork" +some_int = "not an int" + +# this one shouldn't have its own config +[[mediagoblin.tests.testplugins.callables1]] diff --git a/mediagoblin/tests/appconfig_static_plugin.ini b/mediagoblin/tests/appconfig_static_plugin.ini new file mode 100644 index 00000000..5ce5c5bd --- /dev/null +++ b/mediagoblin/tests/appconfig_static_plugin.ini @@ -0,0 +1,27 @@ +[mediagoblin] +direct_remote_path = /test_static/ +email_sender_address = "notice@mediagoblin.example.org" +email_debug_mode = true + +#Runs with an in-memory sqlite db for speed. +sql_engine = "sqlite://" +run_migrations = true + +# Celery shouldn't be set up by the application as it's setup via +# mediagoblin.init.celery.from_celery +celery_setup_elsewhere = true + +[storage:publicstore] +base_dir = %(here)s/user_dev/media/public +base_url = /mgoblin_media/ + +[storage:queuestore] +base_dir = %(here)s/user_dev/media/queue + +[celery] +CELERY_ALWAYS_EAGER = true +CELERY_RESULT_DBURI = "sqlite:///%(here)s/user_dev/celery.db" +BROKER_HOST = "sqlite:///%(here)s/user_dev/kombu.db" + +[plugins] +[[mediagoblin.tests.testplugins.staticstuff]] diff --git a/mediagoblin/tests/auth_configs/__init__.py b/mediagoblin/tests/auth_configs/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/mediagoblin/tests/auth_configs/__init__.py diff --git a/mediagoblin/tests/auth_configs/authentication_disabled_appconfig.ini b/mediagoblin/tests/auth_configs/authentication_disabled_appconfig.ini new file mode 100644 index 00000000..07c69442 --- /dev/null +++ b/mediagoblin/tests/auth_configs/authentication_disabled_appconfig.ini @@ -0,0 +1,25 @@ +[mediagoblin] +direct_remote_path = /test_static/ +email_sender_address = "notice@mediagoblin.example.org" +email_debug_mode = true + +sql_engine = "sqlite://" +run_migrations = true + +# Celery shouldn't be set up by the application as it's setup via +# mediagoblin.init.celery.from_celery +celery_setup_elsewhere = true + +[storage:publicstore] +base_dir = %(here)s/user_dev/media/public +base_url = /mgoblin_media/ + +[storage:queuestore] +base_dir = %(here)s/user_dev/media/queue + +[celery] +CELERY_ALWAYS_EAGER = true +CELERY_RESULT_DBURI = "sqlite:///%(here)s/user_dev/celery.db" +BROKER_HOST = "sqlite:///%(here)s/user_dev/kombu.db" + +[plugins] diff --git a/mediagoblin/tests/auth_configs/ldap_appconfig.ini b/mediagoblin/tests/auth_configs/ldap_appconfig.ini new file mode 100644 index 00000000..9be37e17 --- /dev/null +++ b/mediagoblin/tests/auth_configs/ldap_appconfig.ini @@ -0,0 +1,41 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +[mediagoblin] +direct_remote_path = /test_static/ +email_sender_address = "notice@mediagoblin.example.org" +email_debug_mode = true + +sql_engine = "sqlite://" +run_migrations = true + +# Celery shouldn't be set up by the application as it's setup via +# mediagoblin.init.celery.from_celery +celery_setup_elsewhere = true + +[storage:publicstore] +base_dir = %(here)s/user_dev/media/public +base_url = /mgoblin_media/ + +[storage:queuestore] +base_dir = %(here)s/user_dev/media/queue + +[celery] +CELERY_ALWAYS_EAGER = true +CELERY_RESULT_DBURI = "sqlite:///%(here)s/user_dev/celery.db" +BROKER_HOST = "sqlite:///%(here)s/user_dev/kombu.db" + +[plugins] +[[mediagoblin.plugins.ldap]] diff --git a/mediagoblin/tests/auth_configs/openid_appconfig.ini b/mediagoblin/tests/auth_configs/openid_appconfig.ini new file mode 100644 index 00000000..3433e139 --- /dev/null +++ b/mediagoblin/tests/auth_configs/openid_appconfig.ini @@ -0,0 +1,41 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +[mediagoblin] +direct_remote_path = /test_static/ +email_sender_address = "notice@mediagoblin.example.org" +email_debug_mode = true + +sql_engine = "sqlite://" +run_migrations = true + +# Celery shouldn't be set up by the application as it's setup via +# mediagoblin.init.celery.from_celery +celery_setup_elsewhere = true + +[storage:publicstore] +base_dir = %(here)s/user_dev/media/public +base_url = /mgoblin_media/ + +[storage:queuestore] +base_dir = %(here)s/user_dev/media/queue + +[celery] +CELERY_ALWAYS_EAGER = true +CELERY_RESULT_DBURI = "sqlite:///%(here)s/user_dev/celery.db" +BROKER_HOST = "sqlite:///%(here)s/user_dev/kombu.db" + +[plugins] +[[mediagoblin.plugins.openid]] diff --git a/mediagoblin/tests/auth_configs/persona_appconfig.ini b/mediagoblin/tests/auth_configs/persona_appconfig.ini new file mode 100644 index 00000000..0bd5d634 --- /dev/null +++ b/mediagoblin/tests/auth_configs/persona_appconfig.ini @@ -0,0 +1,42 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +[mediagoblin] +direct_remote_path = /test_static/ +email_sender_address = "notice@mediagoblin.example.org" +email_debug_mode = true + +# TODO: Switch to using an in-memory database +sql_engine = "sqlite:///%(here)s/user_dev/mediagoblin.db" + +# Celery shouldn't be set up by the application as it's setup via +# mediagoblin.init.celery.from_celery +celery_setup_elsewhere = true + +[storage:publicstore] +base_dir = %(here)s/user_dev/media/public +base_url = /mgoblin_media/ + +[storage:queuestore] +base_dir = %(here)s/user_dev/media/queue + +[celery] +CELERY_ALWAYS_EAGER = true +CELERY_RESULT_DBURI = "sqlite:///%(here)s/user_dev/celery.db" +BROKER_HOST = "sqlite:///%(here)s/user_dev/kombu.db" + +[plugins] +[[mediagoblin.plugins.persona]] + diff --git a/mediagoblin/tests/conftest.py b/mediagoblin/tests/conftest.py index 25f495d6..dbb0aa0a 100644 --- a/mediagoblin/tests/conftest.py +++ b/mediagoblin/tests/conftest.py @@ -1,7 +1,25 @@ -from mediagoblin.tests import tools +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2013 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. import pytest +from mediagoblin.tests import tools +from mediagoblin.tools.testing import _activate_testing + + @pytest.fixture() def test_app(request): """ @@ -13,3 +31,11 @@ def test_app(request): in differently to get_app. """ return tools.get_app(request) + + +@pytest.fixture() +def pt_fixture_enable_testing(): + """ + py.test fixture to enable testing mode in tools. + """ + _activate_testing() diff --git a/mediagoblin/tests/pytest.ini b/mediagoblin/tests/pytest.ini index d4aa2d69..e561c074 100644 --- a/mediagoblin/tests/pytest.ini +++ b/mediagoblin/tests/pytest.ini @@ -1,2 +1,2 @@ [pytest] -usefixtures = tmpdir
\ No newline at end of file +usefixtures = tmpdir pt_fixture_enable_testing diff --git a/mediagoblin/tests/resources.py b/mediagoblin/tests/resources.py index f7b3037d..480f6d9a 100644 --- a/mediagoblin/tests/resources.py +++ b/mediagoblin/tests/resources.py @@ -29,6 +29,8 @@ EVIL_JPG = resource('evil.jpg') EVIL_PNG = resource('evil.png') BIG_BLUE = resource('bigblue.png') GOOD_PDF = resource('good.pdf') +MED_PNG = resource('medium.png') +BIG_PNG = resource('big.png') def resource_exif(f): diff --git a/mediagoblin/tests/test_api.py b/mediagoblin/tests/test_api.py index 89cf1026..4e0cbd8f 100644 --- a/mediagoblin/tests/test_api.py +++ b/mediagoblin/tests/test_api.py @@ -35,7 +35,8 @@ class TestAPI(object): self.db = mg_globals.database self.user_password = u'4cc355_70k3N' - self.user = fixture_add_user(u'joapi', self.user_password) + self.user = fixture_add_user(u'joapi', self.user_password, + privileges=[u'active',u'uploader']) def login(self, test_app): test_app.post( diff --git a/mediagoblin/tests/test_auth.py b/mediagoblin/tests/test_auth.py index 755727f9..1bbc3d01 100644 --- a/mediagoblin/tests/test_auth.py +++ b/mediagoblin/tests/test_auth.py @@ -1,3 +1,4 @@ + # GNU MediaGoblin -- federated, autonomous media hosting # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. # @@ -13,54 +14,15 @@ # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. - import urlparse -import datetime +import pkg_resources +import pytest from mediagoblin import mg_globals -from mediagoblin.auth import lib as auth_lib from mediagoblin.db.models import User -from mediagoblin.tests.tools import fixture_add_user +from mediagoblin.tests.tools import get_app, fixture_add_user from mediagoblin.tools import template, mail - - -######################## -# Test bcrypt auth funcs -######################## - -def test_bcrypt_check_password(): - # Check known 'lollerskates' password against check function - assert auth_lib.bcrypt_check_password( - 'lollerskates', - '$2a$12$PXU03zfrVCujBhVeICTwtOaHTUs5FFwsscvSSTJkqx/2RQ0Lhy/nO') - - assert not auth_lib.bcrypt_check_password( - 'notthepassword', - '$2a$12$PXU03zfrVCujBhVeICTwtOaHTUs5FFwsscvSSTJkqx/2RQ0Lhy/nO') - - # Same thing, but with extra fake salt. - assert not auth_lib.bcrypt_check_password( - 'notthepassword', - '$2a$12$ELVlnw3z1FMu6CEGs/L8XO8vl0BuWSlUHgh0rUrry9DUXGMUNWwl6', - '3><7R45417') - - -def test_bcrypt_gen_password_hash(): - pw = 'youwillneverguessthis' - - # Normal password hash generation, and check on that hash - hashed_pw = auth_lib.bcrypt_gen_password_hash(pw) - assert auth_lib.bcrypt_check_password( - pw, hashed_pw) - assert not auth_lib.bcrypt_check_password( - 'notthepassword', hashed_pw) - - # Same thing, extra salt. - hashed_pw = auth_lib.bcrypt_gen_password_hash(pw, '3><7R45417') - assert auth_lib.bcrypt_check_password( - pw, hashed_pw, '3><7R45417') - assert not auth_lib.bcrypt_check_password( - 'notthepassword', hashed_pw, '3><7R45417') +from mediagoblin.auth import tools as auth_tools def test_register_views(test_app): @@ -122,31 +84,35 @@ def test_register_views(test_app): template.clear_test_template_context() response = test_app.post( '/auth/register/', { - 'username': u'happygirl', - 'password': 'iamsohappy', - 'email': 'happygrrl@example.org'}) + 'username': u'angrygirl', + 'password': 'iamsoangry', + 'email': 'angrygrrl@example.org'}) response.follow() ## Did we redirect to the proper page? Use the right template? - assert urlparse.urlsplit(response.location)[2] == '/u/happygirl/' - assert 'mediagoblin/user_pages/user.html' in template.TEMPLATE_TEST_CONTEXT + assert urlparse.urlsplit(response.location)[2] == '/u/angrygirl/' + assert 'mediagoblin/user_pages/user_nonactive.html' in template.TEMPLATE_TEST_CONTEXT ## Make sure user is in place - new_user = mg_globals.database.User.find_one( - {'username': u'happygirl'}) + new_user = mg_globals.database.User.query.filter_by( + username=u'angrygirl').first() assert new_user - assert new_user.status == u'needs_email_verification' - assert new_user.email_verified == False + ## Make sure that the proper privileges are granted on registration + + assert new_user.has_privilege(u'commenter') + assert new_user.has_privilege(u'uploader') + assert new_user.has_privilege(u'reporter') + assert not new_user.has_privilege(u'active') ## Make sure user is logged in request = template.TEMPLATE_TEST_CONTEXT[ - 'mediagoblin/user_pages/user.html']['request'] + 'mediagoblin/user_pages/user_nonactive.html']['request'] assert request.session['user_id'] == unicode(new_user.id) ## Make sure we get email confirmation, and try verifying assert len(mail.EMAIL_TEST_INBOX) == 1 message = mail.EMAIL_TEST_INBOX.pop() - assert message['To'] == 'happygrrl@example.org' + assert message['To'] == 'angrygrrl@example.org' email_context = template.TEMPLATE_TEST_CONTEXT[ 'mediagoblin/auth/verification_email.txt'] assert email_context['verification_url'] in message.get_payload(decode=True) @@ -156,27 +122,20 @@ def test_register_views(test_app): assert path == u'/auth/verify_email/' parsed_get_params = urlparse.parse_qs(get_params) - ### user should have these same parameters - assert parsed_get_params['userid'] == [ - unicode(new_user.id)] - assert parsed_get_params['token'] == [ - new_user.verification_key] - ## Try verifying with bs verification key, shouldn't work template.clear_test_template_context() response = test_app.get( - "/auth/verify_email/?userid=%s&token=total_bs" % unicode( - new_user.id)) + "/auth/verify_email/?token=total_bs") response.follow() - context = template.TEMPLATE_TEST_CONTEXT[ - 'mediagoblin/user_pages/user.html'] + + # Correct redirect? + assert urlparse.urlsplit(response.location)[2] == '/' + # assert context['verification_successful'] == True # TODO: Would be good to test messages here when we can do so... - new_user = mg_globals.database.User.find_one( - {'username': u'happygirl'}) + new_user = mg_globals.database.User.query.filter_by( + username=u'angrygirl').first() assert new_user - assert new_user.status == u'needs_email_verification' - assert new_user.email_verified == False ## Verify the email activation works template.clear_test_template_context() @@ -186,11 +145,9 @@ def test_register_views(test_app): 'mediagoblin/user_pages/user.html'] # assert context['verification_successful'] == True # TODO: Would be good to test messages here when we can do so... - new_user = mg_globals.database.User.find_one( - {'username': u'happygirl'}) + new_user = mg_globals.database.User.query.filter_by( + username=u'angrygirl').first() assert new_user - assert new_user.status == u'active' - assert new_user.email_verified == True # Uniqueness checks # ----------------- @@ -198,9 +155,9 @@ def test_register_views(test_app): template.clear_test_template_context() response = test_app.post( '/auth/register/', { - 'username': u'happygirl', - 'password': 'iamsohappy2', - 'email': 'happygrrl2@example.org'}) + 'username': u'angrygirl', + 'password': 'iamsoangry2', + 'email': 'angrygrrl2@example.org'}) context = template.TEMPLATE_TEST_CONTEXT[ 'mediagoblin/auth/register.html'] @@ -215,7 +172,7 @@ def test_register_views(test_app): template.clear_test_template_context() response = test_app.post( '/auth/forgot_password/', - {'username': u'happygirl'}) + {'username': u'angrygirl'}) response.follow() ## Did we redirect to the proper page? Use the right template? @@ -225,55 +182,37 @@ def test_register_views(test_app): ## Make sure link to change password is sent by email assert len(mail.EMAIL_TEST_INBOX) == 1 message = mail.EMAIL_TEST_INBOX.pop() - assert message['To'] == 'happygrrl@example.org' + assert message['To'] == 'angrygrrl@example.org' email_context = template.TEMPLATE_TEST_CONTEXT[ - 'mediagoblin/auth/fp_verification_email.txt'] + 'mediagoblin/plugins/basic_auth/fp_verification_email.txt'] #TODO - change the name of verification_url to something forgot-password-ish assert email_context['verification_url'] in message.get_payload(decode=True) path = urlparse.urlsplit(email_context['verification_url'])[2] get_params = urlparse.urlsplit(email_context['verification_url'])[3] - assert path == u'/auth/forgot_password/verify/' parsed_get_params = urlparse.parse_qs(get_params) - - # user should have matching parameters - new_user = mg_globals.database.User.find_one({'username': u'happygirl'}) - assert parsed_get_params['userid'] == [unicode(new_user.id)] - assert parsed_get_params['token'] == [new_user.fp_verification_key] - - ### The forgotten password token should be set to expire in ~ 10 days - # A few ticks have expired so there are only 9 full days left... - assert (new_user.fp_token_expire - datetime.datetime.now()).days == 9 + assert path == u'/auth/forgot_password/verify/' ## Try using a bs password-changing verification key, shouldn't work template.clear_test_template_context() response = test_app.get( - "/auth/forgot_password/verify/?userid=%s&token=total_bs" % unicode( - new_user.id), status=404) - assert response.status.split()[0] == u'404' # status="404 NOT FOUND" + "/auth/forgot_password/verify/?token=total_bs") + response.follow() - ## Try using an expired token to change password, shouldn't work - template.clear_test_template_context() - new_user = mg_globals.database.User.find_one({'username': u'happygirl'}) - real_token_expiration = new_user.fp_token_expire - new_user.fp_token_expire = datetime.datetime.now() - new_user.save() - response = test_app.get("%s?%s" % (path, get_params), status=404) - assert response.status.split()[0] == u'404' # status="404 NOT FOUND" - new_user.fp_token_expire = real_token_expiration - new_user.save() + # Correct redirect? + assert urlparse.urlsplit(response.location)[2] == '/' ## Verify step 1 of password-change works -- can see form to change password template.clear_test_template_context() response = test_app.get("%s?%s" % (path, get_params)) - assert 'mediagoblin/auth/change_fp.html' in template.TEMPLATE_TEST_CONTEXT + assert 'mediagoblin/plugins/basic_auth/change_fp.html' in \ + template.TEMPLATE_TEST_CONTEXT ## Verify step 2.1 of password-change works -- report success to user template.clear_test_template_context() response = test_app.post( '/auth/forgot_password/verify/', { - 'userid': parsed_get_params['userid'], - 'password': 'iamveryveryhappy', + 'password': 'iamveryveryangry', 'token': parsed_get_params['token']}) response.follow() assert 'mediagoblin/auth/login.html' in template.TEMPLATE_TEST_CONTEXT @@ -282,8 +221,8 @@ def test_register_views(test_app): template.clear_test_template_context() response = test_app.post( '/auth/login/', { - 'username': u'happygirl', - 'password': 'iamveryveryhappy'}) + 'username': u'angrygirl', + 'password': 'iamveryveryangry'}) # User should be redirected response.follow() @@ -296,7 +235,8 @@ def test_authentication_views(test_app): Test logging in and logging out """ # Make a new user - test_user = fixture_add_user(active_user=False) + test_user = fixture_add_user() + # Get login # --------- @@ -310,7 +250,6 @@ def test_authentication_views(test_app): context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/login.html'] form = context['login_form'] assert form.username.errors == [u'This field is required.'] - assert form.password.errors == [u'This field is required.'] # Failed login - blank user # ------------------------- @@ -328,9 +267,7 @@ def test_authentication_views(test_app): response = test_app.post( '/auth/login/', { 'username': u'chris'}) - context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/login.html'] - form = context['login_form'] - assert form.password.errors == [u'This field is required.'] + assert 'mediagoblin/auth/login.html' in template.TEMPLATE_TEST_CONTEXT # Failed login - bad user # ----------------------- @@ -394,3 +331,47 @@ def test_authentication_views(test_app): 'password': 'toast', 'next' : '/u/chris/'}) assert urlparse.urlsplit(response.location)[2] == '/u/chris/' + +@pytest.fixture() +def authentication_disabled_app(request): + return get_app( + request, + mgoblin_config=pkg_resources.resource_filename( + 'mediagoblin.tests.auth_configs', + 'authentication_disabled_appconfig.ini')) + + +def test_authentication_disabled_app(authentication_disabled_app): + # app.auth should = false + assert mg_globals + assert mg_globals.app.auth is False + + # Try to visit register page + template.clear_test_template_context() + response = authentication_disabled_app.get('/auth/register/') + response.follow() + + # Correct redirect? + assert urlparse.urlsplit(response.location)[2] == '/' + assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT + + # Try to vist login page + template.clear_test_template_context() + response = authentication_disabled_app.get('/auth/login/') + response.follow() + + # Correct redirect? + assert urlparse.urlsplit(response.location)[2] == '/' + assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT + + ## Test check_login_simple should return None + assert auth_tools.check_login_simple('test', 'simple') is None + + # Try to visit the forgot password page + template.clear_test_template_context() + response = authentication_disabled_app.get('/auth/register/') + response.follow() + + # Correct redirect? + assert urlparse.urlsplit(response.location)[2] == '/' + assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT diff --git a/mediagoblin/tests/test_basic_auth.py b/mediagoblin/tests/test_basic_auth.py new file mode 100644 index 00000000..828f0515 --- /dev/null +++ b/mediagoblin/tests/test_basic_auth.py @@ -0,0 +1,103 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +import urlparse + +from mediagoblin.db.models import User +from mediagoblin.plugins.basic_auth import tools as auth_tools +from mediagoblin.tests.tools import fixture_add_user +from mediagoblin.tools import template +from mediagoblin.tools.testing import _activate_testing + +_activate_testing() + + +######################## +# Test bcrypt auth funcs +######################## + + +def test_bcrypt_check_password(): + # Check known 'lollerskates' password against check function + assert auth_tools.bcrypt_check_password( + 'lollerskates', + '$2a$12$PXU03zfrVCujBhVeICTwtOaHTUs5FFwsscvSSTJkqx/2RQ0Lhy/nO') + + assert not auth_tools.bcrypt_check_password( + 'notthepassword', + '$2a$12$PXU03zfrVCujBhVeICTwtOaHTUs5FFwsscvSSTJkqx/2RQ0Lhy/nO') + + # Same thing, but with extra fake salt. + assert not auth_tools.bcrypt_check_password( + 'notthepassword', + '$2a$12$ELVlnw3z1FMu6CEGs/L8XO8vl0BuWSlUHgh0rUrry9DUXGMUNWwl6', + '3><7R45417') + + +def test_bcrypt_gen_password_hash(): + pw = 'youwillneverguessthis' + + # Normal password hash generation, and check on that hash + hashed_pw = auth_tools.bcrypt_gen_password_hash(pw) + assert auth_tools.bcrypt_check_password( + pw, hashed_pw) + assert not auth_tools.bcrypt_check_password( + 'notthepassword', hashed_pw) + + # Same thing, extra salt. + hashed_pw = auth_tools.bcrypt_gen_password_hash(pw, '3><7R45417') + assert auth_tools.bcrypt_check_password( + pw, hashed_pw, '3><7R45417') + assert not auth_tools.bcrypt_check_password( + 'notthepassword', hashed_pw, '3><7R45417') + + +def test_change_password(test_app): + """Test changing password correctly and incorrectly""" + test_user = fixture_add_user( + password=u'toast', + privileges=[u'active']) + + test_app.post( + '/auth/login/', { + 'username': u'chris', + 'password': u'toast'}) + + # test that the password can be changed + res = test_app.post( + '/edit/password/', { + 'old_password': 'toast', + 'new_password': '123456', + }) + res.follow() + + # Did we redirect to the correct page? + assert urlparse.urlsplit(res.location)[2] == '/edit/account/' + + # test_user has to be fetched again in order to have the current values + test_user = User.query.filter_by(username=u'chris').first() + assert auth_tools.bcrypt_check_password('123456', test_user.pw_hash) + + # test that the password cannot be changed if the given + # old_password is wrong + template.clear_test_template_context() + test_app.post( + '/edit/password/', { + 'old_password': 'toast', + 'new_password': '098765', + }) + + test_user = User.query.filter_by(username=u'chris').first() + assert not auth_tools.bcrypt_check_password('098765', test_user.pw_hash) diff --git a/mediagoblin/tests/test_celery_setup.py b/mediagoblin/tests/test_celery_setup.py index 5530c6f2..d60293f9 100644 --- a/mediagoblin/tests/test_celery_setup.py +++ b/mediagoblin/tests/test_celery_setup.py @@ -48,13 +48,13 @@ def test_setup_celery_from_config(): assert isinstance(fake_celery_module.CELERYD_ETA_SCHEDULER_PRECISION, float) assert fake_celery_module.CELERY_RESULT_PERSISTENT is True assert fake_celery_module.CELERY_IMPORTS == [ - 'foo.bar.baz', 'this.is.an.import', 'mediagoblin.processing.task'] + 'foo.bar.baz', 'this.is.an.import', 'mediagoblin.processing.task', 'mediagoblin.notifications.task'] assert fake_celery_module.CELERY_RESULT_BACKEND == 'database' assert fake_celery_module.CELERY_RESULT_DBURI == ( 'sqlite:///' + pkg_resources.resource_filename('mediagoblin.tests', 'celery.db')) assert fake_celery_module.BROKER_TRANSPORT == 'sqlalchemy' - assert fake_celery_module.BROKER_HOST == ( + assert fake_celery_module.BROKER_URL == ( 'sqlite:///' + pkg_resources.resource_filename('mediagoblin.tests', 'kombu.db')) diff --git a/mediagoblin/tests/test_edit.py b/mediagoblin/tests/test_edit.py index cda2607f..4f44e0b9 100644 --- a/mediagoblin/tests/test_edit.py +++ b/mediagoblin/tests/test_edit.py @@ -14,19 +14,21 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import pytest +import urlparse from mediagoblin import mg_globals from mediagoblin.db.models import User from mediagoblin.tests.tools import fixture_add_user -from mediagoblin.tools import template -from mediagoblin.auth.lib import bcrypt_check_password +from mediagoblin import auth +from mediagoblin.tools import template, mail + class TestUserEdit(object): def setup(self): # set up new user self.user_password = u'toast' - self.user = fixture_add_user(password = self.user_password) + self.user = fixture_add_user(password = self.user_password, + privileges=[u'active']) def login(self, test_app): test_app.post( @@ -51,42 +53,10 @@ class TestUserEdit(object): # deleted too. Perhaps in submission test? #Restore user at end of test - self.user = fixture_add_user(password = self.user_password) - self.login(test_app) - - - def test_change_password(self, test_app): - """Test changing password correctly and incorrectly""" + self.user = fixture_add_user(password = self.user_password, + privileges=[u'active']) self.login(test_app) - # test that the password can be changed - # template.clear_test_template_context() - res = test_app.post( - '/edit/account/', { - 'old_password': 'toast', - 'new_password': '123456', - 'wants_comment_notification': 'y' - }) - - # Check for redirect on success - assert res.status_int == 302 - # test_user has to be fetched again in order to have the current values - test_user = User.query.filter_by(username=u'chris').first() - assert bcrypt_check_password('123456', test_user.pw_hash) - # Update current user passwd - self.user_password = '123456' - - # test that the password cannot be changed if the given - # old_password is wrong template.clear_test_template_context() - test_app.post( - '/edit/account/', { - 'old_password': 'toast', - 'new_password': '098765', - }) - - test_user = User.query.filter_by(username=u'chris').first() - assert not bcrypt_check_password('098765', test_user.pw_hash) - def test_change_bio_url(self, test_app): """Test changing bio and URL""" @@ -112,7 +82,8 @@ class TestUserEdit(object): assert test_user.url == u'http://dustycloud.org/' # change a different user than the logged in (should fail with 403) - fixture_add_user(username=u"foo") + fixture_add_user(username=u"foo", + privileges=[u'active']) res = test_app.post( '/u/foo/edit/', { 'bio': u'I love toast!', @@ -138,4 +109,68 @@ class TestUserEdit(object): assert form.url.errors == [ u'This address contains errors'] + def test_email_change(self, test_app): + self.login(test_app) + + # Test email already in db + template.clear_test_template_context() + test_app.post( + '/edit/email/', { + 'new_email': 'chris@example.com', + 'password': 'toast'}) + + # Check form errors + context = template.TEMPLATE_TEST_CONTEXT[ + 'mediagoblin/edit/change_email.html'] + assert context['form'].new_email.errors == [ + u'Sorry, a user with that email address already exists.'] + + # Test successful email change + template.clear_test_template_context() + res = test_app.post( + '/edit/email/', { + 'new_email': 'new@example.com', + 'password': 'toast'}) + res.follow() + + # Correct redirect? + assert urlparse.urlsplit(res.location)[2] == '/edit/account/' + + # Make sure we get email verification and try verifying + assert len(mail.EMAIL_TEST_INBOX) == 1 + message = mail.EMAIL_TEST_INBOX.pop() + assert message['To'] == 'new@example.com' + email_context = template.TEMPLATE_TEST_CONTEXT[ + 'mediagoblin/edit/verification.txt'] + assert email_context['verification_url'] in \ + message.get_payload(decode=True) + + path = urlparse.urlsplit(email_context['verification_url'])[2] + assert path == u'/edit/verify_email/' + + ## Try verifying with bs verification key, shouldn't work + template.clear_test_template_context() + res = test_app.get( + "/edit/verify_email/?token=total_bs") + res.follow() + + # Correct redirect? + assert urlparse.urlsplit(res.location)[2] == '/' + + # Email shouldn't be saved + email_in_db = mg_globals.database.User.query.filter_by( + email='new@example.com').first() + email = User.query.filter_by(username='chris').first().email + assert email_in_db is None + assert email == 'chris@example.com' + + # Verify email activation works + template.clear_test_template_context() + get_params = urlparse.urlsplit(email_context['verification_url'])[3] + res = test_app.get('%s?%s' % (path, get_params)) + res.follow() + + # New email saved? + email = User.query.filter_by(username='chris').first().email + assert email == 'new@example.com' # test changing the url inproperly diff --git a/mediagoblin/tests/test_exif.py b/mediagoblin/tests/test_exif.py index 824de3c2..c07e24ae 100644 --- a/mediagoblin/tests/test_exif.py +++ b/mediagoblin/tests/test_exif.py @@ -48,63 +48,324 @@ def test_exif_extraction(): assert gps == {} # Do we have the "useful" tags? - assert useful == { - 'EXIF Flash': { - 'field_type': 3, - 'printable': u'Flash did not fire', - 'field_offset': 380, - 'tag': 37385, - 'values': [0], - 'field_length': 2}, - 'EXIF ExposureTime': { - 'field_type': 5, - 'printable': '1/125', - 'field_offset': 700, - 'tag': 33434, - 'values': [[1, 125]], - 'field_length': 8}, - 'EXIF FocalLength': { - 'field_type': 5, - 'printable': '18', - 'field_offset': 780, - 'tag': 37386, - 'values': [[18, 1]], - 'field_length': 8}, - 'Image Model': { - 'field_type': 2, - 'printable': 'NIKON D80', - 'field_offset': 152, - 'tag': 272, - 'values': 'NIKON D80', - 'field_length': 10}, - 'Image Make': { - 'field_type': 2, - 'printable': 'NIKON CORPORATION', - 'field_offset': 134, - 'tag': 271, - 'values': 'NIKON CORPORATION', - 'field_length': 18}, - 'EXIF ExposureMode': { - 'field_type': 3, - 'printable': 'Manual Exposure', - 'field_offset': 584, - 'tag': 41986, - 'values': [1], - 'field_length': 2}, - 'EXIF ISOSpeedRatings': { - 'field_type': 3, - 'printable': '100', - 'field_offset': 260, - 'tag': 34855, - 'values': [100], - 'field_length': 2}, - 'EXIF FNumber': { - 'field_type': 5, - 'printable': '10', - 'field_offset': 708, - 'tag': 33437, - 'values': [[10, 1]], - 'field_length': 8}} + assert useful == {'EXIF CVAPattern': {'field_length': 8, + 'field_offset': 26224, + 'field_type': 7, + 'printable': u'[0, 2, 0, 2, 1, 2, 0, 1]', + 'tag': 41730, + 'values': [0, 2, 0, 2, 1, 2, 0, 1]}, + 'EXIF ColorSpace': {'field_length': 2, + 'field_offset': 476, + 'field_type': 3, + 'printable': u'sRGB', + 'tag': 40961, + 'values': [1]}, + 'EXIF ComponentsConfiguration': {'field_length': 4, + 'field_offset': 308, + 'field_type': 7, + 'printable': u'YCbCr', + 'tag': 37121, + 'values': [1, 2, 3, 0]}, + 'EXIF CompressedBitsPerPixel': {'field_length': 8, + 'field_offset': 756, + 'field_type': 5, + 'printable': u'4', + 'tag': 37122, + 'values': [[4, 1]]}, + 'EXIF Contrast': {'field_length': 2, + 'field_offset': 656, + 'field_type': 3, + 'printable': u'Soft', + 'tag': 41992, + 'values': [1]}, + 'EXIF CustomRendered': {'field_length': 2, + 'field_offset': 572, + 'field_type': 3, + 'printable': u'Normal', + 'tag': 41985, + 'values': [0]}, + 'EXIF DateTimeDigitized': {'field_length': 20, + 'field_offset': 736, + 'field_type': 2, + 'printable': u'2011:06:22 12:20:33', + 'tag': 36868, + 'values': u'2011:06:22 12:20:33'}, + 'EXIF DateTimeOriginal': {'field_length': 20, + 'field_offset': 716, + 'field_type': 2, + 'printable': u'2011:06:22 12:20:33', + 'tag': 36867, + 'values': u'2011:06:22 12:20:33'}, + 'EXIF DigitalZoomRatio': {'field_length': 8, + 'field_offset': 26232, + 'field_type': 5, + 'printable': u'1', + 'tag': 41988, + 'values': [[1, 1]]}, + 'EXIF ExifImageLength': {'field_length': 2, + 'field_offset': 500, + 'field_type': 3, + 'printable': u'2592', + 'tag': 40963, + 'values': [2592]}, + 'EXIF ExifImageWidth': {'field_length': 2, + 'field_offset': 488, + 'field_type': 3, + 'printable': u'3872', + 'tag': 40962, + 'values': [3872]}, + 'EXIF ExifVersion': {'field_length': 4, + 'field_offset': 272, + 'field_type': 7, + 'printable': u'0221', + 'tag': 36864, + 'values': [48, 50, 50, 49]}, + 'EXIF ExposureBiasValue': {'field_length': 8, + 'field_offset': 764, + 'field_type': 10, + 'printable': u'0', + 'tag': 37380, + 'values': [[0, 1]]}, + 'EXIF ExposureMode': {'field_length': 2, + 'field_offset': 584, + 'field_type': 3, + 'printable': u'Manual Exposure', + 'tag': 41986, + 'values': [1]}, + 'EXIF ExposureProgram': {'field_length': 2, + 'field_offset': 248, + 'field_type': 3, + 'printable': u'Manual', + 'tag': 34850, + 'values': [1]}, + 'EXIF ExposureTime': {'field_length': 8, + 'field_offset': 700, + 'field_type': 5, + 'printable': u'1/125', + 'tag': 33434, + 'values': [[1, 125]]}, + 'EXIF FNumber': {'field_length': 8, + 'field_offset': 708, + 'field_type': 5, + 'printable': u'10', + 'tag': 33437, + 'values': [[10, 1]]}, + 'EXIF FileSource': {'field_length': 1, + 'field_offset': 536, + 'field_type': 7, + 'printable': u'Digital Camera', + 'tag': 41728, + 'values': [3]}, + 'EXIF Flash': {'field_length': 2, + 'field_offset': 380, + 'field_type': 3, + 'printable': u'Flash did not fire', + 'tag': 37385, + 'values': [0]}, + 'EXIF FlashPixVersion': {'field_length': 4, + 'field_offset': 464, + 'field_type': 7, + 'printable': u'0100', + 'tag': 40960, + 'values': [48, 49, 48, 48]}, + 'EXIF FocalLength': {'field_length': 8, + 'field_offset': 780, + 'field_type': 5, + 'printable': u'18', + 'tag': 37386, + 'values': [[18, 1]]}, + 'EXIF FocalLengthIn35mmFilm': {'field_length': 2, + 'field_offset': 620, + 'field_type': 3, + 'printable': u'27', + 'tag': 41989, + 'values': [27]}, + 'EXIF GainControl': {'field_length': 2, + 'field_offset': 644, + 'field_type': 3, + 'printable': u'None', + 'tag': 41991, + 'values': [0]}, + 'EXIF ISOSpeedRatings': {'field_length': 2, + 'field_offset': 260, + 'field_type': 3, + 'printable': u'100', + 'tag': 34855, + 'values': [100]}, + 'EXIF InteroperabilityOffset': {'field_length': 4, + 'field_offset': 512, + 'field_type': 4, + 'printable': u'26240', + 'tag': 40965, + 'values': [26240]}, + 'EXIF LightSource': {'field_length': 2, + 'field_offset': 368, + 'field_type': 3, + 'printable': u'Unknown', + 'tag': 37384, + 'values': [0]}, + 'EXIF MaxApertureValue': {'field_length': 8, + 'field_offset': 772, + 'field_type': 5, + 'printable': u'18/5', + 'tag': 37381, + 'values': [[18, 5]]}, + 'EXIF MeteringMode': {'field_length': 2, + 'field_offset': 356, + 'field_type': 3, + 'printable': u'Pattern', + 'tag': 37383, + 'values': [5]}, + 'EXIF Saturation': {'field_length': 2, + 'field_offset': 668, + 'field_type': 3, + 'printable': u'Normal', + 'tag': 41993, + 'values': [0]}, + 'EXIF SceneCaptureType': {'field_length': 2, + 'field_offset': 632, + 'field_type': 3, + 'printable': u'Standard', + 'tag': 41990, + 'values': [0]}, + 'EXIF SceneType': {'field_length': 1, + 'field_offset': 548, + 'field_type': 7, + 'printable': u'Directly Photographed', + 'tag': 41729, + 'values': [1]}, + 'EXIF SensingMethod': {'field_length': 2, + 'field_offset': 524, + 'field_type': 3, + 'printable': u'One-chip color area', + 'tag': 41495, + 'values': [2]}, + 'EXIF Sharpness': {'field_length': 2, + 'field_offset': 680, + 'field_type': 3, + 'printable': u'Normal', + 'tag': 41994, + 'values': [0]}, + 'EXIF SubSecTime': {'field_length': 3, + 'field_offset': 428, + 'field_type': 2, + 'printable': u'10', + 'tag': 37520, + 'values': u'10'}, + 'EXIF SubSecTimeDigitized': {'field_length': 3, + 'field_offset': 452, + 'field_type': 2, + 'printable': u'10', + 'tag': 37522, + 'values': u'10'}, + 'EXIF SubSecTimeOriginal': {'field_length': 3, + 'field_offset': 440, + 'field_type': 2, + 'printable': u'10', + 'tag': 37521, + 'values': u'10'}, + 'EXIF SubjectDistanceRange': {'field_length': 2, + 'field_offset': 692, + 'field_type': 3, + 'printable': u'0', + 'tag': 41996, + 'values': [0]}, + 'EXIF WhiteBalance': {'field_length': 2, + 'field_offset': 596, + 'field_type': 3, + 'printable': u'Auto', + 'tag': 41987, + 'values': [0]}, + 'Image DateTime': {'field_length': 20, + 'field_offset': 194, + 'field_type': 2, + 'printable': u'2011:06:22 12:20:33', + 'tag': 306, + 'values': u'2011:06:22 12:20:33'}, + 'Image ExifOffset': {'field_length': 4, + 'field_offset': 126, + 'field_type': 4, + 'printable': u'214', + 'tag': 34665, + 'values': [214]}, + 'Image Make': {'field_length': 18, + 'field_offset': 134, + 'field_type': 2, + 'printable': u'NIKON CORPORATION', + 'tag': 271, + 'values': u'NIKON CORPORATION'}, + 'Image Model': {'field_length': 10, + 'field_offset': 152, + 'field_type': 2, + 'printable': u'NIKON D80', + 'tag': 272, + 'values': u'NIKON D80'}, + 'Image Orientation': {'field_length': 2, + 'field_offset': 42, + 'field_type': 3, + 'printable': u'Rotated 90 CCW', + 'tag': 274, + 'values': [6]}, + 'Image ResolutionUnit': {'field_length': 2, + 'field_offset': 78, + 'field_type': 3, + 'printable': u'Pixels/Inch', + 'tag': 296, + 'values': [2]}, + 'Image Software': {'field_length': 15, + 'field_offset': 178, + 'field_type': 2, + 'printable': u'Shotwell 0.9.3', + 'tag': 305, + 'values': u'Shotwell 0.9.3'}, + 'Image XResolution': {'field_length': 8, + 'field_offset': 162, + 'field_type': 5, + 'printable': u'300', + 'tag': 282, + 'values': [[300, 1]]}, + 'Image YCbCrPositioning': {'field_length': 2, + 'field_offset': 114, + 'field_type': 3, + 'printable': u'Co-sited', + 'tag': 531, + 'values': [2]}, + 'Image YResolution': {'field_length': 8, + 'field_offset': 170, + 'field_type': 5, + 'printable': u'300', + 'tag': 283, + 'values': [[300, 1]]}, + 'Thumbnail Compression': {'field_length': 2, + 'field_offset': 26280, + 'field_type': 3, + 'printable': u'JPEG (old-style)', + 'tag': 259, + 'values': [6]}, + 'Thumbnail ResolutionUnit': {'field_length': 2, + 'field_offset': 26316, + 'field_type': 3, + 'printable': u'Pixels/Inch', + 'tag': 296, + 'values': [2]}, + 'Thumbnail XResolution': {'field_length': 8, + 'field_offset': 26360, + 'field_type': 5, + 'printable': u'300', + 'tag': 282, + 'values': [[300, 1]]}, + 'Thumbnail YCbCrPositioning': {'field_length': 2, + 'field_offset': 26352, + 'field_type': 3, + 'printable': u'Co-sited', + 'tag': 531, + 'values': [2]}, + 'Thumbnail YResolution': {'field_length': 8, + 'field_offset': 26368, + 'field_type': 5, + 'printable': u'300', + 'tag': 283, + 'values': [[300, 1]]}} def test_exif_image_orientation(): diff --git a/mediagoblin/tests/test_http_callback.py b/mediagoblin/tests/test_http_callback.py index a0511af7..64b7ee8f 100644 --- a/mediagoblin/tests/test_http_callback.py +++ b/mediagoblin/tests/test_http_callback.py @@ -23,7 +23,7 @@ from mediagoblin import mg_globals from mediagoblin.tools import processing from mediagoblin.tests.tools import fixture_add_user from mediagoblin.tests.test_submission import GOOD_PNG -from mediagoblin.tests import test_oauth as oauth +from mediagoblin.tests import test_oauth2 as oauth class TestHTTPCallback(object): @@ -44,7 +44,7 @@ class TestHTTPCallback(object): 'password': self.user_password}) def get_access_token(self, client_id, client_secret, code): - response = self.test_app.get('/oauth/access_token', { + response = self.test_app.get('/oauth-2/access_token', { 'code': code, 'client_id': client_id, 'client_secret': client_secret}) diff --git a/mediagoblin/tests/test_ldap.py b/mediagoblin/tests/test_ldap.py new file mode 100644 index 00000000..48efb4b6 --- /dev/null +++ b/mediagoblin/tests/test_ldap.py @@ -0,0 +1,125 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +import urlparse +import pkg_resources +import pytest +import mock + +from mediagoblin import mg_globals +from mediagoblin.db.base import Session +from mediagoblin.tests.tools import get_app +from mediagoblin.tools import template + +pytest.importorskip("ldap") + + +@pytest.fixture() +def ldap_plugin_app(request): + return get_app( + request, + mgoblin_config=pkg_resources.resource_filename( + 'mediagoblin.tests.auth_configs', + 'ldap_appconfig.ini')) + + +def return_value(): + return u'chris', u'chris@example.com' + + +def test_ldap_plugin(ldap_plugin_app): + res = ldap_plugin_app.get('/auth/login/') + + assert urlparse.urlsplit(res.location)[2] == '/auth/ldap/login/' + + res = ldap_plugin_app.get('/auth/register/') + + assert urlparse.urlsplit(res.location)[2] == '/auth/ldap/register/' + + res = ldap_plugin_app.get('/auth/ldap/register/') + + assert urlparse.urlsplit(res.location)[2] == '/auth/ldap/login/' + + template.clear_test_template_context() + res = ldap_plugin_app.post( + '/auth/ldap/login/', {}) + + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/login.html'] + form = context['login_form'] + assert form.username.errors == [u'This field is required.'] + assert form.password.errors == [u'This field is required.'] + + @mock.patch('mediagoblin.plugins.ldap.tools.LDAP.login', mock.Mock(return_value=return_value())) + def _test_authentication(): + template.clear_test_template_context() + res = ldap_plugin_app.post( + '/auth/ldap/login/', + {'username': u'chris', + 'password': u'toast'}) + + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html'] + register_form = context['register_form'] + + assert register_form.username.data == u'chris' + assert register_form.email.data == u'chris@example.com' + + template.clear_test_template_context() + res = ldap_plugin_app.post( + '/auth/ldap/register/', + {'username': u'chris', + 'email': u'chris@example.com'}) + res.follow() + + assert urlparse.urlsplit(res.location)[2] == '/u/chris/' + assert 'mediagoblin/user_pages/user_nonactive.html' in template.TEMPLATE_TEST_CONTEXT + + # Try to register with same email and username + template.clear_test_template_context() + res = ldap_plugin_app.post( + '/auth/ldap/register/', + {'username': u'chris', + 'email': u'chris@example.com'}) + + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html'] + register_form = context['register_form'] + + assert register_form.email.errors == [u'Sorry, a user with that email address already exists.'] + assert register_form.username.errors == [u'Sorry, a user with that name already exists.'] + + # Log out + ldap_plugin_app.get('/auth/logout/') + + # Get user and detach from session + test_user = mg_globals.database.User.query.filter_by( + username=u'chris').first() + Session.expunge(test_user) + + # Log back in + template.clear_test_template_context() + res = ldap_plugin_app.post( + '/auth/ldap/login/', + {'username': u'chris', + 'password': u'toast'}) + res.follow() + + assert urlparse.urlsplit(res.location)[2] == '/' + assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT + + # Make sure user is in the session + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html'] + session = context['request'].session + assert session['user_id'] == unicode(test_user.id) + + _test_authentication() diff --git a/mediagoblin/tests/test_messages.py b/mediagoblin/tests/test_messages.py index 3ac917b0..22f9e800 100644 --- a/mediagoblin/tests/test_messages.py +++ b/mediagoblin/tests/test_messages.py @@ -14,7 +14,7 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -from mediagoblin.messages import fetch_messages, add_message +from mediagoblin import messages from mediagoblin.tools import template @@ -32,11 +32,19 @@ def test_messages(test_app): # The message queue should be empty assert request.session.get('messages', []) == [] + # First of all, we should clear the messages queue + messages.clear_add_message() # Adding a message should modify the session accordingly - add_message(request, 'herp_derp', 'First!') + messages.add_message(request, 'herp_derp', 'First!') test_msg_queue = [{'text': 'First!', 'level': 'herp_derp'}] - assert request.session['messages'] == test_msg_queue + + # Alternative tests to the following, test divided in two steps: + # assert request.session['messages'] == test_msg_queue + # 1. Tests if add_message worked + assert messages.ADD_MESSAGE_TEST[-1] == test_msg_queue + # 2. Tests if add_message updated session information + assert messages.ADD_MESSAGE_TEST[-1] == request.session['messages'] # fetch_messages should return and empty the queue - assert fetch_messages(request) == test_msg_queue + assert messages.fetch_messages(request) == test_msg_queue assert request.session.get('messages') == [] diff --git a/mediagoblin/tests/test_mgoblin_app.ini b/mediagoblin/tests/test_mgoblin_app.ini index 9f95a398..4cd3d9b6 100644 --- a/mediagoblin/tests/test_mgoblin_app.ini +++ b/mediagoblin/tests/test_mgoblin_app.ini @@ -3,8 +3,9 @@ direct_remote_path = /test_static/ email_sender_address = "notice@mediagoblin.example.org" email_debug_mode = true -# TODO: Switch to using an in-memory database -sql_engine = "sqlite:///%(here)s/test_user_dev/mediagoblin.db" +#Runs with an in-memory sqlite db for speed. +sql_engine = "sqlite://" +run_migrations = true # tag parsing tags_max_length = 50 @@ -12,26 +13,28 @@ tags_max_length = 50 # So we can start to test attachments: allow_attachments = True -# Celery shouldn't be set up by the application as it's setup via -# mediagoblin.init.celery.from_celery -celery_setup_elsewhere = true +upload_limit = 500 -media_types = mediagoblin.media_types.image, mediagoblin.media_types.pdf +max_file_size = 2 [storage:publicstore] -base_dir = %(here)s/test_user_dev/media/public +base_dir = %(here)s/user_dev/media/public base_url = /mgoblin_media/ [storage:queuestore] -base_dir = %(here)s/test_user_dev/media/queue +base_dir = %(here)s/user_dev/media/queue [celery] CELERY_ALWAYS_EAGER = true -CELERY_RESULT_DBURI = "sqlite:///%(here)s/test_user_dev/celery.db" -BROKER_HOST = "sqlite:///%(here)s/test_user_dev/kombu.db" +CELERY_RESULT_DBURI = "sqlite:///%(here)s/user_dev/celery.db" +BROKER_URL = "sqlite:///%(here)s/test_user_dev/kombu.db" [plugins] [[mediagoblin.plugins.api]] [[mediagoblin.plugins.oauth]] [[mediagoblin.plugins.httpapiauth]] - +[[mediagoblin.plugins.piwigo]] +[[mediagoblin.plugins.basic_auth]] +[[mediagoblin.plugins.openid]] +[[mediagoblin.media_types.image]] +[[mediagoblin.media_types.pdf]] diff --git a/mediagoblin/tests/test_misc.py b/mediagoblin/tests/test_misc.py index 755d863f..43ad0b6d 100644 --- a/mediagoblin/tests/test_misc.py +++ b/mediagoblin/tests/test_misc.py @@ -28,8 +28,10 @@ def test_user_deletes_other_comments(test_app): user_a = fixture_add_user(u"chris_a") user_b = fixture_add_user(u"chris_b") - media_a = fixture_media_entry(uploader=user_a.id, save=False) - media_b = fixture_media_entry(uploader=user_b.id, save=False) + media_a = fixture_media_entry(uploader=user_a.id, save=False, + expunge=False, fake_upload=False) + media_b = fixture_media_entry(uploader=user_b.id, save=False, + expunge=False, fake_upload=False) Session.add(media_a) Session.add(media_b) Session.flush() @@ -79,7 +81,7 @@ def test_user_deletes_other_comments(test_app): def test_media_deletes_broken_attachment(test_app): user_a = fixture_add_user(u"chris_a") - media = fixture_media_entry(uploader=user_a.id, save=False) + media = fixture_media_entry(uploader=user_a.id, save=False, expunge=False) media.attachment_files.append(dict( name=u"some name", filepath=[u"does", u"not", u"exist"], diff --git a/mediagoblin/tests/test_modelmethods.py b/mediagoblin/tests/test_modelmethods.py index 427aa47c..86513c76 100644 --- a/mediagoblin/tests/test_modelmethods.py +++ b/mediagoblin/tests/test_modelmethods.py @@ -18,7 +18,7 @@ # methods, and so it makes sense to test them here. from mediagoblin.db.base import Session -from mediagoblin.db.models import MediaEntry +from mediagoblin.db.models import MediaEntry, User, Privilege from mediagoblin.tests.tools import fixture_add_user @@ -47,7 +47,7 @@ class TestMediaEntrySlugs(object): entry.id = this_id entry.uploader = uploader or self.chris_user.id entry.media_type = u'image' - + if save: entry.save() @@ -99,7 +99,7 @@ class TestMediaEntrySlugs(object): u"Beware, I exist!!", this_id=9000, save=False) entry.generate_slug() assert entry.slug == u"beware-i-exist-test" - + _real_test() def test_existing_slug_cant_use_id_extra_junk(self, test_app): @@ -151,6 +151,44 @@ class TestMediaEntrySlugs(object): qbert_entry.generate_slug() assert qbert_entry.slug is None +class TestUserHasPrivilege: + def _setup(self): + fixture_add_user(u'natalie', + privileges=[u'admin',u'moderator',u'active']) + fixture_add_user(u'aeva', + privileges=[u'moderator',u'active']) + self.natalie_user = User.query.filter( + User.username==u'natalie').first() + self.aeva_user = User.query.filter( + User.username==u'aeva').first() + + def test_privilege_added_correctly(self, test_app): + self._setup() + admin = Privilege.query.filter( + Privilege.privilege_name == u'admin').one() + # first make sure the privileges were added successfully + + assert admin in self.natalie_user.all_privileges + assert admin not in self.aeva_user.all_privileges + + def test_user_has_privilege_one(self, test_app): + self._setup() + + # then test out the user.has_privilege method for one privilege + assert not self.natalie_user.has_privilege(u'commenter') + assert self.aeva_user.has_privilege(u'active') + + + def test_user_has_privileges_multiple(self, test_app): + self._setup() + + # when multiple args are passed to has_privilege, the method returns + # True if the user has ANY of the privileges + assert self.natalie_user.has_privilege(u'admin',u'commenter') + assert self.aeva_user.has_privilege(u'moderator',u'active') + assert not self.natalie_user.has_privilege(u'commenter',u'uploader') + + def test_media_data_init(test_app): Session.rollback() @@ -165,3 +203,4 @@ def test_media_data_init(test_app): obj_in_session += 1 print repr(obj) assert obj_in_session == 0 + diff --git a/mediagoblin/tests/test_moderation.py b/mediagoblin/tests/test_moderation.py new file mode 100644 index 00000000..e7a0ebef --- /dev/null +++ b/mediagoblin/tests/test_moderation.py @@ -0,0 +1,242 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import pytest + +from mediagoblin.tests.tools import (fixture_add_user, + fixture_add_comment_report, fixture_add_comment) +from mediagoblin.db.models import User, CommentReport, MediaComment, UserBan +from mediagoblin.tools import template, mail +from webtest import AppError + +class TestModerationViews: + @pytest.fixture(autouse=True) + def _setup(self, test_app): + self.test_app = test_app + + fixture_add_user(u'admin', + privileges=[u'admin',u'active']) + fixture_add_user(u'moderator', + privileges=[u'moderator',u'active']) + fixture_add_user(u'regular', + privileges=[u'active',u'commenter']) + self.query_for_users() + + def login(self, username): + self.test_app.post( + '/auth/login/', { + 'username': username, + 'password': 'toast'}) + self.query_for_users() + + def logout(self): + self.test_app.get('/auth/logout/') + self.query_for_users() + + def query_for_users(self): + self.admin_user = User.query.filter(User.username==u'admin').first() + self.mod_user = User.query.filter(User.username==u'moderator').first() + self.user = User.query.filter(User.username==u'regular').first() + + def do_post(self, data, *context_keys, **kwargs): + url = kwargs.pop('url', '/submit/') + do_follow = kwargs.pop('do_follow', False) + template.clear_test_template_context() + response = self.test_app.post(url, data, **kwargs) + if do_follow: + response.follow() + context_data = template.TEMPLATE_TEST_CONTEXT + for key in context_keys: + context_data = context_data[key] + return response, context_data + + def testGiveOrTakeAwayPrivileges(self): + self.login(u'admin') + # First, test an admin taking away a privilege from a user + #---------------------------------------------------------------------- + response, context = self.do_post({'privilege_name':u'commenter'}, + url='/mod/users/{0}/privilege/'.format(self.user.username)) + assert response.status == '302 FOUND' + self.query_for_users() + assert not self.user.has_privilege(u'commenter') + + # Then, test an admin giving a privilege to a user + #---------------------------------------------------------------------- + response, context = self.do_post({'privilege_name':u'commenter'}, + url='/mod/users/{0}/privilege/'.format(self.user.username)) + assert response.status == '302 FOUND' + self.query_for_users() + assert self.user.has_privilege(u'commenter') + + # Then, test a mod trying to take away a privilege from a user + # they are not allowed to do this, so this will raise an error + #---------------------------------------------------------------------- + self.logout() + self.login(u'moderator') + + with pytest.raises(AppError) as excinfo: + response, context = self.do_post({'privilege_name':u'commenter'}, + url='/mod/users/{0}/privilege/'.format(self.user.username)) + assert 'Bad response: 403 FORBIDDEN' in str(excinfo) + self.query_for_users() + + assert self.user.has_privilege(u'commenter') + + def testReportResolution(self): + self.login(u'moderator') + + # First, test a moderators taking away a user's privilege in response + # to a reported comment + #---------------------------------------------------------------------- + fixture_add_comment_report(reported_user=self.user) + comment_report = CommentReport.query.filter( + CommentReport.reported_user==self.user).first() + + response = self.test_app.get('/mod/reports/{0}/'.format( + comment_report.id)) + assert response.status == '200 OK' + self.query_for_users() + comment_report = CommentReport.query.filter( + CommentReport.reported_user==self.user).first() + + response, context = self.do_post({'action_to_resolve':[u'takeaway'], + 'take_away_privileges':[u'commenter'], + 'targeted_user':self.user.id}, + url='/mod/reports/{0}/'.format(comment_report.id)) + + self.query_for_users() + comment_report = CommentReport.query.filter( + CommentReport.reported_user==self.user).first() + assert response.status == '302 FOUND' + assert not self.user.has_privilege(u'commenter') + assert comment_report.is_archived_report() is True + + fixture_add_comment_report(reported_user=self.user) + comment_report = CommentReport.query.filter( + CommentReport.reported_user==self.user).first() + + # Then, test a moderator sending an email to a user in response to a + # reported comment + #---------------------------------------------------------------------- + self.query_for_users() + + response, context = self.do_post({'action_to_resolve':[u'sendmessage'], + 'message_to_user':'This is your last warning, regular....', + 'targeted_user':self.user.id}, + url='/mod/reports/{0}/'.format(comment_report.id)) + + self.query_for_users() + comment_report = CommentReport.query.filter( + CommentReport.reported_user==self.user).first() + assert response.status == '302 FOUND' + assert mail.EMAIL_TEST_MBOX_INBOX == [{'to': [u'regular@example.com'], + 'message': 'Content-Type: text/plain; charset="utf-8"\n\ +MIME-Version: 1.0\nContent-Transfer-Encoding: base64\nSubject: Warning from- \ +moderator \nFrom: notice@mediagoblin.example.org\nTo: regular@example.com\n\n\ +VGhpcyBpcyB5b3VyIGxhc3Qgd2FybmluZywgcmVndWxhci4uLi4=\n', + 'from': 'notice@mediagoblin.example.org'}] + assert comment_report.is_archived_report() is True + + # Then test a moderator banning a user AND a moderator deleting the + # offending comment. This also serves as a test for taking multiple + # actions to resolve a report + #---------------------------------------------------------------------- + self.query_for_users() + fixture_add_comment(author=self.user.id, + comment=u'Comment will be removed') + test_comment = MediaComment.query.filter( + MediaComment.author==self.user.id).first() + fixture_add_comment_report(comment=test_comment, + reported_user=self.user) + comment_report = CommentReport.query.filter( + CommentReport.comment==test_comment).filter( + CommentReport.resolved==None).first() + + response, context = self.do_post( + {'action_to_resolve':[u'userban', u'delete'], + 'targeted_user':self.user.id, + 'why_user_was_banned':u'', + 'user_banned_until':u''}, + url='/mod/reports/{0}/'.format(comment_report.id)) + assert response.status == '302 FOUND' + self.query_for_users() + test_user_ban = UserBan.query.filter( + UserBan.user_id == self.user.id).first() + assert test_user_ban is not None + test_comment = MediaComment.query.filter( + MediaComment.author==self.user.id).first() + assert test_comment is None + + # Then, test what happens when a moderator attempts to punish an admin + # from a reported comment on an admin. + #---------------------------------------------------------------------- + fixture_add_comment_report(reported_user=self.admin_user) + comment_report = CommentReport.query.filter( + CommentReport.reported_user==self.admin_user).filter( + CommentReport.resolved==None).first() + + response, context = self.do_post({'action_to_resolve':[u'takeaway'], + 'take_away_privileges':[u'active'], + 'targeted_user':self.admin_user.id}, + url='/mod/reports/{0}/'.format(comment_report.id)) + self.query_for_users() + + assert response.status == '200 OK' + assert self.admin_user.has_privilege(u'active') + + def testAllModerationViews(self): + self.login(u'moderator') + username = self.user.username + self.query_for_users() + fixture_add_comment_report(reported_user=self.admin_user) + response = self.test_app.get('/mod/reports/') + assert response.status == "200 OK" + + response = self.test_app.get('/mod/reports/1/') + assert response.status == "200 OK" + + response = self.test_app.get('/mod/users/') + assert response.status == "200 OK" + + user_page_url = '/mod/users/{0}/'.format(username) + response = self.test_app.get(user_page_url) + assert response.status == "200 OK" + + self.test_app.get('/mod/media/') + assert response.status == "200 OK" + + def testBanUnBanUser(self): + self.login(u'admin') + username = self.user.username + user_id = self.user.id + ban_url = '/mod/users/{0}/ban/'.format(username) + response, context = self.do_post({ + 'user_banned_until':u'', + 'why_user_was_banned':u'Because I said so'}, + url=ban_url) + + assert response.status == "302 FOUND" + user_banned = UserBan.query.filter(UserBan.user_id==user_id).first() + assert user_banned is not None + assert user_banned.expiration_date is None + assert user_banned.reason == u'Because I said so' + + response, context = self.do_post({}, + url=ban_url) + + assert response.status == "302 FOUND" + user_banned = UserBan.query.filter(UserBan.user_id==user_id).first() + assert user_banned is None diff --git a/mediagoblin/tests/test_notifications.py b/mediagoblin/tests/test_notifications.py new file mode 100644 index 00000000..3bf36f5f --- /dev/null +++ b/mediagoblin/tests/test_notifications.py @@ -0,0 +1,209 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import pytest + +import urlparse + +from mediagoblin.tools import template, mail + +from mediagoblin.db.models import Notification, CommentNotification, \ + CommentSubscription +from mediagoblin.db.base import Session + +from mediagoblin.notifications import mark_comment_notification_seen + +from mediagoblin.tests.tools import fixture_add_comment, \ + fixture_media_entry, fixture_add_user, \ + fixture_comment_subscription + + +class TestNotifications: + @pytest.fixture(autouse=True) + def setup(self, test_app): + self.test_app = test_app + + # TODO: Possibly abstract into a decorator like: + # @as_authenticated_user('chris') + self.test_user = fixture_add_user(privileges=[u'active',u'commenter']) + + self.current_user = None + + self.login() + + def login(self, username=u'chris', password=u'toast'): + response = self.test_app.post( + '/auth/login/', { + 'username': username, + 'password': password}) + + response.follow() + + assert urlparse.urlsplit(response.location)[2] == '/' + assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT + + ctx = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html'] + + assert Session.merge(ctx['request'].user).username == username + + self.current_user = ctx['request'].user + + def logout(self): + self.test_app.get('/auth/logout/') + self.current_user = None + + @pytest.mark.parametrize('wants_email', [True, False]) + def test_comment_notification(self, wants_email): + ''' + Test + - if a notification is created when posting a comment on + another users media entry. + - that the comment data is consistent and exists. + + ''' + user = fixture_add_user('otherperson', password='nosreprehto', + wants_comment_notification=wants_email, + privileges=[u'active',u'commenter']) + + assert user.wants_comment_notification == wants_email + + user_id = user.id + + media_entry = fixture_media_entry(uploader=user.id, state=u'processed') + + media_entry_id = media_entry.id + + subscription = fixture_comment_subscription(media_entry) + + subscription_id = subscription.id + + media_uri_id = '/u/{0}/m/{1}/'.format(user.username, + media_entry.id) + media_uri_slug = '/u/{0}/m/{1}/'.format(user.username, + media_entry.slug) + + self.test_app.post( + media_uri_id + 'comment/add/', + { + 'comment_content': u'Test comment #42' + } + ) + + notifications = Notification.query.filter_by( + user_id=user.id).all() + + assert len(notifications) == 1 + + notification = notifications[0] + + assert type(notification) == CommentNotification + assert notification.seen == False + assert notification.user_id == user.id + assert notification.subject.get_author.id == self.test_user.id + assert notification.subject.content == u'Test comment #42' + + if wants_email == True: + assert mail.EMAIL_TEST_MBOX_INBOX == [ + {'from': 'notice@mediagoblin.example.org', + 'message': 'Content-Type: text/plain; \ +charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: \ +base64\nSubject: GNU MediaGoblin - chris commented on your \ +post\nFrom: notice@mediagoblin.example.org\nTo: \ +otherperson@example.com\n\nSGkgb3RoZXJwZXJzb24sCmNocmlzIGNvbW1lbnRlZCBvbiB5b3VyIHBvc3QgKGh0dHA6Ly9sb2Nh\nbGhvc3Q6ODAvdS9vdGhlcnBlcnNvbi9tL3NvbWUtdGl0bGUvYy8xLyNjb21tZW50KSBhdCBHTlUg\nTWVkaWFHb2JsaW4KClRlc3QgY29tbWVudCAjNDIKCkdOVSBNZWRpYUdvYmxpbg==\n', + 'to': [u'otherperson@example.com']}] + else: + assert mail.EMAIL_TEST_MBOX_INBOX == [] + + + # Save the ids temporarily because of DetachedInstanceError + notification_id = notification.id + comment_id = notification.subject.id + + self.logout() + self.login('otherperson', 'nosreprehto') + + self.test_app.get(media_uri_slug + '/c/{0}/'.format(comment_id)) + + notification = Notification.query.filter_by(id=notification_id).first() + + assert notification.seen == True + + self.test_app.get(media_uri_slug + '/notifications/silence/') + + subscription = CommentSubscription.query.filter_by(id=subscription_id)\ + .first() + + assert subscription.notify == False + + notifications = Notification.query.filter_by( + user_id=user_id).all() + + # User should not have been notified + assert len(notifications) == 1 + + def test_mark_all_comment_notifications_seen(self): + """ Test that mark_all_comments_seen works""" + + user = fixture_add_user('otherperson', password='nosreprehto', + privileges=[u'active']) + + media_entry = fixture_media_entry(uploader=user.id, state=u'processed') + + fixture_comment_subscription(media_entry) + + media_uri_id = '/u/{0}/m/{1}/'.format(user.username, + media_entry.id) + + # add 2 comments + self.test_app.post( + media_uri_id + 'comment/add/', + { + 'comment_content': u'Test comment #43' + } + ) + + self.test_app.post( + media_uri_id + 'comment/add/', + { + 'comment_content': u'Test comment #44' + } + ) + + notifications = Notification.query.filter_by( + user_id=user.id).all() + + assert len(notifications) == 2 + + # both comments should not be marked seen + assert notifications[0].seen == False + assert notifications[1].seen == False + + # login with other user to mark notifications seen + self.logout() + self.login('otherperson', 'nosreprehto') + + # mark all comment notifications seen + res = self.test_app.get('/notifications/comments/mark_all_seen/') + res.follow() + + assert urlparse.urlsplit(res.location)[2] == '/' + + notifications = Notification.query.filter_by( + user_id=user.id).all() + + # both notifications should be marked seen + assert notifications[0].seen == True + assert notifications[1].seen == True diff --git a/mediagoblin/tests/test_oauth1.py b/mediagoblin/tests/test_oauth1.py new file mode 100644 index 00000000..073c2884 --- /dev/null +++ b/mediagoblin/tests/test_oauth1.py @@ -0,0 +1,166 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import cgi + +import pytest +from urlparse import parse_qs, urlparse + +from oauthlib.oauth1 import Client + +from mediagoblin import mg_globals +from mediagoblin.tools import template, pluginapi +from mediagoblin.tests.tools import fixture_add_user + + +class TestOAuth(object): + + MIME_FORM = "application/x-www-form-urlencoded" + MIME_JSON = "application/json" + + @pytest.fixture(autouse=True) + def setup(self, test_app): + self.test_app = test_app + + self.db = mg_globals.database + + self.pman = pluginapi.PluginManager() + + self.user_password = "AUserPassword123" + self.user = fixture_add_user("OAuthy", self.user_password) + + self.login() + + def login(self): + self.test_app.post( + "/auth/login/", { + "username": self.user.username, + "password": self.user_password}) + + def register_client(self, **kwargs): + """ Regiters a client with the API """ + + kwargs["type"] = "client_associate" + kwargs["application_type"] = kwargs.get("application_type", "native") + return self.test_app.post("/api/client/register", kwargs) + + def test_client_client_register_limited_info(self): + """ Tests that a client can be registered with limited information """ + response = self.register_client() + client_info = response.json + + client = self.db.Client.query.filter_by(id=client_info["client_id"]).first() + + assert response.status_int == 200 + assert client is not None + + def test_client_register_full_info(self): + """ Provides every piece of information possible to register client """ + query = { + "application_name": "Testificate MD", + "application_type": "web", + "contacts": "someone@someplace.com tuteo@tsengeo.lu", + "logo_url": "http://ayrel.com/utral.png", + "redirect_uris": "http://navi-kosman.lu http://gmg-yawne-oeru.lu", + } + + response = self.register_client(**query) + client_info = response.json + + client = self.db.Client.query.filter_by(id=client_info["client_id"]).first() + + assert client is not None + assert client.secret == client_info["client_secret"] + assert client.application_type == query["application_type"] + assert client.redirect_uri == query["redirect_uris"].split() + assert client.logo_url == query["logo_url"] + assert client.contacts == query["contacts"].split() + + + def test_client_update(self): + """ Tests that you can update a client """ + # first we need to register a client + response = self.register_client() + + client_info = response.json + client = self.db.Client.query.filter_by(id=client_info["client_id"]).first() + + # Now update + update_query = { + "type": "client_update", + "application_name": "neytiri", + "contacts": "someone@someplace.com abc@cba.com", + "logo_url": "http://place.com/picture.png", + "application_type": "web", + "redirect_uris": "http://blah.gmg/whatever https://inboxen.org/", + } + + update_response = self.register_client(**update_query) + + assert update_response.status_int == 200 + client_info = update_response.json + client = self.db.Client.query.filter_by(id=client_info["client_id"]).first() + + assert client.secret == client_info["client_secret"] + assert client.application_type == update_query["application_type"] + assert client.application_name == update_query["application_name"] + assert client.contacts == update_query["contacts"].split() + assert client.logo_url == update_query["logo_url"] + assert client.redirect_uri == update_query["redirect_uris"].split() + + def to_authorize_headers(self, data): + headers = "" + for key, value in data.items(): + headers += '{0}="{1}",'.format(key, value) + return {"Authorization": "OAuth " + headers[:-1]} + + def test_request_token(self): + """ Test a request for a request token """ + response = self.register_client() + + client_id = response.json["client_id"] + + endpoint = "/oauth/request_token" + request_query = { + "oauth_consumer_key": client_id, + "oauth_nonce": "abcdefghij", + "oauth_timestamp": 123456789.0, + "oauth_callback": "https://some.url/callback", + } + + headers = self.to_authorize_headers(request_query) + + headers["Content-Type"] = self.MIME_FORM + + response = self.test_app.post(endpoint, headers=headers) + response = cgi.parse_qs(response.body) + + # each element is a list, reduce it to a string + for key, value in response.items(): + response[key] = value[0] + + request_token = self.db.RequestToken.query.filter_by( + token=response["oauth_token"] + ).first() + + client = self.db.Client.query.filter_by(id=client_id).first() + + assert request_token is not None + assert request_token.secret == response["oauth_token_secret"] + assert request_token.client == client.id + assert request_token.used == False + assert request_token.callback == request_query["oauth_callback"] + diff --git a/mediagoblin/tests/test_oauth.py b/mediagoblin/tests/test_oauth2.py index ea3bd798..957f4e65 100644 --- a/mediagoblin/tests/test_oauth.py +++ b/mediagoblin/tests/test_oauth2.py @@ -38,7 +38,8 @@ class TestOAuth(object): self.pman = pluginapi.PluginManager() self.user_password = u'4cc355_70k3N' - self.user = fixture_add_user(u'joauth', self.user_password) + self.user = fixture_add_user(u'joauth', self.user_password, + privileges=[u'active']) self.login() @@ -51,7 +52,7 @@ class TestOAuth(object): def register_client(self, name, client_type, description=None, redirect_uri=''): return self.test_app.post( - '/oauth/client/register', { + '/oauth-2/client/register', { 'name': name, 'description': description, 'type': client_type, @@ -115,7 +116,7 @@ class TestOAuth(object): client_identifier = client.identifier redirect_uri = 'https://foo.example' - response = self.test_app.get('/oauth/authorize', { + response = self.test_app.get('/oauth-2/authorize', { 'client_id': client.identifier, 'scope': 'all', 'redirect_uri': redirect_uri}) @@ -129,7 +130,7 @@ class TestOAuth(object): # Short for client authorization post reponse capr = self.test_app.post( - '/oauth/client/authorize', { + '/oauth-2/client/authorize', { 'client_id': form.client_id.data, 'allow': 'Allow', 'next': form.next.data}) @@ -155,7 +156,7 @@ class TestOAuth(object): client = self.db.OAuthClient.query.filter( self.db.OAuthClient.identifier == unicode(client_id)).first() - token_res = self.test_app.get('/oauth/access_token?client_id={0}&\ + token_res = self.test_app.get('/oauth-2/access_token?client_id={0}&\ code={1}&client_secret={2}'.format(client_id, code, client.secret)) assert token_res.status_int == 200 @@ -183,7 +184,7 @@ code={1}&client_secret={2}'.format(client_id, code, client.secret)) client = self.db.OAuthClient.query.filter( self.db.OAuthClient.identifier == unicode(client_id)).first() - token_res = self.test_app.get('/oauth/access_token?\ + token_res = self.test_app.get('/oauth-2/access_token?\ code={0}&client_secret={1}'.format(code, client.secret)) assert token_res.status_int == 200 @@ -204,7 +205,7 @@ code={0}&client_secret={1}'.format(code, client.secret)) client = self.db.OAuthClient.query.filter( self.db.OAuthClient.identifier == client_id).first() - token_res = self.test_app.get('/oauth/access_token', + token_res = self.test_app.get('/oauth-2/access_token', {'refresh_token': token_data['refresh_token'], 'client_id': client_id, 'client_secret': client.secret diff --git a/mediagoblin/tests/test_openid.py b/mediagoblin/tests/test_openid.py new file mode 100644 index 00000000..0424fdda --- /dev/null +++ b/mediagoblin/tests/test_openid.py @@ -0,0 +1,374 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import urlparse +import pkg_resources +import pytest +import mock + +openid_consumer = pytest.importorskip( + "openid.consumer.consumer") + +from mediagoblin import mg_globals +from mediagoblin.db.base import Session +from mediagoblin.db.models import User +from mediagoblin.plugins.openid.models import OpenIDUserURL +from mediagoblin.tests.tools import get_app, fixture_add_user +from mediagoblin.tools import template + + +# App with plugin enabled +@pytest.fixture() +def openid_plugin_app(request): + return get_app( + request, + mgoblin_config=pkg_resources.resource_filename( + 'mediagoblin.tests.auth_configs', + 'openid_appconfig.ini')) + + +class TestOpenIDPlugin(object): + def _setup(self, openid_plugin_app, value=True, edit=False, delete=False): + if value: + response = openid_consumer.SuccessResponse(mock.Mock(), mock.Mock()) + if edit or delete: + response.identity_url = u'http://add.myopenid.com' + else: + response.identity_url = u'http://real.myopenid.com' + self._finish_verification = mock.Mock(return_value=response) + else: + self._finish_verification = mock.Mock(return_value=False) + + @mock.patch('mediagoblin.plugins.openid.views._response_email', mock.Mock(return_value=None)) + @mock.patch('mediagoblin.plugins.openid.views._response_nickname', mock.Mock(return_value=None)) + @mock.patch('mediagoblin.plugins.openid.views._finish_verification', self._finish_verification) + def _setup_start(self, openid_plugin_app, edit, delete): + if edit: + self._start_verification = mock.Mock(return_value=openid_plugin_app.post( + '/edit/openid/finish/')) + elif delete: + self._start_verification = mock.Mock(return_value=openid_plugin_app.post( + '/edit/openid/delete/finish/')) + else: + self._start_verification = mock.Mock(return_value=openid_plugin_app.post( + '/auth/openid/login/finish/')) + _setup_start(self, openid_plugin_app, edit, delete) + + def test_bad_login(self, openid_plugin_app): + """ Test that attempts to login with invalid paramaters""" + + # Test GET request for auth/register page + res = openid_plugin_app.get('/auth/register/').follow() + + # Make sure it redirected to the correct place + assert urlparse.urlsplit(res.location)[2] == '/auth/openid/login/' + + # Test GET request for auth/login page + res = openid_plugin_app.get('/auth/login/') + res.follow() + + # Correct redirect? + assert urlparse.urlsplit(res.location)[2] == '/auth/openid/login/' + + # Test GET request for auth/openid/register page + res = openid_plugin_app.get('/auth/openid/register/') + res.follow() + + # Correct redirect? + assert urlparse.urlsplit(res.location)[2] == '/auth/openid/login/' + + # Test GET request for auth/openid/login/finish page + res = openid_plugin_app.get('/auth/openid/login/finish/') + res.follow() + + # Correct redirect? + assert urlparse.urlsplit(res.location)[2] == '/auth/openid/login/' + + # Test GET request for auth/openid/login page + res = openid_plugin_app.get('/auth/openid/login/') + + # Correct place? + assert 'mediagoblin/plugins/openid/login.html' in template.TEMPLATE_TEST_CONTEXT + + # Try to login with an empty form + template.clear_test_template_context() + openid_plugin_app.post( + '/auth/openid/login/', {}) + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/plugins/openid/login.html'] + form = context['login_form'] + assert form.openid.errors == [u'This field is required.'] + + # Try to login with wrong form values + template.clear_test_template_context() + openid_plugin_app.post( + '/auth/openid/login/', { + 'openid': 'not_a_url.com'}) + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/plugins/openid/login.html'] + form = context['login_form'] + assert form.openid.errors == [u'Please enter a valid url.'] + + # Should be no users in the db + assert User.query.count() == 0 + + # Phony OpenID URl + template.clear_test_template_context() + openid_plugin_app.post( + '/auth/openid/login/', { + 'openid': 'http://phoney.myopenid.com/'}) + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/plugins/openid/login.html'] + form = context['login_form'] + assert form.openid.errors == [u'Sorry, the OpenID server could not be found'] + + def test_login(self, openid_plugin_app): + """Tests that test login and registion with openid""" + # Test finish_login redirects correctly when response = False + self._setup(openid_plugin_app, False) + + @mock.patch('mediagoblin.plugins.openid.views._finish_verification', self._finish_verification) + @mock.patch('mediagoblin.plugins.openid.views._start_verification', self._start_verification) + def _test_non_response(): + template.clear_test_template_context() + res = openid_plugin_app.post( + '/auth/openid/login/', { + 'openid': 'http://phoney.myopenid.com/'}) + res.follow() + + # Correct Place? + assert urlparse.urlsplit(res.location)[2] == '/auth/openid/login/' + assert 'mediagoblin/plugins/openid/login.html' in template.TEMPLATE_TEST_CONTEXT + _test_non_response() + + # Test login with new openid + # Need to clear_test_template_context before calling _setup + template.clear_test_template_context() + self._setup(openid_plugin_app) + + @mock.patch('mediagoblin.plugins.openid.views._finish_verification', self._finish_verification) + @mock.patch('mediagoblin.plugins.openid.views._start_verification', self._start_verification) + def _test_new_user(): + openid_plugin_app.post( + '/auth/openid/login/', { + 'openid': u'http://real.myopenid.com'}) + + # Right place? + assert 'mediagoblin/auth/register.html' in template.TEMPLATE_TEST_CONTEXT + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html'] + register_form = context['register_form'] + + # Register User + res = openid_plugin_app.post( + '/auth/openid/register/', { + 'openid': register_form.openid.data, + 'username': u'chris', + 'email': u'chris@example.com'}) + res.follow() + + # Correct place? + assert urlparse.urlsplit(res.location)[2] == '/u/chris/' + assert 'mediagoblin/user_pages/user_nonactive.html' in template.TEMPLATE_TEST_CONTEXT + + # No need to test if user is in logged in and verification email + # awaits, since openid uses the register_user function which is + # tested in test_auth + + # Logout User + openid_plugin_app.get('/auth/logout') + + # Get user and detach from session + test_user = mg_globals.database.User.query.filter_by( + username=u'chris').first() + Session.expunge(test_user) + + # Log back in + # Could not get it to work by 'POST'ing to /auth/openid/login/ + template.clear_test_template_context() + res = openid_plugin_app.post( + '/auth/openid/login/finish/', { + 'openid': u'http://real.myopenid.com'}) + res.follow() + + assert urlparse.urlsplit(res.location)[2] == '/' + assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT + + # Make sure user is in the session + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html'] + session = context['request'].session + assert session['user_id'] == unicode(test_user.id) + + _test_new_user() + + # Test register with empty form + template.clear_test_template_context() + openid_plugin_app.post( + '/auth/openid/register/', {}) + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html'] + register_form = context['register_form'] + + assert register_form.openid.errors == [u'This field is required.'] + assert register_form.email.errors == [u'This field is required.'] + assert register_form.username.errors == [u'This field is required.'] + + # Try to register with existing username and email + template.clear_test_template_context() + openid_plugin_app.post( + '/auth/openid/register/', { + 'openid': 'http://real.myopenid.com', + 'email': 'chris@example.com', + 'username': 'chris'}) + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html'] + register_form = context['register_form'] + + assert register_form.username.errors == [u'Sorry, a user with that name already exists.'] + assert register_form.email.errors == [u'Sorry, a user with that email address already exists.'] + assert register_form.openid.errors == [u'Sorry, an account is already registered to that OpenID.'] + + def test_add_delete(self, openid_plugin_app): + """Test adding and deleting openids""" + # Add user + test_user = fixture_add_user(password='', privileges=[u'active']) + openid = OpenIDUserURL() + openid.openid_url = 'http://real.myopenid.com' + openid.user_id = test_user.id + openid.save() + + # Log user in + template.clear_test_template_context() + self._setup(openid_plugin_app) + + @mock.patch('mediagoblin.plugins.openid.views._finish_verification', self._finish_verification) + @mock.patch('mediagoblin.plugins.openid.views._start_verification', self._start_verification) + def _login_user(): + openid_plugin_app.post( + '/auth/openid/login/finish/', { + 'openid': u'http://real.myopenid.com'}) + + _login_user() + + # Try and delete only OpenID url + template.clear_test_template_context() + res = openid_plugin_app.post( + '/edit/openid/delete/', { + 'openid': 'http://real.myopenid.com'}) + assert 'mediagoblin/plugins/openid/delete.html' in template.TEMPLATE_TEST_CONTEXT + + # Add OpenID to user + # Empty form + template.clear_test_template_context() + res = openid_plugin_app.post( + '/edit/openid/', {}) + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/plugins/openid/add.html'] + form = context['form'] + assert form.openid.errors == [u'This field is required.'] + + # Try with a bad url + template.clear_test_template_context() + openid_plugin_app.post( + '/edit/openid/', { + 'openid': u'not_a_url.com'}) + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/plugins/openid/add.html'] + form = context['form'] + assert form.openid.errors == [u'Please enter a valid url.'] + + # Try with a url that's already registered + template.clear_test_template_context() + openid_plugin_app.post( + '/edit/openid/', { + 'openid': 'http://real.myopenid.com'}) + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/plugins/openid/add.html'] + form = context['form'] + assert form.openid.errors == [u'Sorry, an account is already registered to that OpenID.'] + + # Test adding openid to account + # Need to clear_test_template_context before calling _setup + template.clear_test_template_context() + self._setup(openid_plugin_app, edit=True) + + # Need to remove openid_url from db because it was added at setup + openid = OpenIDUserURL.query.filter_by( + openid_url=u'http://add.myopenid.com') + openid.delete() + + @mock.patch('mediagoblin.plugins.openid.views._finish_verification', self._finish_verification) + @mock.patch('mediagoblin.plugins.openid.views._start_verification', self._start_verification) + def _test_add(): + # Successful add + template.clear_test_template_context() + res = openid_plugin_app.post( + '/edit/openid/', { + 'openid': u'http://add.myopenid.com'}) + res.follow() + + # Correct place? + assert urlparse.urlsplit(res.location)[2] == '/edit/account/' + assert 'mediagoblin/edit/edit_account.html' in template.TEMPLATE_TEST_CONTEXT + + # OpenID Added? + new_openid = mg_globals.database.OpenIDUserURL.query.filter_by( + openid_url=u'http://add.myopenid.com').first() + assert new_openid + + _test_add() + + # Test deleting openid from account + # Need to clear_test_template_context before calling _setup + template.clear_test_template_context() + self._setup(openid_plugin_app, delete=True) + + # Need to add OpenID back to user because it was deleted during + # patch + openid = OpenIDUserURL() + openid.openid_url = 'http://add.myopenid.com' + openid.user_id = test_user.id + openid.save() + + @mock.patch('mediagoblin.plugins.openid.views._finish_verification', self._finish_verification) + @mock.patch('mediagoblin.plugins.openid.views._start_verification', self._start_verification) + def _test_delete(self, test_user): + # Delete openid from user + # Create another user to test deleting OpenID that doesn't belong to them + new_user = fixture_add_user(username='newman') + openid = OpenIDUserURL() + openid.openid_url = 'http://realfake.myopenid.com/' + openid.user_id = new_user.id + openid.save() + + # Try and delete OpenID url that isn't the users + template.clear_test_template_context() + res = openid_plugin_app.post( + '/edit/openid/delete/', { + 'openid': 'http://realfake.myopenid.com/'}) + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/plugins/openid/delete.html'] + form = context['form'] + assert form.openid.errors == [u'That OpenID is not registered to this account.'] + + # Delete OpenID + # Kind of weird to POST to delete/finish + template.clear_test_template_context() + res = openid_plugin_app.post( + '/edit/openid/delete/finish/', { + 'openid': u'http://add.myopenid.com'}) + res.follow() + + # Correct place? + assert urlparse.urlsplit(res.location)[2] == '/edit/account/' + assert 'mediagoblin/edit/edit_account.html' in template.TEMPLATE_TEST_CONTEXT + + # OpenID deleted? + new_openid = mg_globals.database.OpenIDUserURL.query.filter_by( + openid_url=u'http://add.myopenid.com').first() + assert not new_openid + + _test_delete(self, test_user) diff --git a/mediagoblin/tests/test_paste.ini b/mediagoblin/tests/test_paste.ini index 91ecbb84..a9595432 100644 --- a/mediagoblin/tests/test_paste.ini +++ b/mediagoblin/tests/test_paste.ini @@ -6,6 +6,8 @@ use = egg:Paste#urlmap / = mediagoblin /mgoblin_media/ = publicstore_serve /test_static/ = mediagoblin_static +/theme_static/ = theme_static +/plugin_static/ = plugin_static [app:mediagoblin] use = egg:mediagoblin#app @@ -13,12 +15,22 @@ config = %(here)s/mediagoblin.ini [app:publicstore_serve] use = egg:Paste#static -document_root = %(here)s/test_user_dev/media/public +document_root = %(here)s/user_dev/media/public [app:mediagoblin_static] use = egg:Paste#static document_root = %(here)s/mediagoblin/static/ +[app:theme_static] +use = egg:Paste#static +document_root = %(here)s/user_dev/theme_static/ +cache_max_age = 86400 + +[app:plugin_static] +use = egg:Paste#static +document_root = %(here)s/user_dev/plugin_static/ +cache_max_age = 86400 + [celery] CELERY_ALWAYS_EAGER = true diff --git a/mediagoblin/tests/test_persona.py b/mediagoblin/tests/test_persona.py new file mode 100644 index 00000000..a1cd30eb --- /dev/null +++ b/mediagoblin/tests/test_persona.py @@ -0,0 +1,214 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +import urlparse +import pkg_resources +import pytest +import mock + +pytest.importorskip("requests") + +from mediagoblin import mg_globals +from mediagoblin.db.base import Session +from mediagoblin.db.models import Privilege +from mediagoblin.tests.tools import get_app +from mediagoblin.tools import template + + +# App with plugin enabled +@pytest.fixture() +def persona_plugin_app(request): + return get_app( + request, + mgoblin_config=pkg_resources.resource_filename( + 'mediagoblin.tests.auth_configs', + 'persona_appconfig.ini')) + + +class TestPersonaPlugin(object): + def test_authentication_views(self, persona_plugin_app): + res = persona_plugin_app.get('/auth/login/') + + assert urlparse.urlsplit(res.location)[2] == '/' + + res = persona_plugin_app.get('/auth/register/') + + assert urlparse.urlsplit(res.location)[2] == '/' + + res = persona_plugin_app.get('/auth/persona/login/') + + assert urlparse.urlsplit(res.location)[2] == '/auth/login/' + + res = persona_plugin_app.get('/auth/persona/register/') + + assert urlparse.urlsplit(res.location)[2] == '/auth/login/' + + @mock.patch('mediagoblin.plugins.persona.views._get_response', mock.Mock(return_value=u'test@example.com')) + def _test_registration(): + # No register users + template.clear_test_template_context() + res = persona_plugin_app.post( + '/auth/persona/login/', {}) + + assert 'mediagoblin/auth/register.html' in template.TEMPLATE_TEST_CONTEXT + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html'] + register_form = context['register_form'] + + assert register_form.email.data == u'test@example.com' + assert register_form.persona_email.data == u'test@example.com' + + template.clear_test_template_context() + res = persona_plugin_app.post( + '/auth/persona/register/', {}) + + assert 'mediagoblin/auth/register.html' in template.TEMPLATE_TEST_CONTEXT + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html'] + register_form = context['register_form'] + + assert register_form.username.errors == [u'This field is required.'] + assert register_form.email.errors == [u'This field is required.'] + assert register_form.persona_email.errors == [u'This field is required.'] + + # Successful register + template.clear_test_template_context() + res = persona_plugin_app.post( + '/auth/persona/register/', + {'username': 'chris', + 'email': 'chris@example.com', + 'persona_email': 'test@example.com'}) + res.follow() + + assert urlparse.urlsplit(res.location)[2] == '/u/chris/' + assert 'mediagoblin/user_pages/user_nonactive.html' in template.TEMPLATE_TEST_CONTEXT + + # Try to register same Persona email address + template.clear_test_template_context() + res = persona_plugin_app.post( + '/auth/persona/register/', + {'username': 'chris1', + 'email': 'chris1@example.com', + 'persona_email': 'test@example.com'}) + + assert 'mediagoblin/auth/register.html' in template.TEMPLATE_TEST_CONTEXT + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html'] + register_form = context['register_form'] + + assert register_form.persona_email.errors == [u'Sorry, an account is already registered to that Persona email.'] + + # Logout + persona_plugin_app.get('/auth/logout/') + + # Get user and detach from session + test_user = mg_globals.database.User.query.filter_by( + username=u'chris').first() + active_privilege = Privilege.query.filter( + Privilege.privilege_name==u'active').first() + test_user.all_privileges.append(active_privilege) + test_user.save() + test_user = mg_globals.database.User.query.filter_by( + username=u'chris').first() + Session.expunge(test_user) + + # Add another user for _test_edit_persona + persona_plugin_app.post( + '/auth/persona/register/', + {'username': 'chris1', + 'email': 'chris1@example.com', + 'persona_email': 'test1@example.com'}) + + # Log back in + template.clear_test_template_context() + res = persona_plugin_app.post( + '/auth/persona/login/') + res.follow() + + assert urlparse.urlsplit(res.location)[2] == '/' + assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT + + # Make sure user is in the session + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html'] + session = context['request'].session + assert session['user_id'] == unicode(test_user.id) + + _test_registration() + + @mock.patch('mediagoblin.plugins.persona.views._get_response', mock.Mock(return_value=u'new@example.com')) + def _test_edit_persona(): + # Try and delete only Persona email address + template.clear_test_template_context() + res = persona_plugin_app.post( + '/edit/persona/', + {'email': 'test@example.com'}) + + assert 'mediagoblin/plugins/persona/edit.html' in template.TEMPLATE_TEST_CONTEXT + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/plugins/persona/edit.html'] + form = context['form'] + + assert form.email.errors == [u"You can't delete your only Persona email address unless you have a password set."] + + template.clear_test_template_context() + res = persona_plugin_app.post( + '/edit/persona/', {}) + + assert 'mediagoblin/plugins/persona/edit.html' in template.TEMPLATE_TEST_CONTEXT + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/plugins/persona/edit.html'] + form = context['form'] + + assert form.email.errors == [u'This field is required.'] + + # Try and delete Persona not owned by the user + template.clear_test_template_context() + res = persona_plugin_app.post( + '/edit/persona/', + {'email': 'test1@example.com'}) + + assert 'mediagoblin/plugins/persona/edit.html' in template.TEMPLATE_TEST_CONTEXT + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/plugins/persona/edit.html'] + form = context['form'] + + assert form.email.errors == [u'That Persona email address is not registered to this account.'] + + res = persona_plugin_app.get('/edit/persona/add/') + + assert urlparse.urlsplit(res.location)[2] == '/edit/persona/' + + # Add Persona email address + template.clear_test_template_context() + res = persona_plugin_app.post( + '/edit/persona/add/') + res.follow() + + assert urlparse.urlsplit(res.location)[2] == '/edit/account/' + + # Delete a Persona + res = persona_plugin_app.post( + '/edit/persona/', + {'email': 'test@example.com'}) + res.follow() + + assert urlparse.urlsplit(res.location)[2] == '/edit/account/' + + _test_edit_persona() + + @mock.patch('mediagoblin.plugins.persona.views._get_response', mock.Mock(return_value=u'test1@example.com')) + def _test_add_existing(): + template.clear_test_template_context() + res = persona_plugin_app.post( + '/edit/persona/add/') + res.follow() + + assert urlparse.urlsplit(res.location)[2] == '/edit/persona/' + + _test_add_existing() diff --git a/mediagoblin/tests/test_piwigo.py b/mediagoblin/tests/test_piwigo.py new file mode 100644 index 00000000..16ad0111 --- /dev/null +++ b/mediagoblin/tests/test_piwigo.py @@ -0,0 +1,71 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2013 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import pytest +from .tools import fixture_add_user + + +XML_PREFIX = "<?xml version='1.0' encoding='utf-8'?>\n" + + +class Test_PWG(object): + @pytest.fixture(autouse=True) + def setup(self, test_app): + self.test_app = test_app + + fixture_add_user() + + self.username = u"chris" + self.password = "toast" + + def do_post(self, method, params): + params["method"] = method + return self.test_app.post("/api/piwigo/ws.php", params) + + def do_get(self, method, params=None): + if params is None: + params = {} + params["method"] = method + return self.test_app.get("/api/piwigo/ws.php", params) + + def test_session(self): + resp = self.do_post("pwg.session.login", + {"username": u"nouser", "password": "wrong"}) + assert resp.body == XML_PREFIX \ + + '<rsp stat="fail"><err code="999" msg="Invalid username/password"/></rsp>' + + resp = self.do_post("pwg.session.login", + {"username": self.username, "password": "wrong"}) + assert resp.body == XML_PREFIX \ + + '<rsp stat="fail"><err code="999" msg="Invalid username/password"/></rsp>' + + resp = self.do_get("pwg.session.getStatus") + assert resp.body == XML_PREFIX \ + + '<rsp stat="ok"><username>guest</username></rsp>' + + resp = self.do_post("pwg.session.login", + {"username": self.username, "password": self.password}) + assert resp.body == XML_PREFIX + '<rsp stat="ok">1</rsp>' + + resp = self.do_get("pwg.session.getStatus") + assert resp.body == XML_PREFIX \ + + '<rsp stat="ok"><username>chris</username></rsp>' + + self.do_get("pwg.session.logout") + + resp = self.do_get("pwg.session.getStatus") + assert resp.body == XML_PREFIX \ + + '<rsp stat="ok"><username>guest</username></rsp>' diff --git a/mediagoblin/tests/test_pluginapi.py b/mediagoblin/tests/test_pluginapi.py index f03e868f..eae0ce15 100644 --- a/mediagoblin/tests/test_pluginapi.py +++ b/mediagoblin/tests/test_pluginapi.py @@ -14,14 +14,22 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +import os +import json import sys from configobj import ConfigObj import pytest +import pkg_resources +from validate import VdtTypeError from mediagoblin import mg_globals from mediagoblin.init.plugins import setup_plugins +from mediagoblin.init.config import read_mediagoblin_config +from mediagoblin.gmg_commands.assetlink import link_plugin_assets from mediagoblin.tools import pluginapi +from mediagoblin.tests.tools import get_app +from mediagoblin.tools.common import CollectingPrinter def with_cleanup(*modules_to_delete): @@ -294,3 +302,165 @@ def test_hook_transform(): assert pluginapi.hook_transform( "expand_tuple", (-1, 0)) == (-1, 0, 1, 2, 3) + + +def test_plugin_config(): + """ + Make sure plugins can set up their own config + """ + config, validation_result = read_mediagoblin_config( + pkg_resources.resource_filename( + 'mediagoblin.tests', 'appconfig_plugin_specs.ini')) + + pluginspec_section = config['plugins'][ + 'mediagoblin.tests.testplugins.pluginspec'] + assert pluginspec_section['some_string'] == 'not blork' + assert pluginspec_section['dont_change_me'] == 'still the default' + + # Make sure validation works... this should be an error + assert isinstance( + validation_result[ + 'plugins'][ + 'mediagoblin.tests.testplugins.pluginspec'][ + 'some_int'], + VdtTypeError) + + # the callables thing shouldn't really have anything though. + assert len(config['plugins'][ + 'mediagoblin.tests.testplugins.callables1']) == 0 + + +@pytest.fixture() +def context_modified_app(request): + """ + Get a MediaGoblin app fixture using appconfig_context_modified.ini + """ + return get_app( + request, + mgoblin_config=pkg_resources.resource_filename( + 'mediagoblin.tests', 'appconfig_context_modified.ini')) + + +def test_modify_context(context_modified_app): + """ + Test that we can modify both the view/template specific and + global contexts for templates. + """ + # Specific thing passed into a page + result = context_modified_app.get("/modify_context/specific/") + assert result.body.strip() == """Specific page! + +specific thing: in yer specificpage +global thing: globally appended! +something: orother +doubleme: happyhappy""" + + # General test, should have global context variable only + result = context_modified_app.get("/modify_context/") + assert result.body.strip() == """General page! + +global thing: globally appended! +lol: cats +doubleme: joyjoy""" + + +@pytest.fixture() +def static_plugin_app(request): + """ + Get a MediaGoblin app fixture using appconfig_static_plugin.ini + """ + return get_app( + request, + mgoblin_config=pkg_resources.resource_filename( + 'mediagoblin.tests', 'appconfig_static_plugin.ini')) + + +def test_plugin_assetlink(static_plugin_app): + """ + Test that the assetlink command works correctly + """ + linked_assets_dir = mg_globals.app_config['plugin_linked_assets_dir'] + plugin_link_dir = os.path.join( + linked_assets_dir.rstrip(os.path.sep), + 'staticstuff') + + plugin_statics = pluginapi.hook_runall("static_setup") + assert len(plugin_statics) == 1 + plugin_static = plugin_statics[0] + + def run_assetlink(): + printer = CollectingPrinter() + + link_plugin_assets( + plugin_static, linked_assets_dir, printer) + + return printer + + # it shouldn't exist yet + assert not os.path.lexists(plugin_link_dir) + + # link dir doesn't exist, link it + result = run_assetlink().collection[0] + assert result == \ + 'Linked asset directory for plugin "staticstuff":\n %s\nto:\n %s\n' % ( + plugin_static.file_path.rstrip(os.path.sep), + plugin_link_dir) + assert os.path.lexists(plugin_link_dir) + assert os.path.islink(plugin_link_dir) + assert os.path.realpath(plugin_link_dir) == plugin_static.file_path + + # link dir exists, leave it alone + # (and it should exist still since we just ran it..) + result = run_assetlink().collection[0] + assert result == 'Skipping "staticstuff"; already set up.\n' + assert os.path.lexists(plugin_link_dir) + assert os.path.islink(plugin_link_dir) + assert os.path.realpath(plugin_link_dir) == plugin_static.file_path + + # link dir exists, is a symlink to somewhere else (re-link) + junk_file_path = os.path.join( + linked_assets_dir.rstrip(os.path.sep), + 'junk.txt') + with file(junk_file_path, 'w') as junk_file: + junk_file.write('barf') + + os.unlink(plugin_link_dir) + os.symlink(junk_file_path, plugin_link_dir) + + result = run_assetlink().combined_string + assert result == """Old link found for "staticstuff"; removing. +Linked asset directory for plugin "staticstuff": + %s +to: + %s +""" % (plugin_static.file_path.rstrip(os.path.sep), plugin_link_dir) + assert os.path.lexists(plugin_link_dir) + assert os.path.islink(plugin_link_dir) + assert os.path.realpath(plugin_link_dir) == plugin_static.file_path + + # link dir exists, but is a non-symlink + os.unlink(plugin_link_dir) + with file(plugin_link_dir, 'w') as clobber_file: + clobber_file.write('clobbered!') + + result = run_assetlink().collection[0] + assert result == 'Could not link "staticstuff": %s exists and is not a symlink\n' % ( + plugin_link_dir) + + with file(plugin_link_dir, 'r') as clobber_file: + assert clobber_file.read() == 'clobbered!' + + +def test_plugin_staticdirect(static_plugin_app): + """ + Test that the staticdirect utilities pull up the right things + """ + result = json.loads( + static_plugin_app.get('/staticstuff/').body) + + assert len(result) == 2 + + assert result['mgoblin_bunny_pic'] == '/test_static/images/bunny_pic.png' + assert result['plugin_bunny_css'] == \ + '/plugin_static/staticstuff/css/bunnify.css' + diff --git a/mediagoblin/tests/test_privileges.py b/mediagoblin/tests/test_privileges.py new file mode 100644 index 00000000..05829b34 --- /dev/null +++ b/mediagoblin/tests/test_privileges.py @@ -0,0 +1,205 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import pytest +from datetime import date, timedelta +from webtest import AppError + +from mediagoblin.tests.tools import fixture_add_user, fixture_media_entry + +from mediagoblin.db.models import User, UserBan +from mediagoblin.tools import template + +from .resources import GOOD_JPG + +class TestPrivilegeFunctionality: + + @pytest.fixture(autouse=True) + def _setup(self, test_app): + self.test_app = test_app + + fixture_add_user(u'alex', + privileges=[u'admin',u'active']) + fixture_add_user(u'meow', + privileges=[u'moderator',u'active',u'reporter']) + fixture_add_user(u'natalie', + privileges=[u'active']) + self.query_for_users() + + def login(self, username): + self.test_app.post( + '/auth/login/', { + 'username': username, + 'password': 'toast'}) + self.query_for_users() + + def logout(self): + self.test_app.get('/auth/logout/') + self.query_for_users() + + def do_post(self, data, *context_keys, **kwargs): + url = kwargs.pop('url', '/submit/') + do_follow = kwargs.pop('do_follow', False) + template.clear_test_template_context() + response = self.test_app.post(url, data, **kwargs) + if do_follow: + response.follow() + context_data = template.TEMPLATE_TEST_CONTEXT + for key in context_keys: + context_data = context_data[key] + return response, context_data + + def query_for_users(self): + self.admin_user = User.query.filter(User.username==u'alex').first() + self.mod_user = User.query.filter(User.username==u'meow').first() + self.user = User.query.filter(User.username==u'natalie').first() + + def testUserBanned(self): + self.login(u'natalie') + uid = self.user.id + # First, test what happens when a user is banned indefinitely + #---------------------------------------------------------------------- + user_ban = UserBan(user_id=uid, + reason=u'Testing whether user is banned', + expiration_date=None) + user_ban.save() + + response = self.test_app.get('/') + assert response.status == "200 OK" + assert "You are Banned" in response.body + # Then test what happens when that ban has an expiration date which + # hasn't happened yet + #---------------------------------------------------------------------- + user_ban = UserBan.query.get(uid) + user_ban.delete() + user_ban = UserBan(user_id=uid, + reason=u'Testing whether user is banned', + expiration_date= date.today() + timedelta(days=20)) + user_ban.save() + + response = self.test_app.get('/') + assert response.status == "200 OK" + assert "You are Banned" in response.body + + # Then test what happens when that ban has an expiration date which + # has already happened + #---------------------------------------------------------------------- + user_ban = UserBan.query.get(uid) + user_ban.delete() + exp_date = date.today() - timedelta(days=20) + user_ban = UserBan(user_id=uid, + reason=u'Testing whether user is banned', + expiration_date= exp_date) + user_ban.save() + + response = self.test_app.get('/') + assert response.status == "302 FOUND" + assert not "You are Banned" in response.body + + def testVariousPrivileges(self): + # The various actions that require privileges (ex. reporting, + # commenting, moderating...) are tested in other tests. This method + # will be used to ensure that those actions are impossible for someone + # without the proper privileges. + # For other tests that show what happens when a user has the proper + # privileges, check out: + # tests/test_moderation.py moderator + # tests/test_notifications.py commenter + # tests/test_reporting.py reporter + # tests/test_submission.py uploader + #---------------------------------------------------------------------- + self.login(u'natalie') + + # First test the get and post requests of submission/uploading + #---------------------------------------------------------------------- + with pytest.raises(AppError) as excinfo: + response = self.test_app.get('/submit/') + assert 'Bad response: 403 FORBIDDEN' in str(excinfo) + + + with pytest.raises(AppError) as excinfo: + response = self.do_post({'upload_files':[('file',GOOD_JPG)], + 'title':u'Normal Upload 1'}, + url='/submit/') + assert 'Bad response: 403 FORBIDDEN' in str(excinfo) + + # Test that a user cannot comment without the commenter privilege + #---------------------------------------------------------------------- + self.query_for_users() + + media_entry = fixture_media_entry(uploader=self.admin_user.id, + state=u'processed') + + media_entry_id = media_entry.id + media_uri_id = '/u/{0}/m/{1}/'.format(self.admin_user.username, + media_entry.id) + media_uri_slug = '/u/{0}/m/{1}/'.format(self.admin_user.username, + media_entry.slug) + response = self.test_app.get(media_uri_slug) + assert not "Add a comment" in response.body + + self.query_for_users() + with pytest.raises(AppError) as excinfo: + response = self.test_app.post( + media_uri_id + 'comment/add/', + {'comment_content': u'Test comment #42'}) + assert 'Bad response: 403 FORBIDDEN' in str(excinfo) + + # Test that a user cannot report without the reporter privilege + #---------------------------------------------------------------------- + with pytest.raises(AppError) as excinfo: + response = self.test_app.get(media_uri_slug+"report/") + assert 'Bad response: 403 FORBIDDEN' in str(excinfo) + + with pytest.raises(AppError) as excinfo: + response = self.do_post( + {'report_reason':u'Testing Reports #1', + 'reporter_id':u'3'}, + url=(media_uri_slug+"report/")) + assert 'Bad response: 403 FORBIDDEN' in str(excinfo) + + # Test that a user cannot access the moderation pages w/o moderator + # or admin privileges + #---------------------------------------------------------------------- + with pytest.raises(AppError) as excinfo: + response = self.test_app.get("/mod/users/") + assert 'Bad response: 403 FORBIDDEN' in str(excinfo) + + with pytest.raises(AppError) as excinfo: + response = self.test_app.get("/mod/reports/") + assert 'Bad response: 403 FORBIDDEN' in str(excinfo) + + with pytest.raises(AppError) as excinfo: + response = self.test_app.get("/mod/media/") + assert 'Bad response: 403 FORBIDDEN' in str(excinfo) + + with pytest.raises(AppError) as excinfo: + response = self.test_app.get("/mod/users/1/") + assert 'Bad response: 403 FORBIDDEN' in str(excinfo) + + with pytest.raises(AppError) as excinfo: + response = self.test_app.get("/mod/reports/1/") + assert 'Bad response: 403 FORBIDDEN' in str(excinfo) + + self.query_for_users() + + with pytest.raises(AppError) as excinfo: + response, context = self.do_post({'action_to_resolve':[u'takeaway'], + 'take_away_privileges':[u'active'], + 'targeted_user':self.admin_user.id}, + url='/mod/reports/1/') + self.query_for_users() + assert 'Bad response: 403 FORBIDDEN' in str(excinfo) diff --git a/mediagoblin/tests/test_processing.py b/mediagoblin/tests/test_processing.py index fe8489aa..591add96 100644 --- a/mediagoblin/tests/test_processing.py +++ b/mediagoblin/tests/test_processing.py @@ -1,7 +1,5 @@ #!/usr/bin/env python -from nose.tools import assert_equal - from mediagoblin import processing class TestProcessing(object): @@ -10,7 +8,7 @@ class TestProcessing(object): result = builder.fill(format) if output is None: return result - assert_equal(output, result) + assert output == result def test_easy_filename_fill(self): self.run_fill('/home/user/foo.TXT', '{basename}bar{ext}', 'foobar.txt') diff --git a/mediagoblin/tests/test_reporting.py b/mediagoblin/tests/test_reporting.py new file mode 100644 index 00000000..a154a061 --- /dev/null +++ b/mediagoblin/tests/test_reporting.py @@ -0,0 +1,167 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import pytest + +from mediagoblin.tools import template +from mediagoblin.tests.tools import (fixture_add_user, fixture_media_entry, + fixture_add_comment, fixture_add_comment_report) +from mediagoblin.db.models import (MediaReport, CommentReport, User, + MediaComment) + + +class TestReportFiling: + @pytest.fixture(autouse=True) + def _setup(self, test_app): + self.test_app = test_app + + fixture_add_user(u'allie', + privileges=[u'reporter',u'active']) + fixture_add_user(u'natalie', + privileges=[u'active', u'moderator']) + + def login(self, username): + self.test_app.post( + '/auth/login/', { + 'username': username, + 'password': 'toast'}) + + def logout(self): + self.test_app.get('/auth/logout/') + + def do_post(self, data, *context_keys, **kwargs): + url = kwargs.pop('url', '/submit/') + do_follow = kwargs.pop('do_follow', False) + template.clear_test_template_context() + response = self.test_app.post(url, data, **kwargs) + if do_follow: + response.follow() + context_data = template.TEMPLATE_TEST_CONTEXT + for key in context_keys: + context_data = context_data[key] + return response, context_data + + def query_for_users(self): + return (User.query.filter(User.username==u'allie').first(), + User.query.filter(User.username==u'natalie').first()) + + def testMediaReports(self): + self.login(u'allie') + allie_user, natalie_user = self.query_for_users() + allie_id = allie_user.id + + media_entry = fixture_media_entry(uploader=natalie_user.id, + state=u'processed') + + mid = media_entry.id + media_uri_slug = '/u/{0}/m/{1}/'.format(natalie_user.username, + media_entry.slug) + + response = self.test_app.get(media_uri_slug + "report/") + assert response.status == "200 OK" + + response, context = self.do_post( + {'report_reason':u'Testing Media Report', + 'reporter_id':unicode(allie_id)},url= media_uri_slug + "report/") + + assert response.status == "302 FOUND" + + media_report = MediaReport.query.first() + + allie_user, natalie_user = self.query_for_users() + assert media_report is not None + assert media_report.report_content == u'Testing Media Report' + assert media_report.reporter_id == allie_id + assert media_report.reported_user_id == natalie_user.id + assert media_report.created is not None + assert media_report.discriminator == 'media_report' + + def testCommentReports(self): + self.login(u'allie') + allie_user, natalie_user = self.query_for_users() + allie_id = allie_user.id + + media_entry = fixture_media_entry(uploader=natalie_user.id, + state=u'processed') + mid = media_entry.id + fixture_add_comment(media_entry=mid, + author=natalie_user.id) + comment = MediaComment.query.first() + + comment_uri_slug = '/u/{0}/m/{1}/c/{2}/'.format(natalie_user.username, + media_entry.slug, + comment.id) + + response = self.test_app.get(comment_uri_slug + "report/") + assert response.status == "200 OK" + + response, context = self.do_post({ + 'report_reason':u'Testing Comment Report', + 'reporter_id':unicode(allie_id)},url= comment_uri_slug + "report/") + + assert response.status == "302 FOUND" + + comment_report = CommentReport.query.first() + + allie_user, natalie_user = self.query_for_users() + assert comment_report is not None + assert comment_report.report_content == u'Testing Comment Report' + assert comment_report.reporter_id == allie_id + assert comment_report.reported_user_id == natalie_user.id + assert comment_report.created is not None + assert comment_report.discriminator == 'comment_report' + + def testArchivingReports(self): + self.login(u'natalie') + allie_user, natalie_user = self.query_for_users() + allie_id, natalie_id = allie_user.id, natalie_user.id + + fixture_add_comment(author=allie_user.id, + comment=u'Comment will be removed') + test_comment = MediaComment.query.filter( + MediaComment.author==allie_user.id).first() + fixture_add_comment_report(comment=test_comment, + reported_user=allie_user, + report_content=u'Testing Archived Reports #1', + reporter=natalie_user) + comment_report = CommentReport.query.filter( + CommentReport.reported_user==allie_user).first() + + assert comment_report.report_content == u'Testing Archived Reports #1' + response, context = self.do_post( + {'action_to_resolve':[u'userban', u'delete'], + 'targeted_user':allie_user.id, + 'resolution_content':u'This is a test of archiving reports.'}, + url='/mod/reports/{0}/'.format(comment_report.id)) + + assert response.status == "302 FOUND" + allie_user, natalie_user = self.query_for_users() + + archived_report = CommentReport.query.filter( + CommentReport.reported_user==allie_user).first() + + assert CommentReport.query.count() != 0 + assert archived_report is not None + assert archived_report.report_content == u'Testing Archived Reports #1' + assert archived_report.reporter_id == natalie_id + assert archived_report.reported_user_id == allie_id + assert archived_report.created is not None + assert archived_report.resolved is not None + assert archived_report.result == u'''This is a test of archiving reports. +natalie banned user allie indefinitely. +natalie deleted the comment.''' + assert archived_report.discriminator == 'comment_report' + diff --git a/mediagoblin/tests/test_sql_migrations.py b/mediagoblin/tests/test_sql_migrations.py index 2fc4c043..3d67fdf6 100644 --- a/mediagoblin/tests/test_sql_migrations.py +++ b/mediagoblin/tests/test_sql_migrations.py @@ -58,6 +58,10 @@ class Level1(Base1): SET1_MODELS = [Creature1, Level1] +FOUNDATIONS = {Creature1:[{'name':u'goblin','num_legs':2,'is_demon':False}, + {'name':u'cerberus','num_legs':4,'is_demon':True}] + } + SET1_MIGRATIONS = {} ####################################################### @@ -542,7 +546,6 @@ def _insert_migration3_objects(session): session.commit() - def create_test_engine(): from sqlalchemy import create_engine engine = create_engine('sqlite:///:memory:', echo=False) @@ -572,7 +575,7 @@ def test_set1_to_set3(): printer = CollectingPrinter() migration_manager = MigrationManager( - u'__main__', SET1_MODELS, SET1_MIGRATIONS, Session(), + u'__main__', SET1_MODELS, FOUNDATIONS, SET1_MIGRATIONS, Session(), printer) # Check latest migration and database current migration @@ -585,11 +588,13 @@ def test_set1_to_set3(): assert result == u'inited' # Check output assert printer.combined_string == ( - "-> Initializing main mediagoblin tables... done.\n") + "-> Initializing main mediagoblin tables... done.\n" + \ + " + Laying foundations for Creature1 table\n" ) # Check version in database assert migration_manager.latest_migration == 0 assert migration_manager.database_current_migration == 0 + # Install the initial set # ----------------------- @@ -597,8 +602,8 @@ def test_set1_to_set3(): # Try to "re-migrate" with same manager settings... nothing should happen migration_manager = MigrationManager( - u'__main__', SET1_MODELS, SET1_MIGRATIONS, Session(), - printer) + u'__main__', SET1_MODELS, FOUNDATIONS, SET1_MIGRATIONS, + Session(), printer) assert migration_manager.init_or_migrate() == None # Check version in database @@ -639,6 +644,20 @@ def test_set1_to_set3(): # Now check to see if stuff seems to be in there. session = Session() + # Check the creation of the foundation rows on the creature table + creature = session.query(Creature1).filter_by( + name=u'goblin').one() + assert creature.num_legs == 2 + assert creature.is_demon == False + + creature = session.query(Creature1).filter_by( + name=u'cerberus').one() + assert creature.num_legs == 4 + assert creature.is_demon == True + + + # Check the creation of the inserted rows on the creature and levels tables + creature = session.query(Creature1).filter_by( name=u'centipede').one() assert creature.num_legs == 100 @@ -679,7 +698,7 @@ def test_set1_to_set3(): # isn't said to be updated yet printer = CollectingPrinter() migration_manager = MigrationManager( - u'__main__', SET3_MODELS, SET3_MIGRATIONS, Session(), + u'__main__', SET3_MODELS, FOUNDATIONS, SET3_MIGRATIONS, Session(), printer) assert migration_manager.latest_migration == 8 @@ -706,7 +725,7 @@ def test_set1_to_set3(): # Make sure version matches expected migration_manager = MigrationManager( - u'__main__', SET3_MODELS, SET3_MIGRATIONS, Session(), + u'__main__', SET3_MODELS, FOUNDATIONS, SET3_MIGRATIONS, Session(), printer) assert migration_manager.latest_migration == 8 assert migration_manager.database_current_migration == 8 @@ -772,6 +791,15 @@ def test_set1_to_set3(): # Now check to see if stuff seems to be in there. session = Session() + + + # Start with making sure that the foundations did not run again + assert session.query(Creature3).filter_by( + name=u'goblin').count() == 1 + assert session.query(Creature3).filter_by( + name=u'cerberus').count() == 1 + + # Then make sure the models have been migrated correctly creature = session.query(Creature3).filter_by( name=u'centipede').one() assert creature.num_limbs == 100.0 diff --git a/mediagoblin/tests/test_submission.py b/mediagoblin/tests/test_submission.py index 162b2d19..b5b13ed3 100644 --- a/mediagoblin/tests/test_submission.py +++ b/mediagoblin/tests/test_submission.py @@ -24,13 +24,14 @@ import pytest from mediagoblin.tests.tools import fixture_add_user from mediagoblin import mg_globals -from mediagoblin.db.models import MediaEntry +from mediagoblin.db.models import MediaEntry, User +from mediagoblin.db.base import Session from mediagoblin.tools import template -from mediagoblin.media_types.image import MEDIA_MANAGER as img_MEDIA_MANAGER +from mediagoblin.media_types.image import ImageMediaManager from mediagoblin.media_types.pdf.processing import check_prerequisites as pdf_check_prerequisites from .resources import GOOD_JPG, GOOD_PNG, EVIL_FILE, EVIL_JPG, EVIL_PNG, \ - BIG_BLUE, GOOD_PDF, GPS_JPG + BIG_BLUE, GOOD_PDF, GPS_JPG, MED_PNG, BIG_PNG GOOD_TAG_STRING = u'yin,yang' BAD_TAG_STRING = unicode('rage,' + 'f' * 26 + 'u' * 26) @@ -46,12 +47,22 @@ class TestSubmission: # TODO: Possibly abstract into a decorator like: # @as_authenticated_user('chris') - test_user = fixture_add_user() - - self.test_user = test_user + fixture_add_user(privileges=[u'active',u'uploader', u'commenter']) self.login() + def our_user(self): + """ + Fetch the user we're submitting with. Every .get() or .post() + invalidates the session; this is a hacky workaround. + """ + #### FIXME: Pytest collects this as a test and runs this. + #### ... it shouldn't. At least it passes, but that's + #### totally stupid. + #### Also if we found a way to make this run it should be a + #### property. + return User.query.filter(User.username==u'chris').first() + def login(self): self.test_app.post( '/auth/login/', { @@ -77,7 +88,7 @@ class TestSubmission: return {'upload_files': [('file', filename)]} def check_comments(self, request, media_id, count): - comments = request.db.MediaComment.find({'media_entry': media_id}) + comments = request.db.MediaComment.query.filter_by(media_entry=media_id) assert count == len(list(comments)) def test_missing_fields(self): @@ -97,19 +108,40 @@ class TestSubmission: def check_normal_upload(self, title, filename): response, context = self.do_post({'title': title}, do_follow=True, **self.upload_data(filename)) - self.check_url(response, '/u/{0}/'.format(self.test_user.username)) + self.check_url(response, '/u/{0}/'.format(self.our_user().username)) assert 'mediagoblin/user_pages/user.html' in context # Make sure the media view is at least reachable, logged in... - url = '/u/{0}/m/{1}/'.format(self.test_user.username, + url = '/u/{0}/m/{1}/'.format(self.our_user().username, title.lower().replace(' ', '-')) self.test_app.get(url) # ... and logged out too. self.logout() self.test_app.get(url) + def user_upload_limits(self, uploaded=None, upload_limit=None): + our_user = self.our_user() + + if uploaded: + our_user.uploaded = uploaded + if upload_limit: + our_user.upload_limit = upload_limit + + our_user.save() + Session.expunge(our_user) + def test_normal_jpg(self): + # User uploaded should be 0 + assert self.our_user().uploaded == 0 + self.check_normal_upload(u'Normal upload 1', GOOD_JPG) + # User uploaded should be the same as GOOD_JPG size in Mb + file_size = os.stat(GOOD_JPG).st_size / (1024.0 * 1024) + file_size = float('{0:.2f}'.format(file_size)) + + # Reload user + assert self.our_user().uploaded == file_size + def test_normal_png(self): self.check_normal_upload(u'Normal upload 2', GOOD_PNG) @@ -118,11 +150,65 @@ class TestSubmission: response, context = self.do_post({'title': u'Normal upload 3 (pdf)'}, do_follow=True, **self.upload_data(GOOD_PDF)) - self.check_url(response, '/u/{0}/'.format(self.test_user.username)) + self.check_url(response, '/u/{0}/'.format(self.our_user().username)) + assert 'mediagoblin/user_pages/user.html' in context + + def test_default_upload_limits(self): + self.user_upload_limits(uploaded=500) + + # User uploaded should be 500 + assert self.our_user().uploaded == 500 + + response, context = self.do_post({'title': u'Normal upload 4'}, + do_follow=True, + **self.upload_data(GOOD_JPG)) + self.check_url(response, '/u/{0}/'.format(self.our_user().username)) assert 'mediagoblin/user_pages/user.html' in context + # Shouldn't have uploaded + assert self.our_user().uploaded == 500 + + def test_user_upload_limit(self): + self.user_upload_limits(uploaded=25, upload_limit=25) + + # User uploaded should be 25 + assert self.our_user().uploaded == 25 + + response, context = self.do_post({'title': u'Normal upload 5'}, + do_follow=True, + **self.upload_data(GOOD_JPG)) + self.check_url(response, '/u/{0}/'.format(self.our_user().username)) + assert 'mediagoblin/user_pages/user.html' in context + + # Shouldn't have uploaded + assert self.our_user().uploaded == 25 + + def test_user_under_limit(self): + self.user_upload_limits(uploaded=499) + + # User uploaded should be 499 + assert self.our_user().uploaded == 499 + + response, context = self.do_post({'title': u'Normal upload 6'}, + do_follow=False, + **self.upload_data(MED_PNG)) + form = context['mediagoblin/submit/start.html']['submit_form'] + assert form.file.errors == [u'Sorry, uploading this file will put you' + ' over your upload limit.'] + + # Shouldn't have uploaded + assert self.our_user().uploaded == 499 + + def test_big_file(self): + response, context = self.do_post({'title': u'Normal upload 7'}, + do_follow=False, + **self.upload_data(BIG_PNG)) + + form = context['mediagoblin/submit/start.html']['submit_form'] + assert form.file.errors == [u'Sorry, the file size is too big.'] + def check_media(self, request, find_data, count=None): - media = MediaEntry.find(find_data) + media = MediaEntry.query.filter_by(**find_data) if count is not None: assert media.count() == count if count == 0: @@ -155,6 +241,7 @@ class TestSubmission: 'ffffffffffffffffffffffffffuuuuuuuuuuuuuuuuuuuuuuuuuu'] def test_delete(self): + self.user_upload_limits(uploaded=50) response, request = self.do_post({'title': u'Balanced Goblin'}, *REQUEST_CONTEXT, do_follow=True, **self.upload_data(GOOD_JPG)) @@ -164,7 +251,7 @@ class TestSubmission: # render and post to the edit page. edit_url = request.urlgen( 'mediagoblin.edit.edit_media', - user=self.test_user.username, media_id=media_id) + user=self.our_user().username, media_id=media_id) self.test_app.get(edit_url) self.test_app.post(edit_url, {'title': u'Balanced Goblin', @@ -177,7 +264,7 @@ class TestSubmission: self.check_comments(request, media_id, 0) comment_url = request.urlgen( 'mediagoblin.user_pages.media_post_comment', - user=self.test_user.username, media_id=media_id) + user=self.our_user().username, media_id=media_id) response = self.do_post({'comment_content': 'i love this test'}, url=comment_url, do_follow=True)[0] self.check_comments(request, media_id, 1) @@ -186,7 +273,7 @@ class TestSubmission: # --------------------------------------------------- delete_url = request.urlgen( 'mediagoblin.user_pages.media_confirm_delete', - user=self.test_user.username, media_id=media_id) + user=self.our_user().username, media_id=media_id) # Empty data means don't confirm response = self.do_post({}, do_follow=True, url=delete_url)[0] media = self.check_media(request, {'title': u'Balanced Goblin'}, 1) @@ -199,6 +286,9 @@ class TestSubmission: self.check_media(request, {'id': media_id}, 0) self.check_comments(request, media_id, 0) + # Check that user.uploaded is the same as before the upload + assert self.our_user().uploaded == 50 + def test_evil_file(self): # Test non-suppoerted file with non-supported extension # ----------------------------------------------------- @@ -219,7 +309,7 @@ class TestSubmission: media = self.check_media(request, {'title': u'Balanced Goblin'}, 1) assert media.media_type == u'mediagoblin.media_types.image' - assert isinstance(media.media_manager, img_MEDIA_MANAGER) + assert isinstance(media.media_manager, ImageMediaManager) assert media.media_manager.entry == media @@ -240,8 +330,8 @@ class TestSubmission: request = context['request'] - media = request.db.MediaEntry.find_one({ - u'title': u'UNIQUE_TITLE_PLS_DONT_CREATE_OTHER_MEDIA_WITH_THIS_TITLE'}) + media = request.db.MediaEntry.query.filter_by( + title=u'UNIQUE_TITLE_PLS_DONT_CREATE_OTHER_MEDIA_WITH_THIS_TITLE').first() assert media.media_type == 'mediagoblin.media_types.image' @@ -251,8 +341,8 @@ class TestSubmission: # they'll be caught as failures during the processing step. response, context = self.do_post({'title': title}, do_follow=True, **self.upload_data(filename)) - self.check_url(response, '/u/{0}/'.format(self.test_user.username)) - entry = mg_globals.database.MediaEntry.find_one({'title': title}) + self.check_url(response, '/u/{0}/'.format(self.our_user().username)) + entry = mg_globals.database.MediaEntry.query.filter_by(title=title).first() assert entry.state == 'failed' assert entry.fail_error == u'mediagoblin.processing:BadMediaFail' diff --git a/mediagoblin/tests/test_submission/COPYING.txt b/mediagoblin/tests/test_submission/COPYING.txt new file mode 100644 index 00000000..3818aae4 --- /dev/null +++ b/mediagoblin/tests/test_submission/COPYING.txt @@ -0,0 +1,5 @@ +Images located in this directory tree are released under a GPLv3 license +and CC BY-SA 3.0 license. To the extent possible under law, the author(s) +have dedicated all copyright and related and neighboring rights to these +files to the public domain worldwide. These files are distributed without +any warranty. diff --git a/mediagoblin/tests/test_submission/big.png b/mediagoblin/tests/test_submission/big.png Binary files differnew file mode 100644 index 00000000..a284cfda --- /dev/null +++ b/mediagoblin/tests/test_submission/big.png diff --git a/mediagoblin/tests/test_submission/medium.png b/mediagoblin/tests/test_submission/medium.png Binary files differnew file mode 100644 index 00000000..e8b9ca00 --- /dev/null +++ b/mediagoblin/tests/test_submission/medium.png diff --git a/mediagoblin/tests/testplugins/modify_context/__init__.py b/mediagoblin/tests/testplugins/modify_context/__init__.py new file mode 100644 index 00000000..164e66c1 --- /dev/null +++ b/mediagoblin/tests/testplugins/modify_context/__init__.py @@ -0,0 +1,55 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from mediagoblin.tools import pluginapi +import pkg_resources + + +def append_to_specific_context(context): + context['specific_page_append'] = 'in yer specificpage' + return context + +def append_to_global_context(context): + context['global_append'] = 'globally appended!' + return context + +def double_doubleme(context): + if 'doubleme' in context: + context['doubleme'] = context['doubleme'] * 2 + return context + + +def setup_plugin(): + routes = [ + ('modify_context.specific_page', + '/modify_context/specific/', + 'mediagoblin.tests.testplugins.modify_context.views:specific'), + ('modify_context.general_page', + '/modify_context/', + 'mediagoblin.tests.testplugins.modify_context.views:general')] + + pluginapi.register_routes(routes) + pluginapi.register_template_path( + pkg_resources.resource_filename( + 'mediagoblin.tests.testplugins.modify_context', 'templates')) + + +hooks = { + 'setup': setup_plugin, + ('modify_context.specific_page', + 'contextplugin/specific.html'): append_to_specific_context, + 'template_global_context': append_to_global_context, + 'template_context_prerender': double_doubleme} diff --git a/mediagoblin/tests/testplugins/modify_context/templates/contextplugin/general.html b/mediagoblin/tests/testplugins/modify_context/templates/contextplugin/general.html new file mode 100644 index 00000000..9cf96d3e --- /dev/null +++ b/mediagoblin/tests/testplugins/modify_context/templates/contextplugin/general.html @@ -0,0 +1,5 @@ +General page! + +global thing: {{ global_append }} +lol: {{ lol }} +doubleme: {{ doubleme }} diff --git a/mediagoblin/tests/testplugins/modify_context/templates/contextplugin/specific.html b/mediagoblin/tests/testplugins/modify_context/templates/contextplugin/specific.html new file mode 100644 index 00000000..5b1b4c4a --- /dev/null +++ b/mediagoblin/tests/testplugins/modify_context/templates/contextplugin/specific.html @@ -0,0 +1,6 @@ +Specific page! + +specific thing: {{ specific_page_append }} +global thing: {{ global_append }} +something: {{ something }} +doubleme: {{ doubleme }} diff --git a/mediagoblin/tests/testplugins/modify_context/views.py b/mediagoblin/tests/testplugins/modify_context/views.py new file mode 100644 index 00000000..701ec6f9 --- /dev/null +++ b/mediagoblin/tests/testplugins/modify_context/views.py @@ -0,0 +1,33 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from mediagoblin.tools.response import render_to_response + + +def specific(request): + return render_to_response( + request, + 'contextplugin/specific.html', + {"something": "orother", + "doubleme": "happy"}) + + +def general(request): + return render_to_response( + request, + 'contextplugin/general.html', + {"lol": "cats", + "doubleme": "joy"}) diff --git a/mediagoblin/tests/testplugins/pluginspec/__init__.py b/mediagoblin/tests/testplugins/pluginspec/__init__.py new file mode 100644 index 00000000..76ca2b1f --- /dev/null +++ b/mediagoblin/tests/testplugins/pluginspec/__init__.py @@ -0,0 +1,22 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +def setup_plugin(): + pass + +hooks = { + 'setup': setup_plugin, +} diff --git a/mediagoblin/tests/testplugins/pluginspec/config_spec.ini b/mediagoblin/tests/testplugins/pluginspec/config_spec.ini new file mode 100644 index 00000000..5c9c3bd7 --- /dev/null +++ b/mediagoblin/tests/testplugins/pluginspec/config_spec.ini @@ -0,0 +1,4 @@ +[plugin_spec] +some_string = string(default="blork") +some_int = integer(default=50) +dont_change_me = string(default="still the default")
\ No newline at end of file diff --git a/mediagoblin/tests/testplugins/staticstuff/__init__.py b/mediagoblin/tests/testplugins/staticstuff/__init__.py new file mode 100644 index 00000000..a2591646 --- /dev/null +++ b/mediagoblin/tests/testplugins/staticstuff/__init__.py @@ -0,0 +1,36 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from mediagoblin.tools.staticdirect import PluginStatic +from mediagoblin.tools import pluginapi +from pkg_resources import resource_filename + +def setup_plugin(): + routes = [ + ('staticstuff.static_demo', + '/staticstuff/', + 'mediagoblin.tests.testplugins.staticstuff.views:static_demo')] + + pluginapi.register_routes(routes) + + +hooks = { + 'setup': setup_plugin, + 'static_setup': lambda: PluginStatic( + 'staticstuff', + resource_filename( + 'mediagoblin.tests.testplugins.staticstuff', + 'static'))} diff --git a/mediagoblin/tests/testplugins/staticstuff/static/css/bunnify.css b/mediagoblin/tests/testplugins/staticstuff/static/css/bunnify.css new file mode 100644 index 00000000..1294ab8a --- /dev/null +++ b/mediagoblin/tests/testplugins/staticstuff/static/css/bunnify.css @@ -0,0 +1,4 @@ +body { + background-color: #5edcf1; + color: #eb8add; +}
\ No newline at end of file diff --git a/mediagoblin/tests/testplugins/staticstuff/views.py b/mediagoblin/tests/testplugins/staticstuff/views.py new file mode 100644 index 00000000..34a5e8cb --- /dev/null +++ b/mediagoblin/tests/testplugins/staticstuff/views.py @@ -0,0 +1,28 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import json + +from werkzeug import Response + + +def static_demo(request): + return Response(json.dumps({ + # this does not exist, but we'll pretend it does ;) + 'mgoblin_bunny_pic': request.staticdirect( + 'images/bunny_pic.png'), + 'plugin_bunny_css': request.staticdirect( + 'css/bunnify.css', 'staticstuff')})) diff --git a/mediagoblin/tests/tools.py b/mediagoblin/tests/tools.py index 52635e18..060dfda9 100644 --- a/mediagoblin/tests/tools.py +++ b/mediagoblin/tests/tools.py @@ -15,25 +15,25 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. -import sys import os import pkg_resources import shutil -from functools import wraps from paste.deploy import loadapp from webtest import TestApp from mediagoblin import mg_globals -from mediagoblin.db.models import User, MediaEntry, Collection +from mediagoblin.db.models import User, MediaEntry, Collection, MediaComment, \ + CommentSubscription, CommentNotification, Privilege, CommentReport from mediagoblin.tools import testing from mediagoblin.init.config import read_mediagoblin_config from mediagoblin.db.base import Session from mediagoblin.meddleware import BaseMeddleware -from mediagoblin.auth.lib import bcrypt_gen_password_hash +from mediagoblin.auth import gen_password_hash from mediagoblin.gmg_commands.dbupdate import run_dbupdate -from mediagoblin.init.celery import setup_celery_app + +from datetime import datetime MEDIAGOBLIN_TEST_DB_NAME = u'__mediagoblin_tests__' @@ -41,22 +41,10 @@ TEST_SERVER_CONFIG = pkg_resources.resource_filename( 'mediagoblin.tests', 'test_paste.ini') TEST_APP_CONFIG = pkg_resources.resource_filename( 'mediagoblin.tests', 'test_mgoblin_app.ini') -TEST_USER_DEV = pkg_resources.resource_filename( - 'mediagoblin.tests', 'test_user_dev') USER_DEV_DIRECTORIES_TO_SETUP = ['media/public', 'media/queue'] -BAD_CELERY_MESSAGE = """\ -Sorry, you *absolutely* must run tests with the -mediagoblin.init.celery.from_tests module. Like so: - -$ CELERY_CONFIG_MODULE=mediagoblin.init.celery.from_tests {0} -""".format(sys.argv[0]) - - -class BadCeleryEnviron(Exception): pass - class TestingMeddleware(BaseMeddleware): """ @@ -97,12 +85,6 @@ class TestingMeddleware(BaseMeddleware): return -def suicide_if_bad_celery_environ(): - if not os.environ.get('CELERY_CONFIG_MODULE') == \ - 'mediagoblin.init.celery.from_tests': - raise BadCeleryEnviron(BAD_CELERY_MESSAGE) - - def get_app(request, paste_config=None, mgoblin_config=None): """Create a MediaGoblin app for testing. @@ -120,21 +102,13 @@ def get_app(request, paste_config=None, mgoblin_config=None): # This is the directory we're copying the paste/mgoblin config stuff into run_dir = request.config._tmpdirhandler.mktemp( 'mgoblin_app', numbered=True) - user_dev_dir = run_dir.mkdir('test_user_dev').strpath + user_dev_dir = run_dir.mkdir('user_dev').strpath new_paste_config = run_dir.join('paste.ini').strpath new_mgoblin_config = run_dir.join('mediagoblin.ini').strpath shutil.copyfile(paste_config, new_paste_config) shutil.copyfile(mgoblin_config, new_mgoblin_config) - suicide_if_bad_celery_environ() - - # Make sure we've turned on testing - testing._activate_testing() - - # Leave this imported as it sets up celery. - from mediagoblin.init.celery import from_tests - Session.rollback() Session.remove() @@ -154,9 +128,6 @@ def get_app(request, paste_config=None, mgoblin_config=None): test_app = loadapp( 'config:' + new_paste_config) - # Re-setup celery - setup_celery_app(app_config, global_config) - # Insert the TestingMeddleware, which can do some # sanity checks on every request/response. # Doing it this way is probably not the cleanest way. @@ -164,7 +135,6 @@ def get_app(request, paste_config=None, mgoblin_config=None): mg_globals.app.meddleware.insert(0, TestingMeddleware(mg_globals.app)) app = TestApp(test_app) - return app @@ -195,13 +165,13 @@ def assert_db_meets_expected(db, expected): for collection_name, collection_data in expected.iteritems(): collection = db[collection_name] for expected_document in collection_data: - document = collection.find_one({'id': expected_document['id']}) + document = collection.query.filter_by(id=expected_document['id']).first() assert document is not None # make sure it exists assert document == expected_document # make sure it matches def fixture_add_user(username=u'chris', password=u'toast', - active_user=True): + privileges=[], wants_comment_notification=True): # Reuse existing user or create a new one test_user = User.query.filter_by(username=username).first() if test_user is None: @@ -209,13 +179,14 @@ def fixture_add_user(username=u'chris', password=u'toast', test_user.username = username test_user.email = username + u'@example.com' if password is not None: - test_user.pw_hash = bcrypt_gen_password_hash(password) - if active_user: - test_user.email_verified = True - test_user.status = u'active' + test_user.pw_hash = gen_password_hash(password) + test_user.wants_comment_notification = wants_comment_notification + for privilege in privileges: + query = Privilege.query.filter(Privilege.privilege_name==privilege) + if query.count(): + test_user.all_privileges.append(query.one()) test_user.save() - # Reload test_user = User.query.filter_by(username=username).first() @@ -225,19 +196,79 @@ def fixture_add_user(username=u'chris', password=u'toast', return test_user +def fixture_comment_subscription(entry, notify=True, send_email=None): + if send_email is None: + uploader = User.query.filter_by(id=entry.uploader).first() + send_email = uploader.wants_comment_notification + + cs = CommentSubscription( + media_entry_id=entry.id, + user_id=entry.uploader, + notify=notify, + send_email=send_email) + + cs.save() + + cs = CommentSubscription.query.filter_by(id=cs.id).first() + + Session.expunge(cs) + + return cs + + +def fixture_add_comment_notification(entry_id, subject_id, user_id, + seen=False): + cn = CommentNotification(user_id=user_id, + seen=seen, + subject_id=subject_id) + cn.save() + + cn = CommentNotification.query.filter_by(id=cn.id).first() + + Session.expunge(cn) + + return cn + + def fixture_media_entry(title=u"Some title", slug=None, - uploader=None, save=True, gen_slug=True): + uploader=None, save=True, gen_slug=True, + state=u'unprocessed', fake_upload=True, + expunge=True): + """ + Add a media entry for testing purposes. + + Caution: if you're adding multiple entries with fake_upload=True, + make sure you save between them... otherwise you'll hit an + IntegrityError from multiple newly-added-MediaEntries adding + FileKeynames at once. :) + """ + if uploader is None: + uploader = fixture_add_user().id + entry = MediaEntry() entry.title = title entry.slug = slug - entry.uploader = uploader or fixture_add_user().id + entry.uploader = uploader entry.media_type = u'image' + entry.state = state + + if fake_upload: + entry.media_files = {'thumb': ['a', 'b', 'c.jpg'], + 'medium': ['d', 'e', 'f.png'], + 'original': ['g', 'h', 'i.png']} + entry.media_type = u'mediagoblin.media_types.image' if gen_slug: entry.generate_slug() + if save: entry.save() + if expunge: + entry = MediaEntry.query.filter_by(id=entry.id).first() + + Session.expunge(entry) + return entry @@ -260,3 +291,55 @@ def fixture_add_collection(name=u"My first Collection", user=None): Session.expunge(coll) return coll + +def fixture_add_comment(author=None, media_entry=None, comment=None): + if author is None: + author = fixture_add_user().id + + if media_entry is None: + media_entry = fixture_media_entry().id + + if comment is None: + comment = \ + 'Auto-generated test comment by user #{0} on media #{0}'.format( + author, media_entry) + + comment = MediaComment(author=author, + media_entry=media_entry, + content=comment) + + comment.save() + + Session.expunge(comment) + + return comment + +def fixture_add_comment_report(comment=None, reported_user=None, + reporter=None, created=None, report_content=None): + if comment is None: + comment = fixture_add_comment() + + if reported_user is None: + reported_user = fixture_add_user() + + if reporter is None: + reporter = fixture_add_user() + + if created is None: + created=datetime.now() + + if report_content is None: + report_content = \ + 'Auto-generated test report' + + comment_report = CommentReport(comment=comment, + reported_user = reported_user, + reporter = reporter, + created = created, + report_content=report_content) + + comment_report.save() + + Session.expunge(comment_report) + + return comment_report |