aboutsummaryrefslogtreecommitdiffstats
path: root/mediagoblin/tests
diff options
context:
space:
mode:
Diffstat (limited to 'mediagoblin/tests')
-rw-r--r--mediagoblin/tests/appconfig_context_modified.ini5
-rw-r--r--mediagoblin/tests/appconfig_static_plugin.ini5
-rw-r--r--mediagoblin/tests/auth_configs/__init__.py0
-rw-r--r--mediagoblin/tests/auth_configs/authentication_disabled_appconfig.ini25
-rw-r--r--mediagoblin/tests/auth_configs/openid_appconfig.ini41
-rw-r--r--mediagoblin/tests/test_auth.py153
-rw-r--r--mediagoblin/tests/test_basic_auth.py59
-rw-r--r--mediagoblin/tests/test_celery_setup.py2
-rw-r--r--mediagoblin/tests/test_edit.py74
-rw-r--r--mediagoblin/tests/test_mgoblin_app.ini11
-rw-r--r--mediagoblin/tests/test_misc.py8
-rw-r--r--mediagoblin/tests/test_notifications.py151
-rw-r--r--mediagoblin/tests/test_openid.py373
-rw-r--r--mediagoblin/tests/test_submission.py14
-rw-r--r--mediagoblin/tests/tools.py101
15 files changed, 902 insertions, 120 deletions
diff --git a/mediagoblin/tests/appconfig_context_modified.ini b/mediagoblin/tests/appconfig_context_modified.ini
index 80ca69b1..cc6721f5 100644
--- a/mediagoblin/tests/appconfig_context_modified.ini
+++ b/mediagoblin/tests/appconfig_context_modified.ini
@@ -3,8 +3,9 @@ direct_remote_path = /test_static/
email_sender_address = "notice@mediagoblin.example.org"
email_debug_mode = true
-# TODO: Switch to using an in-memory database
-sql_engine = "sqlite:///%(here)s/user_dev/mediagoblin.db"
+#Runs with an in-memory sqlite db for speed.
+sql_engine = "sqlite://"
+run_migrations = true
# Celery shouldn't be set up by the application as it's setup via
# mediagoblin.init.celery.from_celery
diff --git a/mediagoblin/tests/appconfig_static_plugin.ini b/mediagoblin/tests/appconfig_static_plugin.ini
index dc251171..5ce5c5bd 100644
--- a/mediagoblin/tests/appconfig_static_plugin.ini
+++ b/mediagoblin/tests/appconfig_static_plugin.ini
@@ -3,8 +3,9 @@ direct_remote_path = /test_static/
email_sender_address = "notice@mediagoblin.example.org"
email_debug_mode = true
-# TODO: Switch to using an in-memory database
-sql_engine = "sqlite:///%(here)s/user_dev/mediagoblin.db"
+#Runs with an in-memory sqlite db for speed.
+sql_engine = "sqlite://"
+run_migrations = true
# Celery shouldn't be set up by the application as it's setup via
# mediagoblin.init.celery.from_celery
diff --git a/mediagoblin/tests/auth_configs/__init__.py b/mediagoblin/tests/auth_configs/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/mediagoblin/tests/auth_configs/__init__.py
diff --git a/mediagoblin/tests/auth_configs/authentication_disabled_appconfig.ini b/mediagoblin/tests/auth_configs/authentication_disabled_appconfig.ini
new file mode 100644
index 00000000..a64e9e40
--- /dev/null
+++ b/mediagoblin/tests/auth_configs/authentication_disabled_appconfig.ini
@@ -0,0 +1,25 @@
+[mediagoblin]
+direct_remote_path = /test_static/
+email_sender_address = "notice@mediagoblin.example.org"
+email_debug_mode = true
+
+# TODO: Switch to using an in-memory database
+sql_engine = "sqlite:///%(here)s/user_dev/mediagoblin.db"
+
+# Celery shouldn't be set up by the application as it's setup via
+# mediagoblin.init.celery.from_celery
+celery_setup_elsewhere = true
+
+[storage:publicstore]
+base_dir = %(here)s/user_dev/media/public
+base_url = /mgoblin_media/
+
+[storage:queuestore]
+base_dir = %(here)s/user_dev/media/queue
+
+[celery]
+CELERY_ALWAYS_EAGER = true
+CELERY_RESULT_DBURI = "sqlite:///%(here)s/user_dev/celery.db"
+BROKER_HOST = "sqlite:///%(here)s/user_dev/kombu.db"
+
+[plugins]
diff --git a/mediagoblin/tests/auth_configs/openid_appconfig.ini b/mediagoblin/tests/auth_configs/openid_appconfig.ini
new file mode 100644
index 00000000..c2bd82fd
--- /dev/null
+++ b/mediagoblin/tests/auth_configs/openid_appconfig.ini
@@ -0,0 +1,41 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+[mediagoblin]
+direct_remote_path = /test_static/
+email_sender_address = "notice@mediagoblin.example.org"
+email_debug_mode = true
+
+# TODO: Switch to using an in-memory database
+sql_engine = "sqlite:///%(here)s/user_dev/mediagoblin.db"
+
+# Celery shouldn't be set up by the application as it's setup via
+# mediagoblin.init.celery.from_celery
+celery_setup_elsewhere = true
+
+[storage:publicstore]
+base_dir = %(here)s/user_dev/media/public
+base_url = /mgoblin_media/
+
+[storage:queuestore]
+base_dir = %(here)s/user_dev/media/queue
+
+[celery]
+CELERY_ALWAYS_EAGER = true
+CELERY_RESULT_DBURI = "sqlite:///%(here)s/user_dev/celery.db"
+BROKER_HOST = "sqlite:///%(here)s/user_dev/kombu.db"
+
+[plugins]
+[[mediagoblin.plugins.openid]]
diff --git a/mediagoblin/tests/test_auth.py b/mediagoblin/tests/test_auth.py
index 755727f9..61503d32 100644
--- a/mediagoblin/tests/test_auth.py
+++ b/mediagoblin/tests/test_auth.py
@@ -13,54 +13,15 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
import urlparse
-import datetime
+import pkg_resources
+import pytest
from mediagoblin import mg_globals
-from mediagoblin.auth import lib as auth_lib
from mediagoblin.db.models import User
-from mediagoblin.tests.tools import fixture_add_user
+from mediagoblin.tests.tools import get_app, fixture_add_user
from mediagoblin.tools import template, mail
-
-
-########################
-# Test bcrypt auth funcs
-########################
-
-def test_bcrypt_check_password():
- # Check known 'lollerskates' password against check function
- assert auth_lib.bcrypt_check_password(
- 'lollerskates',
- '$2a$12$PXU03zfrVCujBhVeICTwtOaHTUs5FFwsscvSSTJkqx/2RQ0Lhy/nO')
-
- assert not auth_lib.bcrypt_check_password(
- 'notthepassword',
- '$2a$12$PXU03zfrVCujBhVeICTwtOaHTUs5FFwsscvSSTJkqx/2RQ0Lhy/nO')
-
- # Same thing, but with extra fake salt.
- assert not auth_lib.bcrypt_check_password(
- 'notthepassword',
- '$2a$12$ELVlnw3z1FMu6CEGs/L8XO8vl0BuWSlUHgh0rUrry9DUXGMUNWwl6',
- '3><7R45417')
-
-
-def test_bcrypt_gen_password_hash():
- pw = 'youwillneverguessthis'
-
- # Normal password hash generation, and check on that hash
- hashed_pw = auth_lib.bcrypt_gen_password_hash(pw)
- assert auth_lib.bcrypt_check_password(
- pw, hashed_pw)
- assert not auth_lib.bcrypt_check_password(
- 'notthepassword', hashed_pw)
-
- # Same thing, extra salt.
- hashed_pw = auth_lib.bcrypt_gen_password_hash(pw, '3><7R45417')
- assert auth_lib.bcrypt_check_password(
- pw, hashed_pw, '3><7R45417')
- assert not auth_lib.bcrypt_check_password(
- 'notthepassword', hashed_pw, '3><7R45417')
+from mediagoblin.auth import tools as auth_tools
def test_register_views(test_app):
@@ -132,8 +93,8 @@ def test_register_views(test_app):
assert 'mediagoblin/user_pages/user.html' in template.TEMPLATE_TEST_CONTEXT
## Make sure user is in place
- new_user = mg_globals.database.User.find_one(
- {'username': u'happygirl'})
+ new_user = mg_globals.database.User.query.filter_by(
+ username=u'happygirl').first()
assert new_user
assert new_user.status == u'needs_email_verification'
assert new_user.email_verified == False
@@ -156,24 +117,19 @@ def test_register_views(test_app):
assert path == u'/auth/verify_email/'
parsed_get_params = urlparse.parse_qs(get_params)
- ### user should have these same parameters
- assert parsed_get_params['userid'] == [
- unicode(new_user.id)]
- assert parsed_get_params['token'] == [
- new_user.verification_key]
-
## Try verifying with bs verification key, shouldn't work
template.clear_test_template_context()
response = test_app.get(
- "/auth/verify_email/?userid=%s&token=total_bs" % unicode(
- new_user.id))
+ "/auth/verify_email/?token=total_bs")
response.follow()
- context = template.TEMPLATE_TEST_CONTEXT[
- 'mediagoblin/user_pages/user.html']
+
+ # Correct redirect?
+ assert urlparse.urlsplit(response.location)[2] == '/'
+
# assert context['verification_successful'] == True
# TODO: Would be good to test messages here when we can do so...
- new_user = mg_globals.database.User.find_one(
- {'username': u'happygirl'})
+ new_user = mg_globals.database.User.query.filter_by(
+ username=u'happygirl').first()
assert new_user
assert new_user.status == u'needs_email_verification'
assert new_user.email_verified == False
@@ -186,8 +142,8 @@ def test_register_views(test_app):
'mediagoblin/user_pages/user.html']
# assert context['verification_successful'] == True
# TODO: Would be good to test messages here when we can do so...
- new_user = mg_globals.database.User.find_one(
- {'username': u'happygirl'})
+ new_user = mg_globals.database.User.query.filter_by(
+ username=u'happygirl').first()
assert new_user
assert new_user.status == u'active'
assert new_user.email_verified == True
@@ -233,35 +189,17 @@ def test_register_views(test_app):
path = urlparse.urlsplit(email_context['verification_url'])[2]
get_params = urlparse.urlsplit(email_context['verification_url'])[3]
- assert path == u'/auth/forgot_password/verify/'
parsed_get_params = urlparse.parse_qs(get_params)
-
- # user should have matching parameters
- new_user = mg_globals.database.User.find_one({'username': u'happygirl'})
- assert parsed_get_params['userid'] == [unicode(new_user.id)]
- assert parsed_get_params['token'] == [new_user.fp_verification_key]
-
- ### The forgotten password token should be set to expire in ~ 10 days
- # A few ticks have expired so there are only 9 full days left...
- assert (new_user.fp_token_expire - datetime.datetime.now()).days == 9
+ assert path == u'/auth/forgot_password/verify/'
## Try using a bs password-changing verification key, shouldn't work
template.clear_test_template_context()
response = test_app.get(
- "/auth/forgot_password/verify/?userid=%s&token=total_bs" % unicode(
- new_user.id), status=404)
- assert response.status.split()[0] == u'404' # status="404 NOT FOUND"
+ "/auth/forgot_password/verify/?token=total_bs")
+ response.follow()
- ## Try using an expired token to change password, shouldn't work
- template.clear_test_template_context()
- new_user = mg_globals.database.User.find_one({'username': u'happygirl'})
- real_token_expiration = new_user.fp_token_expire
- new_user.fp_token_expire = datetime.datetime.now()
- new_user.save()
- response = test_app.get("%s?%s" % (path, get_params), status=404)
- assert response.status.split()[0] == u'404' # status="404 NOT FOUND"
- new_user.fp_token_expire = real_token_expiration
- new_user.save()
+ # Correct redirect?
+ assert urlparse.urlsplit(response.location)[2] == '/'
## Verify step 1 of password-change works -- can see form to change password
template.clear_test_template_context()
@@ -272,7 +210,6 @@ def test_register_views(test_app):
template.clear_test_template_context()
response = test_app.post(
'/auth/forgot_password/verify/', {
- 'userid': parsed_get_params['userid'],
'password': 'iamveryveryhappy',
'token': parsed_get_params['token']})
response.follow()
@@ -298,6 +235,7 @@ def test_authentication_views(test_app):
# Make a new user
test_user = fixture_add_user(active_user=False)
+
# Get login
# ---------
test_app.get('/auth/login/')
@@ -310,7 +248,6 @@ def test_authentication_views(test_app):
context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/login.html']
form = context['login_form']
assert form.username.errors == [u'This field is required.']
- assert form.password.errors == [u'This field is required.']
# Failed login - blank user
# -------------------------
@@ -328,9 +265,7 @@ def test_authentication_views(test_app):
response = test_app.post(
'/auth/login/', {
'username': u'chris'})
- context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/login.html']
- form = context['login_form']
- assert form.password.errors == [u'This field is required.']
+ assert 'mediagoblin/auth/login.html' in template.TEMPLATE_TEST_CONTEXT
# Failed login - bad user
# -----------------------
@@ -394,3 +329,47 @@ def test_authentication_views(test_app):
'password': 'toast',
'next' : '/u/chris/'})
assert urlparse.urlsplit(response.location)[2] == '/u/chris/'
+
+
+@pytest.fixture()
+def authentication_disabled_app(request):
+ return get_app(
+ request,
+ mgoblin_config=pkg_resources.resource_filename(
+ 'mediagoblin.tests.auth_configs',
+ 'authentication_disabled_appconfig.ini'))
+
+
+def test_authentication_disabled_app(authentication_disabled_app):
+ # app.auth should = false
+ assert mg_globals.app.auth is False
+
+ # Try to visit register page
+ template.clear_test_template_context()
+ response = authentication_disabled_app.get('/auth/register/')
+ response.follow()
+
+ # Correct redirect?
+ assert urlparse.urlsplit(response.location)[2] == '/'
+ assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT
+
+ # Try to vist login page
+ template.clear_test_template_context()
+ response = authentication_disabled_app.get('/auth/login/')
+ response.follow()
+
+ # Correct redirect?
+ assert urlparse.urlsplit(response.location)[2] == '/'
+ assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT
+
+ ## Test check_login_simple should return None
+ assert auth_tools.check_login_simple('test', 'simple') is None
+
+ # Try to visit the forgot password page
+ template.clear_test_template_context()
+ response = authentication_disabled_app.get('/auth/register/')
+ response.follow()
+
+ # Correct redirect?
+ assert urlparse.urlsplit(response.location)[2] == '/'
+ assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT
diff --git a/mediagoblin/tests/test_basic_auth.py b/mediagoblin/tests/test_basic_auth.py
new file mode 100644
index 00000000..cdd80fca
--- /dev/null
+++ b/mediagoblin/tests/test_basic_auth.py
@@ -0,0 +1,59 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+from mediagoblin.plugins.basic_auth import tools as auth_tools
+from mediagoblin.tools.testing import _activate_testing
+
+_activate_testing()
+
+
+########################
+# Test bcrypt auth funcs
+########################
+
+
+def test_bcrypt_check_password():
+ # Check known 'lollerskates' password against check function
+ assert auth_tools.bcrypt_check_password(
+ 'lollerskates',
+ '$2a$12$PXU03zfrVCujBhVeICTwtOaHTUs5FFwsscvSSTJkqx/2RQ0Lhy/nO')
+
+ assert not auth_tools.bcrypt_check_password(
+ 'notthepassword',
+ '$2a$12$PXU03zfrVCujBhVeICTwtOaHTUs5FFwsscvSSTJkqx/2RQ0Lhy/nO')
+
+ # Same thing, but with extra fake salt.
+ assert not auth_tools.bcrypt_check_password(
+ 'notthepassword',
+ '$2a$12$ELVlnw3z1FMu6CEGs/L8XO8vl0BuWSlUHgh0rUrry9DUXGMUNWwl6',
+ '3><7R45417')
+
+
+def test_bcrypt_gen_password_hash():
+ pw = 'youwillneverguessthis'
+
+ # Normal password hash generation, and check on that hash
+ hashed_pw = auth_tools.bcrypt_gen_password_hash(pw)
+ assert auth_tools.bcrypt_check_password(
+ pw, hashed_pw)
+ assert not auth_tools.bcrypt_check_password(
+ 'notthepassword', hashed_pw)
+
+ # Same thing, extra salt.
+ hashed_pw = auth_tools.bcrypt_gen_password_hash(pw, '3><7R45417')
+ assert auth_tools.bcrypt_check_password(
+ pw, hashed_pw, '3><7R45417')
+ assert not auth_tools.bcrypt_check_password(
+ 'notthepassword', hashed_pw, '3><7R45417')
diff --git a/mediagoblin/tests/test_celery_setup.py b/mediagoblin/tests/test_celery_setup.py
index 5530c6f2..0184436a 100644
--- a/mediagoblin/tests/test_celery_setup.py
+++ b/mediagoblin/tests/test_celery_setup.py
@@ -48,7 +48,7 @@ def test_setup_celery_from_config():
assert isinstance(fake_celery_module.CELERYD_ETA_SCHEDULER_PRECISION, float)
assert fake_celery_module.CELERY_RESULT_PERSISTENT is True
assert fake_celery_module.CELERY_IMPORTS == [
- 'foo.bar.baz', 'this.is.an.import', 'mediagoblin.processing.task']
+ 'foo.bar.baz', 'this.is.an.import', 'mediagoblin.processing.task', 'mediagoblin.notifications.task']
assert fake_celery_module.CELERY_RESULT_BACKEND == 'database'
assert fake_celery_module.CELERY_RESULT_DBURI == (
'sqlite:///' +
diff --git a/mediagoblin/tests/test_edit.py b/mediagoblin/tests/test_edit.py
index 08b4f8cf..d70d0478 100644
--- a/mediagoblin/tests/test_edit.py
+++ b/mediagoblin/tests/test_edit.py
@@ -15,13 +15,13 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import urlparse
-import pytest
from mediagoblin import mg_globals
from mediagoblin.db.models import User
from mediagoblin.tests.tools import fixture_add_user
-from mediagoblin.tools import template
-from mediagoblin.auth.lib import bcrypt_check_password
+from mediagoblin import auth
+from mediagoblin.tools import template, mail
+
class TestUserEdit(object):
def setup(self):
@@ -74,7 +74,7 @@ class TestUserEdit(object):
# test_user has to be fetched again in order to have the current values
test_user = User.query.filter_by(username=u'chris').first()
- assert bcrypt_check_password('123456', test_user.pw_hash)
+ assert auth.check_password('123456', test_user.pw_hash)
# Update current user passwd
self.user_password = '123456'
@@ -88,7 +88,7 @@ class TestUserEdit(object):
})
test_user = User.query.filter_by(username=u'chris').first()
- assert not bcrypt_check_password('098765', test_user.pw_hash)
+ assert not auth.check_password('098765', test_user.pw_hash)
def test_change_bio_url(self, test_app):
@@ -141,4 +141,68 @@ class TestUserEdit(object):
assert form.url.errors == [
u'This address contains errors']
+ def test_email_change(self, test_app):
+ self.login(test_app)
+
+ # Test email already in db
+ template.clear_test_template_context()
+ test_app.post(
+ '/edit/account/', {
+ 'new_email': 'chris@example.com',
+ 'password': 'toast'})
+
+ # Check form errors
+ context = template.TEMPLATE_TEST_CONTEXT[
+ 'mediagoblin/edit/edit_account.html']
+ assert context['form'].new_email.errors == [
+ u'Sorry, a user with that email address already exists.']
+
+ # Test successful email change
+ template.clear_test_template_context()
+ res = test_app.post(
+ '/edit/account/', {
+ 'new_email': 'new@example.com',
+ 'password': 'toast'})
+ res.follow()
+
+ # Correct redirect?
+ assert urlparse.urlsplit(res.location)[2] == '/u/chris/'
+
+ # Make sure we get email verification and try verifying
+ assert len(mail.EMAIL_TEST_INBOX) == 1
+ message = mail.EMAIL_TEST_INBOX.pop()
+ assert message['To'] == 'new@example.com'
+ email_context = template.TEMPLATE_TEST_CONTEXT[
+ 'mediagoblin/edit/verification.txt']
+ assert email_context['verification_url'] in \
+ message.get_payload(decode=True)
+
+ path = urlparse.urlsplit(email_context['verification_url'])[2]
+ assert path == u'/edit/verify_email/'
+
+ ## Try verifying with bs verification key, shouldn't work
+ template.clear_test_template_context()
+ res = test_app.get(
+ "/edit/verify_email/?token=total_bs")
+ res.follow()
+
+ # Correct redirect?
+ assert urlparse.urlsplit(res.location)[2] == '/'
+
+ # Email shouldn't be saved
+ email_in_db = mg_globals.database.User.query.filter_by(
+ email='new@example.com').first()
+ email = User.query.filter_by(username='chris').first().email
+ assert email_in_db is None
+ assert email == 'chris@example.com'
+
+ # Verify email activation works
+ template.clear_test_template_context()
+ get_params = urlparse.urlsplit(email_context['verification_url'])[3]
+ res = test_app.get('%s?%s' % (path, get_params))
+ res.follow()
+
+ # New email saved?
+ email = User.query.filter_by(username='chris').first().email
+ assert email == 'new@example.com'
# test changing the url inproperly
diff --git a/mediagoblin/tests/test_mgoblin_app.ini b/mediagoblin/tests/test_mgoblin_app.ini
index 0466b53b..535cf1c1 100644
--- a/mediagoblin/tests/test_mgoblin_app.ini
+++ b/mediagoblin/tests/test_mgoblin_app.ini
@@ -3,8 +3,9 @@ direct_remote_path = /test_static/
email_sender_address = "notice@mediagoblin.example.org"
email_debug_mode = true
-# TODO: Switch to using an in-memory database
-sql_engine = "sqlite:///%(here)s/user_dev/mediagoblin.db"
+#Runs with an in-memory sqlite db for speed.
+sql_engine = "sqlite://"
+run_migrations = true
# tag parsing
tags_max_length = 50
@@ -12,8 +13,6 @@ tags_max_length = 50
# So we can start to test attachments:
allow_attachments = True
-media_types = mediagoblin.media_types.image, mediagoblin.media_types.pdf
-
[storage:publicstore]
base_dir = %(here)s/user_dev/media/public
base_url = /mgoblin_media/
@@ -31,3 +30,7 @@ BROKER_HOST = "sqlite:///%(here)s/user_dev/kombu.db"
[[mediagoblin.plugins.oauth]]
[[mediagoblin.plugins.httpapiauth]]
[[mediagoblin.plugins.piwigo]]
+[[mediagoblin.plugins.basic_auth]]
+[[mediagoblin.plugins.openid]]
+[[mediagoblin.media_types.image]]
+[[mediagoblin.media_types.pdf]]
diff --git a/mediagoblin/tests/test_misc.py b/mediagoblin/tests/test_misc.py
index 755d863f..43ad0b6d 100644
--- a/mediagoblin/tests/test_misc.py
+++ b/mediagoblin/tests/test_misc.py
@@ -28,8 +28,10 @@ def test_user_deletes_other_comments(test_app):
user_a = fixture_add_user(u"chris_a")
user_b = fixture_add_user(u"chris_b")
- media_a = fixture_media_entry(uploader=user_a.id, save=False)
- media_b = fixture_media_entry(uploader=user_b.id, save=False)
+ media_a = fixture_media_entry(uploader=user_a.id, save=False,
+ expunge=False, fake_upload=False)
+ media_b = fixture_media_entry(uploader=user_b.id, save=False,
+ expunge=False, fake_upload=False)
Session.add(media_a)
Session.add(media_b)
Session.flush()
@@ -79,7 +81,7 @@ def test_user_deletes_other_comments(test_app):
def test_media_deletes_broken_attachment(test_app):
user_a = fixture_add_user(u"chris_a")
- media = fixture_media_entry(uploader=user_a.id, save=False)
+ media = fixture_media_entry(uploader=user_a.id, save=False, expunge=False)
media.attachment_files.append(dict(
name=u"some name",
filepath=[u"does", u"not", u"exist"],
diff --git a/mediagoblin/tests/test_notifications.py b/mediagoblin/tests/test_notifications.py
new file mode 100644
index 00000000..d52b8d5a
--- /dev/null
+++ b/mediagoblin/tests/test_notifications.py
@@ -0,0 +1,151 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import pytest
+
+import urlparse
+
+from mediagoblin.tools import template, mail
+
+from mediagoblin.db.models import Notification, CommentNotification, \
+ CommentSubscription
+from mediagoblin.db.base import Session
+
+from mediagoblin.notifications import mark_comment_notification_seen
+
+from mediagoblin.tests.tools import fixture_add_comment, \
+ fixture_media_entry, fixture_add_user, \
+ fixture_comment_subscription
+
+
+class TestNotifications:
+ @pytest.fixture(autouse=True)
+ def setup(self, test_app):
+ self.test_app = test_app
+
+ # TODO: Possibly abstract into a decorator like:
+ # @as_authenticated_user('chris')
+ self.test_user = fixture_add_user()
+
+ self.current_user = None
+
+ self.login()
+
+ def login(self, username=u'chris', password=u'toast'):
+ response = self.test_app.post(
+ '/auth/login/', {
+ 'username': username,
+ 'password': password})
+
+ response.follow()
+
+ assert urlparse.urlsplit(response.location)[2] == '/'
+ assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT
+
+ ctx = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html']
+
+ assert Session.merge(ctx['request'].user).username == username
+
+ self.current_user = ctx['request'].user
+
+ def logout(self):
+ self.test_app.get('/auth/logout/')
+ self.current_user = None
+
+ @pytest.mark.parametrize('wants_email', [True, False])
+ def test_comment_notification(self, wants_email):
+ '''
+ Test
+ - if a notification is created when posting a comment on
+ another users media entry.
+ - that the comment data is consistent and exists.
+
+ '''
+ user = fixture_add_user('otherperson', password='nosreprehto',
+ wants_comment_notification=wants_email)
+
+ user_id = user.id
+
+ media_entry = fixture_media_entry(uploader=user.id, state=u'processed')
+
+ media_entry_id = media_entry.id
+
+ subscription = fixture_comment_subscription(media_entry)
+
+ subscription_id = subscription.id
+
+ media_uri_id = '/u/{0}/m/{1}/'.format(user.username,
+ media_entry.id)
+ media_uri_slug = '/u/{0}/m/{1}/'.format(user.username,
+ media_entry.slug)
+
+ self.test_app.post(
+ media_uri_id + 'comment/add/',
+ {
+ 'comment_content': u'Test comment #42'
+ }
+ )
+
+ notifications = Notification.query.filter_by(
+ user_id=user.id).all()
+
+ assert len(notifications) == 1
+
+ notification = notifications[0]
+
+ assert type(notification) == CommentNotification
+ assert notification.seen == False
+ assert notification.user_id == user.id
+ assert notification.subject.get_author.id == self.test_user.id
+ assert notification.subject.content == u'Test comment #42'
+
+ if wants_email == True:
+ assert mail.EMAIL_TEST_MBOX_INBOX == [
+ {'from': 'notice@mediagoblin.example.org',
+ 'message': 'Content-Type: text/plain; \
+charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: \
+base64\nSubject: GNU MediaGoblin - chris commented on your \
+post\nFrom: notice@mediagoblin.example.org\nTo: \
+otherperson@example.com\n\nSGkgb3RoZXJwZXJzb24sCmNocmlzIGNvbW1lbnRlZCBvbiB5b3VyIHBvc3QgKGh0dHA6Ly9sb2Nh\nbGhvc3Q6ODAvdS9vdGhlcnBlcnNvbi9tL3NvbWUtdGl0bGUvYy8xLyNjb21tZW50KSBhdCBHTlUg\nTWVkaWFHb2JsaW4KClRlc3QgY29tbWVudCAjNDIKCkdOVSBNZWRpYUdvYmxpbg==\n',
+ 'to': [u'otherperson@example.com']}]
+ else:
+ assert mail.EMAIL_TEST_MBOX_INBOX == []
+
+ # Save the ids temporarily because of DetachedInstanceError
+ notification_id = notification.id
+ comment_id = notification.subject.id
+
+ self.logout()
+ self.login('otherperson', 'nosreprehto')
+
+ self.test_app.get(media_uri_slug + '/c/{0}/'.format(comment_id))
+
+ notification = Notification.query.filter_by(id=notification_id).first()
+
+ assert notification.seen == True
+
+ self.test_app.get(media_uri_slug + '/notifications/silence/')
+
+ subscription = CommentSubscription.query.filter_by(id=subscription_id)\
+ .first()
+
+ assert subscription.notify == False
+
+ notifications = Notification.query.filter_by(
+ user_id=user_id).all()
+
+ # User should not have been notified
+ assert len(notifications) == 1
diff --git a/mediagoblin/tests/test_openid.py b/mediagoblin/tests/test_openid.py
new file mode 100644
index 00000000..23a2290e
--- /dev/null
+++ b/mediagoblin/tests/test_openid.py
@@ -0,0 +1,373 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import urlparse
+import pkg_resources
+import pytest
+import mock
+
+openid_consumer = pytest.importorskip(
+ "openid.consumer.consumer")
+
+from mediagoblin import mg_globals
+from mediagoblin.db.base import Session
+from mediagoblin.db.models import User
+from mediagoblin.plugins.openid.models import OpenIDUserURL
+from mediagoblin.tests.tools import get_app, fixture_add_user
+from mediagoblin.tools import template
+
+# App with plugin enabled
+@pytest.fixture()
+def openid_plugin_app(request):
+ return get_app(
+ request,
+ mgoblin_config=pkg_resources.resource_filename(
+ 'mediagoblin.tests.auth_configs',
+ 'openid_appconfig.ini'))
+
+
+class TestOpenIDPlugin(object):
+ def _setup(self, openid_plugin_app, value=True, edit=False, delete=False):
+ if value:
+ response = openid_consumer.SuccessResponse(mock.Mock(), mock.Mock())
+ if edit or delete:
+ response.identity_url = u'http://add.myopenid.com'
+ else:
+ response.identity_url = u'http://real.myopenid.com'
+ self._finish_verification = mock.Mock(return_value=response)
+ else:
+ self._finish_verification = mock.Mock(return_value=False)
+
+ @mock.patch('mediagoblin.plugins.openid.views._response_email', mock.Mock(return_value=None))
+ @mock.patch('mediagoblin.plugins.openid.views._response_nickname', mock.Mock(return_value=None))
+ @mock.patch('mediagoblin.plugins.openid.views._finish_verification', self._finish_verification)
+ def _setup_start(self, openid_plugin_app, edit, delete):
+ if edit:
+ self._start_verification = mock.Mock(return_value=openid_plugin_app.post(
+ '/edit/openid/finish/'))
+ elif delete:
+ self._start_verification = mock.Mock(return_value=openid_plugin_app.post(
+ '/edit/openid/delete/finish/'))
+ else:
+ self._start_verification = mock.Mock(return_value=openid_plugin_app.post(
+ '/auth/openid/login/finish/'))
+ _setup_start(self, openid_plugin_app, edit, delete)
+
+ def test_bad_login(self, openid_plugin_app):
+ """ Test that attempts to login with invalid paramaters"""
+
+ # Test GET request for auth/register page
+ res = openid_plugin_app.get('/auth/register/').follow()
+
+ # Make sure it redirected to the correct place
+ assert urlparse.urlsplit(res.location)[2] == '/auth/openid/login/'
+
+ # Test GET request for auth/login page
+ res = openid_plugin_app.get('/auth/login/')
+ res.follow()
+
+ # Correct redirect?
+ assert urlparse.urlsplit(res.location)[2] == '/auth/openid/login/'
+
+ # Test GET request for auth/openid/register page
+ res = openid_plugin_app.get('/auth/openid/register/')
+ res.follow()
+
+ # Correct redirect?
+ assert urlparse.urlsplit(res.location)[2] == '/auth/openid/login/'
+
+ # Test GET request for auth/openid/login/finish page
+ res = openid_plugin_app.get('/auth/openid/login/finish/')
+ res.follow()
+
+ # Correct redirect?
+ assert urlparse.urlsplit(res.location)[2] == '/auth/openid/login/'
+
+ # Test GET request for auth/openid/login page
+ res = openid_plugin_app.get('/auth/openid/login/')
+
+ # Correct place?
+ assert 'mediagoblin/plugins/openid/login.html' in template.TEMPLATE_TEST_CONTEXT
+
+ # Try to login with an empty form
+ template.clear_test_template_context()
+ openid_plugin_app.post(
+ '/auth/openid/login/', {})
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/plugins/openid/login.html']
+ form = context['login_form']
+ assert form.openid.errors == [u'This field is required.']
+
+ # Try to login with wrong form values
+ template.clear_test_template_context()
+ openid_plugin_app.post(
+ '/auth/openid/login/', {
+ 'openid': 'not_a_url.com'})
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/plugins/openid/login.html']
+ form = context['login_form']
+ assert form.openid.errors == [u'Please enter a valid url.']
+
+ # Should be no users in the db
+ assert User.query.count() == 0
+
+ # Phony OpenID URl
+ template.clear_test_template_context()
+ openid_plugin_app.post(
+ '/auth/openid/login/', {
+ 'openid': 'http://phoney.myopenid.com/'})
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/plugins/openid/login.html']
+ form = context['login_form']
+ assert form.openid.errors == [u'Sorry, the OpenID server could not be found']
+
+ def test_login(self, openid_plugin_app):
+ """Tests that test login and registion with openid"""
+ # Test finish_login redirects correctly when response = False
+ self._setup(openid_plugin_app, False)
+
+ @mock.patch('mediagoblin.plugins.openid.views._finish_verification', self._finish_verification)
+ @mock.patch('mediagoblin.plugins.openid.views._start_verification', self._start_verification)
+ def _test_non_response():
+ template.clear_test_template_context()
+ res = openid_plugin_app.post(
+ '/auth/openid/login/', {
+ 'openid': 'http://phoney.myopenid.com/'})
+ res.follow()
+
+ # Correct Place?
+ assert urlparse.urlsplit(res.location)[2] == '/auth/openid/login/'
+ assert 'mediagoblin/plugins/openid/login.html' in template.TEMPLATE_TEST_CONTEXT
+ _test_non_response()
+
+ # Test login with new openid
+ # Need to clear_test_template_context before calling _setup
+ template.clear_test_template_context()
+ self._setup(openid_plugin_app)
+
+ @mock.patch('mediagoblin.plugins.openid.views._finish_verification', self._finish_verification)
+ @mock.patch('mediagoblin.plugins.openid.views._start_verification', self._start_verification)
+ def _test_new_user():
+ openid_plugin_app.post(
+ '/auth/openid/login/', {
+ 'openid': u'http://real.myopenid.com'})
+
+ # Right place?
+ assert 'mediagoblin/auth/register.html' in template.TEMPLATE_TEST_CONTEXT
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html']
+ register_form = context['register_form']
+
+ # Register User
+ res = openid_plugin_app.post(
+ '/auth/openid/register/', {
+ 'openid': register_form.openid.data,
+ 'username': u'chris',
+ 'email': u'chris@example.com'})
+ res.follow()
+
+ # Correct place?
+ assert urlparse.urlsplit(res.location)[2] == '/u/chris/'
+ assert 'mediagoblin/user_pages/user.html' in template.TEMPLATE_TEST_CONTEXT
+
+ # No need to test if user is in logged in and verification email
+ # awaits, since openid uses the register_user function which is
+ # tested in test_auth
+
+ # Logout User
+ openid_plugin_app.get('/auth/logout')
+
+ # Get user and detach from session
+ test_user = mg_globals.database.User.query.filter_by(
+ username=u'chris').first()
+ Session.expunge(test_user)
+
+ # Log back in
+ # Could not get it to work by 'POST'ing to /auth/openid/login/
+ template.clear_test_template_context()
+ res = openid_plugin_app.post(
+ '/auth/openid/login/finish/', {
+ 'openid': u'http://real.myopenid.com'})
+ res.follow()
+
+ assert urlparse.urlsplit(res.location)[2] == '/'
+ assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT
+
+ # Make sure user is in the session
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html']
+ session = context['request'].session
+ assert session['user_id'] == unicode(test_user.id)
+
+ _test_new_user()
+
+ # Test register with empty form
+ template.clear_test_template_context()
+ openid_plugin_app.post(
+ '/auth/openid/register/', {})
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html']
+ register_form = context['register_form']
+
+ assert register_form.openid.errors == [u'This field is required.']
+ assert register_form.email.errors == [u'This field is required.']
+ assert register_form.username.errors == [u'This field is required.']
+
+ # Try to register with existing username and email
+ template.clear_test_template_context()
+ openid_plugin_app.post(
+ '/auth/openid/register/', {
+ 'openid': 'http://real.myopenid.com',
+ 'email': 'chris@example.com',
+ 'username': 'chris'})
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html']
+ register_form = context['register_form']
+
+ assert register_form.username.errors == [u'Sorry, a user with that name already exists.']
+ assert register_form.email.errors == [u'Sorry, a user with that email address already exists.']
+ assert register_form.openid.errors == [u'Sorry, an account is already registered to that OpenID.']
+
+ def test_add_delete(self, openid_plugin_app):
+ """Test adding and deleting openids"""
+ # Add user
+ test_user = fixture_add_user(password='')
+ openid = OpenIDUserURL()
+ openid.openid_url = 'http://real.myopenid.com'
+ openid.user_id = test_user.id
+ openid.save()
+
+ # Log user in
+ template.clear_test_template_context()
+ self._setup(openid_plugin_app)
+
+ @mock.patch('mediagoblin.plugins.openid.views._finish_verification', self._finish_verification)
+ @mock.patch('mediagoblin.plugins.openid.views._start_verification', self._start_verification)
+ def _login_user():
+ openid_plugin_app.post(
+ '/auth/openid/login/finish/', {
+ 'openid': u'http://real.myopenid.com'})
+
+ _login_user()
+
+ # Try and delete only OpenID url
+ template.clear_test_template_context()
+ res = openid_plugin_app.post(
+ '/edit/openid/delete/', {
+ 'openid': 'http://real.myopenid.com'})
+ assert 'mediagoblin/plugins/openid/delete.html' in template.TEMPLATE_TEST_CONTEXT
+
+ # Add OpenID to user
+ # Empty form
+ template.clear_test_template_context()
+ res = openid_plugin_app.post(
+ '/edit/openid/', {})
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/plugins/openid/add.html']
+ form = context['form']
+ assert form.openid.errors == [u'This field is required.']
+
+ # Try with a bad url
+ template.clear_test_template_context()
+ openid_plugin_app.post(
+ '/edit/openid/', {
+ 'openid': u'not_a_url.com'})
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/plugins/openid/add.html']
+ form = context['form']
+ assert form.openid.errors == [u'Please enter a valid url.']
+
+ # Try with a url that's already registered
+ template.clear_test_template_context()
+ openid_plugin_app.post(
+ '/edit/openid/', {
+ 'openid': 'http://real.myopenid.com'})
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/plugins/openid/add.html']
+ form = context['form']
+ assert form.openid.errors == [u'Sorry, an account is already registered to that OpenID.']
+
+ # Test adding openid to account
+ # Need to clear_test_template_context before calling _setup
+ template.clear_test_template_context()
+ self._setup(openid_plugin_app, edit=True)
+
+ # Need to remove openid_url from db because it was added at setup
+ openid = OpenIDUserURL.query.filter_by(
+ openid_url=u'http://add.myopenid.com')
+ openid.delete()
+
+ @mock.patch('mediagoblin.plugins.openid.views._finish_verification', self._finish_verification)
+ @mock.patch('mediagoblin.plugins.openid.views._start_verification', self._start_verification)
+ def _test_add():
+ # Successful add
+ template.clear_test_template_context()
+ res = openid_plugin_app.post(
+ '/edit/openid/', {
+ 'openid': u'http://add.myopenid.com'})
+ res.follow()
+
+ # Correct place?
+ assert urlparse.urlsplit(res.location)[2] == '/edit/account/'
+ assert 'mediagoblin/edit/edit_account.html' in template.TEMPLATE_TEST_CONTEXT
+
+ # OpenID Added?
+ new_openid = mg_globals.database.OpenIDUserURL.query.filter_by(
+ openid_url=u'http://add.myopenid.com').first()
+ assert new_openid
+
+ _test_add()
+
+ # Test deleting openid from account
+ # Need to clear_test_template_context before calling _setup
+ template.clear_test_template_context()
+ self._setup(openid_plugin_app, delete=True)
+
+ # Need to add OpenID back to user because it was deleted during
+ # patch
+ openid = OpenIDUserURL()
+ openid.openid_url = 'http://add.myopenid.com'
+ openid.user_id = test_user.id
+ openid.save()
+
+ @mock.patch('mediagoblin.plugins.openid.views._finish_verification', self._finish_verification)
+ @mock.patch('mediagoblin.plugins.openid.views._start_verification', self._start_verification)
+ def _test_delete(self, test_user):
+ # Delete openid from user
+ # Create another user to test deleting OpenID that doesn't belong to them
+ new_user = fixture_add_user(username='newman')
+ openid = OpenIDUserURL()
+ openid.openid_url = 'http://realfake.myopenid.com/'
+ openid.user_id = new_user.id
+ openid.save()
+
+ # Try and delete OpenID url that isn't the users
+ template.clear_test_template_context()
+ res = openid_plugin_app.post(
+ '/edit/openid/delete/', {
+ 'openid': 'http://realfake.myopenid.com/'})
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/plugins/openid/delete.html']
+ form = context['form']
+ assert form.openid.errors == [u'That OpenID is not registered to this account.']
+
+ # Delete OpenID
+ # Kind of weird to POST to delete/finish
+ template.clear_test_template_context()
+ res = openid_plugin_app.post(
+ '/edit/openid/delete/finish/', {
+ 'openid': u'http://add.myopenid.com'})
+ res.follow()
+
+ # Correct place?
+ assert urlparse.urlsplit(res.location)[2] == '/edit/account/'
+ assert 'mediagoblin/edit/edit_account.html' in template.TEMPLATE_TEST_CONTEXT
+
+ # OpenID deleted?
+ new_openid = mg_globals.database.OpenIDUserURL.query.filter_by(
+ openid_url=u'http://add.myopenid.com').first()
+ assert not new_openid
+
+ _test_delete(self, test_user)
diff --git a/mediagoblin/tests/test_submission.py b/mediagoblin/tests/test_submission.py
index 162b2d19..ac941063 100644
--- a/mediagoblin/tests/test_submission.py
+++ b/mediagoblin/tests/test_submission.py
@@ -26,7 +26,7 @@ from mediagoblin.tests.tools import fixture_add_user
from mediagoblin import mg_globals
from mediagoblin.db.models import MediaEntry
from mediagoblin.tools import template
-from mediagoblin.media_types.image import MEDIA_MANAGER as img_MEDIA_MANAGER
+from mediagoblin.media_types.image import ImageMediaManager
from mediagoblin.media_types.pdf.processing import check_prerequisites as pdf_check_prerequisites
from .resources import GOOD_JPG, GOOD_PNG, EVIL_FILE, EVIL_JPG, EVIL_PNG, \
@@ -77,7 +77,7 @@ class TestSubmission:
return {'upload_files': [('file', filename)]}
def check_comments(self, request, media_id, count):
- comments = request.db.MediaComment.find({'media_entry': media_id})
+ comments = request.db.MediaComment.query.filter_by(media_entry=media_id)
assert count == len(list(comments))
def test_missing_fields(self):
@@ -122,7 +122,7 @@ class TestSubmission:
assert 'mediagoblin/user_pages/user.html' in context
def check_media(self, request, find_data, count=None):
- media = MediaEntry.find(find_data)
+ media = MediaEntry.query.filter_by(**find_data)
if count is not None:
assert media.count() == count
if count == 0:
@@ -219,7 +219,7 @@ class TestSubmission:
media = self.check_media(request, {'title': u'Balanced Goblin'}, 1)
assert media.media_type == u'mediagoblin.media_types.image'
- assert isinstance(media.media_manager, img_MEDIA_MANAGER)
+ assert isinstance(media.media_manager, ImageMediaManager)
assert media.media_manager.entry == media
@@ -240,8 +240,8 @@ class TestSubmission:
request = context['request']
- media = request.db.MediaEntry.find_one({
- u'title': u'UNIQUE_TITLE_PLS_DONT_CREATE_OTHER_MEDIA_WITH_THIS_TITLE'})
+ media = request.db.MediaEntry.query.filter_by(
+ title=u'UNIQUE_TITLE_PLS_DONT_CREATE_OTHER_MEDIA_WITH_THIS_TITLE').first()
assert media.media_type == 'mediagoblin.media_types.image'
@@ -252,7 +252,7 @@ class TestSubmission:
response, context = self.do_post({'title': title}, do_follow=True,
**self.upload_data(filename))
self.check_url(response, '/u/{0}/'.format(self.test_user.username))
- entry = mg_globals.database.MediaEntry.find_one({'title': title})
+ entry = mg_globals.database.MediaEntry.query.filter_by(title=title).first()
assert entry.state == 'failed'
assert entry.fail_error == u'mediagoblin.processing:BadMediaFail'
diff --git a/mediagoblin/tests/tools.py b/mediagoblin/tests/tools.py
index 2ee39e89..98361adc 100644
--- a/mediagoblin/tests/tools.py
+++ b/mediagoblin/tests/tools.py
@@ -15,23 +15,22 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import sys
import os
import pkg_resources
import shutil
-from functools import wraps
from paste.deploy import loadapp
from webtest import TestApp
from mediagoblin import mg_globals
-from mediagoblin.db.models import User, MediaEntry, Collection
+from mediagoblin.db.models import User, MediaEntry, Collection, MediaComment, \
+ CommentSubscription, CommentNotification
from mediagoblin.tools import testing
from mediagoblin.init.config import read_mediagoblin_config
from mediagoblin.db.base import Session
from mediagoblin.meddleware import BaseMeddleware
-from mediagoblin.auth.lib import bcrypt_gen_password_hash
+from mediagoblin.auth import gen_password_hash
from mediagoblin.gmg_commands.dbupdate import run_dbupdate
@@ -165,13 +164,13 @@ def assert_db_meets_expected(db, expected):
for collection_name, collection_data in expected.iteritems():
collection = db[collection_name]
for expected_document in collection_data:
- document = collection.find_one({'id': expected_document['id']})
+ document = collection.query.filter_by(id=expected_document['id']).first()
assert document is not None # make sure it exists
assert document == expected_document # make sure it matches
def fixture_add_user(username=u'chris', password=u'toast',
- active_user=True):
+ active_user=True, wants_comment_notification=True):
# Reuse existing user or create a new one
test_user = User.query.filter_by(username=username).first()
if test_user is None:
@@ -179,11 +178,13 @@ def fixture_add_user(username=u'chris', password=u'toast',
test_user.username = username
test_user.email = username + u'@example.com'
if password is not None:
- test_user.pw_hash = bcrypt_gen_password_hash(password)
+ test_user.pw_hash = gen_password_hash(password)
if active_user:
test_user.email_verified = True
test_user.status = u'active'
+ test_user.wants_comment_notification = wants_comment_notification
+
test_user.save()
# Reload
@@ -195,19 +196,79 @@ def fixture_add_user(username=u'chris', password=u'toast',
return test_user
+def fixture_comment_subscription(entry, notify=True, send_email=None):
+ if send_email is None:
+ uploader = User.query.filter_by(id=entry.uploader).first()
+ send_email = uploader.wants_comment_notification
+
+ cs = CommentSubscription(
+ media_entry_id=entry.id,
+ user_id=entry.uploader,
+ notify=notify,
+ send_email=send_email)
+
+ cs.save()
+
+ cs = CommentSubscription.query.filter_by(id=cs.id).first()
+
+ Session.expunge(cs)
+
+ return cs
+
+
+def fixture_add_comment_notification(entry_id, subject_id, user_id,
+ seen=False):
+ cn = CommentNotification(user_id=user_id,
+ seen=seen,
+ subject_id=subject_id)
+ cn.save()
+
+ cn = CommentNotification.query.filter_by(id=cn.id).first()
+
+ Session.expunge(cn)
+
+ return cn
+
+
def fixture_media_entry(title=u"Some title", slug=None,
- uploader=None, save=True, gen_slug=True):
+ uploader=None, save=True, gen_slug=True,
+ state=u'unprocessed', fake_upload=True,
+ expunge=True):
+ """
+ Add a media entry for testing purposes.
+
+ Caution: if you're adding multiple entries with fake_upload=True,
+ make sure you save between them... otherwise you'll hit an
+ IntegrityError from multiple newly-added-MediaEntries adding
+ FileKeynames at once. :)
+ """
+ if uploader is None:
+ uploader = fixture_add_user().id
+
entry = MediaEntry()
entry.title = title
entry.slug = slug
- entry.uploader = uploader or fixture_add_user().id
+ entry.uploader = uploader
entry.media_type = u'image'
+ entry.state = state
+
+ if fake_upload:
+ entry.media_files = {'thumb': ['a', 'b', 'c.jpg'],
+ 'medium': ['d', 'e', 'f.png'],
+ 'original': ['g', 'h', 'i.png']}
+ entry.media_type = u'mediagoblin.media_types.image'
if gen_slug:
entry.generate_slug()
+
if save:
entry.save()
+ if expunge:
+ entry = MediaEntry.query.filter_by(id=entry.id).first()
+
+ Session.expunge(entry)
+
return entry
@@ -231,3 +292,25 @@ def fixture_add_collection(name=u"My first Collection", user=None):
return coll
+def fixture_add_comment(author=None, media_entry=None, comment=None):
+ if author is None:
+ author = fixture_add_user().id
+
+ if media_entry is None:
+ media_entry = fixture_media_entry().id
+
+ if comment is None:
+ comment = \
+ 'Auto-generated test comment by user #{0} on media #{0}'.format(
+ author, media_entry)
+
+ comment = MediaComment(author=author,
+ media_entry=media_entry,
+ content=comment)
+
+ comment.save()
+
+ Session.expunge(comment)
+
+ return comment
+