diff options
Diffstat (limited to 'mediagoblin/tests')
-rw-r--r-- | mediagoblin/tests/test_auth.py | 44 | ||||
-rw-r--r-- | mediagoblin/tests/test_celery_setup.py | 2 | ||||
-rw-r--r-- | mediagoblin/tests/test_edit.py | 67 | ||||
-rw-r--r-- | mediagoblin/tests/test_misc.py | 8 | ||||
-rw-r--r-- | mediagoblin/tests/test_notifications.py | 151 | ||||
-rw-r--r-- | mediagoblin/tests/tools.py | 95 |
6 files changed, 321 insertions, 46 deletions
diff --git a/mediagoblin/tests/test_auth.py b/mediagoblin/tests/test_auth.py index 9f70c5ff..f973ebd8 100644 --- a/mediagoblin/tests/test_auth.py +++ b/mediagoblin/tests/test_auth.py @@ -118,20 +118,15 @@ def test_register_views(test_app): assert path == u'/auth/verify_email/' parsed_get_params = urlparse.parse_qs(get_params) - ### user should have these same parameters - assert parsed_get_params['userid'] == [ - unicode(new_user.id)] - assert parsed_get_params['token'] == [ - new_user.verification_key] - ## Try verifying with bs verification key, shouldn't work template.clear_test_template_context() response = test_app.get( - "/auth/verify_email/?userid=%s&token=total_bs" % unicode( - new_user.id)) + "/auth/verify_email/?token=total_bs") response.follow() - context = template.TEMPLATE_TEST_CONTEXT[ - 'mediagoblin/user_pages/user.html'] + + # Correct redirect? + assert urlparse.urlsplit(response.location)[2] == '/' + # assert context['verification_successful'] == True # TODO: Would be good to test messages here when we can do so... new_user = mg_globals.database.User.find_one( @@ -195,35 +190,17 @@ def test_register_views(test_app): path = urlparse.urlsplit(email_context['verification_url'])[2] get_params = urlparse.urlsplit(email_context['verification_url'])[3] - assert path == u'/auth/forgot_password/verify/' parsed_get_params = urlparse.parse_qs(get_params) - - # user should have matching parameters - new_user = mg_globals.database.User.find_one({'username': u'happygirl'}) - assert parsed_get_params['userid'] == [unicode(new_user.id)] - assert parsed_get_params['token'] == [new_user.fp_verification_key] - - ### The forgotten password token should be set to expire in ~ 10 days - # A few ticks have expired so there are only 9 full days left... - assert (new_user.fp_token_expire - datetime.datetime.now()).days == 9 + assert path == u'/auth/forgot_password/verify/' ## Try using a bs password-changing verification key, shouldn't work template.clear_test_template_context() response = test_app.get( - "/auth/forgot_password/verify/?userid=%s&token=total_bs" % unicode( - new_user.id), status=404) - assert response.status.split()[0] == u'404' # status="404 NOT FOUND" + "/auth/forgot_password/verify/?token=total_bs") + response.follow() - ## Try using an expired token to change password, shouldn't work - template.clear_test_template_context() - new_user = mg_globals.database.User.find_one({'username': u'happygirl'}) - real_token_expiration = new_user.fp_token_expire - new_user.fp_token_expire = datetime.datetime.now() - new_user.save() - response = test_app.get("%s?%s" % (path, get_params), status=404) - assert response.status.split()[0] == u'404' # status="404 NOT FOUND" - new_user.fp_token_expire = real_token_expiration - new_user.save() + # Correct redirect? + assert urlparse.urlsplit(response.location)[2] == '/' ## Verify step 1 of password-change works -- can see form to change password template.clear_test_template_context() @@ -234,7 +211,6 @@ def test_register_views(test_app): template.clear_test_template_context() response = test_app.post( '/auth/forgot_password/verify/', { - 'userid': parsed_get_params['userid'], 'password': 'iamveryveryhappy', 'token': parsed_get_params['token']}) response.follow() diff --git a/mediagoblin/tests/test_celery_setup.py b/mediagoblin/tests/test_celery_setup.py index 5530c6f2..0184436a 100644 --- a/mediagoblin/tests/test_celery_setup.py +++ b/mediagoblin/tests/test_celery_setup.py @@ -48,7 +48,7 @@ def test_setup_celery_from_config(): assert isinstance(fake_celery_module.CELERYD_ETA_SCHEDULER_PRECISION, float) assert fake_celery_module.CELERY_RESULT_PERSISTENT is True assert fake_celery_module.CELERY_IMPORTS == [ - 'foo.bar.baz', 'this.is.an.import', 'mediagoblin.processing.task'] + 'foo.bar.baz', 'this.is.an.import', 'mediagoblin.processing.task', 'mediagoblin.notifications.task'] assert fake_celery_module.CELERY_RESULT_BACKEND == 'database' assert fake_celery_module.CELERY_RESULT_DBURI == ( 'sqlite:///' + diff --git a/mediagoblin/tests/test_edit.py b/mediagoblin/tests/test_edit.py index b6ec7a29..acc638d9 100644 --- a/mediagoblin/tests/test_edit.py +++ b/mediagoblin/tests/test_edit.py @@ -15,13 +15,12 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. import urlparse -import pytest from mediagoblin import mg_globals from mediagoblin.db.models import User from mediagoblin.tests.tools import fixture_add_user -from mediagoblin.tools import template from mediagoblin import auth +from mediagoblin.tools import template, mail class TestUserEdit(object): @@ -142,4 +141,68 @@ class TestUserEdit(object): assert form.url.errors == [ u'This address contains errors'] + def test_email_change(self, test_app): + self.login(test_app) + + # Test email already in db + template.clear_test_template_context() + test_app.post( + '/edit/account/', { + 'new_email': 'chris@example.com', + 'password': 'toast'}) + + # Check form errors + context = template.TEMPLATE_TEST_CONTEXT[ + 'mediagoblin/edit/edit_account.html'] + assert context['form'].new_email.errors == [ + u'Sorry, a user with that email address already exists.'] + + # Test successful email change + template.clear_test_template_context() + res = test_app.post( + '/edit/account/', { + 'new_email': 'new@example.com', + 'password': 'toast'}) + res.follow() + + # Correct redirect? + assert urlparse.urlsplit(res.location)[2] == '/u/chris/' + + # Make sure we get email verification and try verifying + assert len(mail.EMAIL_TEST_INBOX) == 1 + message = mail.EMAIL_TEST_INBOX.pop() + assert message['To'] == 'new@example.com' + email_context = template.TEMPLATE_TEST_CONTEXT[ + 'mediagoblin/edit/verification.txt'] + assert email_context['verification_url'] in \ + message.get_payload(decode=True) + + path = urlparse.urlsplit(email_context['verification_url'])[2] + assert path == u'/edit/verify_email/' + + ## Try verifying with bs verification key, shouldn't work + template.clear_test_template_context() + res = test_app.get( + "/edit/verify_email/?token=total_bs") + res.follow() + + # Correct redirect? + assert urlparse.urlsplit(res.location)[2] == '/' + + # Email shouldn't be saved + email_in_db = mg_globals.database.User.find_one( + {'email': 'new@example.com'}) + email = User.query.filter_by(username='chris').first().email + assert email_in_db is None + assert email == 'chris@example.com' + + # Verify email activation works + template.clear_test_template_context() + get_params = urlparse.urlsplit(email_context['verification_url'])[3] + res = test_app.get('%s?%s' % (path, get_params)) + res.follow() + + # New email saved? + email = User.query.filter_by(username='chris').first().email + assert email == 'new@example.com' # test changing the url inproperly diff --git a/mediagoblin/tests/test_misc.py b/mediagoblin/tests/test_misc.py index 755d863f..43ad0b6d 100644 --- a/mediagoblin/tests/test_misc.py +++ b/mediagoblin/tests/test_misc.py @@ -28,8 +28,10 @@ def test_user_deletes_other_comments(test_app): user_a = fixture_add_user(u"chris_a") user_b = fixture_add_user(u"chris_b") - media_a = fixture_media_entry(uploader=user_a.id, save=False) - media_b = fixture_media_entry(uploader=user_b.id, save=False) + media_a = fixture_media_entry(uploader=user_a.id, save=False, + expunge=False, fake_upload=False) + media_b = fixture_media_entry(uploader=user_b.id, save=False, + expunge=False, fake_upload=False) Session.add(media_a) Session.add(media_b) Session.flush() @@ -79,7 +81,7 @@ def test_user_deletes_other_comments(test_app): def test_media_deletes_broken_attachment(test_app): user_a = fixture_add_user(u"chris_a") - media = fixture_media_entry(uploader=user_a.id, save=False) + media = fixture_media_entry(uploader=user_a.id, save=False, expunge=False) media.attachment_files.append(dict( name=u"some name", filepath=[u"does", u"not", u"exist"], diff --git a/mediagoblin/tests/test_notifications.py b/mediagoblin/tests/test_notifications.py new file mode 100644 index 00000000..d52b8d5a --- /dev/null +++ b/mediagoblin/tests/test_notifications.py @@ -0,0 +1,151 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import pytest + +import urlparse + +from mediagoblin.tools import template, mail + +from mediagoblin.db.models import Notification, CommentNotification, \ + CommentSubscription +from mediagoblin.db.base import Session + +from mediagoblin.notifications import mark_comment_notification_seen + +from mediagoblin.tests.tools import fixture_add_comment, \ + fixture_media_entry, fixture_add_user, \ + fixture_comment_subscription + + +class TestNotifications: + @pytest.fixture(autouse=True) + def setup(self, test_app): + self.test_app = test_app + + # TODO: Possibly abstract into a decorator like: + # @as_authenticated_user('chris') + self.test_user = fixture_add_user() + + self.current_user = None + + self.login() + + def login(self, username=u'chris', password=u'toast'): + response = self.test_app.post( + '/auth/login/', { + 'username': username, + 'password': password}) + + response.follow() + + assert urlparse.urlsplit(response.location)[2] == '/' + assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT + + ctx = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html'] + + assert Session.merge(ctx['request'].user).username == username + + self.current_user = ctx['request'].user + + def logout(self): + self.test_app.get('/auth/logout/') + self.current_user = None + + @pytest.mark.parametrize('wants_email', [True, False]) + def test_comment_notification(self, wants_email): + ''' + Test + - if a notification is created when posting a comment on + another users media entry. + - that the comment data is consistent and exists. + + ''' + user = fixture_add_user('otherperson', password='nosreprehto', + wants_comment_notification=wants_email) + + user_id = user.id + + media_entry = fixture_media_entry(uploader=user.id, state=u'processed') + + media_entry_id = media_entry.id + + subscription = fixture_comment_subscription(media_entry) + + subscription_id = subscription.id + + media_uri_id = '/u/{0}/m/{1}/'.format(user.username, + media_entry.id) + media_uri_slug = '/u/{0}/m/{1}/'.format(user.username, + media_entry.slug) + + self.test_app.post( + media_uri_id + 'comment/add/', + { + 'comment_content': u'Test comment #42' + } + ) + + notifications = Notification.query.filter_by( + user_id=user.id).all() + + assert len(notifications) == 1 + + notification = notifications[0] + + assert type(notification) == CommentNotification + assert notification.seen == False + assert notification.user_id == user.id + assert notification.subject.get_author.id == self.test_user.id + assert notification.subject.content == u'Test comment #42' + + if wants_email == True: + assert mail.EMAIL_TEST_MBOX_INBOX == [ + {'from': 'notice@mediagoblin.example.org', + 'message': 'Content-Type: text/plain; \ +charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: \ +base64\nSubject: GNU MediaGoblin - chris commented on your \ +post\nFrom: notice@mediagoblin.example.org\nTo: \ +otherperson@example.com\n\nSGkgb3RoZXJwZXJzb24sCmNocmlzIGNvbW1lbnRlZCBvbiB5b3VyIHBvc3QgKGh0dHA6Ly9sb2Nh\nbGhvc3Q6ODAvdS9vdGhlcnBlcnNvbi9tL3NvbWUtdGl0bGUvYy8xLyNjb21tZW50KSBhdCBHTlUg\nTWVkaWFHb2JsaW4KClRlc3QgY29tbWVudCAjNDIKCkdOVSBNZWRpYUdvYmxpbg==\n', + 'to': [u'otherperson@example.com']}] + else: + assert mail.EMAIL_TEST_MBOX_INBOX == [] + + # Save the ids temporarily because of DetachedInstanceError + notification_id = notification.id + comment_id = notification.subject.id + + self.logout() + self.login('otherperson', 'nosreprehto') + + self.test_app.get(media_uri_slug + '/c/{0}/'.format(comment_id)) + + notification = Notification.query.filter_by(id=notification_id).first() + + assert notification.seen == True + + self.test_app.get(media_uri_slug + '/notifications/silence/') + + subscription = CommentSubscription.query.filter_by(id=subscription_id)\ + .first() + + assert subscription.notify == False + + notifications = Notification.query.filter_by( + user_id=user_id).all() + + # User should not have been notified + assert len(notifications) == 1 diff --git a/mediagoblin/tests/tools.py b/mediagoblin/tests/tools.py index 35c2c3e9..2584c62f 100644 --- a/mediagoblin/tests/tools.py +++ b/mediagoblin/tests/tools.py @@ -15,18 +15,17 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. -import sys import os import pkg_resources import shutil -from functools import wraps from paste.deploy import loadapp from webtest import TestApp from mediagoblin import mg_globals -from mediagoblin.db.models import User, MediaEntry, Collection +from mediagoblin.db.models import User, MediaEntry, Collection, MediaComment, \ + CommentSubscription, CommentNotification from mediagoblin.tools import testing from mediagoblin.init.config import read_mediagoblin_config from mediagoblin.db.base import Session @@ -171,7 +170,7 @@ def assert_db_meets_expected(db, expected): def fixture_add_user(username=u'chris', password=u'toast', - active_user=True): + active_user=True, wants_comment_notification=True): # Reuse existing user or create a new one test_user = User.query.filter_by(username=username).first() if test_user is None: @@ -184,6 +183,8 @@ def fixture_add_user(username=u'chris', password=u'toast', test_user.email_verified = True test_user.status = u'active' + test_user.wants_comment_notification = wants_comment_notification + test_user.save() # Reload @@ -195,19 +196,79 @@ def fixture_add_user(username=u'chris', password=u'toast', return test_user +def fixture_comment_subscription(entry, notify=True, send_email=None): + if send_email is None: + uploader = User.query.filter_by(id=entry.uploader).first() + send_email = uploader.wants_comment_notification + + cs = CommentSubscription( + media_entry_id=entry.id, + user_id=entry.uploader, + notify=notify, + send_email=send_email) + + cs.save() + + cs = CommentSubscription.query.filter_by(id=cs.id).first() + + Session.expunge(cs) + + return cs + + +def fixture_add_comment_notification(entry_id, subject_id, user_id, + seen=False): + cn = CommentNotification(user_id=user_id, + seen=seen, + subject_id=subject_id) + cn.save() + + cn = CommentNotification.query.filter_by(id=cn.id).first() + + Session.expunge(cn) + + return cn + + def fixture_media_entry(title=u"Some title", slug=None, - uploader=None, save=True, gen_slug=True): + uploader=None, save=True, gen_slug=True, + state=u'unprocessed', fake_upload=True, + expunge=True): + """ + Add a media entry for testing purposes. + + Caution: if you're adding multiple entries with fake_upload=True, + make sure you save between them... otherwise you'll hit an + IntegrityError from multiple newly-added-MediaEntries adding + FileKeynames at once. :) + """ + if uploader is None: + uploader = fixture_add_user().id + entry = MediaEntry() entry.title = title entry.slug = slug - entry.uploader = uploader or fixture_add_user().id + entry.uploader = uploader entry.media_type = u'image' + entry.state = state + + if fake_upload: + entry.media_files = {'thumb': ['a', 'b', 'c.jpg'], + 'medium': ['d', 'e', 'f.png'], + 'original': ['g', 'h', 'i.png']} + entry.media_type = u'mediagoblin.media_types.image' if gen_slug: entry.generate_slug() + if save: entry.save() + if expunge: + entry = MediaEntry.query.filter_by(id=entry.id).first() + + Session.expunge(entry) + return entry @@ -231,3 +292,25 @@ def fixture_add_collection(name=u"My first Collection", user=None): return coll +def fixture_add_comment(author=None, media_entry=None, comment=None): + if author is None: + author = fixture_add_user().id + + if media_entry is None: + media_entry = fixture_media_entry().id + + if comment is None: + comment = \ + 'Auto-generated test comment by user #{0} on media #{0}'.format( + author, media_entry) + + comment = MediaComment(author=author, + media_entry=media_entry, + content=comment) + + comment.save() + + Session.expunge(comment) + + return comment + |