diff options
Diffstat (limited to 'mediagoblin/tests')
23 files changed, 1076 insertions, 103 deletions
diff --git a/mediagoblin/tests/auth_configs/authentication_disabled_appconfig.ini b/mediagoblin/tests/auth_configs/authentication_disabled_appconfig.ini index a64e9e40..07c69442 100644 --- a/mediagoblin/tests/auth_configs/authentication_disabled_appconfig.ini +++ b/mediagoblin/tests/auth_configs/authentication_disabled_appconfig.ini @@ -3,8 +3,8 @@ direct_remote_path = /test_static/ email_sender_address = "notice@mediagoblin.example.org" email_debug_mode = true -# TODO: Switch to using an in-memory database -sql_engine = "sqlite:///%(here)s/user_dev/mediagoblin.db" +sql_engine = "sqlite://" +run_migrations = true # Celery shouldn't be set up by the application as it's setup via # mediagoblin.init.celery.from_celery diff --git a/mediagoblin/tests/auth_configs/ldap_appconfig.ini b/mediagoblin/tests/auth_configs/ldap_appconfig.ini new file mode 100644 index 00000000..9be37e17 --- /dev/null +++ b/mediagoblin/tests/auth_configs/ldap_appconfig.ini @@ -0,0 +1,41 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +[mediagoblin] +direct_remote_path = /test_static/ +email_sender_address = "notice@mediagoblin.example.org" +email_debug_mode = true + +sql_engine = "sqlite://" +run_migrations = true + +# Celery shouldn't be set up by the application as it's setup via +# mediagoblin.init.celery.from_celery +celery_setup_elsewhere = true + +[storage:publicstore] +base_dir = %(here)s/user_dev/media/public +base_url = /mgoblin_media/ + +[storage:queuestore] +base_dir = %(here)s/user_dev/media/queue + +[celery] +CELERY_ALWAYS_EAGER = true +CELERY_RESULT_DBURI = "sqlite:///%(here)s/user_dev/celery.db" +BROKER_HOST = "sqlite:///%(here)s/user_dev/kombu.db" + +[plugins] +[[mediagoblin.plugins.ldap]] diff --git a/mediagoblin/tests/auth_configs/openid_appconfig.ini b/mediagoblin/tests/auth_configs/openid_appconfig.ini index c2bd82fd..3433e139 100644 --- a/mediagoblin/tests/auth_configs/openid_appconfig.ini +++ b/mediagoblin/tests/auth_configs/openid_appconfig.ini @@ -18,8 +18,8 @@ direct_remote_path = /test_static/ email_sender_address = "notice@mediagoblin.example.org" email_debug_mode = true -# TODO: Switch to using an in-memory database -sql_engine = "sqlite:///%(here)s/user_dev/mediagoblin.db" +sql_engine = "sqlite://" +run_migrations = true # Celery shouldn't be set up by the application as it's setup via # mediagoblin.init.celery.from_celery diff --git a/mediagoblin/tests/resources.py b/mediagoblin/tests/resources.py index f7b3037d..480f6d9a 100644 --- a/mediagoblin/tests/resources.py +++ b/mediagoblin/tests/resources.py @@ -29,6 +29,8 @@ EVIL_JPG = resource('evil.jpg') EVIL_PNG = resource('evil.png') BIG_BLUE = resource('bigblue.png') GOOD_PDF = resource('good.pdf') +MED_PNG = resource('medium.png') +BIG_PNG = resource('big.png') def resource_exif(f): diff --git a/mediagoblin/tests/test_api.py b/mediagoblin/tests/test_api.py index 89cf1026..4e0cbd8f 100644 --- a/mediagoblin/tests/test_api.py +++ b/mediagoblin/tests/test_api.py @@ -35,7 +35,8 @@ class TestAPI(object): self.db = mg_globals.database self.user_password = u'4cc355_70k3N' - self.user = fixture_add_user(u'joapi', self.user_password) + self.user = fixture_add_user(u'joapi', self.user_password, + privileges=[u'active',u'uploader']) def login(self, test_app): test_app.post( diff --git a/mediagoblin/tests/test_auth.py b/mediagoblin/tests/test_auth.py index 61503d32..1bbc3d01 100644 --- a/mediagoblin/tests/test_auth.py +++ b/mediagoblin/tests/test_auth.py @@ -1,3 +1,4 @@ + # GNU MediaGoblin -- federated, autonomous media hosting # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. # @@ -83,31 +84,35 @@ def test_register_views(test_app): template.clear_test_template_context() response = test_app.post( '/auth/register/', { - 'username': u'happygirl', - 'password': 'iamsohappy', - 'email': 'happygrrl@example.org'}) + 'username': u'angrygirl', + 'password': 'iamsoangry', + 'email': 'angrygrrl@example.org'}) response.follow() ## Did we redirect to the proper page? Use the right template? - assert urlparse.urlsplit(response.location)[2] == '/u/happygirl/' - assert 'mediagoblin/user_pages/user.html' in template.TEMPLATE_TEST_CONTEXT + assert urlparse.urlsplit(response.location)[2] == '/u/angrygirl/' + assert 'mediagoblin/user_pages/user_nonactive.html' in template.TEMPLATE_TEST_CONTEXT ## Make sure user is in place new_user = mg_globals.database.User.query.filter_by( - username=u'happygirl').first() + username=u'angrygirl').first() assert new_user - assert new_user.status == u'needs_email_verification' - assert new_user.email_verified == False + ## Make sure that the proper privileges are granted on registration + + assert new_user.has_privilege(u'commenter') + assert new_user.has_privilege(u'uploader') + assert new_user.has_privilege(u'reporter') + assert not new_user.has_privilege(u'active') ## Make sure user is logged in request = template.TEMPLATE_TEST_CONTEXT[ - 'mediagoblin/user_pages/user.html']['request'] + 'mediagoblin/user_pages/user_nonactive.html']['request'] assert request.session['user_id'] == unicode(new_user.id) ## Make sure we get email confirmation, and try verifying assert len(mail.EMAIL_TEST_INBOX) == 1 message = mail.EMAIL_TEST_INBOX.pop() - assert message['To'] == 'happygrrl@example.org' + assert message['To'] == 'angrygrrl@example.org' email_context = template.TEMPLATE_TEST_CONTEXT[ 'mediagoblin/auth/verification_email.txt'] assert email_context['verification_url'] in message.get_payload(decode=True) @@ -129,10 +134,8 @@ def test_register_views(test_app): # assert context['verification_successful'] == True # TODO: Would be good to test messages here when we can do so... new_user = mg_globals.database.User.query.filter_by( - username=u'happygirl').first() + username=u'angrygirl').first() assert new_user - assert new_user.status == u'needs_email_verification' - assert new_user.email_verified == False ## Verify the email activation works template.clear_test_template_context() @@ -143,10 +146,8 @@ def test_register_views(test_app): # assert context['verification_successful'] == True # TODO: Would be good to test messages here when we can do so... new_user = mg_globals.database.User.query.filter_by( - username=u'happygirl').first() + username=u'angrygirl').first() assert new_user - assert new_user.status == u'active' - assert new_user.email_verified == True # Uniqueness checks # ----------------- @@ -154,9 +155,9 @@ def test_register_views(test_app): template.clear_test_template_context() response = test_app.post( '/auth/register/', { - 'username': u'happygirl', - 'password': 'iamsohappy2', - 'email': 'happygrrl2@example.org'}) + 'username': u'angrygirl', + 'password': 'iamsoangry2', + 'email': 'angrygrrl2@example.org'}) context = template.TEMPLATE_TEST_CONTEXT[ 'mediagoblin/auth/register.html'] @@ -171,7 +172,7 @@ def test_register_views(test_app): template.clear_test_template_context() response = test_app.post( '/auth/forgot_password/', - {'username': u'happygirl'}) + {'username': u'angrygirl'}) response.follow() ## Did we redirect to the proper page? Use the right template? @@ -181,9 +182,9 @@ def test_register_views(test_app): ## Make sure link to change password is sent by email assert len(mail.EMAIL_TEST_INBOX) == 1 message = mail.EMAIL_TEST_INBOX.pop() - assert message['To'] == 'happygrrl@example.org' + assert message['To'] == 'angrygrrl@example.org' email_context = template.TEMPLATE_TEST_CONTEXT[ - 'mediagoblin/auth/fp_verification_email.txt'] + 'mediagoblin/plugins/basic_auth/fp_verification_email.txt'] #TODO - change the name of verification_url to something forgot-password-ish assert email_context['verification_url'] in message.get_payload(decode=True) @@ -204,13 +205,14 @@ def test_register_views(test_app): ## Verify step 1 of password-change works -- can see form to change password template.clear_test_template_context() response = test_app.get("%s?%s" % (path, get_params)) - assert 'mediagoblin/auth/change_fp.html' in template.TEMPLATE_TEST_CONTEXT + assert 'mediagoblin/plugins/basic_auth/change_fp.html' in \ + template.TEMPLATE_TEST_CONTEXT ## Verify step 2.1 of password-change works -- report success to user template.clear_test_template_context() response = test_app.post( '/auth/forgot_password/verify/', { - 'password': 'iamveryveryhappy', + 'password': 'iamveryveryangry', 'token': parsed_get_params['token']}) response.follow() assert 'mediagoblin/auth/login.html' in template.TEMPLATE_TEST_CONTEXT @@ -219,8 +221,8 @@ def test_register_views(test_app): template.clear_test_template_context() response = test_app.post( '/auth/login/', { - 'username': u'happygirl', - 'password': 'iamveryveryhappy'}) + 'username': u'angrygirl', + 'password': 'iamveryveryangry'}) # User should be redirected response.follow() @@ -233,7 +235,7 @@ def test_authentication_views(test_app): Test logging in and logging out """ # Make a new user - test_user = fixture_add_user(active_user=False) + test_user = fixture_add_user() # Get login @@ -330,7 +332,6 @@ def test_authentication_views(test_app): 'next' : '/u/chris/'}) assert urlparse.urlsplit(response.location)[2] == '/u/chris/' - @pytest.fixture() def authentication_disabled_app(request): return get_app( @@ -342,6 +343,7 @@ def authentication_disabled_app(request): def test_authentication_disabled_app(authentication_disabled_app): # app.auth should = false + assert mg_globals assert mg_globals.app.auth is False # Try to visit register page diff --git a/mediagoblin/tests/test_basic_auth.py b/mediagoblin/tests/test_basic_auth.py index cdd80fca..828f0515 100644 --- a/mediagoblin/tests/test_basic_auth.py +++ b/mediagoblin/tests/test_basic_auth.py @@ -13,7 +13,12 @@ # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +import urlparse + +from mediagoblin.db.models import User from mediagoblin.plugins.basic_auth import tools as auth_tools +from mediagoblin.tests.tools import fixture_add_user +from mediagoblin.tools import template from mediagoblin.tools.testing import _activate_testing _activate_testing() @@ -57,3 +62,42 @@ def test_bcrypt_gen_password_hash(): pw, hashed_pw, '3><7R45417') assert not auth_tools.bcrypt_check_password( 'notthepassword', hashed_pw, '3><7R45417') + + +def test_change_password(test_app): + """Test changing password correctly and incorrectly""" + test_user = fixture_add_user( + password=u'toast', + privileges=[u'active']) + + test_app.post( + '/auth/login/', { + 'username': u'chris', + 'password': u'toast'}) + + # test that the password can be changed + res = test_app.post( + '/edit/password/', { + 'old_password': 'toast', + 'new_password': '123456', + }) + res.follow() + + # Did we redirect to the correct page? + assert urlparse.urlsplit(res.location)[2] == '/edit/account/' + + # test_user has to be fetched again in order to have the current values + test_user = User.query.filter_by(username=u'chris').first() + assert auth_tools.bcrypt_check_password('123456', test_user.pw_hash) + + # test that the password cannot be changed if the given + # old_password is wrong + template.clear_test_template_context() + test_app.post( + '/edit/password/', { + 'old_password': 'toast', + 'new_password': '098765', + }) + + test_user = User.query.filter_by(username=u'chris').first() + assert not auth_tools.bcrypt_check_password('098765', test_user.pw_hash) diff --git a/mediagoblin/tests/test_edit.py b/mediagoblin/tests/test_edit.py index c43a3a42..4f44e0b9 100644 --- a/mediagoblin/tests/test_edit.py +++ b/mediagoblin/tests/test_edit.py @@ -27,7 +27,8 @@ class TestUserEdit(object): def setup(self): # set up new user self.user_password = u'toast' - self.user = fixture_add_user(password = self.user_password) + self.user = fixture_add_user(password = self.user_password, + privileges=[u'active']) def login(self, test_app): test_app.post( @@ -52,45 +53,11 @@ class TestUserEdit(object): # deleted too. Perhaps in submission test? #Restore user at end of test - self.user = fixture_add_user(password = self.user_password) + self.user = fixture_add_user(password = self.user_password, + privileges=[u'active']) self.login(test_app) - def test_change_password(self, test_app): - """Test changing password correctly and incorrectly""" - self.login(test_app) - - # test that the password can be changed - template.clear_test_template_context() - res = test_app.post( - '/edit/password/', { - 'old_password': 'toast', - 'new_password': '123456', - }) - res.follow() - - # Did we redirect to the correct page? - assert urlparse.urlsplit(res.location)[2] == '/edit/account/' - - # test_user has to be fetched again in order to have the current values - test_user = User.query.filter_by(username=u'chris').first() - assert auth.check_password('123456', test_user.pw_hash) - # Update current user passwd - self.user_password = '123456' - - # test that the password cannot be changed if the given - # old_password is wrong - template.clear_test_template_context() - test_app.post( - '/edit/password/', { - 'old_password': 'toast', - 'new_password': '098765', - }) - - test_user = User.query.filter_by(username=u'chris').first() - assert not auth.check_password('098765', test_user.pw_hash) - - def test_change_bio_url(self, test_app): """Test changing bio and URL""" self.login(test_app) @@ -115,7 +82,8 @@ class TestUserEdit(object): assert test_user.url == u'http://dustycloud.org/' # change a different user than the logged in (should fail with 403) - fixture_add_user(username=u"foo") + fixture_add_user(username=u"foo", + privileges=[u'active']) res = test_app.post( '/u/foo/edit/', { 'bio': u'I love toast!', diff --git a/mediagoblin/tests/test_ldap.py b/mediagoblin/tests/test_ldap.py new file mode 100644 index 00000000..48efb4b6 --- /dev/null +++ b/mediagoblin/tests/test_ldap.py @@ -0,0 +1,125 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +import urlparse +import pkg_resources +import pytest +import mock + +from mediagoblin import mg_globals +from mediagoblin.db.base import Session +from mediagoblin.tests.tools import get_app +from mediagoblin.tools import template + +pytest.importorskip("ldap") + + +@pytest.fixture() +def ldap_plugin_app(request): + return get_app( + request, + mgoblin_config=pkg_resources.resource_filename( + 'mediagoblin.tests.auth_configs', + 'ldap_appconfig.ini')) + + +def return_value(): + return u'chris', u'chris@example.com' + + +def test_ldap_plugin(ldap_plugin_app): + res = ldap_plugin_app.get('/auth/login/') + + assert urlparse.urlsplit(res.location)[2] == '/auth/ldap/login/' + + res = ldap_plugin_app.get('/auth/register/') + + assert urlparse.urlsplit(res.location)[2] == '/auth/ldap/register/' + + res = ldap_plugin_app.get('/auth/ldap/register/') + + assert urlparse.urlsplit(res.location)[2] == '/auth/ldap/login/' + + template.clear_test_template_context() + res = ldap_plugin_app.post( + '/auth/ldap/login/', {}) + + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/login.html'] + form = context['login_form'] + assert form.username.errors == [u'This field is required.'] + assert form.password.errors == [u'This field is required.'] + + @mock.patch('mediagoblin.plugins.ldap.tools.LDAP.login', mock.Mock(return_value=return_value())) + def _test_authentication(): + template.clear_test_template_context() + res = ldap_plugin_app.post( + '/auth/ldap/login/', + {'username': u'chris', + 'password': u'toast'}) + + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html'] + register_form = context['register_form'] + + assert register_form.username.data == u'chris' + assert register_form.email.data == u'chris@example.com' + + template.clear_test_template_context() + res = ldap_plugin_app.post( + '/auth/ldap/register/', + {'username': u'chris', + 'email': u'chris@example.com'}) + res.follow() + + assert urlparse.urlsplit(res.location)[2] == '/u/chris/' + assert 'mediagoblin/user_pages/user_nonactive.html' in template.TEMPLATE_TEST_CONTEXT + + # Try to register with same email and username + template.clear_test_template_context() + res = ldap_plugin_app.post( + '/auth/ldap/register/', + {'username': u'chris', + 'email': u'chris@example.com'}) + + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html'] + register_form = context['register_form'] + + assert register_form.email.errors == [u'Sorry, a user with that email address already exists.'] + assert register_form.username.errors == [u'Sorry, a user with that name already exists.'] + + # Log out + ldap_plugin_app.get('/auth/logout/') + + # Get user and detach from session + test_user = mg_globals.database.User.query.filter_by( + username=u'chris').first() + Session.expunge(test_user) + + # Log back in + template.clear_test_template_context() + res = ldap_plugin_app.post( + '/auth/ldap/login/', + {'username': u'chris', + 'password': u'toast'}) + res.follow() + + assert urlparse.urlsplit(res.location)[2] == '/' + assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT + + # Make sure user is in the session + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html'] + session = context['request'].session + assert session['user_id'] == unicode(test_user.id) + + _test_authentication() diff --git a/mediagoblin/tests/test_mgoblin_app.ini b/mediagoblin/tests/test_mgoblin_app.ini index da0dffb9..4cd3d9b6 100644 --- a/mediagoblin/tests/test_mgoblin_app.ini +++ b/mediagoblin/tests/test_mgoblin_app.ini @@ -13,6 +13,10 @@ tags_max_length = 50 # So we can start to test attachments: allow_attachments = True +upload_limit = 500 + +max_file_size = 2 + [storage:publicstore] base_dir = %(here)s/user_dev/media/public base_url = /mgoblin_media/ diff --git a/mediagoblin/tests/test_modelmethods.py b/mediagoblin/tests/test_modelmethods.py index 427aa47c..86513c76 100644 --- a/mediagoblin/tests/test_modelmethods.py +++ b/mediagoblin/tests/test_modelmethods.py @@ -18,7 +18,7 @@ # methods, and so it makes sense to test them here. from mediagoblin.db.base import Session -from mediagoblin.db.models import MediaEntry +from mediagoblin.db.models import MediaEntry, User, Privilege from mediagoblin.tests.tools import fixture_add_user @@ -47,7 +47,7 @@ class TestMediaEntrySlugs(object): entry.id = this_id entry.uploader = uploader or self.chris_user.id entry.media_type = u'image' - + if save: entry.save() @@ -99,7 +99,7 @@ class TestMediaEntrySlugs(object): u"Beware, I exist!!", this_id=9000, save=False) entry.generate_slug() assert entry.slug == u"beware-i-exist-test" - + _real_test() def test_existing_slug_cant_use_id_extra_junk(self, test_app): @@ -151,6 +151,44 @@ class TestMediaEntrySlugs(object): qbert_entry.generate_slug() assert qbert_entry.slug is None +class TestUserHasPrivilege: + def _setup(self): + fixture_add_user(u'natalie', + privileges=[u'admin',u'moderator',u'active']) + fixture_add_user(u'aeva', + privileges=[u'moderator',u'active']) + self.natalie_user = User.query.filter( + User.username==u'natalie').first() + self.aeva_user = User.query.filter( + User.username==u'aeva').first() + + def test_privilege_added_correctly(self, test_app): + self._setup() + admin = Privilege.query.filter( + Privilege.privilege_name == u'admin').one() + # first make sure the privileges were added successfully + + assert admin in self.natalie_user.all_privileges + assert admin not in self.aeva_user.all_privileges + + def test_user_has_privilege_one(self, test_app): + self._setup() + + # then test out the user.has_privilege method for one privilege + assert not self.natalie_user.has_privilege(u'commenter') + assert self.aeva_user.has_privilege(u'active') + + + def test_user_has_privileges_multiple(self, test_app): + self._setup() + + # when multiple args are passed to has_privilege, the method returns + # True if the user has ANY of the privileges + assert self.natalie_user.has_privilege(u'admin',u'commenter') + assert self.aeva_user.has_privilege(u'moderator',u'active') + assert not self.natalie_user.has_privilege(u'commenter',u'uploader') + + def test_media_data_init(test_app): Session.rollback() @@ -165,3 +203,4 @@ def test_media_data_init(test_app): obj_in_session += 1 print repr(obj) assert obj_in_session == 0 + diff --git a/mediagoblin/tests/test_moderation.py b/mediagoblin/tests/test_moderation.py new file mode 100644 index 00000000..e7a0ebef --- /dev/null +++ b/mediagoblin/tests/test_moderation.py @@ -0,0 +1,242 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import pytest + +from mediagoblin.tests.tools import (fixture_add_user, + fixture_add_comment_report, fixture_add_comment) +from mediagoblin.db.models import User, CommentReport, MediaComment, UserBan +from mediagoblin.tools import template, mail +from webtest import AppError + +class TestModerationViews: + @pytest.fixture(autouse=True) + def _setup(self, test_app): + self.test_app = test_app + + fixture_add_user(u'admin', + privileges=[u'admin',u'active']) + fixture_add_user(u'moderator', + privileges=[u'moderator',u'active']) + fixture_add_user(u'regular', + privileges=[u'active',u'commenter']) + self.query_for_users() + + def login(self, username): + self.test_app.post( + '/auth/login/', { + 'username': username, + 'password': 'toast'}) + self.query_for_users() + + def logout(self): + self.test_app.get('/auth/logout/') + self.query_for_users() + + def query_for_users(self): + self.admin_user = User.query.filter(User.username==u'admin').first() + self.mod_user = User.query.filter(User.username==u'moderator').first() + self.user = User.query.filter(User.username==u'regular').first() + + def do_post(self, data, *context_keys, **kwargs): + url = kwargs.pop('url', '/submit/') + do_follow = kwargs.pop('do_follow', False) + template.clear_test_template_context() + response = self.test_app.post(url, data, **kwargs) + if do_follow: + response.follow() + context_data = template.TEMPLATE_TEST_CONTEXT + for key in context_keys: + context_data = context_data[key] + return response, context_data + + def testGiveOrTakeAwayPrivileges(self): + self.login(u'admin') + # First, test an admin taking away a privilege from a user + #---------------------------------------------------------------------- + response, context = self.do_post({'privilege_name':u'commenter'}, + url='/mod/users/{0}/privilege/'.format(self.user.username)) + assert response.status == '302 FOUND' + self.query_for_users() + assert not self.user.has_privilege(u'commenter') + + # Then, test an admin giving a privilege to a user + #---------------------------------------------------------------------- + response, context = self.do_post({'privilege_name':u'commenter'}, + url='/mod/users/{0}/privilege/'.format(self.user.username)) + assert response.status == '302 FOUND' + self.query_for_users() + assert self.user.has_privilege(u'commenter') + + # Then, test a mod trying to take away a privilege from a user + # they are not allowed to do this, so this will raise an error + #---------------------------------------------------------------------- + self.logout() + self.login(u'moderator') + + with pytest.raises(AppError) as excinfo: + response, context = self.do_post({'privilege_name':u'commenter'}, + url='/mod/users/{0}/privilege/'.format(self.user.username)) + assert 'Bad response: 403 FORBIDDEN' in str(excinfo) + self.query_for_users() + + assert self.user.has_privilege(u'commenter') + + def testReportResolution(self): + self.login(u'moderator') + + # First, test a moderators taking away a user's privilege in response + # to a reported comment + #---------------------------------------------------------------------- + fixture_add_comment_report(reported_user=self.user) + comment_report = CommentReport.query.filter( + CommentReport.reported_user==self.user).first() + + response = self.test_app.get('/mod/reports/{0}/'.format( + comment_report.id)) + assert response.status == '200 OK' + self.query_for_users() + comment_report = CommentReport.query.filter( + CommentReport.reported_user==self.user).first() + + response, context = self.do_post({'action_to_resolve':[u'takeaway'], + 'take_away_privileges':[u'commenter'], + 'targeted_user':self.user.id}, + url='/mod/reports/{0}/'.format(comment_report.id)) + + self.query_for_users() + comment_report = CommentReport.query.filter( + CommentReport.reported_user==self.user).first() + assert response.status == '302 FOUND' + assert not self.user.has_privilege(u'commenter') + assert comment_report.is_archived_report() is True + + fixture_add_comment_report(reported_user=self.user) + comment_report = CommentReport.query.filter( + CommentReport.reported_user==self.user).first() + + # Then, test a moderator sending an email to a user in response to a + # reported comment + #---------------------------------------------------------------------- + self.query_for_users() + + response, context = self.do_post({'action_to_resolve':[u'sendmessage'], + 'message_to_user':'This is your last warning, regular....', + 'targeted_user':self.user.id}, + url='/mod/reports/{0}/'.format(comment_report.id)) + + self.query_for_users() + comment_report = CommentReport.query.filter( + CommentReport.reported_user==self.user).first() + assert response.status == '302 FOUND' + assert mail.EMAIL_TEST_MBOX_INBOX == [{'to': [u'regular@example.com'], + 'message': 'Content-Type: text/plain; charset="utf-8"\n\ +MIME-Version: 1.0\nContent-Transfer-Encoding: base64\nSubject: Warning from- \ +moderator \nFrom: notice@mediagoblin.example.org\nTo: regular@example.com\n\n\ +VGhpcyBpcyB5b3VyIGxhc3Qgd2FybmluZywgcmVndWxhci4uLi4=\n', + 'from': 'notice@mediagoblin.example.org'}] + assert comment_report.is_archived_report() is True + + # Then test a moderator banning a user AND a moderator deleting the + # offending comment. This also serves as a test for taking multiple + # actions to resolve a report + #---------------------------------------------------------------------- + self.query_for_users() + fixture_add_comment(author=self.user.id, + comment=u'Comment will be removed') + test_comment = MediaComment.query.filter( + MediaComment.author==self.user.id).first() + fixture_add_comment_report(comment=test_comment, + reported_user=self.user) + comment_report = CommentReport.query.filter( + CommentReport.comment==test_comment).filter( + CommentReport.resolved==None).first() + + response, context = self.do_post( + {'action_to_resolve':[u'userban', u'delete'], + 'targeted_user':self.user.id, + 'why_user_was_banned':u'', + 'user_banned_until':u''}, + url='/mod/reports/{0}/'.format(comment_report.id)) + assert response.status == '302 FOUND' + self.query_for_users() + test_user_ban = UserBan.query.filter( + UserBan.user_id == self.user.id).first() + assert test_user_ban is not None + test_comment = MediaComment.query.filter( + MediaComment.author==self.user.id).first() + assert test_comment is None + + # Then, test what happens when a moderator attempts to punish an admin + # from a reported comment on an admin. + #---------------------------------------------------------------------- + fixture_add_comment_report(reported_user=self.admin_user) + comment_report = CommentReport.query.filter( + CommentReport.reported_user==self.admin_user).filter( + CommentReport.resolved==None).first() + + response, context = self.do_post({'action_to_resolve':[u'takeaway'], + 'take_away_privileges':[u'active'], + 'targeted_user':self.admin_user.id}, + url='/mod/reports/{0}/'.format(comment_report.id)) + self.query_for_users() + + assert response.status == '200 OK' + assert self.admin_user.has_privilege(u'active') + + def testAllModerationViews(self): + self.login(u'moderator') + username = self.user.username + self.query_for_users() + fixture_add_comment_report(reported_user=self.admin_user) + response = self.test_app.get('/mod/reports/') + assert response.status == "200 OK" + + response = self.test_app.get('/mod/reports/1/') + assert response.status == "200 OK" + + response = self.test_app.get('/mod/users/') + assert response.status == "200 OK" + + user_page_url = '/mod/users/{0}/'.format(username) + response = self.test_app.get(user_page_url) + assert response.status == "200 OK" + + self.test_app.get('/mod/media/') + assert response.status == "200 OK" + + def testBanUnBanUser(self): + self.login(u'admin') + username = self.user.username + user_id = self.user.id + ban_url = '/mod/users/{0}/ban/'.format(username) + response, context = self.do_post({ + 'user_banned_until':u'', + 'why_user_was_banned':u'Because I said so'}, + url=ban_url) + + assert response.status == "302 FOUND" + user_banned = UserBan.query.filter(UserBan.user_id==user_id).first() + assert user_banned is not None + assert user_banned.expiration_date is None + assert user_banned.reason == u'Because I said so' + + response, context = self.do_post({}, + url=ban_url) + + assert response.status == "302 FOUND" + user_banned = UserBan.query.filter(UserBan.user_id==user_id).first() + assert user_banned is None diff --git a/mediagoblin/tests/test_notifications.py b/mediagoblin/tests/test_notifications.py index e075d475..3bf36f5f 100644 --- a/mediagoblin/tests/test_notifications.py +++ b/mediagoblin/tests/test_notifications.py @@ -38,7 +38,7 @@ class TestNotifications: # TODO: Possibly abstract into a decorator like: # @as_authenticated_user('chris') - self.test_user = fixture_add_user() + self.test_user = fixture_add_user(privileges=[u'active',u'commenter']) self.current_user = None @@ -75,7 +75,10 @@ class TestNotifications: ''' user = fixture_add_user('otherperson', password='nosreprehto', - wants_comment_notification=wants_email) + wants_comment_notification=wants_email, + privileges=[u'active',u'commenter']) + + assert user.wants_comment_notification == wants_email user_id = user.id @@ -124,6 +127,7 @@ otherperson@example.com\n\nSGkgb3RoZXJwZXJzb24sCmNocmlzIGNvbW1lbnRlZCBvbiB5b3VyI else: assert mail.EMAIL_TEST_MBOX_INBOX == [] + # Save the ids temporarily because of DetachedInstanceError notification_id = notification.id comment_id = notification.subject.id @@ -153,7 +157,8 @@ otherperson@example.com\n\nSGkgb3RoZXJwZXJzb24sCmNocmlzIGNvbW1lbnRlZCBvbiB5b3VyI def test_mark_all_comment_notifications_seen(self): """ Test that mark_all_comments_seen works""" - user = fixture_add_user('otherperson', password='nosreprehto') + user = fixture_add_user('otherperson', password='nosreprehto', + privileges=[u'active']) media_entry = fixture_media_entry(uploader=user.id, state=u'processed') diff --git a/mediagoblin/tests/test_oauth2.py b/mediagoblin/tests/test_oauth2.py index 86f9e8cc..957f4e65 100644 --- a/mediagoblin/tests/test_oauth2.py +++ b/mediagoblin/tests/test_oauth2.py @@ -38,7 +38,8 @@ class TestOAuth(object): self.pman = pluginapi.PluginManager() self.user_password = u'4cc355_70k3N' - self.user = fixture_add_user(u'joauth', self.user_password) + self.user = fixture_add_user(u'joauth', self.user_password, + privileges=[u'active']) self.login() diff --git a/mediagoblin/tests/test_openid.py b/mediagoblin/tests/test_openid.py index 23a2290e..0424fdda 100644 --- a/mediagoblin/tests/test_openid.py +++ b/mediagoblin/tests/test_openid.py @@ -29,6 +29,7 @@ from mediagoblin.plugins.openid.models import OpenIDUserURL from mediagoblin.tests.tools import get_app, fixture_add_user from mediagoblin.tools import template + # App with plugin enabled @pytest.fixture() def openid_plugin_app(request): @@ -177,7 +178,7 @@ class TestOpenIDPlugin(object): # Correct place? assert urlparse.urlsplit(res.location)[2] == '/u/chris/' - assert 'mediagoblin/user_pages/user.html' in template.TEMPLATE_TEST_CONTEXT + assert 'mediagoblin/user_pages/user_nonactive.html' in template.TEMPLATE_TEST_CONTEXT # No need to test if user is in logged in and verification email # awaits, since openid uses the register_user function which is @@ -237,7 +238,7 @@ class TestOpenIDPlugin(object): def test_add_delete(self, openid_plugin_app): """Test adding and deleting openids""" # Add user - test_user = fixture_add_user(password='') + test_user = fixture_add_user(password='', privileges=[u'active']) openid = OpenIDUserURL() openid.openid_url = 'http://real.myopenid.com' openid.user_id = test_user.id diff --git a/mediagoblin/tests/test_persona.py b/mediagoblin/tests/test_persona.py index 919877c9..a1cd30eb 100644 --- a/mediagoblin/tests/test_persona.py +++ b/mediagoblin/tests/test_persona.py @@ -22,6 +22,7 @@ pytest.importorskip("requests") from mediagoblin import mg_globals from mediagoblin.db.base import Session +from mediagoblin.db.models import Privilege from mediagoblin.tests.tools import get_app from mediagoblin.tools import template @@ -90,7 +91,7 @@ class TestPersonaPlugin(object): res.follow() assert urlparse.urlsplit(res.location)[2] == '/u/chris/' - assert 'mediagoblin/user_pages/user.html' in template.TEMPLATE_TEST_CONTEXT + assert 'mediagoblin/user_pages/user_nonactive.html' in template.TEMPLATE_TEST_CONTEXT # Try to register same Persona email address template.clear_test_template_context() @@ -112,8 +113,9 @@ class TestPersonaPlugin(object): # Get user and detach from session test_user = mg_globals.database.User.query.filter_by( username=u'chris').first() - test_user.email_verified = True - test_user.status = u'active' + active_privilege = Privilege.query.filter( + Privilege.privilege_name==u'active').first() + test_user.all_privileges.append(active_privilege) test_user.save() test_user = mg_globals.database.User.query.filter_by( username=u'chris').first() diff --git a/mediagoblin/tests/test_privileges.py b/mediagoblin/tests/test_privileges.py new file mode 100644 index 00000000..05829b34 --- /dev/null +++ b/mediagoblin/tests/test_privileges.py @@ -0,0 +1,205 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import pytest +from datetime import date, timedelta +from webtest import AppError + +from mediagoblin.tests.tools import fixture_add_user, fixture_media_entry + +from mediagoblin.db.models import User, UserBan +from mediagoblin.tools import template + +from .resources import GOOD_JPG + +class TestPrivilegeFunctionality: + + @pytest.fixture(autouse=True) + def _setup(self, test_app): + self.test_app = test_app + + fixture_add_user(u'alex', + privileges=[u'admin',u'active']) + fixture_add_user(u'meow', + privileges=[u'moderator',u'active',u'reporter']) + fixture_add_user(u'natalie', + privileges=[u'active']) + self.query_for_users() + + def login(self, username): + self.test_app.post( + '/auth/login/', { + 'username': username, + 'password': 'toast'}) + self.query_for_users() + + def logout(self): + self.test_app.get('/auth/logout/') + self.query_for_users() + + def do_post(self, data, *context_keys, **kwargs): + url = kwargs.pop('url', '/submit/') + do_follow = kwargs.pop('do_follow', False) + template.clear_test_template_context() + response = self.test_app.post(url, data, **kwargs) + if do_follow: + response.follow() + context_data = template.TEMPLATE_TEST_CONTEXT + for key in context_keys: + context_data = context_data[key] + return response, context_data + + def query_for_users(self): + self.admin_user = User.query.filter(User.username==u'alex').first() + self.mod_user = User.query.filter(User.username==u'meow').first() + self.user = User.query.filter(User.username==u'natalie').first() + + def testUserBanned(self): + self.login(u'natalie') + uid = self.user.id + # First, test what happens when a user is banned indefinitely + #---------------------------------------------------------------------- + user_ban = UserBan(user_id=uid, + reason=u'Testing whether user is banned', + expiration_date=None) + user_ban.save() + + response = self.test_app.get('/') + assert response.status == "200 OK" + assert "You are Banned" in response.body + # Then test what happens when that ban has an expiration date which + # hasn't happened yet + #---------------------------------------------------------------------- + user_ban = UserBan.query.get(uid) + user_ban.delete() + user_ban = UserBan(user_id=uid, + reason=u'Testing whether user is banned', + expiration_date= date.today() + timedelta(days=20)) + user_ban.save() + + response = self.test_app.get('/') + assert response.status == "200 OK" + assert "You are Banned" in response.body + + # Then test what happens when that ban has an expiration date which + # has already happened + #---------------------------------------------------------------------- + user_ban = UserBan.query.get(uid) + user_ban.delete() + exp_date = date.today() - timedelta(days=20) + user_ban = UserBan(user_id=uid, + reason=u'Testing whether user is banned', + expiration_date= exp_date) + user_ban.save() + + response = self.test_app.get('/') + assert response.status == "302 FOUND" + assert not "You are Banned" in response.body + + def testVariousPrivileges(self): + # The various actions that require privileges (ex. reporting, + # commenting, moderating...) are tested in other tests. This method + # will be used to ensure that those actions are impossible for someone + # without the proper privileges. + # For other tests that show what happens when a user has the proper + # privileges, check out: + # tests/test_moderation.py moderator + # tests/test_notifications.py commenter + # tests/test_reporting.py reporter + # tests/test_submission.py uploader + #---------------------------------------------------------------------- + self.login(u'natalie') + + # First test the get and post requests of submission/uploading + #---------------------------------------------------------------------- + with pytest.raises(AppError) as excinfo: + response = self.test_app.get('/submit/') + assert 'Bad response: 403 FORBIDDEN' in str(excinfo) + + + with pytest.raises(AppError) as excinfo: + response = self.do_post({'upload_files':[('file',GOOD_JPG)], + 'title':u'Normal Upload 1'}, + url='/submit/') + assert 'Bad response: 403 FORBIDDEN' in str(excinfo) + + # Test that a user cannot comment without the commenter privilege + #---------------------------------------------------------------------- + self.query_for_users() + + media_entry = fixture_media_entry(uploader=self.admin_user.id, + state=u'processed') + + media_entry_id = media_entry.id + media_uri_id = '/u/{0}/m/{1}/'.format(self.admin_user.username, + media_entry.id) + media_uri_slug = '/u/{0}/m/{1}/'.format(self.admin_user.username, + media_entry.slug) + response = self.test_app.get(media_uri_slug) + assert not "Add a comment" in response.body + + self.query_for_users() + with pytest.raises(AppError) as excinfo: + response = self.test_app.post( + media_uri_id + 'comment/add/', + {'comment_content': u'Test comment #42'}) + assert 'Bad response: 403 FORBIDDEN' in str(excinfo) + + # Test that a user cannot report without the reporter privilege + #---------------------------------------------------------------------- + with pytest.raises(AppError) as excinfo: + response = self.test_app.get(media_uri_slug+"report/") + assert 'Bad response: 403 FORBIDDEN' in str(excinfo) + + with pytest.raises(AppError) as excinfo: + response = self.do_post( + {'report_reason':u'Testing Reports #1', + 'reporter_id':u'3'}, + url=(media_uri_slug+"report/")) + assert 'Bad response: 403 FORBIDDEN' in str(excinfo) + + # Test that a user cannot access the moderation pages w/o moderator + # or admin privileges + #---------------------------------------------------------------------- + with pytest.raises(AppError) as excinfo: + response = self.test_app.get("/mod/users/") + assert 'Bad response: 403 FORBIDDEN' in str(excinfo) + + with pytest.raises(AppError) as excinfo: + response = self.test_app.get("/mod/reports/") + assert 'Bad response: 403 FORBIDDEN' in str(excinfo) + + with pytest.raises(AppError) as excinfo: + response = self.test_app.get("/mod/media/") + assert 'Bad response: 403 FORBIDDEN' in str(excinfo) + + with pytest.raises(AppError) as excinfo: + response = self.test_app.get("/mod/users/1/") + assert 'Bad response: 403 FORBIDDEN' in str(excinfo) + + with pytest.raises(AppError) as excinfo: + response = self.test_app.get("/mod/reports/1/") + assert 'Bad response: 403 FORBIDDEN' in str(excinfo) + + self.query_for_users() + + with pytest.raises(AppError) as excinfo: + response, context = self.do_post({'action_to_resolve':[u'takeaway'], + 'take_away_privileges':[u'active'], + 'targeted_user':self.admin_user.id}, + url='/mod/reports/1/') + self.query_for_users() + assert 'Bad response: 403 FORBIDDEN' in str(excinfo) diff --git a/mediagoblin/tests/test_reporting.py b/mediagoblin/tests/test_reporting.py new file mode 100644 index 00000000..a154a061 --- /dev/null +++ b/mediagoblin/tests/test_reporting.py @@ -0,0 +1,167 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import pytest + +from mediagoblin.tools import template +from mediagoblin.tests.tools import (fixture_add_user, fixture_media_entry, + fixture_add_comment, fixture_add_comment_report) +from mediagoblin.db.models import (MediaReport, CommentReport, User, + MediaComment) + + +class TestReportFiling: + @pytest.fixture(autouse=True) + def _setup(self, test_app): + self.test_app = test_app + + fixture_add_user(u'allie', + privileges=[u'reporter',u'active']) + fixture_add_user(u'natalie', + privileges=[u'active', u'moderator']) + + def login(self, username): + self.test_app.post( + '/auth/login/', { + 'username': username, + 'password': 'toast'}) + + def logout(self): + self.test_app.get('/auth/logout/') + + def do_post(self, data, *context_keys, **kwargs): + url = kwargs.pop('url', '/submit/') + do_follow = kwargs.pop('do_follow', False) + template.clear_test_template_context() + response = self.test_app.post(url, data, **kwargs) + if do_follow: + response.follow() + context_data = template.TEMPLATE_TEST_CONTEXT + for key in context_keys: + context_data = context_data[key] + return response, context_data + + def query_for_users(self): + return (User.query.filter(User.username==u'allie').first(), + User.query.filter(User.username==u'natalie').first()) + + def testMediaReports(self): + self.login(u'allie') + allie_user, natalie_user = self.query_for_users() + allie_id = allie_user.id + + media_entry = fixture_media_entry(uploader=natalie_user.id, + state=u'processed') + + mid = media_entry.id + media_uri_slug = '/u/{0}/m/{1}/'.format(natalie_user.username, + media_entry.slug) + + response = self.test_app.get(media_uri_slug + "report/") + assert response.status == "200 OK" + + response, context = self.do_post( + {'report_reason':u'Testing Media Report', + 'reporter_id':unicode(allie_id)},url= media_uri_slug + "report/") + + assert response.status == "302 FOUND" + + media_report = MediaReport.query.first() + + allie_user, natalie_user = self.query_for_users() + assert media_report is not None + assert media_report.report_content == u'Testing Media Report' + assert media_report.reporter_id == allie_id + assert media_report.reported_user_id == natalie_user.id + assert media_report.created is not None + assert media_report.discriminator == 'media_report' + + def testCommentReports(self): + self.login(u'allie') + allie_user, natalie_user = self.query_for_users() + allie_id = allie_user.id + + media_entry = fixture_media_entry(uploader=natalie_user.id, + state=u'processed') + mid = media_entry.id + fixture_add_comment(media_entry=mid, + author=natalie_user.id) + comment = MediaComment.query.first() + + comment_uri_slug = '/u/{0}/m/{1}/c/{2}/'.format(natalie_user.username, + media_entry.slug, + comment.id) + + response = self.test_app.get(comment_uri_slug + "report/") + assert response.status == "200 OK" + + response, context = self.do_post({ + 'report_reason':u'Testing Comment Report', + 'reporter_id':unicode(allie_id)},url= comment_uri_slug + "report/") + + assert response.status == "302 FOUND" + + comment_report = CommentReport.query.first() + + allie_user, natalie_user = self.query_for_users() + assert comment_report is not None + assert comment_report.report_content == u'Testing Comment Report' + assert comment_report.reporter_id == allie_id + assert comment_report.reported_user_id == natalie_user.id + assert comment_report.created is not None + assert comment_report.discriminator == 'comment_report' + + def testArchivingReports(self): + self.login(u'natalie') + allie_user, natalie_user = self.query_for_users() + allie_id, natalie_id = allie_user.id, natalie_user.id + + fixture_add_comment(author=allie_user.id, + comment=u'Comment will be removed') + test_comment = MediaComment.query.filter( + MediaComment.author==allie_user.id).first() + fixture_add_comment_report(comment=test_comment, + reported_user=allie_user, + report_content=u'Testing Archived Reports #1', + reporter=natalie_user) + comment_report = CommentReport.query.filter( + CommentReport.reported_user==allie_user).first() + + assert comment_report.report_content == u'Testing Archived Reports #1' + response, context = self.do_post( + {'action_to_resolve':[u'userban', u'delete'], + 'targeted_user':allie_user.id, + 'resolution_content':u'This is a test of archiving reports.'}, + url='/mod/reports/{0}/'.format(comment_report.id)) + + assert response.status == "302 FOUND" + allie_user, natalie_user = self.query_for_users() + + archived_report = CommentReport.query.filter( + CommentReport.reported_user==allie_user).first() + + assert CommentReport.query.count() != 0 + assert archived_report is not None + assert archived_report.report_content == u'Testing Archived Reports #1' + assert archived_report.reporter_id == natalie_id + assert archived_report.reported_user_id == allie_id + assert archived_report.created is not None + assert archived_report.resolved is not None + assert archived_report.result == u'''This is a test of archiving reports. +natalie banned user allie indefinitely. +natalie deleted the comment.''' + assert archived_report.discriminator == 'comment_report' + diff --git a/mediagoblin/tests/test_submission.py b/mediagoblin/tests/test_submission.py index ac941063..b5b13ed3 100644 --- a/mediagoblin/tests/test_submission.py +++ b/mediagoblin/tests/test_submission.py @@ -24,13 +24,14 @@ import pytest from mediagoblin.tests.tools import fixture_add_user from mediagoblin import mg_globals -from mediagoblin.db.models import MediaEntry +from mediagoblin.db.models import MediaEntry, User +from mediagoblin.db.base import Session from mediagoblin.tools import template from mediagoblin.media_types.image import ImageMediaManager from mediagoblin.media_types.pdf.processing import check_prerequisites as pdf_check_prerequisites from .resources import GOOD_JPG, GOOD_PNG, EVIL_FILE, EVIL_JPG, EVIL_PNG, \ - BIG_BLUE, GOOD_PDF, GPS_JPG + BIG_BLUE, GOOD_PDF, GPS_JPG, MED_PNG, BIG_PNG GOOD_TAG_STRING = u'yin,yang' BAD_TAG_STRING = unicode('rage,' + 'f' * 26 + 'u' * 26) @@ -46,12 +47,22 @@ class TestSubmission: # TODO: Possibly abstract into a decorator like: # @as_authenticated_user('chris') - test_user = fixture_add_user() - - self.test_user = test_user + fixture_add_user(privileges=[u'active',u'uploader', u'commenter']) self.login() + def our_user(self): + """ + Fetch the user we're submitting with. Every .get() or .post() + invalidates the session; this is a hacky workaround. + """ + #### FIXME: Pytest collects this as a test and runs this. + #### ... it shouldn't. At least it passes, but that's + #### totally stupid. + #### Also if we found a way to make this run it should be a + #### property. + return User.query.filter(User.username==u'chris').first() + def login(self): self.test_app.post( '/auth/login/', { @@ -97,19 +108,40 @@ class TestSubmission: def check_normal_upload(self, title, filename): response, context = self.do_post({'title': title}, do_follow=True, **self.upload_data(filename)) - self.check_url(response, '/u/{0}/'.format(self.test_user.username)) + self.check_url(response, '/u/{0}/'.format(self.our_user().username)) assert 'mediagoblin/user_pages/user.html' in context # Make sure the media view is at least reachable, logged in... - url = '/u/{0}/m/{1}/'.format(self.test_user.username, + url = '/u/{0}/m/{1}/'.format(self.our_user().username, title.lower().replace(' ', '-')) self.test_app.get(url) # ... and logged out too. self.logout() self.test_app.get(url) + def user_upload_limits(self, uploaded=None, upload_limit=None): + our_user = self.our_user() + + if uploaded: + our_user.uploaded = uploaded + if upload_limit: + our_user.upload_limit = upload_limit + + our_user.save() + Session.expunge(our_user) + def test_normal_jpg(self): + # User uploaded should be 0 + assert self.our_user().uploaded == 0 + self.check_normal_upload(u'Normal upload 1', GOOD_JPG) + # User uploaded should be the same as GOOD_JPG size in Mb + file_size = os.stat(GOOD_JPG).st_size / (1024.0 * 1024) + file_size = float('{0:.2f}'.format(file_size)) + + # Reload user + assert self.our_user().uploaded == file_size + def test_normal_png(self): self.check_normal_upload(u'Normal upload 2', GOOD_PNG) @@ -118,9 +150,63 @@ class TestSubmission: response, context = self.do_post({'title': u'Normal upload 3 (pdf)'}, do_follow=True, **self.upload_data(GOOD_PDF)) - self.check_url(response, '/u/{0}/'.format(self.test_user.username)) + self.check_url(response, '/u/{0}/'.format(self.our_user().username)) + assert 'mediagoblin/user_pages/user.html' in context + + def test_default_upload_limits(self): + self.user_upload_limits(uploaded=500) + + # User uploaded should be 500 + assert self.our_user().uploaded == 500 + + response, context = self.do_post({'title': u'Normal upload 4'}, + do_follow=True, + **self.upload_data(GOOD_JPG)) + self.check_url(response, '/u/{0}/'.format(self.our_user().username)) assert 'mediagoblin/user_pages/user.html' in context + # Shouldn't have uploaded + assert self.our_user().uploaded == 500 + + def test_user_upload_limit(self): + self.user_upload_limits(uploaded=25, upload_limit=25) + + # User uploaded should be 25 + assert self.our_user().uploaded == 25 + + response, context = self.do_post({'title': u'Normal upload 5'}, + do_follow=True, + **self.upload_data(GOOD_JPG)) + self.check_url(response, '/u/{0}/'.format(self.our_user().username)) + assert 'mediagoblin/user_pages/user.html' in context + + # Shouldn't have uploaded + assert self.our_user().uploaded == 25 + + def test_user_under_limit(self): + self.user_upload_limits(uploaded=499) + + # User uploaded should be 499 + assert self.our_user().uploaded == 499 + + response, context = self.do_post({'title': u'Normal upload 6'}, + do_follow=False, + **self.upload_data(MED_PNG)) + form = context['mediagoblin/submit/start.html']['submit_form'] + assert form.file.errors == [u'Sorry, uploading this file will put you' + ' over your upload limit.'] + + # Shouldn't have uploaded + assert self.our_user().uploaded == 499 + + def test_big_file(self): + response, context = self.do_post({'title': u'Normal upload 7'}, + do_follow=False, + **self.upload_data(BIG_PNG)) + + form = context['mediagoblin/submit/start.html']['submit_form'] + assert form.file.errors == [u'Sorry, the file size is too big.'] + def check_media(self, request, find_data, count=None): media = MediaEntry.query.filter_by(**find_data) if count is not None: @@ -155,6 +241,7 @@ class TestSubmission: 'ffffffffffffffffffffffffffuuuuuuuuuuuuuuuuuuuuuuuuuu'] def test_delete(self): + self.user_upload_limits(uploaded=50) response, request = self.do_post({'title': u'Balanced Goblin'}, *REQUEST_CONTEXT, do_follow=True, **self.upload_data(GOOD_JPG)) @@ -164,7 +251,7 @@ class TestSubmission: # render and post to the edit page. edit_url = request.urlgen( 'mediagoblin.edit.edit_media', - user=self.test_user.username, media_id=media_id) + user=self.our_user().username, media_id=media_id) self.test_app.get(edit_url) self.test_app.post(edit_url, {'title': u'Balanced Goblin', @@ -177,7 +264,7 @@ class TestSubmission: self.check_comments(request, media_id, 0) comment_url = request.urlgen( 'mediagoblin.user_pages.media_post_comment', - user=self.test_user.username, media_id=media_id) + user=self.our_user().username, media_id=media_id) response = self.do_post({'comment_content': 'i love this test'}, url=comment_url, do_follow=True)[0] self.check_comments(request, media_id, 1) @@ -186,7 +273,7 @@ class TestSubmission: # --------------------------------------------------- delete_url = request.urlgen( 'mediagoblin.user_pages.media_confirm_delete', - user=self.test_user.username, media_id=media_id) + user=self.our_user().username, media_id=media_id) # Empty data means don't confirm response = self.do_post({}, do_follow=True, url=delete_url)[0] media = self.check_media(request, {'title': u'Balanced Goblin'}, 1) @@ -199,6 +286,9 @@ class TestSubmission: self.check_media(request, {'id': media_id}, 0) self.check_comments(request, media_id, 0) + # Check that user.uploaded is the same as before the upload + assert self.our_user().uploaded == 50 + def test_evil_file(self): # Test non-suppoerted file with non-supported extension # ----------------------------------------------------- @@ -251,7 +341,7 @@ class TestSubmission: # they'll be caught as failures during the processing step. response, context = self.do_post({'title': title}, do_follow=True, **self.upload_data(filename)) - self.check_url(response, '/u/{0}/'.format(self.test_user.username)) + self.check_url(response, '/u/{0}/'.format(self.our_user().username)) entry = mg_globals.database.MediaEntry.query.filter_by(title=title).first() assert entry.state == 'failed' assert entry.fail_error == u'mediagoblin.processing:BadMediaFail' diff --git a/mediagoblin/tests/test_submission/COPYING.txt b/mediagoblin/tests/test_submission/COPYING.txt new file mode 100644 index 00000000..3818aae4 --- /dev/null +++ b/mediagoblin/tests/test_submission/COPYING.txt @@ -0,0 +1,5 @@ +Images located in this directory tree are released under a GPLv3 license +and CC BY-SA 3.0 license. To the extent possible under law, the author(s) +have dedicated all copyright and related and neighboring rights to these +files to the public domain worldwide. These files are distributed without +any warranty. diff --git a/mediagoblin/tests/test_submission/big.png b/mediagoblin/tests/test_submission/big.png Binary files differnew file mode 100644 index 00000000..a284cfda --- /dev/null +++ b/mediagoblin/tests/test_submission/big.png diff --git a/mediagoblin/tests/test_submission/medium.png b/mediagoblin/tests/test_submission/medium.png Binary files differnew file mode 100644 index 00000000..e8b9ca00 --- /dev/null +++ b/mediagoblin/tests/test_submission/medium.png diff --git a/mediagoblin/tests/tools.py b/mediagoblin/tests/tools.py index 98361adc..060dfda9 100644 --- a/mediagoblin/tests/tools.py +++ b/mediagoblin/tests/tools.py @@ -25,7 +25,7 @@ from webtest import TestApp from mediagoblin import mg_globals from mediagoblin.db.models import User, MediaEntry, Collection, MediaComment, \ - CommentSubscription, CommentNotification + CommentSubscription, CommentNotification, Privilege, CommentReport from mediagoblin.tools import testing from mediagoblin.init.config import read_mediagoblin_config from mediagoblin.db.base import Session @@ -33,6 +33,8 @@ from mediagoblin.meddleware import BaseMeddleware from mediagoblin.auth import gen_password_hash from mediagoblin.gmg_commands.dbupdate import run_dbupdate +from datetime import datetime + MEDIAGOBLIN_TEST_DB_NAME = u'__mediagoblin_tests__' TEST_SERVER_CONFIG = pkg_resources.resource_filename( @@ -133,7 +135,6 @@ def get_app(request, paste_config=None, mgoblin_config=None): mg_globals.app.meddleware.insert(0, TestingMeddleware(mg_globals.app)) app = TestApp(test_app) - return app @@ -170,7 +171,7 @@ def assert_db_meets_expected(db, expected): def fixture_add_user(username=u'chris', password=u'toast', - active_user=True, wants_comment_notification=True): + privileges=[], wants_comment_notification=True): # Reuse existing user or create a new one test_user = User.query.filter_by(username=username).first() if test_user is None: @@ -179,14 +180,13 @@ def fixture_add_user(username=u'chris', password=u'toast', test_user.email = username + u'@example.com' if password is not None: test_user.pw_hash = gen_password_hash(password) - if active_user: - test_user.email_verified = True - test_user.status = u'active' - test_user.wants_comment_notification = wants_comment_notification + for privilege in privileges: + query = Privilege.query.filter(Privilege.privilege_name==privilege) + if query.count(): + test_user.all_privileges.append(query.one()) test_user.save() - # Reload test_user = User.query.filter_by(username=username).first() @@ -314,3 +314,32 @@ def fixture_add_comment(author=None, media_entry=None, comment=None): return comment +def fixture_add_comment_report(comment=None, reported_user=None, + reporter=None, created=None, report_content=None): + if comment is None: + comment = fixture_add_comment() + + if reported_user is None: + reported_user = fixture_add_user() + + if reporter is None: + reporter = fixture_add_user() + + if created is None: + created=datetime.now() + + if report_content is None: + report_content = \ + 'Auto-generated test report' + + comment_report = CommentReport(comment=comment, + reported_user = reported_user, + reporter = reporter, + created = created, + report_content=report_content) + + comment_report.save() + + Session.expunge(comment_report) + + return comment_report |