aboutsummaryrefslogtreecommitdiffstats
path: root/mediagoblin/tests
diff options
context:
space:
mode:
Diffstat (limited to 'mediagoblin/tests')
-rw-r--r--mediagoblin/tests/__init__.py14
-rw-r--r--mediagoblin/tests/appconfig_context_modified.ini27
-rw-r--r--mediagoblin/tests/appconfig_plugin_specs.ini21
-rw-r--r--mediagoblin/tests/appconfig_static_plugin.ini27
-rw-r--r--mediagoblin/tests/auth_configs/__init__.py0
-rw-r--r--mediagoblin/tests/auth_configs/authentication_disabled_appconfig.ini25
-rw-r--r--mediagoblin/tests/auth_configs/ldap_appconfig.ini41
-rw-r--r--mediagoblin/tests/auth_configs/openid_appconfig.ini41
-rw-r--r--mediagoblin/tests/auth_configs/persona_appconfig.ini42
-rw-r--r--mediagoblin/tests/conftest.py28
-rw-r--r--mediagoblin/tests/pytest.ini2
-rw-r--r--mediagoblin/tests/resources.py2
-rw-r--r--mediagoblin/tests/test_api.py3
-rw-r--r--mediagoblin/tests/test_auth.py203
-rw-r--r--mediagoblin/tests/test_basic_auth.py103
-rw-r--r--mediagoblin/tests/test_celery_setup.py4
-rw-r--r--mediagoblin/tests/test_edit.py113
-rw-r--r--mediagoblin/tests/test_exif.py375
-rw-r--r--mediagoblin/tests/test_http_callback.py4
-rw-r--r--mediagoblin/tests/test_ldap.py125
-rw-r--r--mediagoblin/tests/test_messages.py16
-rw-r--r--mediagoblin/tests/test_mgoblin_app.ini25
-rw-r--r--mediagoblin/tests/test_misc.py8
-rw-r--r--mediagoblin/tests/test_modelmethods.py45
-rw-r--r--mediagoblin/tests/test_moderation.py242
-rw-r--r--mediagoblin/tests/test_notifications.py209
-rw-r--r--mediagoblin/tests/test_oauth1.py166
-rw-r--r--mediagoblin/tests/test_oauth2.py (renamed from mediagoblin/tests/test_oauth.py)15
-rw-r--r--mediagoblin/tests/test_openid.py374
-rw-r--r--mediagoblin/tests/test_paste.ini14
-rw-r--r--mediagoblin/tests/test_persona.py214
-rw-r--r--mediagoblin/tests/test_piwigo.py71
-rw-r--r--mediagoblin/tests/test_pluginapi.py170
-rw-r--r--mediagoblin/tests/test_privileges.py205
-rw-r--r--mediagoblin/tests/test_processing.py4
-rw-r--r--mediagoblin/tests/test_reporting.py167
-rw-r--r--mediagoblin/tests/test_sql_migrations.py42
-rw-r--r--mediagoblin/tests/test_submission.py128
-rw-r--r--mediagoblin/tests/test_submission/COPYING.txt5
-rw-r--r--mediagoblin/tests/test_submission/big.pngbin0 -> 2212445 bytes
-rw-r--r--mediagoblin/tests/test_submission/medium.pngbin0 -> 1796336 bytes
-rw-r--r--mediagoblin/tests/testplugins/modify_context/__init__.py55
-rw-r--r--mediagoblin/tests/testplugins/modify_context/templates/contextplugin/general.html5
-rw-r--r--mediagoblin/tests/testplugins/modify_context/templates/contextplugin/specific.html6
-rw-r--r--mediagoblin/tests/testplugins/modify_context/views.py33
-rw-r--r--mediagoblin/tests/testplugins/pluginspec/__init__.py22
-rw-r--r--mediagoblin/tests/testplugins/pluginspec/config_spec.ini4
-rw-r--r--mediagoblin/tests/testplugins/staticstuff/__init__.py36
-rw-r--r--mediagoblin/tests/testplugins/staticstuff/static/css/bunnify.css4
-rw-r--r--mediagoblin/tests/testplugins/staticstuff/views.py28
-rw-r--r--mediagoblin/tests/tools.py173
51 files changed, 3355 insertions, 331 deletions
diff --git a/mediagoblin/tests/__init__.py b/mediagoblin/tests/__init__.py
index 5a3235c6..cf200791 100644
--- a/mediagoblin/tests/__init__.py
+++ b/mediagoblin/tests/__init__.py
@@ -14,23 +14,9 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import os
-import shutil
-
-from mediagoblin import mg_globals
-from mediagoblin.tests.tools import (
- TEST_USER_DEV, suicide_if_bad_celery_environ)
-
def setup_package():
- suicide_if_bad_celery_environ()
import warnings
from sqlalchemy.exc import SAWarning
warnings.simplefilter("error", SAWarning)
-
-
-def teardown_package():
- # Remove and reinstall user_dev directories
- if os.path.exists(TEST_USER_DEV):
- shutil.rmtree(TEST_USER_DEV)
diff --git a/mediagoblin/tests/appconfig_context_modified.ini b/mediagoblin/tests/appconfig_context_modified.ini
new file mode 100644
index 00000000..cc6721f5
--- /dev/null
+++ b/mediagoblin/tests/appconfig_context_modified.ini
@@ -0,0 +1,27 @@
+[mediagoblin]
+direct_remote_path = /test_static/
+email_sender_address = "notice@mediagoblin.example.org"
+email_debug_mode = true
+
+#Runs with an in-memory sqlite db for speed.
+sql_engine = "sqlite://"
+run_migrations = true
+
+# Celery shouldn't be set up by the application as it's setup via
+# mediagoblin.init.celery.from_celery
+celery_setup_elsewhere = true
+
+[storage:publicstore]
+base_dir = %(here)s/user_dev/media/public
+base_url = /mgoblin_media/
+
+[storage:queuestore]
+base_dir = %(here)s/user_dev/media/queue
+
+[celery]
+CELERY_ALWAYS_EAGER = true
+CELERY_RESULT_DBURI = "sqlite:///%(here)s/user_dev/celery.db"
+BROKER_HOST = "sqlite:///%(here)s/user_dev/kombu.db"
+
+[plugins]
+[[mediagoblin.tests.testplugins.modify_context]]
diff --git a/mediagoblin/tests/appconfig_plugin_specs.ini b/mediagoblin/tests/appconfig_plugin_specs.ini
new file mode 100644
index 00000000..5511cd97
--- /dev/null
+++ b/mediagoblin/tests/appconfig_plugin_specs.ini
@@ -0,0 +1,21 @@
+[mediagoblin]
+direct_remote_path = /mgoblin_static/
+email_sender_address = "notice@mediagoblin.example.org"
+
+## Uncomment and change to your DB's appropiate setting.
+## Default is a local sqlite db "mediagoblin.db".
+# sql_engine = postgresql:///gmg
+
+# set to false to enable sending notices
+email_debug_mode = true
+
+# Set to false to disable registrations
+allow_registration = true
+
+[plugins]
+[[mediagoblin.tests.testplugins.pluginspec]]
+some_string = "not blork"
+some_int = "not an int"
+
+# this one shouldn't have its own config
+[[mediagoblin.tests.testplugins.callables1]]
diff --git a/mediagoblin/tests/appconfig_static_plugin.ini b/mediagoblin/tests/appconfig_static_plugin.ini
new file mode 100644
index 00000000..5ce5c5bd
--- /dev/null
+++ b/mediagoblin/tests/appconfig_static_plugin.ini
@@ -0,0 +1,27 @@
+[mediagoblin]
+direct_remote_path = /test_static/
+email_sender_address = "notice@mediagoblin.example.org"
+email_debug_mode = true
+
+#Runs with an in-memory sqlite db for speed.
+sql_engine = "sqlite://"
+run_migrations = true
+
+# Celery shouldn't be set up by the application as it's setup via
+# mediagoblin.init.celery.from_celery
+celery_setup_elsewhere = true
+
+[storage:publicstore]
+base_dir = %(here)s/user_dev/media/public
+base_url = /mgoblin_media/
+
+[storage:queuestore]
+base_dir = %(here)s/user_dev/media/queue
+
+[celery]
+CELERY_ALWAYS_EAGER = true
+CELERY_RESULT_DBURI = "sqlite:///%(here)s/user_dev/celery.db"
+BROKER_HOST = "sqlite:///%(here)s/user_dev/kombu.db"
+
+[plugins]
+[[mediagoblin.tests.testplugins.staticstuff]]
diff --git a/mediagoblin/tests/auth_configs/__init__.py b/mediagoblin/tests/auth_configs/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/mediagoblin/tests/auth_configs/__init__.py
diff --git a/mediagoblin/tests/auth_configs/authentication_disabled_appconfig.ini b/mediagoblin/tests/auth_configs/authentication_disabled_appconfig.ini
new file mode 100644
index 00000000..07c69442
--- /dev/null
+++ b/mediagoblin/tests/auth_configs/authentication_disabled_appconfig.ini
@@ -0,0 +1,25 @@
+[mediagoblin]
+direct_remote_path = /test_static/
+email_sender_address = "notice@mediagoblin.example.org"
+email_debug_mode = true
+
+sql_engine = "sqlite://"
+run_migrations = true
+
+# Celery shouldn't be set up by the application as it's setup via
+# mediagoblin.init.celery.from_celery
+celery_setup_elsewhere = true
+
+[storage:publicstore]
+base_dir = %(here)s/user_dev/media/public
+base_url = /mgoblin_media/
+
+[storage:queuestore]
+base_dir = %(here)s/user_dev/media/queue
+
+[celery]
+CELERY_ALWAYS_EAGER = true
+CELERY_RESULT_DBURI = "sqlite:///%(here)s/user_dev/celery.db"
+BROKER_HOST = "sqlite:///%(here)s/user_dev/kombu.db"
+
+[plugins]
diff --git a/mediagoblin/tests/auth_configs/ldap_appconfig.ini b/mediagoblin/tests/auth_configs/ldap_appconfig.ini
new file mode 100644
index 00000000..9be37e17
--- /dev/null
+++ b/mediagoblin/tests/auth_configs/ldap_appconfig.ini
@@ -0,0 +1,41 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+[mediagoblin]
+direct_remote_path = /test_static/
+email_sender_address = "notice@mediagoblin.example.org"
+email_debug_mode = true
+
+sql_engine = "sqlite://"
+run_migrations = true
+
+# Celery shouldn't be set up by the application as it's setup via
+# mediagoblin.init.celery.from_celery
+celery_setup_elsewhere = true
+
+[storage:publicstore]
+base_dir = %(here)s/user_dev/media/public
+base_url = /mgoblin_media/
+
+[storage:queuestore]
+base_dir = %(here)s/user_dev/media/queue
+
+[celery]
+CELERY_ALWAYS_EAGER = true
+CELERY_RESULT_DBURI = "sqlite:///%(here)s/user_dev/celery.db"
+BROKER_HOST = "sqlite:///%(here)s/user_dev/kombu.db"
+
+[plugins]
+[[mediagoblin.plugins.ldap]]
diff --git a/mediagoblin/tests/auth_configs/openid_appconfig.ini b/mediagoblin/tests/auth_configs/openid_appconfig.ini
new file mode 100644
index 00000000..3433e139
--- /dev/null
+++ b/mediagoblin/tests/auth_configs/openid_appconfig.ini
@@ -0,0 +1,41 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+[mediagoblin]
+direct_remote_path = /test_static/
+email_sender_address = "notice@mediagoblin.example.org"
+email_debug_mode = true
+
+sql_engine = "sqlite://"
+run_migrations = true
+
+# Celery shouldn't be set up by the application as it's setup via
+# mediagoblin.init.celery.from_celery
+celery_setup_elsewhere = true
+
+[storage:publicstore]
+base_dir = %(here)s/user_dev/media/public
+base_url = /mgoblin_media/
+
+[storage:queuestore]
+base_dir = %(here)s/user_dev/media/queue
+
+[celery]
+CELERY_ALWAYS_EAGER = true
+CELERY_RESULT_DBURI = "sqlite:///%(here)s/user_dev/celery.db"
+BROKER_HOST = "sqlite:///%(here)s/user_dev/kombu.db"
+
+[plugins]
+[[mediagoblin.plugins.openid]]
diff --git a/mediagoblin/tests/auth_configs/persona_appconfig.ini b/mediagoblin/tests/auth_configs/persona_appconfig.ini
new file mode 100644
index 00000000..0bd5d634
--- /dev/null
+++ b/mediagoblin/tests/auth_configs/persona_appconfig.ini
@@ -0,0 +1,42 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+[mediagoblin]
+direct_remote_path = /test_static/
+email_sender_address = "notice@mediagoblin.example.org"
+email_debug_mode = true
+
+# TODO: Switch to using an in-memory database
+sql_engine = "sqlite:///%(here)s/user_dev/mediagoblin.db"
+
+# Celery shouldn't be set up by the application as it's setup via
+# mediagoblin.init.celery.from_celery
+celery_setup_elsewhere = true
+
+[storage:publicstore]
+base_dir = %(here)s/user_dev/media/public
+base_url = /mgoblin_media/
+
+[storage:queuestore]
+base_dir = %(here)s/user_dev/media/queue
+
+[celery]
+CELERY_ALWAYS_EAGER = true
+CELERY_RESULT_DBURI = "sqlite:///%(here)s/user_dev/celery.db"
+BROKER_HOST = "sqlite:///%(here)s/user_dev/kombu.db"
+
+[plugins]
+[[mediagoblin.plugins.persona]]
+
diff --git a/mediagoblin/tests/conftest.py b/mediagoblin/tests/conftest.py
index 25f495d6..dbb0aa0a 100644
--- a/mediagoblin/tests/conftest.py
+++ b/mediagoblin/tests/conftest.py
@@ -1,7 +1,25 @@
-from mediagoblin.tests import tools
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2013 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
import pytest
+from mediagoblin.tests import tools
+from mediagoblin.tools.testing import _activate_testing
+
+
@pytest.fixture()
def test_app(request):
"""
@@ -13,3 +31,11 @@ def test_app(request):
in differently to get_app.
"""
return tools.get_app(request)
+
+
+@pytest.fixture()
+def pt_fixture_enable_testing():
+ """
+ py.test fixture to enable testing mode in tools.
+ """
+ _activate_testing()
diff --git a/mediagoblin/tests/pytest.ini b/mediagoblin/tests/pytest.ini
index d4aa2d69..e561c074 100644
--- a/mediagoblin/tests/pytest.ini
+++ b/mediagoblin/tests/pytest.ini
@@ -1,2 +1,2 @@
[pytest]
-usefixtures = tmpdir \ No newline at end of file
+usefixtures = tmpdir pt_fixture_enable_testing
diff --git a/mediagoblin/tests/resources.py b/mediagoblin/tests/resources.py
index f7b3037d..480f6d9a 100644
--- a/mediagoblin/tests/resources.py
+++ b/mediagoblin/tests/resources.py
@@ -29,6 +29,8 @@ EVIL_JPG = resource('evil.jpg')
EVIL_PNG = resource('evil.png')
BIG_BLUE = resource('bigblue.png')
GOOD_PDF = resource('good.pdf')
+MED_PNG = resource('medium.png')
+BIG_PNG = resource('big.png')
def resource_exif(f):
diff --git a/mediagoblin/tests/test_api.py b/mediagoblin/tests/test_api.py
index 89cf1026..4e0cbd8f 100644
--- a/mediagoblin/tests/test_api.py
+++ b/mediagoblin/tests/test_api.py
@@ -35,7 +35,8 @@ class TestAPI(object):
self.db = mg_globals.database
self.user_password = u'4cc355_70k3N'
- self.user = fixture_add_user(u'joapi', self.user_password)
+ self.user = fixture_add_user(u'joapi', self.user_password,
+ privileges=[u'active',u'uploader'])
def login(self, test_app):
test_app.post(
diff --git a/mediagoblin/tests/test_auth.py b/mediagoblin/tests/test_auth.py
index 755727f9..1bbc3d01 100644
--- a/mediagoblin/tests/test_auth.py
+++ b/mediagoblin/tests/test_auth.py
@@ -1,3 +1,4 @@
+
# GNU MediaGoblin -- federated, autonomous media hosting
# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
#
@@ -13,54 +14,15 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
import urlparse
-import datetime
+import pkg_resources
+import pytest
from mediagoblin import mg_globals
-from mediagoblin.auth import lib as auth_lib
from mediagoblin.db.models import User
-from mediagoblin.tests.tools import fixture_add_user
+from mediagoblin.tests.tools import get_app, fixture_add_user
from mediagoblin.tools import template, mail
-
-
-########################
-# Test bcrypt auth funcs
-########################
-
-def test_bcrypt_check_password():
- # Check known 'lollerskates' password against check function
- assert auth_lib.bcrypt_check_password(
- 'lollerskates',
- '$2a$12$PXU03zfrVCujBhVeICTwtOaHTUs5FFwsscvSSTJkqx/2RQ0Lhy/nO')
-
- assert not auth_lib.bcrypt_check_password(
- 'notthepassword',
- '$2a$12$PXU03zfrVCujBhVeICTwtOaHTUs5FFwsscvSSTJkqx/2RQ0Lhy/nO')
-
- # Same thing, but with extra fake salt.
- assert not auth_lib.bcrypt_check_password(
- 'notthepassword',
- '$2a$12$ELVlnw3z1FMu6CEGs/L8XO8vl0BuWSlUHgh0rUrry9DUXGMUNWwl6',
- '3><7R45417')
-
-
-def test_bcrypt_gen_password_hash():
- pw = 'youwillneverguessthis'
-
- # Normal password hash generation, and check on that hash
- hashed_pw = auth_lib.bcrypt_gen_password_hash(pw)
- assert auth_lib.bcrypt_check_password(
- pw, hashed_pw)
- assert not auth_lib.bcrypt_check_password(
- 'notthepassword', hashed_pw)
-
- # Same thing, extra salt.
- hashed_pw = auth_lib.bcrypt_gen_password_hash(pw, '3><7R45417')
- assert auth_lib.bcrypt_check_password(
- pw, hashed_pw, '3><7R45417')
- assert not auth_lib.bcrypt_check_password(
- 'notthepassword', hashed_pw, '3><7R45417')
+from mediagoblin.auth import tools as auth_tools
def test_register_views(test_app):
@@ -122,31 +84,35 @@ def test_register_views(test_app):
template.clear_test_template_context()
response = test_app.post(
'/auth/register/', {
- 'username': u'happygirl',
- 'password': 'iamsohappy',
- 'email': 'happygrrl@example.org'})
+ 'username': u'angrygirl',
+ 'password': 'iamsoangry',
+ 'email': 'angrygrrl@example.org'})
response.follow()
## Did we redirect to the proper page? Use the right template?
- assert urlparse.urlsplit(response.location)[2] == '/u/happygirl/'
- assert 'mediagoblin/user_pages/user.html' in template.TEMPLATE_TEST_CONTEXT
+ assert urlparse.urlsplit(response.location)[2] == '/u/angrygirl/'
+ assert 'mediagoblin/user_pages/user_nonactive.html' in template.TEMPLATE_TEST_CONTEXT
## Make sure user is in place
- new_user = mg_globals.database.User.find_one(
- {'username': u'happygirl'})
+ new_user = mg_globals.database.User.query.filter_by(
+ username=u'angrygirl').first()
assert new_user
- assert new_user.status == u'needs_email_verification'
- assert new_user.email_verified == False
+ ## Make sure that the proper privileges are granted on registration
+
+ assert new_user.has_privilege(u'commenter')
+ assert new_user.has_privilege(u'uploader')
+ assert new_user.has_privilege(u'reporter')
+ assert not new_user.has_privilege(u'active')
## Make sure user is logged in
request = template.TEMPLATE_TEST_CONTEXT[
- 'mediagoblin/user_pages/user.html']['request']
+ 'mediagoblin/user_pages/user_nonactive.html']['request']
assert request.session['user_id'] == unicode(new_user.id)
## Make sure we get email confirmation, and try verifying
assert len(mail.EMAIL_TEST_INBOX) == 1
message = mail.EMAIL_TEST_INBOX.pop()
- assert message['To'] == 'happygrrl@example.org'
+ assert message['To'] == 'angrygrrl@example.org'
email_context = template.TEMPLATE_TEST_CONTEXT[
'mediagoblin/auth/verification_email.txt']
assert email_context['verification_url'] in message.get_payload(decode=True)
@@ -156,27 +122,20 @@ def test_register_views(test_app):
assert path == u'/auth/verify_email/'
parsed_get_params = urlparse.parse_qs(get_params)
- ### user should have these same parameters
- assert parsed_get_params['userid'] == [
- unicode(new_user.id)]
- assert parsed_get_params['token'] == [
- new_user.verification_key]
-
## Try verifying with bs verification key, shouldn't work
template.clear_test_template_context()
response = test_app.get(
- "/auth/verify_email/?userid=%s&token=total_bs" % unicode(
- new_user.id))
+ "/auth/verify_email/?token=total_bs")
response.follow()
- context = template.TEMPLATE_TEST_CONTEXT[
- 'mediagoblin/user_pages/user.html']
+
+ # Correct redirect?
+ assert urlparse.urlsplit(response.location)[2] == '/'
+
# assert context['verification_successful'] == True
# TODO: Would be good to test messages here when we can do so...
- new_user = mg_globals.database.User.find_one(
- {'username': u'happygirl'})
+ new_user = mg_globals.database.User.query.filter_by(
+ username=u'angrygirl').first()
assert new_user
- assert new_user.status == u'needs_email_verification'
- assert new_user.email_verified == False
## Verify the email activation works
template.clear_test_template_context()
@@ -186,11 +145,9 @@ def test_register_views(test_app):
'mediagoblin/user_pages/user.html']
# assert context['verification_successful'] == True
# TODO: Would be good to test messages here when we can do so...
- new_user = mg_globals.database.User.find_one(
- {'username': u'happygirl'})
+ new_user = mg_globals.database.User.query.filter_by(
+ username=u'angrygirl').first()
assert new_user
- assert new_user.status == u'active'
- assert new_user.email_verified == True
# Uniqueness checks
# -----------------
@@ -198,9 +155,9 @@ def test_register_views(test_app):
template.clear_test_template_context()
response = test_app.post(
'/auth/register/', {
- 'username': u'happygirl',
- 'password': 'iamsohappy2',
- 'email': 'happygrrl2@example.org'})
+ 'username': u'angrygirl',
+ 'password': 'iamsoangry2',
+ 'email': 'angrygrrl2@example.org'})
context = template.TEMPLATE_TEST_CONTEXT[
'mediagoblin/auth/register.html']
@@ -215,7 +172,7 @@ def test_register_views(test_app):
template.clear_test_template_context()
response = test_app.post(
'/auth/forgot_password/',
- {'username': u'happygirl'})
+ {'username': u'angrygirl'})
response.follow()
## Did we redirect to the proper page? Use the right template?
@@ -225,55 +182,37 @@ def test_register_views(test_app):
## Make sure link to change password is sent by email
assert len(mail.EMAIL_TEST_INBOX) == 1
message = mail.EMAIL_TEST_INBOX.pop()
- assert message['To'] == 'happygrrl@example.org'
+ assert message['To'] == 'angrygrrl@example.org'
email_context = template.TEMPLATE_TEST_CONTEXT[
- 'mediagoblin/auth/fp_verification_email.txt']
+ 'mediagoblin/plugins/basic_auth/fp_verification_email.txt']
#TODO - change the name of verification_url to something forgot-password-ish
assert email_context['verification_url'] in message.get_payload(decode=True)
path = urlparse.urlsplit(email_context['verification_url'])[2]
get_params = urlparse.urlsplit(email_context['verification_url'])[3]
- assert path == u'/auth/forgot_password/verify/'
parsed_get_params = urlparse.parse_qs(get_params)
-
- # user should have matching parameters
- new_user = mg_globals.database.User.find_one({'username': u'happygirl'})
- assert parsed_get_params['userid'] == [unicode(new_user.id)]
- assert parsed_get_params['token'] == [new_user.fp_verification_key]
-
- ### The forgotten password token should be set to expire in ~ 10 days
- # A few ticks have expired so there are only 9 full days left...
- assert (new_user.fp_token_expire - datetime.datetime.now()).days == 9
+ assert path == u'/auth/forgot_password/verify/'
## Try using a bs password-changing verification key, shouldn't work
template.clear_test_template_context()
response = test_app.get(
- "/auth/forgot_password/verify/?userid=%s&token=total_bs" % unicode(
- new_user.id), status=404)
- assert response.status.split()[0] == u'404' # status="404 NOT FOUND"
+ "/auth/forgot_password/verify/?token=total_bs")
+ response.follow()
- ## Try using an expired token to change password, shouldn't work
- template.clear_test_template_context()
- new_user = mg_globals.database.User.find_one({'username': u'happygirl'})
- real_token_expiration = new_user.fp_token_expire
- new_user.fp_token_expire = datetime.datetime.now()
- new_user.save()
- response = test_app.get("%s?%s" % (path, get_params), status=404)
- assert response.status.split()[0] == u'404' # status="404 NOT FOUND"
- new_user.fp_token_expire = real_token_expiration
- new_user.save()
+ # Correct redirect?
+ assert urlparse.urlsplit(response.location)[2] == '/'
## Verify step 1 of password-change works -- can see form to change password
template.clear_test_template_context()
response = test_app.get("%s?%s" % (path, get_params))
- assert 'mediagoblin/auth/change_fp.html' in template.TEMPLATE_TEST_CONTEXT
+ assert 'mediagoblin/plugins/basic_auth/change_fp.html' in \
+ template.TEMPLATE_TEST_CONTEXT
## Verify step 2.1 of password-change works -- report success to user
template.clear_test_template_context()
response = test_app.post(
'/auth/forgot_password/verify/', {
- 'userid': parsed_get_params['userid'],
- 'password': 'iamveryveryhappy',
+ 'password': 'iamveryveryangry',
'token': parsed_get_params['token']})
response.follow()
assert 'mediagoblin/auth/login.html' in template.TEMPLATE_TEST_CONTEXT
@@ -282,8 +221,8 @@ def test_register_views(test_app):
template.clear_test_template_context()
response = test_app.post(
'/auth/login/', {
- 'username': u'happygirl',
- 'password': 'iamveryveryhappy'})
+ 'username': u'angrygirl',
+ 'password': 'iamveryveryangry'})
# User should be redirected
response.follow()
@@ -296,7 +235,8 @@ def test_authentication_views(test_app):
Test logging in and logging out
"""
# Make a new user
- test_user = fixture_add_user(active_user=False)
+ test_user = fixture_add_user()
+
# Get login
# ---------
@@ -310,7 +250,6 @@ def test_authentication_views(test_app):
context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/login.html']
form = context['login_form']
assert form.username.errors == [u'This field is required.']
- assert form.password.errors == [u'This field is required.']
# Failed login - blank user
# -------------------------
@@ -328,9 +267,7 @@ def test_authentication_views(test_app):
response = test_app.post(
'/auth/login/', {
'username': u'chris'})
- context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/login.html']
- form = context['login_form']
- assert form.password.errors == [u'This field is required.']
+ assert 'mediagoblin/auth/login.html' in template.TEMPLATE_TEST_CONTEXT
# Failed login - bad user
# -----------------------
@@ -394,3 +331,47 @@ def test_authentication_views(test_app):
'password': 'toast',
'next' : '/u/chris/'})
assert urlparse.urlsplit(response.location)[2] == '/u/chris/'
+
+@pytest.fixture()
+def authentication_disabled_app(request):
+ return get_app(
+ request,
+ mgoblin_config=pkg_resources.resource_filename(
+ 'mediagoblin.tests.auth_configs',
+ 'authentication_disabled_appconfig.ini'))
+
+
+def test_authentication_disabled_app(authentication_disabled_app):
+ # app.auth should = false
+ assert mg_globals
+ assert mg_globals.app.auth is False
+
+ # Try to visit register page
+ template.clear_test_template_context()
+ response = authentication_disabled_app.get('/auth/register/')
+ response.follow()
+
+ # Correct redirect?
+ assert urlparse.urlsplit(response.location)[2] == '/'
+ assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT
+
+ # Try to vist login page
+ template.clear_test_template_context()
+ response = authentication_disabled_app.get('/auth/login/')
+ response.follow()
+
+ # Correct redirect?
+ assert urlparse.urlsplit(response.location)[2] == '/'
+ assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT
+
+ ## Test check_login_simple should return None
+ assert auth_tools.check_login_simple('test', 'simple') is None
+
+ # Try to visit the forgot password page
+ template.clear_test_template_context()
+ response = authentication_disabled_app.get('/auth/register/')
+ response.follow()
+
+ # Correct redirect?
+ assert urlparse.urlsplit(response.location)[2] == '/'
+ assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT
diff --git a/mediagoblin/tests/test_basic_auth.py b/mediagoblin/tests/test_basic_auth.py
new file mode 100644
index 00000000..828f0515
--- /dev/null
+++ b/mediagoblin/tests/test_basic_auth.py
@@ -0,0 +1,103 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import urlparse
+
+from mediagoblin.db.models import User
+from mediagoblin.plugins.basic_auth import tools as auth_tools
+from mediagoblin.tests.tools import fixture_add_user
+from mediagoblin.tools import template
+from mediagoblin.tools.testing import _activate_testing
+
+_activate_testing()
+
+
+########################
+# Test bcrypt auth funcs
+########################
+
+
+def test_bcrypt_check_password():
+ # Check known 'lollerskates' password against check function
+ assert auth_tools.bcrypt_check_password(
+ 'lollerskates',
+ '$2a$12$PXU03zfrVCujBhVeICTwtOaHTUs5FFwsscvSSTJkqx/2RQ0Lhy/nO')
+
+ assert not auth_tools.bcrypt_check_password(
+ 'notthepassword',
+ '$2a$12$PXU03zfrVCujBhVeICTwtOaHTUs5FFwsscvSSTJkqx/2RQ0Lhy/nO')
+
+ # Same thing, but with extra fake salt.
+ assert not auth_tools.bcrypt_check_password(
+ 'notthepassword',
+ '$2a$12$ELVlnw3z1FMu6CEGs/L8XO8vl0BuWSlUHgh0rUrry9DUXGMUNWwl6',
+ '3><7R45417')
+
+
+def test_bcrypt_gen_password_hash():
+ pw = 'youwillneverguessthis'
+
+ # Normal password hash generation, and check on that hash
+ hashed_pw = auth_tools.bcrypt_gen_password_hash(pw)
+ assert auth_tools.bcrypt_check_password(
+ pw, hashed_pw)
+ assert not auth_tools.bcrypt_check_password(
+ 'notthepassword', hashed_pw)
+
+ # Same thing, extra salt.
+ hashed_pw = auth_tools.bcrypt_gen_password_hash(pw, '3><7R45417')
+ assert auth_tools.bcrypt_check_password(
+ pw, hashed_pw, '3><7R45417')
+ assert not auth_tools.bcrypt_check_password(
+ 'notthepassword', hashed_pw, '3><7R45417')
+
+
+def test_change_password(test_app):
+ """Test changing password correctly and incorrectly"""
+ test_user = fixture_add_user(
+ password=u'toast',
+ privileges=[u'active'])
+
+ test_app.post(
+ '/auth/login/', {
+ 'username': u'chris',
+ 'password': u'toast'})
+
+ # test that the password can be changed
+ res = test_app.post(
+ '/edit/password/', {
+ 'old_password': 'toast',
+ 'new_password': '123456',
+ })
+ res.follow()
+
+ # Did we redirect to the correct page?
+ assert urlparse.urlsplit(res.location)[2] == '/edit/account/'
+
+ # test_user has to be fetched again in order to have the current values
+ test_user = User.query.filter_by(username=u'chris').first()
+ assert auth_tools.bcrypt_check_password('123456', test_user.pw_hash)
+
+ # test that the password cannot be changed if the given
+ # old_password is wrong
+ template.clear_test_template_context()
+ test_app.post(
+ '/edit/password/', {
+ 'old_password': 'toast',
+ 'new_password': '098765',
+ })
+
+ test_user = User.query.filter_by(username=u'chris').first()
+ assert not auth_tools.bcrypt_check_password('098765', test_user.pw_hash)
diff --git a/mediagoblin/tests/test_celery_setup.py b/mediagoblin/tests/test_celery_setup.py
index 5530c6f2..d60293f9 100644
--- a/mediagoblin/tests/test_celery_setup.py
+++ b/mediagoblin/tests/test_celery_setup.py
@@ -48,13 +48,13 @@ def test_setup_celery_from_config():
assert isinstance(fake_celery_module.CELERYD_ETA_SCHEDULER_PRECISION, float)
assert fake_celery_module.CELERY_RESULT_PERSISTENT is True
assert fake_celery_module.CELERY_IMPORTS == [
- 'foo.bar.baz', 'this.is.an.import', 'mediagoblin.processing.task']
+ 'foo.bar.baz', 'this.is.an.import', 'mediagoblin.processing.task', 'mediagoblin.notifications.task']
assert fake_celery_module.CELERY_RESULT_BACKEND == 'database'
assert fake_celery_module.CELERY_RESULT_DBURI == (
'sqlite:///' +
pkg_resources.resource_filename('mediagoblin.tests', 'celery.db'))
assert fake_celery_module.BROKER_TRANSPORT == 'sqlalchemy'
- assert fake_celery_module.BROKER_HOST == (
+ assert fake_celery_module.BROKER_URL == (
'sqlite:///' +
pkg_resources.resource_filename('mediagoblin.tests', 'kombu.db'))
diff --git a/mediagoblin/tests/test_edit.py b/mediagoblin/tests/test_edit.py
index cda2607f..4f44e0b9 100644
--- a/mediagoblin/tests/test_edit.py
+++ b/mediagoblin/tests/test_edit.py
@@ -14,19 +14,21 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import pytest
+import urlparse
from mediagoblin import mg_globals
from mediagoblin.db.models import User
from mediagoblin.tests.tools import fixture_add_user
-from mediagoblin.tools import template
-from mediagoblin.auth.lib import bcrypt_check_password
+from mediagoblin import auth
+from mediagoblin.tools import template, mail
+
class TestUserEdit(object):
def setup(self):
# set up new user
self.user_password = u'toast'
- self.user = fixture_add_user(password = self.user_password)
+ self.user = fixture_add_user(password = self.user_password,
+ privileges=[u'active'])
def login(self, test_app):
test_app.post(
@@ -51,42 +53,10 @@ class TestUserEdit(object):
# deleted too. Perhaps in submission test?
#Restore user at end of test
- self.user = fixture_add_user(password = self.user_password)
- self.login(test_app)
-
-
- def test_change_password(self, test_app):
- """Test changing password correctly and incorrectly"""
+ self.user = fixture_add_user(password = self.user_password,
+ privileges=[u'active'])
self.login(test_app)
- # test that the password can be changed
- # template.clear_test_template_context()
- res = test_app.post(
- '/edit/account/', {
- 'old_password': 'toast',
- 'new_password': '123456',
- 'wants_comment_notification': 'y'
- })
-
- # Check for redirect on success
- assert res.status_int == 302
- # test_user has to be fetched again in order to have the current values
- test_user = User.query.filter_by(username=u'chris').first()
- assert bcrypt_check_password('123456', test_user.pw_hash)
- # Update current user passwd
- self.user_password = '123456'
-
- # test that the password cannot be changed if the given
- # old_password is wrong template.clear_test_template_context()
- test_app.post(
- '/edit/account/', {
- 'old_password': 'toast',
- 'new_password': '098765',
- })
-
- test_user = User.query.filter_by(username=u'chris').first()
- assert not bcrypt_check_password('098765', test_user.pw_hash)
-
def test_change_bio_url(self, test_app):
"""Test changing bio and URL"""
@@ -112,7 +82,8 @@ class TestUserEdit(object):
assert test_user.url == u'http://dustycloud.org/'
# change a different user than the logged in (should fail with 403)
- fixture_add_user(username=u"foo")
+ fixture_add_user(username=u"foo",
+ privileges=[u'active'])
res = test_app.post(
'/u/foo/edit/', {
'bio': u'I love toast!',
@@ -138,4 +109,68 @@ class TestUserEdit(object):
assert form.url.errors == [
u'This address contains errors']
+ def test_email_change(self, test_app):
+ self.login(test_app)
+
+ # Test email already in db
+ template.clear_test_template_context()
+ test_app.post(
+ '/edit/email/', {
+ 'new_email': 'chris@example.com',
+ 'password': 'toast'})
+
+ # Check form errors
+ context = template.TEMPLATE_TEST_CONTEXT[
+ 'mediagoblin/edit/change_email.html']
+ assert context['form'].new_email.errors == [
+ u'Sorry, a user with that email address already exists.']
+
+ # Test successful email change
+ template.clear_test_template_context()
+ res = test_app.post(
+ '/edit/email/', {
+ 'new_email': 'new@example.com',
+ 'password': 'toast'})
+ res.follow()
+
+ # Correct redirect?
+ assert urlparse.urlsplit(res.location)[2] == '/edit/account/'
+
+ # Make sure we get email verification and try verifying
+ assert len(mail.EMAIL_TEST_INBOX) == 1
+ message = mail.EMAIL_TEST_INBOX.pop()
+ assert message['To'] == 'new@example.com'
+ email_context = template.TEMPLATE_TEST_CONTEXT[
+ 'mediagoblin/edit/verification.txt']
+ assert email_context['verification_url'] in \
+ message.get_payload(decode=True)
+
+ path = urlparse.urlsplit(email_context['verification_url'])[2]
+ assert path == u'/edit/verify_email/'
+
+ ## Try verifying with bs verification key, shouldn't work
+ template.clear_test_template_context()
+ res = test_app.get(
+ "/edit/verify_email/?token=total_bs")
+ res.follow()
+
+ # Correct redirect?
+ assert urlparse.urlsplit(res.location)[2] == '/'
+
+ # Email shouldn't be saved
+ email_in_db = mg_globals.database.User.query.filter_by(
+ email='new@example.com').first()
+ email = User.query.filter_by(username='chris').first().email
+ assert email_in_db is None
+ assert email == 'chris@example.com'
+
+ # Verify email activation works
+ template.clear_test_template_context()
+ get_params = urlparse.urlsplit(email_context['verification_url'])[3]
+ res = test_app.get('%s?%s' % (path, get_params))
+ res.follow()
+
+ # New email saved?
+ email = User.query.filter_by(username='chris').first().email
+ assert email == 'new@example.com'
# test changing the url inproperly
diff --git a/mediagoblin/tests/test_exif.py b/mediagoblin/tests/test_exif.py
index 824de3c2..c07e24ae 100644
--- a/mediagoblin/tests/test_exif.py
+++ b/mediagoblin/tests/test_exif.py
@@ -48,63 +48,324 @@ def test_exif_extraction():
assert gps == {}
# Do we have the "useful" tags?
- assert useful == {
- 'EXIF Flash': {
- 'field_type': 3,
- 'printable': u'Flash did not fire',
- 'field_offset': 380,
- 'tag': 37385,
- 'values': [0],
- 'field_length': 2},
- 'EXIF ExposureTime': {
- 'field_type': 5,
- 'printable': '1/125',
- 'field_offset': 700,
- 'tag': 33434,
- 'values': [[1, 125]],
- 'field_length': 8},
- 'EXIF FocalLength': {
- 'field_type': 5,
- 'printable': '18',
- 'field_offset': 780,
- 'tag': 37386,
- 'values': [[18, 1]],
- 'field_length': 8},
- 'Image Model': {
- 'field_type': 2,
- 'printable': 'NIKON D80',
- 'field_offset': 152,
- 'tag': 272,
- 'values': 'NIKON D80',
- 'field_length': 10},
- 'Image Make': {
- 'field_type': 2,
- 'printable': 'NIKON CORPORATION',
- 'field_offset': 134,
- 'tag': 271,
- 'values': 'NIKON CORPORATION',
- 'field_length': 18},
- 'EXIF ExposureMode': {
- 'field_type': 3,
- 'printable': 'Manual Exposure',
- 'field_offset': 584,
- 'tag': 41986,
- 'values': [1],
- 'field_length': 2},
- 'EXIF ISOSpeedRatings': {
- 'field_type': 3,
- 'printable': '100',
- 'field_offset': 260,
- 'tag': 34855,
- 'values': [100],
- 'field_length': 2},
- 'EXIF FNumber': {
- 'field_type': 5,
- 'printable': '10',
- 'field_offset': 708,
- 'tag': 33437,
- 'values': [[10, 1]],
- 'field_length': 8}}
+ assert useful == {'EXIF CVAPattern': {'field_length': 8,
+ 'field_offset': 26224,
+ 'field_type': 7,
+ 'printable': u'[0, 2, 0, 2, 1, 2, 0, 1]',
+ 'tag': 41730,
+ 'values': [0, 2, 0, 2, 1, 2, 0, 1]},
+ 'EXIF ColorSpace': {'field_length': 2,
+ 'field_offset': 476,
+ 'field_type': 3,
+ 'printable': u'sRGB',
+ 'tag': 40961,
+ 'values': [1]},
+ 'EXIF ComponentsConfiguration': {'field_length': 4,
+ 'field_offset': 308,
+ 'field_type': 7,
+ 'printable': u'YCbCr',
+ 'tag': 37121,
+ 'values': [1, 2, 3, 0]},
+ 'EXIF CompressedBitsPerPixel': {'field_length': 8,
+ 'field_offset': 756,
+ 'field_type': 5,
+ 'printable': u'4',
+ 'tag': 37122,
+ 'values': [[4, 1]]},
+ 'EXIF Contrast': {'field_length': 2,
+ 'field_offset': 656,
+ 'field_type': 3,
+ 'printable': u'Soft',
+ 'tag': 41992,
+ 'values': [1]},
+ 'EXIF CustomRendered': {'field_length': 2,
+ 'field_offset': 572,
+ 'field_type': 3,
+ 'printable': u'Normal',
+ 'tag': 41985,
+ 'values': [0]},
+ 'EXIF DateTimeDigitized': {'field_length': 20,
+ 'field_offset': 736,
+ 'field_type': 2,
+ 'printable': u'2011:06:22 12:20:33',
+ 'tag': 36868,
+ 'values': u'2011:06:22 12:20:33'},
+ 'EXIF DateTimeOriginal': {'field_length': 20,
+ 'field_offset': 716,
+ 'field_type': 2,
+ 'printable': u'2011:06:22 12:20:33',
+ 'tag': 36867,
+ 'values': u'2011:06:22 12:20:33'},
+ 'EXIF DigitalZoomRatio': {'field_length': 8,
+ 'field_offset': 26232,
+ 'field_type': 5,
+ 'printable': u'1',
+ 'tag': 41988,
+ 'values': [[1, 1]]},
+ 'EXIF ExifImageLength': {'field_length': 2,
+ 'field_offset': 500,
+ 'field_type': 3,
+ 'printable': u'2592',
+ 'tag': 40963,
+ 'values': [2592]},
+ 'EXIF ExifImageWidth': {'field_length': 2,
+ 'field_offset': 488,
+ 'field_type': 3,
+ 'printable': u'3872',
+ 'tag': 40962,
+ 'values': [3872]},
+ 'EXIF ExifVersion': {'field_length': 4,
+ 'field_offset': 272,
+ 'field_type': 7,
+ 'printable': u'0221',
+ 'tag': 36864,
+ 'values': [48, 50, 50, 49]},
+ 'EXIF ExposureBiasValue': {'field_length': 8,
+ 'field_offset': 764,
+ 'field_type': 10,
+ 'printable': u'0',
+ 'tag': 37380,
+ 'values': [[0, 1]]},
+ 'EXIF ExposureMode': {'field_length': 2,
+ 'field_offset': 584,
+ 'field_type': 3,
+ 'printable': u'Manual Exposure',
+ 'tag': 41986,
+ 'values': [1]},
+ 'EXIF ExposureProgram': {'field_length': 2,
+ 'field_offset': 248,
+ 'field_type': 3,
+ 'printable': u'Manual',
+ 'tag': 34850,
+ 'values': [1]},
+ 'EXIF ExposureTime': {'field_length': 8,
+ 'field_offset': 700,
+ 'field_type': 5,
+ 'printable': u'1/125',
+ 'tag': 33434,
+ 'values': [[1, 125]]},
+ 'EXIF FNumber': {'field_length': 8,
+ 'field_offset': 708,
+ 'field_type': 5,
+ 'printable': u'10',
+ 'tag': 33437,
+ 'values': [[10, 1]]},
+ 'EXIF FileSource': {'field_length': 1,
+ 'field_offset': 536,
+ 'field_type': 7,
+ 'printable': u'Digital Camera',
+ 'tag': 41728,
+ 'values': [3]},
+ 'EXIF Flash': {'field_length': 2,
+ 'field_offset': 380,
+ 'field_type': 3,
+ 'printable': u'Flash did not fire',
+ 'tag': 37385,
+ 'values': [0]},
+ 'EXIF FlashPixVersion': {'field_length': 4,
+ 'field_offset': 464,
+ 'field_type': 7,
+ 'printable': u'0100',
+ 'tag': 40960,
+ 'values': [48, 49, 48, 48]},
+ 'EXIF FocalLength': {'field_length': 8,
+ 'field_offset': 780,
+ 'field_type': 5,
+ 'printable': u'18',
+ 'tag': 37386,
+ 'values': [[18, 1]]},
+ 'EXIF FocalLengthIn35mmFilm': {'field_length': 2,
+ 'field_offset': 620,
+ 'field_type': 3,
+ 'printable': u'27',
+ 'tag': 41989,
+ 'values': [27]},
+ 'EXIF GainControl': {'field_length': 2,
+ 'field_offset': 644,
+ 'field_type': 3,
+ 'printable': u'None',
+ 'tag': 41991,
+ 'values': [0]},
+ 'EXIF ISOSpeedRatings': {'field_length': 2,
+ 'field_offset': 260,
+ 'field_type': 3,
+ 'printable': u'100',
+ 'tag': 34855,
+ 'values': [100]},
+ 'EXIF InteroperabilityOffset': {'field_length': 4,
+ 'field_offset': 512,
+ 'field_type': 4,
+ 'printable': u'26240',
+ 'tag': 40965,
+ 'values': [26240]},
+ 'EXIF LightSource': {'field_length': 2,
+ 'field_offset': 368,
+ 'field_type': 3,
+ 'printable': u'Unknown',
+ 'tag': 37384,
+ 'values': [0]},
+ 'EXIF MaxApertureValue': {'field_length': 8,
+ 'field_offset': 772,
+ 'field_type': 5,
+ 'printable': u'18/5',
+ 'tag': 37381,
+ 'values': [[18, 5]]},
+ 'EXIF MeteringMode': {'field_length': 2,
+ 'field_offset': 356,
+ 'field_type': 3,
+ 'printable': u'Pattern',
+ 'tag': 37383,
+ 'values': [5]},
+ 'EXIF Saturation': {'field_length': 2,
+ 'field_offset': 668,
+ 'field_type': 3,
+ 'printable': u'Normal',
+ 'tag': 41993,
+ 'values': [0]},
+ 'EXIF SceneCaptureType': {'field_length': 2,
+ 'field_offset': 632,
+ 'field_type': 3,
+ 'printable': u'Standard',
+ 'tag': 41990,
+ 'values': [0]},
+ 'EXIF SceneType': {'field_length': 1,
+ 'field_offset': 548,
+ 'field_type': 7,
+ 'printable': u'Directly Photographed',
+ 'tag': 41729,
+ 'values': [1]},
+ 'EXIF SensingMethod': {'field_length': 2,
+ 'field_offset': 524,
+ 'field_type': 3,
+ 'printable': u'One-chip color area',
+ 'tag': 41495,
+ 'values': [2]},
+ 'EXIF Sharpness': {'field_length': 2,
+ 'field_offset': 680,
+ 'field_type': 3,
+ 'printable': u'Normal',
+ 'tag': 41994,
+ 'values': [0]},
+ 'EXIF SubSecTime': {'field_length': 3,
+ 'field_offset': 428,
+ 'field_type': 2,
+ 'printable': u'10',
+ 'tag': 37520,
+ 'values': u'10'},
+ 'EXIF SubSecTimeDigitized': {'field_length': 3,
+ 'field_offset': 452,
+ 'field_type': 2,
+ 'printable': u'10',
+ 'tag': 37522,
+ 'values': u'10'},
+ 'EXIF SubSecTimeOriginal': {'field_length': 3,
+ 'field_offset': 440,
+ 'field_type': 2,
+ 'printable': u'10',
+ 'tag': 37521,
+ 'values': u'10'},
+ 'EXIF SubjectDistanceRange': {'field_length': 2,
+ 'field_offset': 692,
+ 'field_type': 3,
+ 'printable': u'0',
+ 'tag': 41996,
+ 'values': [0]},
+ 'EXIF WhiteBalance': {'field_length': 2,
+ 'field_offset': 596,
+ 'field_type': 3,
+ 'printable': u'Auto',
+ 'tag': 41987,
+ 'values': [0]},
+ 'Image DateTime': {'field_length': 20,
+ 'field_offset': 194,
+ 'field_type': 2,
+ 'printable': u'2011:06:22 12:20:33',
+ 'tag': 306,
+ 'values': u'2011:06:22 12:20:33'},
+ 'Image ExifOffset': {'field_length': 4,
+ 'field_offset': 126,
+ 'field_type': 4,
+ 'printable': u'214',
+ 'tag': 34665,
+ 'values': [214]},
+ 'Image Make': {'field_length': 18,
+ 'field_offset': 134,
+ 'field_type': 2,
+ 'printable': u'NIKON CORPORATION',
+ 'tag': 271,
+ 'values': u'NIKON CORPORATION'},
+ 'Image Model': {'field_length': 10,
+ 'field_offset': 152,
+ 'field_type': 2,
+ 'printable': u'NIKON D80',
+ 'tag': 272,
+ 'values': u'NIKON D80'},
+ 'Image Orientation': {'field_length': 2,
+ 'field_offset': 42,
+ 'field_type': 3,
+ 'printable': u'Rotated 90 CCW',
+ 'tag': 274,
+ 'values': [6]},
+ 'Image ResolutionUnit': {'field_length': 2,
+ 'field_offset': 78,
+ 'field_type': 3,
+ 'printable': u'Pixels/Inch',
+ 'tag': 296,
+ 'values': [2]},
+ 'Image Software': {'field_length': 15,
+ 'field_offset': 178,
+ 'field_type': 2,
+ 'printable': u'Shotwell 0.9.3',
+ 'tag': 305,
+ 'values': u'Shotwell 0.9.3'},
+ 'Image XResolution': {'field_length': 8,
+ 'field_offset': 162,
+ 'field_type': 5,
+ 'printable': u'300',
+ 'tag': 282,
+ 'values': [[300, 1]]},
+ 'Image YCbCrPositioning': {'field_length': 2,
+ 'field_offset': 114,
+ 'field_type': 3,
+ 'printable': u'Co-sited',
+ 'tag': 531,
+ 'values': [2]},
+ 'Image YResolution': {'field_length': 8,
+ 'field_offset': 170,
+ 'field_type': 5,
+ 'printable': u'300',
+ 'tag': 283,
+ 'values': [[300, 1]]},
+ 'Thumbnail Compression': {'field_length': 2,
+ 'field_offset': 26280,
+ 'field_type': 3,
+ 'printable': u'JPEG (old-style)',
+ 'tag': 259,
+ 'values': [6]},
+ 'Thumbnail ResolutionUnit': {'field_length': 2,
+ 'field_offset': 26316,
+ 'field_type': 3,
+ 'printable': u'Pixels/Inch',
+ 'tag': 296,
+ 'values': [2]},
+ 'Thumbnail XResolution': {'field_length': 8,
+ 'field_offset': 26360,
+ 'field_type': 5,
+ 'printable': u'300',
+ 'tag': 282,
+ 'values': [[300, 1]]},
+ 'Thumbnail YCbCrPositioning': {'field_length': 2,
+ 'field_offset': 26352,
+ 'field_type': 3,
+ 'printable': u'Co-sited',
+ 'tag': 531,
+ 'values': [2]},
+ 'Thumbnail YResolution': {'field_length': 8,
+ 'field_offset': 26368,
+ 'field_type': 5,
+ 'printable': u'300',
+ 'tag': 283,
+ 'values': [[300, 1]]}}
def test_exif_image_orientation():
diff --git a/mediagoblin/tests/test_http_callback.py b/mediagoblin/tests/test_http_callback.py
index a0511af7..64b7ee8f 100644
--- a/mediagoblin/tests/test_http_callback.py
+++ b/mediagoblin/tests/test_http_callback.py
@@ -23,7 +23,7 @@ from mediagoblin import mg_globals
from mediagoblin.tools import processing
from mediagoblin.tests.tools import fixture_add_user
from mediagoblin.tests.test_submission import GOOD_PNG
-from mediagoblin.tests import test_oauth as oauth
+from mediagoblin.tests import test_oauth2 as oauth
class TestHTTPCallback(object):
@@ -44,7 +44,7 @@ class TestHTTPCallback(object):
'password': self.user_password})
def get_access_token(self, client_id, client_secret, code):
- response = self.test_app.get('/oauth/access_token', {
+ response = self.test_app.get('/oauth-2/access_token', {
'code': code,
'client_id': client_id,
'client_secret': client_secret})
diff --git a/mediagoblin/tests/test_ldap.py b/mediagoblin/tests/test_ldap.py
new file mode 100644
index 00000000..48efb4b6
--- /dev/null
+++ b/mediagoblin/tests/test_ldap.py
@@ -0,0 +1,125 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import urlparse
+import pkg_resources
+import pytest
+import mock
+
+from mediagoblin import mg_globals
+from mediagoblin.db.base import Session
+from mediagoblin.tests.tools import get_app
+from mediagoblin.tools import template
+
+pytest.importorskip("ldap")
+
+
+@pytest.fixture()
+def ldap_plugin_app(request):
+ return get_app(
+ request,
+ mgoblin_config=pkg_resources.resource_filename(
+ 'mediagoblin.tests.auth_configs',
+ 'ldap_appconfig.ini'))
+
+
+def return_value():
+ return u'chris', u'chris@example.com'
+
+
+def test_ldap_plugin(ldap_plugin_app):
+ res = ldap_plugin_app.get('/auth/login/')
+
+ assert urlparse.urlsplit(res.location)[2] == '/auth/ldap/login/'
+
+ res = ldap_plugin_app.get('/auth/register/')
+
+ assert urlparse.urlsplit(res.location)[2] == '/auth/ldap/register/'
+
+ res = ldap_plugin_app.get('/auth/ldap/register/')
+
+ assert urlparse.urlsplit(res.location)[2] == '/auth/ldap/login/'
+
+ template.clear_test_template_context()
+ res = ldap_plugin_app.post(
+ '/auth/ldap/login/', {})
+
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/login.html']
+ form = context['login_form']
+ assert form.username.errors == [u'This field is required.']
+ assert form.password.errors == [u'This field is required.']
+
+ @mock.patch('mediagoblin.plugins.ldap.tools.LDAP.login', mock.Mock(return_value=return_value()))
+ def _test_authentication():
+ template.clear_test_template_context()
+ res = ldap_plugin_app.post(
+ '/auth/ldap/login/',
+ {'username': u'chris',
+ 'password': u'toast'})
+
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html']
+ register_form = context['register_form']
+
+ assert register_form.username.data == u'chris'
+ assert register_form.email.data == u'chris@example.com'
+
+ template.clear_test_template_context()
+ res = ldap_plugin_app.post(
+ '/auth/ldap/register/',
+ {'username': u'chris',
+ 'email': u'chris@example.com'})
+ res.follow()
+
+ assert urlparse.urlsplit(res.location)[2] == '/u/chris/'
+ assert 'mediagoblin/user_pages/user_nonactive.html' in template.TEMPLATE_TEST_CONTEXT
+
+ # Try to register with same email and username
+ template.clear_test_template_context()
+ res = ldap_plugin_app.post(
+ '/auth/ldap/register/',
+ {'username': u'chris',
+ 'email': u'chris@example.com'})
+
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html']
+ register_form = context['register_form']
+
+ assert register_form.email.errors == [u'Sorry, a user with that email address already exists.']
+ assert register_form.username.errors == [u'Sorry, a user with that name already exists.']
+
+ # Log out
+ ldap_plugin_app.get('/auth/logout/')
+
+ # Get user and detach from session
+ test_user = mg_globals.database.User.query.filter_by(
+ username=u'chris').first()
+ Session.expunge(test_user)
+
+ # Log back in
+ template.clear_test_template_context()
+ res = ldap_plugin_app.post(
+ '/auth/ldap/login/',
+ {'username': u'chris',
+ 'password': u'toast'})
+ res.follow()
+
+ assert urlparse.urlsplit(res.location)[2] == '/'
+ assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT
+
+ # Make sure user is in the session
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html']
+ session = context['request'].session
+ assert session['user_id'] == unicode(test_user.id)
+
+ _test_authentication()
diff --git a/mediagoblin/tests/test_messages.py b/mediagoblin/tests/test_messages.py
index 3ac917b0..22f9e800 100644
--- a/mediagoblin/tests/test_messages.py
+++ b/mediagoblin/tests/test_messages.py
@@ -14,7 +14,7 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-from mediagoblin.messages import fetch_messages, add_message
+from mediagoblin import messages
from mediagoblin.tools import template
@@ -32,11 +32,19 @@ def test_messages(test_app):
# The message queue should be empty
assert request.session.get('messages', []) == []
+ # First of all, we should clear the messages queue
+ messages.clear_add_message()
# Adding a message should modify the session accordingly
- add_message(request, 'herp_derp', 'First!')
+ messages.add_message(request, 'herp_derp', 'First!')
test_msg_queue = [{'text': 'First!', 'level': 'herp_derp'}]
- assert request.session['messages'] == test_msg_queue
+
+ # Alternative tests to the following, test divided in two steps:
+ # assert request.session['messages'] == test_msg_queue
+ # 1. Tests if add_message worked
+ assert messages.ADD_MESSAGE_TEST[-1] == test_msg_queue
+ # 2. Tests if add_message updated session information
+ assert messages.ADD_MESSAGE_TEST[-1] == request.session['messages']
# fetch_messages should return and empty the queue
- assert fetch_messages(request) == test_msg_queue
+ assert messages.fetch_messages(request) == test_msg_queue
assert request.session.get('messages') == []
diff --git a/mediagoblin/tests/test_mgoblin_app.ini b/mediagoblin/tests/test_mgoblin_app.ini
index 9f95a398..4cd3d9b6 100644
--- a/mediagoblin/tests/test_mgoblin_app.ini
+++ b/mediagoblin/tests/test_mgoblin_app.ini
@@ -3,8 +3,9 @@ direct_remote_path = /test_static/
email_sender_address = "notice@mediagoblin.example.org"
email_debug_mode = true
-# TODO: Switch to using an in-memory database
-sql_engine = "sqlite:///%(here)s/test_user_dev/mediagoblin.db"
+#Runs with an in-memory sqlite db for speed.
+sql_engine = "sqlite://"
+run_migrations = true
# tag parsing
tags_max_length = 50
@@ -12,26 +13,28 @@ tags_max_length = 50
# So we can start to test attachments:
allow_attachments = True
-# Celery shouldn't be set up by the application as it's setup via
-# mediagoblin.init.celery.from_celery
-celery_setup_elsewhere = true
+upload_limit = 500
-media_types = mediagoblin.media_types.image, mediagoblin.media_types.pdf
+max_file_size = 2
[storage:publicstore]
-base_dir = %(here)s/test_user_dev/media/public
+base_dir = %(here)s/user_dev/media/public
base_url = /mgoblin_media/
[storage:queuestore]
-base_dir = %(here)s/test_user_dev/media/queue
+base_dir = %(here)s/user_dev/media/queue
[celery]
CELERY_ALWAYS_EAGER = true
-CELERY_RESULT_DBURI = "sqlite:///%(here)s/test_user_dev/celery.db"
-BROKER_HOST = "sqlite:///%(here)s/test_user_dev/kombu.db"
+CELERY_RESULT_DBURI = "sqlite:///%(here)s/user_dev/celery.db"
+BROKER_URL = "sqlite:///%(here)s/test_user_dev/kombu.db"
[plugins]
[[mediagoblin.plugins.api]]
[[mediagoblin.plugins.oauth]]
[[mediagoblin.plugins.httpapiauth]]
-
+[[mediagoblin.plugins.piwigo]]
+[[mediagoblin.plugins.basic_auth]]
+[[mediagoblin.plugins.openid]]
+[[mediagoblin.media_types.image]]
+[[mediagoblin.media_types.pdf]]
diff --git a/mediagoblin/tests/test_misc.py b/mediagoblin/tests/test_misc.py
index 755d863f..43ad0b6d 100644
--- a/mediagoblin/tests/test_misc.py
+++ b/mediagoblin/tests/test_misc.py
@@ -28,8 +28,10 @@ def test_user_deletes_other_comments(test_app):
user_a = fixture_add_user(u"chris_a")
user_b = fixture_add_user(u"chris_b")
- media_a = fixture_media_entry(uploader=user_a.id, save=False)
- media_b = fixture_media_entry(uploader=user_b.id, save=False)
+ media_a = fixture_media_entry(uploader=user_a.id, save=False,
+ expunge=False, fake_upload=False)
+ media_b = fixture_media_entry(uploader=user_b.id, save=False,
+ expunge=False, fake_upload=False)
Session.add(media_a)
Session.add(media_b)
Session.flush()
@@ -79,7 +81,7 @@ def test_user_deletes_other_comments(test_app):
def test_media_deletes_broken_attachment(test_app):
user_a = fixture_add_user(u"chris_a")
- media = fixture_media_entry(uploader=user_a.id, save=False)
+ media = fixture_media_entry(uploader=user_a.id, save=False, expunge=False)
media.attachment_files.append(dict(
name=u"some name",
filepath=[u"does", u"not", u"exist"],
diff --git a/mediagoblin/tests/test_modelmethods.py b/mediagoblin/tests/test_modelmethods.py
index 427aa47c..86513c76 100644
--- a/mediagoblin/tests/test_modelmethods.py
+++ b/mediagoblin/tests/test_modelmethods.py
@@ -18,7 +18,7 @@
# methods, and so it makes sense to test them here.
from mediagoblin.db.base import Session
-from mediagoblin.db.models import MediaEntry
+from mediagoblin.db.models import MediaEntry, User, Privilege
from mediagoblin.tests.tools import fixture_add_user
@@ -47,7 +47,7 @@ class TestMediaEntrySlugs(object):
entry.id = this_id
entry.uploader = uploader or self.chris_user.id
entry.media_type = u'image'
-
+
if save:
entry.save()
@@ -99,7 +99,7 @@ class TestMediaEntrySlugs(object):
u"Beware, I exist!!", this_id=9000, save=False)
entry.generate_slug()
assert entry.slug == u"beware-i-exist-test"
-
+
_real_test()
def test_existing_slug_cant_use_id_extra_junk(self, test_app):
@@ -151,6 +151,44 @@ class TestMediaEntrySlugs(object):
qbert_entry.generate_slug()
assert qbert_entry.slug is None
+class TestUserHasPrivilege:
+ def _setup(self):
+ fixture_add_user(u'natalie',
+ privileges=[u'admin',u'moderator',u'active'])
+ fixture_add_user(u'aeva',
+ privileges=[u'moderator',u'active'])
+ self.natalie_user = User.query.filter(
+ User.username==u'natalie').first()
+ self.aeva_user = User.query.filter(
+ User.username==u'aeva').first()
+
+ def test_privilege_added_correctly(self, test_app):
+ self._setup()
+ admin = Privilege.query.filter(
+ Privilege.privilege_name == u'admin').one()
+ # first make sure the privileges were added successfully
+
+ assert admin in self.natalie_user.all_privileges
+ assert admin not in self.aeva_user.all_privileges
+
+ def test_user_has_privilege_one(self, test_app):
+ self._setup()
+
+ # then test out the user.has_privilege method for one privilege
+ assert not self.natalie_user.has_privilege(u'commenter')
+ assert self.aeva_user.has_privilege(u'active')
+
+
+ def test_user_has_privileges_multiple(self, test_app):
+ self._setup()
+
+ # when multiple args are passed to has_privilege, the method returns
+ # True if the user has ANY of the privileges
+ assert self.natalie_user.has_privilege(u'admin',u'commenter')
+ assert self.aeva_user.has_privilege(u'moderator',u'active')
+ assert not self.natalie_user.has_privilege(u'commenter',u'uploader')
+
+
def test_media_data_init(test_app):
Session.rollback()
@@ -165,3 +203,4 @@ def test_media_data_init(test_app):
obj_in_session += 1
print repr(obj)
assert obj_in_session == 0
+
diff --git a/mediagoblin/tests/test_moderation.py b/mediagoblin/tests/test_moderation.py
new file mode 100644
index 00000000..e7a0ebef
--- /dev/null
+++ b/mediagoblin/tests/test_moderation.py
@@ -0,0 +1,242 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import pytest
+
+from mediagoblin.tests.tools import (fixture_add_user,
+ fixture_add_comment_report, fixture_add_comment)
+from mediagoblin.db.models import User, CommentReport, MediaComment, UserBan
+from mediagoblin.tools import template, mail
+from webtest import AppError
+
+class TestModerationViews:
+ @pytest.fixture(autouse=True)
+ def _setup(self, test_app):
+ self.test_app = test_app
+
+ fixture_add_user(u'admin',
+ privileges=[u'admin',u'active'])
+ fixture_add_user(u'moderator',
+ privileges=[u'moderator',u'active'])
+ fixture_add_user(u'regular',
+ privileges=[u'active',u'commenter'])
+ self.query_for_users()
+
+ def login(self, username):
+ self.test_app.post(
+ '/auth/login/', {
+ 'username': username,
+ 'password': 'toast'})
+ self.query_for_users()
+
+ def logout(self):
+ self.test_app.get('/auth/logout/')
+ self.query_for_users()
+
+ def query_for_users(self):
+ self.admin_user = User.query.filter(User.username==u'admin').first()
+ self.mod_user = User.query.filter(User.username==u'moderator').first()
+ self.user = User.query.filter(User.username==u'regular').first()
+
+ def do_post(self, data, *context_keys, **kwargs):
+ url = kwargs.pop('url', '/submit/')
+ do_follow = kwargs.pop('do_follow', False)
+ template.clear_test_template_context()
+ response = self.test_app.post(url, data, **kwargs)
+ if do_follow:
+ response.follow()
+ context_data = template.TEMPLATE_TEST_CONTEXT
+ for key in context_keys:
+ context_data = context_data[key]
+ return response, context_data
+
+ def testGiveOrTakeAwayPrivileges(self):
+ self.login(u'admin')
+ # First, test an admin taking away a privilege from a user
+ #----------------------------------------------------------------------
+ response, context = self.do_post({'privilege_name':u'commenter'},
+ url='/mod/users/{0}/privilege/'.format(self.user.username))
+ assert response.status == '302 FOUND'
+ self.query_for_users()
+ assert not self.user.has_privilege(u'commenter')
+
+ # Then, test an admin giving a privilege to a user
+ #----------------------------------------------------------------------
+ response, context = self.do_post({'privilege_name':u'commenter'},
+ url='/mod/users/{0}/privilege/'.format(self.user.username))
+ assert response.status == '302 FOUND'
+ self.query_for_users()
+ assert self.user.has_privilege(u'commenter')
+
+ # Then, test a mod trying to take away a privilege from a user
+ # they are not allowed to do this, so this will raise an error
+ #----------------------------------------------------------------------
+ self.logout()
+ self.login(u'moderator')
+
+ with pytest.raises(AppError) as excinfo:
+ response, context = self.do_post({'privilege_name':u'commenter'},
+ url='/mod/users/{0}/privilege/'.format(self.user.username))
+ assert 'Bad response: 403 FORBIDDEN' in str(excinfo)
+ self.query_for_users()
+
+ assert self.user.has_privilege(u'commenter')
+
+ def testReportResolution(self):
+ self.login(u'moderator')
+
+ # First, test a moderators taking away a user's privilege in response
+ # to a reported comment
+ #----------------------------------------------------------------------
+ fixture_add_comment_report(reported_user=self.user)
+ comment_report = CommentReport.query.filter(
+ CommentReport.reported_user==self.user).first()
+
+ response = self.test_app.get('/mod/reports/{0}/'.format(
+ comment_report.id))
+ assert response.status == '200 OK'
+ self.query_for_users()
+ comment_report = CommentReport.query.filter(
+ CommentReport.reported_user==self.user).first()
+
+ response, context = self.do_post({'action_to_resolve':[u'takeaway'],
+ 'take_away_privileges':[u'commenter'],
+ 'targeted_user':self.user.id},
+ url='/mod/reports/{0}/'.format(comment_report.id))
+
+ self.query_for_users()
+ comment_report = CommentReport.query.filter(
+ CommentReport.reported_user==self.user).first()
+ assert response.status == '302 FOUND'
+ assert not self.user.has_privilege(u'commenter')
+ assert comment_report.is_archived_report() is True
+
+ fixture_add_comment_report(reported_user=self.user)
+ comment_report = CommentReport.query.filter(
+ CommentReport.reported_user==self.user).first()
+
+ # Then, test a moderator sending an email to a user in response to a
+ # reported comment
+ #----------------------------------------------------------------------
+ self.query_for_users()
+
+ response, context = self.do_post({'action_to_resolve':[u'sendmessage'],
+ 'message_to_user':'This is your last warning, regular....',
+ 'targeted_user':self.user.id},
+ url='/mod/reports/{0}/'.format(comment_report.id))
+
+ self.query_for_users()
+ comment_report = CommentReport.query.filter(
+ CommentReport.reported_user==self.user).first()
+ assert response.status == '302 FOUND'
+ assert mail.EMAIL_TEST_MBOX_INBOX == [{'to': [u'regular@example.com'],
+ 'message': 'Content-Type: text/plain; charset="utf-8"\n\
+MIME-Version: 1.0\nContent-Transfer-Encoding: base64\nSubject: Warning from- \
+moderator \nFrom: notice@mediagoblin.example.org\nTo: regular@example.com\n\n\
+VGhpcyBpcyB5b3VyIGxhc3Qgd2FybmluZywgcmVndWxhci4uLi4=\n',
+ 'from': 'notice@mediagoblin.example.org'}]
+ assert comment_report.is_archived_report() is True
+
+ # Then test a moderator banning a user AND a moderator deleting the
+ # offending comment. This also serves as a test for taking multiple
+ # actions to resolve a report
+ #----------------------------------------------------------------------
+ self.query_for_users()
+ fixture_add_comment(author=self.user.id,
+ comment=u'Comment will be removed')
+ test_comment = MediaComment.query.filter(
+ MediaComment.author==self.user.id).first()
+ fixture_add_comment_report(comment=test_comment,
+ reported_user=self.user)
+ comment_report = CommentReport.query.filter(
+ CommentReport.comment==test_comment).filter(
+ CommentReport.resolved==None).first()
+
+ response, context = self.do_post(
+ {'action_to_resolve':[u'userban', u'delete'],
+ 'targeted_user':self.user.id,
+ 'why_user_was_banned':u'',
+ 'user_banned_until':u''},
+ url='/mod/reports/{0}/'.format(comment_report.id))
+ assert response.status == '302 FOUND'
+ self.query_for_users()
+ test_user_ban = UserBan.query.filter(
+ UserBan.user_id == self.user.id).first()
+ assert test_user_ban is not None
+ test_comment = MediaComment.query.filter(
+ MediaComment.author==self.user.id).first()
+ assert test_comment is None
+
+ # Then, test what happens when a moderator attempts to punish an admin
+ # from a reported comment on an admin.
+ #----------------------------------------------------------------------
+ fixture_add_comment_report(reported_user=self.admin_user)
+ comment_report = CommentReport.query.filter(
+ CommentReport.reported_user==self.admin_user).filter(
+ CommentReport.resolved==None).first()
+
+ response, context = self.do_post({'action_to_resolve':[u'takeaway'],
+ 'take_away_privileges':[u'active'],
+ 'targeted_user':self.admin_user.id},
+ url='/mod/reports/{0}/'.format(comment_report.id))
+ self.query_for_users()
+
+ assert response.status == '200 OK'
+ assert self.admin_user.has_privilege(u'active')
+
+ def testAllModerationViews(self):
+ self.login(u'moderator')
+ username = self.user.username
+ self.query_for_users()
+ fixture_add_comment_report(reported_user=self.admin_user)
+ response = self.test_app.get('/mod/reports/')
+ assert response.status == "200 OK"
+
+ response = self.test_app.get('/mod/reports/1/')
+ assert response.status == "200 OK"
+
+ response = self.test_app.get('/mod/users/')
+ assert response.status == "200 OK"
+
+ user_page_url = '/mod/users/{0}/'.format(username)
+ response = self.test_app.get(user_page_url)
+ assert response.status == "200 OK"
+
+ self.test_app.get('/mod/media/')
+ assert response.status == "200 OK"
+
+ def testBanUnBanUser(self):
+ self.login(u'admin')
+ username = self.user.username
+ user_id = self.user.id
+ ban_url = '/mod/users/{0}/ban/'.format(username)
+ response, context = self.do_post({
+ 'user_banned_until':u'',
+ 'why_user_was_banned':u'Because I said so'},
+ url=ban_url)
+
+ assert response.status == "302 FOUND"
+ user_banned = UserBan.query.filter(UserBan.user_id==user_id).first()
+ assert user_banned is not None
+ assert user_banned.expiration_date is None
+ assert user_banned.reason == u'Because I said so'
+
+ response, context = self.do_post({},
+ url=ban_url)
+
+ assert response.status == "302 FOUND"
+ user_banned = UserBan.query.filter(UserBan.user_id==user_id).first()
+ assert user_banned is None
diff --git a/mediagoblin/tests/test_notifications.py b/mediagoblin/tests/test_notifications.py
new file mode 100644
index 00000000..3bf36f5f
--- /dev/null
+++ b/mediagoblin/tests/test_notifications.py
@@ -0,0 +1,209 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import pytest
+
+import urlparse
+
+from mediagoblin.tools import template, mail
+
+from mediagoblin.db.models import Notification, CommentNotification, \
+ CommentSubscription
+from mediagoblin.db.base import Session
+
+from mediagoblin.notifications import mark_comment_notification_seen
+
+from mediagoblin.tests.tools import fixture_add_comment, \
+ fixture_media_entry, fixture_add_user, \
+ fixture_comment_subscription
+
+
+class TestNotifications:
+ @pytest.fixture(autouse=True)
+ def setup(self, test_app):
+ self.test_app = test_app
+
+ # TODO: Possibly abstract into a decorator like:
+ # @as_authenticated_user('chris')
+ self.test_user = fixture_add_user(privileges=[u'active',u'commenter'])
+
+ self.current_user = None
+
+ self.login()
+
+ def login(self, username=u'chris', password=u'toast'):
+ response = self.test_app.post(
+ '/auth/login/', {
+ 'username': username,
+ 'password': password})
+
+ response.follow()
+
+ assert urlparse.urlsplit(response.location)[2] == '/'
+ assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT
+
+ ctx = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html']
+
+ assert Session.merge(ctx['request'].user).username == username
+
+ self.current_user = ctx['request'].user
+
+ def logout(self):
+ self.test_app.get('/auth/logout/')
+ self.current_user = None
+
+ @pytest.mark.parametrize('wants_email', [True, False])
+ def test_comment_notification(self, wants_email):
+ '''
+ Test
+ - if a notification is created when posting a comment on
+ another users media entry.
+ - that the comment data is consistent and exists.
+
+ '''
+ user = fixture_add_user('otherperson', password='nosreprehto',
+ wants_comment_notification=wants_email,
+ privileges=[u'active',u'commenter'])
+
+ assert user.wants_comment_notification == wants_email
+
+ user_id = user.id
+
+ media_entry = fixture_media_entry(uploader=user.id, state=u'processed')
+
+ media_entry_id = media_entry.id
+
+ subscription = fixture_comment_subscription(media_entry)
+
+ subscription_id = subscription.id
+
+ media_uri_id = '/u/{0}/m/{1}/'.format(user.username,
+ media_entry.id)
+ media_uri_slug = '/u/{0}/m/{1}/'.format(user.username,
+ media_entry.slug)
+
+ self.test_app.post(
+ media_uri_id + 'comment/add/',
+ {
+ 'comment_content': u'Test comment #42'
+ }
+ )
+
+ notifications = Notification.query.filter_by(
+ user_id=user.id).all()
+
+ assert len(notifications) == 1
+
+ notification = notifications[0]
+
+ assert type(notification) == CommentNotification
+ assert notification.seen == False
+ assert notification.user_id == user.id
+ assert notification.subject.get_author.id == self.test_user.id
+ assert notification.subject.content == u'Test comment #42'
+
+ if wants_email == True:
+ assert mail.EMAIL_TEST_MBOX_INBOX == [
+ {'from': 'notice@mediagoblin.example.org',
+ 'message': 'Content-Type: text/plain; \
+charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: \
+base64\nSubject: GNU MediaGoblin - chris commented on your \
+post\nFrom: notice@mediagoblin.example.org\nTo: \
+otherperson@example.com\n\nSGkgb3RoZXJwZXJzb24sCmNocmlzIGNvbW1lbnRlZCBvbiB5b3VyIHBvc3QgKGh0dHA6Ly9sb2Nh\nbGhvc3Q6ODAvdS9vdGhlcnBlcnNvbi9tL3NvbWUtdGl0bGUvYy8xLyNjb21tZW50KSBhdCBHTlUg\nTWVkaWFHb2JsaW4KClRlc3QgY29tbWVudCAjNDIKCkdOVSBNZWRpYUdvYmxpbg==\n',
+ 'to': [u'otherperson@example.com']}]
+ else:
+ assert mail.EMAIL_TEST_MBOX_INBOX == []
+
+
+ # Save the ids temporarily because of DetachedInstanceError
+ notification_id = notification.id
+ comment_id = notification.subject.id
+
+ self.logout()
+ self.login('otherperson', 'nosreprehto')
+
+ self.test_app.get(media_uri_slug + '/c/{0}/'.format(comment_id))
+
+ notification = Notification.query.filter_by(id=notification_id).first()
+
+ assert notification.seen == True
+
+ self.test_app.get(media_uri_slug + '/notifications/silence/')
+
+ subscription = CommentSubscription.query.filter_by(id=subscription_id)\
+ .first()
+
+ assert subscription.notify == False
+
+ notifications = Notification.query.filter_by(
+ user_id=user_id).all()
+
+ # User should not have been notified
+ assert len(notifications) == 1
+
+ def test_mark_all_comment_notifications_seen(self):
+ """ Test that mark_all_comments_seen works"""
+
+ user = fixture_add_user('otherperson', password='nosreprehto',
+ privileges=[u'active'])
+
+ media_entry = fixture_media_entry(uploader=user.id, state=u'processed')
+
+ fixture_comment_subscription(media_entry)
+
+ media_uri_id = '/u/{0}/m/{1}/'.format(user.username,
+ media_entry.id)
+
+ # add 2 comments
+ self.test_app.post(
+ media_uri_id + 'comment/add/',
+ {
+ 'comment_content': u'Test comment #43'
+ }
+ )
+
+ self.test_app.post(
+ media_uri_id + 'comment/add/',
+ {
+ 'comment_content': u'Test comment #44'
+ }
+ )
+
+ notifications = Notification.query.filter_by(
+ user_id=user.id).all()
+
+ assert len(notifications) == 2
+
+ # both comments should not be marked seen
+ assert notifications[0].seen == False
+ assert notifications[1].seen == False
+
+ # login with other user to mark notifications seen
+ self.logout()
+ self.login('otherperson', 'nosreprehto')
+
+ # mark all comment notifications seen
+ res = self.test_app.get('/notifications/comments/mark_all_seen/')
+ res.follow()
+
+ assert urlparse.urlsplit(res.location)[2] == '/'
+
+ notifications = Notification.query.filter_by(
+ user_id=user.id).all()
+
+ # both notifications should be marked seen
+ assert notifications[0].seen == True
+ assert notifications[1].seen == True
diff --git a/mediagoblin/tests/test_oauth1.py b/mediagoblin/tests/test_oauth1.py
new file mode 100644
index 00000000..073c2884
--- /dev/null
+++ b/mediagoblin/tests/test_oauth1.py
@@ -0,0 +1,166 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import cgi
+
+import pytest
+from urlparse import parse_qs, urlparse
+
+from oauthlib.oauth1 import Client
+
+from mediagoblin import mg_globals
+from mediagoblin.tools import template, pluginapi
+from mediagoblin.tests.tools import fixture_add_user
+
+
+class TestOAuth(object):
+
+ MIME_FORM = "application/x-www-form-urlencoded"
+ MIME_JSON = "application/json"
+
+ @pytest.fixture(autouse=True)
+ def setup(self, test_app):
+ self.test_app = test_app
+
+ self.db = mg_globals.database
+
+ self.pman = pluginapi.PluginManager()
+
+ self.user_password = "AUserPassword123"
+ self.user = fixture_add_user("OAuthy", self.user_password)
+
+ self.login()
+
+ def login(self):
+ self.test_app.post(
+ "/auth/login/", {
+ "username": self.user.username,
+ "password": self.user_password})
+
+ def register_client(self, **kwargs):
+ """ Regiters a client with the API """
+
+ kwargs["type"] = "client_associate"
+ kwargs["application_type"] = kwargs.get("application_type", "native")
+ return self.test_app.post("/api/client/register", kwargs)
+
+ def test_client_client_register_limited_info(self):
+ """ Tests that a client can be registered with limited information """
+ response = self.register_client()
+ client_info = response.json
+
+ client = self.db.Client.query.filter_by(id=client_info["client_id"]).first()
+
+ assert response.status_int == 200
+ assert client is not None
+
+ def test_client_register_full_info(self):
+ """ Provides every piece of information possible to register client """
+ query = {
+ "application_name": "Testificate MD",
+ "application_type": "web",
+ "contacts": "someone@someplace.com tuteo@tsengeo.lu",
+ "logo_url": "http://ayrel.com/utral.png",
+ "redirect_uris": "http://navi-kosman.lu http://gmg-yawne-oeru.lu",
+ }
+
+ response = self.register_client(**query)
+ client_info = response.json
+
+ client = self.db.Client.query.filter_by(id=client_info["client_id"]).first()
+
+ assert client is not None
+ assert client.secret == client_info["client_secret"]
+ assert client.application_type == query["application_type"]
+ assert client.redirect_uri == query["redirect_uris"].split()
+ assert client.logo_url == query["logo_url"]
+ assert client.contacts == query["contacts"].split()
+
+
+ def test_client_update(self):
+ """ Tests that you can update a client """
+ # first we need to register a client
+ response = self.register_client()
+
+ client_info = response.json
+ client = self.db.Client.query.filter_by(id=client_info["client_id"]).first()
+
+ # Now update
+ update_query = {
+ "type": "client_update",
+ "application_name": "neytiri",
+ "contacts": "someone@someplace.com abc@cba.com",
+ "logo_url": "http://place.com/picture.png",
+ "application_type": "web",
+ "redirect_uris": "http://blah.gmg/whatever https://inboxen.org/",
+ }
+
+ update_response = self.register_client(**update_query)
+
+ assert update_response.status_int == 200
+ client_info = update_response.json
+ client = self.db.Client.query.filter_by(id=client_info["client_id"]).first()
+
+ assert client.secret == client_info["client_secret"]
+ assert client.application_type == update_query["application_type"]
+ assert client.application_name == update_query["application_name"]
+ assert client.contacts == update_query["contacts"].split()
+ assert client.logo_url == update_query["logo_url"]
+ assert client.redirect_uri == update_query["redirect_uris"].split()
+
+ def to_authorize_headers(self, data):
+ headers = ""
+ for key, value in data.items():
+ headers += '{0}="{1}",'.format(key, value)
+ return {"Authorization": "OAuth " + headers[:-1]}
+
+ def test_request_token(self):
+ """ Test a request for a request token """
+ response = self.register_client()
+
+ client_id = response.json["client_id"]
+
+ endpoint = "/oauth/request_token"
+ request_query = {
+ "oauth_consumer_key": client_id,
+ "oauth_nonce": "abcdefghij",
+ "oauth_timestamp": 123456789.0,
+ "oauth_callback": "https://some.url/callback",
+ }
+
+ headers = self.to_authorize_headers(request_query)
+
+ headers["Content-Type"] = self.MIME_FORM
+
+ response = self.test_app.post(endpoint, headers=headers)
+ response = cgi.parse_qs(response.body)
+
+ # each element is a list, reduce it to a string
+ for key, value in response.items():
+ response[key] = value[0]
+
+ request_token = self.db.RequestToken.query.filter_by(
+ token=response["oauth_token"]
+ ).first()
+
+ client = self.db.Client.query.filter_by(id=client_id).first()
+
+ assert request_token is not None
+ assert request_token.secret == response["oauth_token_secret"]
+ assert request_token.client == client.id
+ assert request_token.used == False
+ assert request_token.callback == request_query["oauth_callback"]
+
diff --git a/mediagoblin/tests/test_oauth.py b/mediagoblin/tests/test_oauth2.py
index ea3bd798..957f4e65 100644
--- a/mediagoblin/tests/test_oauth.py
+++ b/mediagoblin/tests/test_oauth2.py
@@ -38,7 +38,8 @@ class TestOAuth(object):
self.pman = pluginapi.PluginManager()
self.user_password = u'4cc355_70k3N'
- self.user = fixture_add_user(u'joauth', self.user_password)
+ self.user = fixture_add_user(u'joauth', self.user_password,
+ privileges=[u'active'])
self.login()
@@ -51,7 +52,7 @@ class TestOAuth(object):
def register_client(self, name, client_type, description=None,
redirect_uri=''):
return self.test_app.post(
- '/oauth/client/register', {
+ '/oauth-2/client/register', {
'name': name,
'description': description,
'type': client_type,
@@ -115,7 +116,7 @@ class TestOAuth(object):
client_identifier = client.identifier
redirect_uri = 'https://foo.example'
- response = self.test_app.get('/oauth/authorize', {
+ response = self.test_app.get('/oauth-2/authorize', {
'client_id': client.identifier,
'scope': 'all',
'redirect_uri': redirect_uri})
@@ -129,7 +130,7 @@ class TestOAuth(object):
# Short for client authorization post reponse
capr = self.test_app.post(
- '/oauth/client/authorize', {
+ '/oauth-2/client/authorize', {
'client_id': form.client_id.data,
'allow': 'Allow',
'next': form.next.data})
@@ -155,7 +156,7 @@ class TestOAuth(object):
client = self.db.OAuthClient.query.filter(
self.db.OAuthClient.identifier == unicode(client_id)).first()
- token_res = self.test_app.get('/oauth/access_token?client_id={0}&\
+ token_res = self.test_app.get('/oauth-2/access_token?client_id={0}&\
code={1}&client_secret={2}'.format(client_id, code, client.secret))
assert token_res.status_int == 200
@@ -183,7 +184,7 @@ code={1}&client_secret={2}'.format(client_id, code, client.secret))
client = self.db.OAuthClient.query.filter(
self.db.OAuthClient.identifier == unicode(client_id)).first()
- token_res = self.test_app.get('/oauth/access_token?\
+ token_res = self.test_app.get('/oauth-2/access_token?\
code={0}&client_secret={1}'.format(code, client.secret))
assert token_res.status_int == 200
@@ -204,7 +205,7 @@ code={0}&client_secret={1}'.format(code, client.secret))
client = self.db.OAuthClient.query.filter(
self.db.OAuthClient.identifier == client_id).first()
- token_res = self.test_app.get('/oauth/access_token',
+ token_res = self.test_app.get('/oauth-2/access_token',
{'refresh_token': token_data['refresh_token'],
'client_id': client_id,
'client_secret': client.secret
diff --git a/mediagoblin/tests/test_openid.py b/mediagoblin/tests/test_openid.py
new file mode 100644
index 00000000..0424fdda
--- /dev/null
+++ b/mediagoblin/tests/test_openid.py
@@ -0,0 +1,374 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import urlparse
+import pkg_resources
+import pytest
+import mock
+
+openid_consumer = pytest.importorskip(
+ "openid.consumer.consumer")
+
+from mediagoblin import mg_globals
+from mediagoblin.db.base import Session
+from mediagoblin.db.models import User
+from mediagoblin.plugins.openid.models import OpenIDUserURL
+from mediagoblin.tests.tools import get_app, fixture_add_user
+from mediagoblin.tools import template
+
+
+# App with plugin enabled
+@pytest.fixture()
+def openid_plugin_app(request):
+ return get_app(
+ request,
+ mgoblin_config=pkg_resources.resource_filename(
+ 'mediagoblin.tests.auth_configs',
+ 'openid_appconfig.ini'))
+
+
+class TestOpenIDPlugin(object):
+ def _setup(self, openid_plugin_app, value=True, edit=False, delete=False):
+ if value:
+ response = openid_consumer.SuccessResponse(mock.Mock(), mock.Mock())
+ if edit or delete:
+ response.identity_url = u'http://add.myopenid.com'
+ else:
+ response.identity_url = u'http://real.myopenid.com'
+ self._finish_verification = mock.Mock(return_value=response)
+ else:
+ self._finish_verification = mock.Mock(return_value=False)
+
+ @mock.patch('mediagoblin.plugins.openid.views._response_email', mock.Mock(return_value=None))
+ @mock.patch('mediagoblin.plugins.openid.views._response_nickname', mock.Mock(return_value=None))
+ @mock.patch('mediagoblin.plugins.openid.views._finish_verification', self._finish_verification)
+ def _setup_start(self, openid_plugin_app, edit, delete):
+ if edit:
+ self._start_verification = mock.Mock(return_value=openid_plugin_app.post(
+ '/edit/openid/finish/'))
+ elif delete:
+ self._start_verification = mock.Mock(return_value=openid_plugin_app.post(
+ '/edit/openid/delete/finish/'))
+ else:
+ self._start_verification = mock.Mock(return_value=openid_plugin_app.post(
+ '/auth/openid/login/finish/'))
+ _setup_start(self, openid_plugin_app, edit, delete)
+
+ def test_bad_login(self, openid_plugin_app):
+ """ Test that attempts to login with invalid paramaters"""
+
+ # Test GET request for auth/register page
+ res = openid_plugin_app.get('/auth/register/').follow()
+
+ # Make sure it redirected to the correct place
+ assert urlparse.urlsplit(res.location)[2] == '/auth/openid/login/'
+
+ # Test GET request for auth/login page
+ res = openid_plugin_app.get('/auth/login/')
+ res.follow()
+
+ # Correct redirect?
+ assert urlparse.urlsplit(res.location)[2] == '/auth/openid/login/'
+
+ # Test GET request for auth/openid/register page
+ res = openid_plugin_app.get('/auth/openid/register/')
+ res.follow()
+
+ # Correct redirect?
+ assert urlparse.urlsplit(res.location)[2] == '/auth/openid/login/'
+
+ # Test GET request for auth/openid/login/finish page
+ res = openid_plugin_app.get('/auth/openid/login/finish/')
+ res.follow()
+
+ # Correct redirect?
+ assert urlparse.urlsplit(res.location)[2] == '/auth/openid/login/'
+
+ # Test GET request for auth/openid/login page
+ res = openid_plugin_app.get('/auth/openid/login/')
+
+ # Correct place?
+ assert 'mediagoblin/plugins/openid/login.html' in template.TEMPLATE_TEST_CONTEXT
+
+ # Try to login with an empty form
+ template.clear_test_template_context()
+ openid_plugin_app.post(
+ '/auth/openid/login/', {})
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/plugins/openid/login.html']
+ form = context['login_form']
+ assert form.openid.errors == [u'This field is required.']
+
+ # Try to login with wrong form values
+ template.clear_test_template_context()
+ openid_plugin_app.post(
+ '/auth/openid/login/', {
+ 'openid': 'not_a_url.com'})
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/plugins/openid/login.html']
+ form = context['login_form']
+ assert form.openid.errors == [u'Please enter a valid url.']
+
+ # Should be no users in the db
+ assert User.query.count() == 0
+
+ # Phony OpenID URl
+ template.clear_test_template_context()
+ openid_plugin_app.post(
+ '/auth/openid/login/', {
+ 'openid': 'http://phoney.myopenid.com/'})
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/plugins/openid/login.html']
+ form = context['login_form']
+ assert form.openid.errors == [u'Sorry, the OpenID server could not be found']
+
+ def test_login(self, openid_plugin_app):
+ """Tests that test login and registion with openid"""
+ # Test finish_login redirects correctly when response = False
+ self._setup(openid_plugin_app, False)
+
+ @mock.patch('mediagoblin.plugins.openid.views._finish_verification', self._finish_verification)
+ @mock.patch('mediagoblin.plugins.openid.views._start_verification', self._start_verification)
+ def _test_non_response():
+ template.clear_test_template_context()
+ res = openid_plugin_app.post(
+ '/auth/openid/login/', {
+ 'openid': 'http://phoney.myopenid.com/'})
+ res.follow()
+
+ # Correct Place?
+ assert urlparse.urlsplit(res.location)[2] == '/auth/openid/login/'
+ assert 'mediagoblin/plugins/openid/login.html' in template.TEMPLATE_TEST_CONTEXT
+ _test_non_response()
+
+ # Test login with new openid
+ # Need to clear_test_template_context before calling _setup
+ template.clear_test_template_context()
+ self._setup(openid_plugin_app)
+
+ @mock.patch('mediagoblin.plugins.openid.views._finish_verification', self._finish_verification)
+ @mock.patch('mediagoblin.plugins.openid.views._start_verification', self._start_verification)
+ def _test_new_user():
+ openid_plugin_app.post(
+ '/auth/openid/login/', {
+ 'openid': u'http://real.myopenid.com'})
+
+ # Right place?
+ assert 'mediagoblin/auth/register.html' in template.TEMPLATE_TEST_CONTEXT
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html']
+ register_form = context['register_form']
+
+ # Register User
+ res = openid_plugin_app.post(
+ '/auth/openid/register/', {
+ 'openid': register_form.openid.data,
+ 'username': u'chris',
+ 'email': u'chris@example.com'})
+ res.follow()
+
+ # Correct place?
+ assert urlparse.urlsplit(res.location)[2] == '/u/chris/'
+ assert 'mediagoblin/user_pages/user_nonactive.html' in template.TEMPLATE_TEST_CONTEXT
+
+ # No need to test if user is in logged in and verification email
+ # awaits, since openid uses the register_user function which is
+ # tested in test_auth
+
+ # Logout User
+ openid_plugin_app.get('/auth/logout')
+
+ # Get user and detach from session
+ test_user = mg_globals.database.User.query.filter_by(
+ username=u'chris').first()
+ Session.expunge(test_user)
+
+ # Log back in
+ # Could not get it to work by 'POST'ing to /auth/openid/login/
+ template.clear_test_template_context()
+ res = openid_plugin_app.post(
+ '/auth/openid/login/finish/', {
+ 'openid': u'http://real.myopenid.com'})
+ res.follow()
+
+ assert urlparse.urlsplit(res.location)[2] == '/'
+ assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT
+
+ # Make sure user is in the session
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html']
+ session = context['request'].session
+ assert session['user_id'] == unicode(test_user.id)
+
+ _test_new_user()
+
+ # Test register with empty form
+ template.clear_test_template_context()
+ openid_plugin_app.post(
+ '/auth/openid/register/', {})
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html']
+ register_form = context['register_form']
+
+ assert register_form.openid.errors == [u'This field is required.']
+ assert register_form.email.errors == [u'This field is required.']
+ assert register_form.username.errors == [u'This field is required.']
+
+ # Try to register with existing username and email
+ template.clear_test_template_context()
+ openid_plugin_app.post(
+ '/auth/openid/register/', {
+ 'openid': 'http://real.myopenid.com',
+ 'email': 'chris@example.com',
+ 'username': 'chris'})
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html']
+ register_form = context['register_form']
+
+ assert register_form.username.errors == [u'Sorry, a user with that name already exists.']
+ assert register_form.email.errors == [u'Sorry, a user with that email address already exists.']
+ assert register_form.openid.errors == [u'Sorry, an account is already registered to that OpenID.']
+
+ def test_add_delete(self, openid_plugin_app):
+ """Test adding and deleting openids"""
+ # Add user
+ test_user = fixture_add_user(password='', privileges=[u'active'])
+ openid = OpenIDUserURL()
+ openid.openid_url = 'http://real.myopenid.com'
+ openid.user_id = test_user.id
+ openid.save()
+
+ # Log user in
+ template.clear_test_template_context()
+ self._setup(openid_plugin_app)
+
+ @mock.patch('mediagoblin.plugins.openid.views._finish_verification', self._finish_verification)
+ @mock.patch('mediagoblin.plugins.openid.views._start_verification', self._start_verification)
+ def _login_user():
+ openid_plugin_app.post(
+ '/auth/openid/login/finish/', {
+ 'openid': u'http://real.myopenid.com'})
+
+ _login_user()
+
+ # Try and delete only OpenID url
+ template.clear_test_template_context()
+ res = openid_plugin_app.post(
+ '/edit/openid/delete/', {
+ 'openid': 'http://real.myopenid.com'})
+ assert 'mediagoblin/plugins/openid/delete.html' in template.TEMPLATE_TEST_CONTEXT
+
+ # Add OpenID to user
+ # Empty form
+ template.clear_test_template_context()
+ res = openid_plugin_app.post(
+ '/edit/openid/', {})
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/plugins/openid/add.html']
+ form = context['form']
+ assert form.openid.errors == [u'This field is required.']
+
+ # Try with a bad url
+ template.clear_test_template_context()
+ openid_plugin_app.post(
+ '/edit/openid/', {
+ 'openid': u'not_a_url.com'})
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/plugins/openid/add.html']
+ form = context['form']
+ assert form.openid.errors == [u'Please enter a valid url.']
+
+ # Try with a url that's already registered
+ template.clear_test_template_context()
+ openid_plugin_app.post(
+ '/edit/openid/', {
+ 'openid': 'http://real.myopenid.com'})
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/plugins/openid/add.html']
+ form = context['form']
+ assert form.openid.errors == [u'Sorry, an account is already registered to that OpenID.']
+
+ # Test adding openid to account
+ # Need to clear_test_template_context before calling _setup
+ template.clear_test_template_context()
+ self._setup(openid_plugin_app, edit=True)
+
+ # Need to remove openid_url from db because it was added at setup
+ openid = OpenIDUserURL.query.filter_by(
+ openid_url=u'http://add.myopenid.com')
+ openid.delete()
+
+ @mock.patch('mediagoblin.plugins.openid.views._finish_verification', self._finish_verification)
+ @mock.patch('mediagoblin.plugins.openid.views._start_verification', self._start_verification)
+ def _test_add():
+ # Successful add
+ template.clear_test_template_context()
+ res = openid_plugin_app.post(
+ '/edit/openid/', {
+ 'openid': u'http://add.myopenid.com'})
+ res.follow()
+
+ # Correct place?
+ assert urlparse.urlsplit(res.location)[2] == '/edit/account/'
+ assert 'mediagoblin/edit/edit_account.html' in template.TEMPLATE_TEST_CONTEXT
+
+ # OpenID Added?
+ new_openid = mg_globals.database.OpenIDUserURL.query.filter_by(
+ openid_url=u'http://add.myopenid.com').first()
+ assert new_openid
+
+ _test_add()
+
+ # Test deleting openid from account
+ # Need to clear_test_template_context before calling _setup
+ template.clear_test_template_context()
+ self._setup(openid_plugin_app, delete=True)
+
+ # Need to add OpenID back to user because it was deleted during
+ # patch
+ openid = OpenIDUserURL()
+ openid.openid_url = 'http://add.myopenid.com'
+ openid.user_id = test_user.id
+ openid.save()
+
+ @mock.patch('mediagoblin.plugins.openid.views._finish_verification', self._finish_verification)
+ @mock.patch('mediagoblin.plugins.openid.views._start_verification', self._start_verification)
+ def _test_delete(self, test_user):
+ # Delete openid from user
+ # Create another user to test deleting OpenID that doesn't belong to them
+ new_user = fixture_add_user(username='newman')
+ openid = OpenIDUserURL()
+ openid.openid_url = 'http://realfake.myopenid.com/'
+ openid.user_id = new_user.id
+ openid.save()
+
+ # Try and delete OpenID url that isn't the users
+ template.clear_test_template_context()
+ res = openid_plugin_app.post(
+ '/edit/openid/delete/', {
+ 'openid': 'http://realfake.myopenid.com/'})
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/plugins/openid/delete.html']
+ form = context['form']
+ assert form.openid.errors == [u'That OpenID is not registered to this account.']
+
+ # Delete OpenID
+ # Kind of weird to POST to delete/finish
+ template.clear_test_template_context()
+ res = openid_plugin_app.post(
+ '/edit/openid/delete/finish/', {
+ 'openid': u'http://add.myopenid.com'})
+ res.follow()
+
+ # Correct place?
+ assert urlparse.urlsplit(res.location)[2] == '/edit/account/'
+ assert 'mediagoblin/edit/edit_account.html' in template.TEMPLATE_TEST_CONTEXT
+
+ # OpenID deleted?
+ new_openid = mg_globals.database.OpenIDUserURL.query.filter_by(
+ openid_url=u'http://add.myopenid.com').first()
+ assert not new_openid
+
+ _test_delete(self, test_user)
diff --git a/mediagoblin/tests/test_paste.ini b/mediagoblin/tests/test_paste.ini
index 91ecbb84..a9595432 100644
--- a/mediagoblin/tests/test_paste.ini
+++ b/mediagoblin/tests/test_paste.ini
@@ -6,6 +6,8 @@ use = egg:Paste#urlmap
/ = mediagoblin
/mgoblin_media/ = publicstore_serve
/test_static/ = mediagoblin_static
+/theme_static/ = theme_static
+/plugin_static/ = plugin_static
[app:mediagoblin]
use = egg:mediagoblin#app
@@ -13,12 +15,22 @@ config = %(here)s/mediagoblin.ini
[app:publicstore_serve]
use = egg:Paste#static
-document_root = %(here)s/test_user_dev/media/public
+document_root = %(here)s/user_dev/media/public
[app:mediagoblin_static]
use = egg:Paste#static
document_root = %(here)s/mediagoblin/static/
+[app:theme_static]
+use = egg:Paste#static
+document_root = %(here)s/user_dev/theme_static/
+cache_max_age = 86400
+
+[app:plugin_static]
+use = egg:Paste#static
+document_root = %(here)s/user_dev/plugin_static/
+cache_max_age = 86400
+
[celery]
CELERY_ALWAYS_EAGER = true
diff --git a/mediagoblin/tests/test_persona.py b/mediagoblin/tests/test_persona.py
new file mode 100644
index 00000000..a1cd30eb
--- /dev/null
+++ b/mediagoblin/tests/test_persona.py
@@ -0,0 +1,214 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import urlparse
+import pkg_resources
+import pytest
+import mock
+
+pytest.importorskip("requests")
+
+from mediagoblin import mg_globals
+from mediagoblin.db.base import Session
+from mediagoblin.db.models import Privilege
+from mediagoblin.tests.tools import get_app
+from mediagoblin.tools import template
+
+
+# App with plugin enabled
+@pytest.fixture()
+def persona_plugin_app(request):
+ return get_app(
+ request,
+ mgoblin_config=pkg_resources.resource_filename(
+ 'mediagoblin.tests.auth_configs',
+ 'persona_appconfig.ini'))
+
+
+class TestPersonaPlugin(object):
+ def test_authentication_views(self, persona_plugin_app):
+ res = persona_plugin_app.get('/auth/login/')
+
+ assert urlparse.urlsplit(res.location)[2] == '/'
+
+ res = persona_plugin_app.get('/auth/register/')
+
+ assert urlparse.urlsplit(res.location)[2] == '/'
+
+ res = persona_plugin_app.get('/auth/persona/login/')
+
+ assert urlparse.urlsplit(res.location)[2] == '/auth/login/'
+
+ res = persona_plugin_app.get('/auth/persona/register/')
+
+ assert urlparse.urlsplit(res.location)[2] == '/auth/login/'
+
+ @mock.patch('mediagoblin.plugins.persona.views._get_response', mock.Mock(return_value=u'test@example.com'))
+ def _test_registration():
+ # No register users
+ template.clear_test_template_context()
+ res = persona_plugin_app.post(
+ '/auth/persona/login/', {})
+
+ assert 'mediagoblin/auth/register.html' in template.TEMPLATE_TEST_CONTEXT
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html']
+ register_form = context['register_form']
+
+ assert register_form.email.data == u'test@example.com'
+ assert register_form.persona_email.data == u'test@example.com'
+
+ template.clear_test_template_context()
+ res = persona_plugin_app.post(
+ '/auth/persona/register/', {})
+
+ assert 'mediagoblin/auth/register.html' in template.TEMPLATE_TEST_CONTEXT
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html']
+ register_form = context['register_form']
+
+ assert register_form.username.errors == [u'This field is required.']
+ assert register_form.email.errors == [u'This field is required.']
+ assert register_form.persona_email.errors == [u'This field is required.']
+
+ # Successful register
+ template.clear_test_template_context()
+ res = persona_plugin_app.post(
+ '/auth/persona/register/',
+ {'username': 'chris',
+ 'email': 'chris@example.com',
+ 'persona_email': 'test@example.com'})
+ res.follow()
+
+ assert urlparse.urlsplit(res.location)[2] == '/u/chris/'
+ assert 'mediagoblin/user_pages/user_nonactive.html' in template.TEMPLATE_TEST_CONTEXT
+
+ # Try to register same Persona email address
+ template.clear_test_template_context()
+ res = persona_plugin_app.post(
+ '/auth/persona/register/',
+ {'username': 'chris1',
+ 'email': 'chris1@example.com',
+ 'persona_email': 'test@example.com'})
+
+ assert 'mediagoblin/auth/register.html' in template.TEMPLATE_TEST_CONTEXT
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html']
+ register_form = context['register_form']
+
+ assert register_form.persona_email.errors == [u'Sorry, an account is already registered to that Persona email.']
+
+ # Logout
+ persona_plugin_app.get('/auth/logout/')
+
+ # Get user and detach from session
+ test_user = mg_globals.database.User.query.filter_by(
+ username=u'chris').first()
+ active_privilege = Privilege.query.filter(
+ Privilege.privilege_name==u'active').first()
+ test_user.all_privileges.append(active_privilege)
+ test_user.save()
+ test_user = mg_globals.database.User.query.filter_by(
+ username=u'chris').first()
+ Session.expunge(test_user)
+
+ # Add another user for _test_edit_persona
+ persona_plugin_app.post(
+ '/auth/persona/register/',
+ {'username': 'chris1',
+ 'email': 'chris1@example.com',
+ 'persona_email': 'test1@example.com'})
+
+ # Log back in
+ template.clear_test_template_context()
+ res = persona_plugin_app.post(
+ '/auth/persona/login/')
+ res.follow()
+
+ assert urlparse.urlsplit(res.location)[2] == '/'
+ assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT
+
+ # Make sure user is in the session
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html']
+ session = context['request'].session
+ assert session['user_id'] == unicode(test_user.id)
+
+ _test_registration()
+
+ @mock.patch('mediagoblin.plugins.persona.views._get_response', mock.Mock(return_value=u'new@example.com'))
+ def _test_edit_persona():
+ # Try and delete only Persona email address
+ template.clear_test_template_context()
+ res = persona_plugin_app.post(
+ '/edit/persona/',
+ {'email': 'test@example.com'})
+
+ assert 'mediagoblin/plugins/persona/edit.html' in template.TEMPLATE_TEST_CONTEXT
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/plugins/persona/edit.html']
+ form = context['form']
+
+ assert form.email.errors == [u"You can't delete your only Persona email address unless you have a password set."]
+
+ template.clear_test_template_context()
+ res = persona_plugin_app.post(
+ '/edit/persona/', {})
+
+ assert 'mediagoblin/plugins/persona/edit.html' in template.TEMPLATE_TEST_CONTEXT
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/plugins/persona/edit.html']
+ form = context['form']
+
+ assert form.email.errors == [u'This field is required.']
+
+ # Try and delete Persona not owned by the user
+ template.clear_test_template_context()
+ res = persona_plugin_app.post(
+ '/edit/persona/',
+ {'email': 'test1@example.com'})
+
+ assert 'mediagoblin/plugins/persona/edit.html' in template.TEMPLATE_TEST_CONTEXT
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/plugins/persona/edit.html']
+ form = context['form']
+
+ assert form.email.errors == [u'That Persona email address is not registered to this account.']
+
+ res = persona_plugin_app.get('/edit/persona/add/')
+
+ assert urlparse.urlsplit(res.location)[2] == '/edit/persona/'
+
+ # Add Persona email address
+ template.clear_test_template_context()
+ res = persona_plugin_app.post(
+ '/edit/persona/add/')
+ res.follow()
+
+ assert urlparse.urlsplit(res.location)[2] == '/edit/account/'
+
+ # Delete a Persona
+ res = persona_plugin_app.post(
+ '/edit/persona/',
+ {'email': 'test@example.com'})
+ res.follow()
+
+ assert urlparse.urlsplit(res.location)[2] == '/edit/account/'
+
+ _test_edit_persona()
+
+ @mock.patch('mediagoblin.plugins.persona.views._get_response', mock.Mock(return_value=u'test1@example.com'))
+ def _test_add_existing():
+ template.clear_test_template_context()
+ res = persona_plugin_app.post(
+ '/edit/persona/add/')
+ res.follow()
+
+ assert urlparse.urlsplit(res.location)[2] == '/edit/persona/'
+
+ _test_add_existing()
diff --git a/mediagoblin/tests/test_piwigo.py b/mediagoblin/tests/test_piwigo.py
new file mode 100644
index 00000000..16ad0111
--- /dev/null
+++ b/mediagoblin/tests/test_piwigo.py
@@ -0,0 +1,71 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2013 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import pytest
+from .tools import fixture_add_user
+
+
+XML_PREFIX = "<?xml version='1.0' encoding='utf-8'?>\n"
+
+
+class Test_PWG(object):
+ @pytest.fixture(autouse=True)
+ def setup(self, test_app):
+ self.test_app = test_app
+
+ fixture_add_user()
+
+ self.username = u"chris"
+ self.password = "toast"
+
+ def do_post(self, method, params):
+ params["method"] = method
+ return self.test_app.post("/api/piwigo/ws.php", params)
+
+ def do_get(self, method, params=None):
+ if params is None:
+ params = {}
+ params["method"] = method
+ return self.test_app.get("/api/piwigo/ws.php", params)
+
+ def test_session(self):
+ resp = self.do_post("pwg.session.login",
+ {"username": u"nouser", "password": "wrong"})
+ assert resp.body == XML_PREFIX \
+ + '<rsp stat="fail"><err code="999" msg="Invalid username/password"/></rsp>'
+
+ resp = self.do_post("pwg.session.login",
+ {"username": self.username, "password": "wrong"})
+ assert resp.body == XML_PREFIX \
+ + '<rsp stat="fail"><err code="999" msg="Invalid username/password"/></rsp>'
+
+ resp = self.do_get("pwg.session.getStatus")
+ assert resp.body == XML_PREFIX \
+ + '<rsp stat="ok"><username>guest</username></rsp>'
+
+ resp = self.do_post("pwg.session.login",
+ {"username": self.username, "password": self.password})
+ assert resp.body == XML_PREFIX + '<rsp stat="ok">1</rsp>'
+
+ resp = self.do_get("pwg.session.getStatus")
+ assert resp.body == XML_PREFIX \
+ + '<rsp stat="ok"><username>chris</username></rsp>'
+
+ self.do_get("pwg.session.logout")
+
+ resp = self.do_get("pwg.session.getStatus")
+ assert resp.body == XML_PREFIX \
+ + '<rsp stat="ok"><username>guest</username></rsp>'
diff --git a/mediagoblin/tests/test_pluginapi.py b/mediagoblin/tests/test_pluginapi.py
index f03e868f..eae0ce15 100644
--- a/mediagoblin/tests/test_pluginapi.py
+++ b/mediagoblin/tests/test_pluginapi.py
@@ -14,14 +14,22 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import os
+import json
import sys
from configobj import ConfigObj
import pytest
+import pkg_resources
+from validate import VdtTypeError
from mediagoblin import mg_globals
from mediagoblin.init.plugins import setup_plugins
+from mediagoblin.init.config import read_mediagoblin_config
+from mediagoblin.gmg_commands.assetlink import link_plugin_assets
from mediagoblin.tools import pluginapi
+from mediagoblin.tests.tools import get_app
+from mediagoblin.tools.common import CollectingPrinter
def with_cleanup(*modules_to_delete):
@@ -294,3 +302,165 @@ def test_hook_transform():
assert pluginapi.hook_transform(
"expand_tuple", (-1, 0)) == (-1, 0, 1, 2, 3)
+
+
+def test_plugin_config():
+ """
+ Make sure plugins can set up their own config
+ """
+ config, validation_result = read_mediagoblin_config(
+ pkg_resources.resource_filename(
+ 'mediagoblin.tests', 'appconfig_plugin_specs.ini'))
+
+ pluginspec_section = config['plugins'][
+ 'mediagoblin.tests.testplugins.pluginspec']
+ assert pluginspec_section['some_string'] == 'not blork'
+ assert pluginspec_section['dont_change_me'] == 'still the default'
+
+ # Make sure validation works... this should be an error
+ assert isinstance(
+ validation_result[
+ 'plugins'][
+ 'mediagoblin.tests.testplugins.pluginspec'][
+ 'some_int'],
+ VdtTypeError)
+
+ # the callables thing shouldn't really have anything though.
+ assert len(config['plugins'][
+ 'mediagoblin.tests.testplugins.callables1']) == 0
+
+
+@pytest.fixture()
+def context_modified_app(request):
+ """
+ Get a MediaGoblin app fixture using appconfig_context_modified.ini
+ """
+ return get_app(
+ request,
+ mgoblin_config=pkg_resources.resource_filename(
+ 'mediagoblin.tests', 'appconfig_context_modified.ini'))
+
+
+def test_modify_context(context_modified_app):
+ """
+ Test that we can modify both the view/template specific and
+ global contexts for templates.
+ """
+ # Specific thing passed into a page
+ result = context_modified_app.get("/modify_context/specific/")
+ assert result.body.strip() == """Specific page!
+
+specific thing: in yer specificpage
+global thing: globally appended!
+something: orother
+doubleme: happyhappy"""
+
+ # General test, should have global context variable only
+ result = context_modified_app.get("/modify_context/")
+ assert result.body.strip() == """General page!
+
+global thing: globally appended!
+lol: cats
+doubleme: joyjoy"""
+
+
+@pytest.fixture()
+def static_plugin_app(request):
+ """
+ Get a MediaGoblin app fixture using appconfig_static_plugin.ini
+ """
+ return get_app(
+ request,
+ mgoblin_config=pkg_resources.resource_filename(
+ 'mediagoblin.tests', 'appconfig_static_plugin.ini'))
+
+
+def test_plugin_assetlink(static_plugin_app):
+ """
+ Test that the assetlink command works correctly
+ """
+ linked_assets_dir = mg_globals.app_config['plugin_linked_assets_dir']
+ plugin_link_dir = os.path.join(
+ linked_assets_dir.rstrip(os.path.sep),
+ 'staticstuff')
+
+ plugin_statics = pluginapi.hook_runall("static_setup")
+ assert len(plugin_statics) == 1
+ plugin_static = plugin_statics[0]
+
+ def run_assetlink():
+ printer = CollectingPrinter()
+
+ link_plugin_assets(
+ plugin_static, linked_assets_dir, printer)
+
+ return printer
+
+ # it shouldn't exist yet
+ assert not os.path.lexists(plugin_link_dir)
+
+ # link dir doesn't exist, link it
+ result = run_assetlink().collection[0]
+ assert result == \
+ 'Linked asset directory for plugin "staticstuff":\n %s\nto:\n %s\n' % (
+ plugin_static.file_path.rstrip(os.path.sep),
+ plugin_link_dir)
+ assert os.path.lexists(plugin_link_dir)
+ assert os.path.islink(plugin_link_dir)
+ assert os.path.realpath(plugin_link_dir) == plugin_static.file_path
+
+ # link dir exists, leave it alone
+ # (and it should exist still since we just ran it..)
+ result = run_assetlink().collection[0]
+ assert result == 'Skipping "staticstuff"; already set up.\n'
+ assert os.path.lexists(plugin_link_dir)
+ assert os.path.islink(plugin_link_dir)
+ assert os.path.realpath(plugin_link_dir) == plugin_static.file_path
+
+ # link dir exists, is a symlink to somewhere else (re-link)
+ junk_file_path = os.path.join(
+ linked_assets_dir.rstrip(os.path.sep),
+ 'junk.txt')
+ with file(junk_file_path, 'w') as junk_file:
+ junk_file.write('barf')
+
+ os.unlink(plugin_link_dir)
+ os.symlink(junk_file_path, plugin_link_dir)
+
+ result = run_assetlink().combined_string
+ assert result == """Old link found for "staticstuff"; removing.
+Linked asset directory for plugin "staticstuff":
+ %s
+to:
+ %s
+""" % (plugin_static.file_path.rstrip(os.path.sep), plugin_link_dir)
+ assert os.path.lexists(plugin_link_dir)
+ assert os.path.islink(plugin_link_dir)
+ assert os.path.realpath(plugin_link_dir) == plugin_static.file_path
+
+ # link dir exists, but is a non-symlink
+ os.unlink(plugin_link_dir)
+ with file(plugin_link_dir, 'w') as clobber_file:
+ clobber_file.write('clobbered!')
+
+ result = run_assetlink().collection[0]
+ assert result == 'Could not link "staticstuff": %s exists and is not a symlink\n' % (
+ plugin_link_dir)
+
+ with file(plugin_link_dir, 'r') as clobber_file:
+ assert clobber_file.read() == 'clobbered!'
+
+
+def test_plugin_staticdirect(static_plugin_app):
+ """
+ Test that the staticdirect utilities pull up the right things
+ """
+ result = json.loads(
+ static_plugin_app.get('/staticstuff/').body)
+
+ assert len(result) == 2
+
+ assert result['mgoblin_bunny_pic'] == '/test_static/images/bunny_pic.png'
+ assert result['plugin_bunny_css'] == \
+ '/plugin_static/staticstuff/css/bunnify.css'
+
diff --git a/mediagoblin/tests/test_privileges.py b/mediagoblin/tests/test_privileges.py
new file mode 100644
index 00000000..05829b34
--- /dev/null
+++ b/mediagoblin/tests/test_privileges.py
@@ -0,0 +1,205 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import pytest
+from datetime import date, timedelta
+from webtest import AppError
+
+from mediagoblin.tests.tools import fixture_add_user, fixture_media_entry
+
+from mediagoblin.db.models import User, UserBan
+from mediagoblin.tools import template
+
+from .resources import GOOD_JPG
+
+class TestPrivilegeFunctionality:
+
+ @pytest.fixture(autouse=True)
+ def _setup(self, test_app):
+ self.test_app = test_app
+
+ fixture_add_user(u'alex',
+ privileges=[u'admin',u'active'])
+ fixture_add_user(u'meow',
+ privileges=[u'moderator',u'active',u'reporter'])
+ fixture_add_user(u'natalie',
+ privileges=[u'active'])
+ self.query_for_users()
+
+ def login(self, username):
+ self.test_app.post(
+ '/auth/login/', {
+ 'username': username,
+ 'password': 'toast'})
+ self.query_for_users()
+
+ def logout(self):
+ self.test_app.get('/auth/logout/')
+ self.query_for_users()
+
+ def do_post(self, data, *context_keys, **kwargs):
+ url = kwargs.pop('url', '/submit/')
+ do_follow = kwargs.pop('do_follow', False)
+ template.clear_test_template_context()
+ response = self.test_app.post(url, data, **kwargs)
+ if do_follow:
+ response.follow()
+ context_data = template.TEMPLATE_TEST_CONTEXT
+ for key in context_keys:
+ context_data = context_data[key]
+ return response, context_data
+
+ def query_for_users(self):
+ self.admin_user = User.query.filter(User.username==u'alex').first()
+ self.mod_user = User.query.filter(User.username==u'meow').first()
+ self.user = User.query.filter(User.username==u'natalie').first()
+
+ def testUserBanned(self):
+ self.login(u'natalie')
+ uid = self.user.id
+ # First, test what happens when a user is banned indefinitely
+ #----------------------------------------------------------------------
+ user_ban = UserBan(user_id=uid,
+ reason=u'Testing whether user is banned',
+ expiration_date=None)
+ user_ban.save()
+
+ response = self.test_app.get('/')
+ assert response.status == "200 OK"
+ assert "You are Banned" in response.body
+ # Then test what happens when that ban has an expiration date which
+ # hasn't happened yet
+ #----------------------------------------------------------------------
+ user_ban = UserBan.query.get(uid)
+ user_ban.delete()
+ user_ban = UserBan(user_id=uid,
+ reason=u'Testing whether user is banned',
+ expiration_date= date.today() + timedelta(days=20))
+ user_ban.save()
+
+ response = self.test_app.get('/')
+ assert response.status == "200 OK"
+ assert "You are Banned" in response.body
+
+ # Then test what happens when that ban has an expiration date which
+ # has already happened
+ #----------------------------------------------------------------------
+ user_ban = UserBan.query.get(uid)
+ user_ban.delete()
+ exp_date = date.today() - timedelta(days=20)
+ user_ban = UserBan(user_id=uid,
+ reason=u'Testing whether user is banned',
+ expiration_date= exp_date)
+ user_ban.save()
+
+ response = self.test_app.get('/')
+ assert response.status == "302 FOUND"
+ assert not "You are Banned" in response.body
+
+ def testVariousPrivileges(self):
+ # The various actions that require privileges (ex. reporting,
+ # commenting, moderating...) are tested in other tests. This method
+ # will be used to ensure that those actions are impossible for someone
+ # without the proper privileges.
+ # For other tests that show what happens when a user has the proper
+ # privileges, check out:
+ # tests/test_moderation.py moderator
+ # tests/test_notifications.py commenter
+ # tests/test_reporting.py reporter
+ # tests/test_submission.py uploader
+ #----------------------------------------------------------------------
+ self.login(u'natalie')
+
+ # First test the get and post requests of submission/uploading
+ #----------------------------------------------------------------------
+ with pytest.raises(AppError) as excinfo:
+ response = self.test_app.get('/submit/')
+ assert 'Bad response: 403 FORBIDDEN' in str(excinfo)
+
+
+ with pytest.raises(AppError) as excinfo:
+ response = self.do_post({'upload_files':[('file',GOOD_JPG)],
+ 'title':u'Normal Upload 1'},
+ url='/submit/')
+ assert 'Bad response: 403 FORBIDDEN' in str(excinfo)
+
+ # Test that a user cannot comment without the commenter privilege
+ #----------------------------------------------------------------------
+ self.query_for_users()
+
+ media_entry = fixture_media_entry(uploader=self.admin_user.id,
+ state=u'processed')
+
+ media_entry_id = media_entry.id
+ media_uri_id = '/u/{0}/m/{1}/'.format(self.admin_user.username,
+ media_entry.id)
+ media_uri_slug = '/u/{0}/m/{1}/'.format(self.admin_user.username,
+ media_entry.slug)
+ response = self.test_app.get(media_uri_slug)
+ assert not "Add a comment" in response.body
+
+ self.query_for_users()
+ with pytest.raises(AppError) as excinfo:
+ response = self.test_app.post(
+ media_uri_id + 'comment/add/',
+ {'comment_content': u'Test comment #42'})
+ assert 'Bad response: 403 FORBIDDEN' in str(excinfo)
+
+ # Test that a user cannot report without the reporter privilege
+ #----------------------------------------------------------------------
+ with pytest.raises(AppError) as excinfo:
+ response = self.test_app.get(media_uri_slug+"report/")
+ assert 'Bad response: 403 FORBIDDEN' in str(excinfo)
+
+ with pytest.raises(AppError) as excinfo:
+ response = self.do_post(
+ {'report_reason':u'Testing Reports #1',
+ 'reporter_id':u'3'},
+ url=(media_uri_slug+"report/"))
+ assert 'Bad response: 403 FORBIDDEN' in str(excinfo)
+
+ # Test that a user cannot access the moderation pages w/o moderator
+ # or admin privileges
+ #----------------------------------------------------------------------
+ with pytest.raises(AppError) as excinfo:
+ response = self.test_app.get("/mod/users/")
+ assert 'Bad response: 403 FORBIDDEN' in str(excinfo)
+
+ with pytest.raises(AppError) as excinfo:
+ response = self.test_app.get("/mod/reports/")
+ assert 'Bad response: 403 FORBIDDEN' in str(excinfo)
+
+ with pytest.raises(AppError) as excinfo:
+ response = self.test_app.get("/mod/media/")
+ assert 'Bad response: 403 FORBIDDEN' in str(excinfo)
+
+ with pytest.raises(AppError) as excinfo:
+ response = self.test_app.get("/mod/users/1/")
+ assert 'Bad response: 403 FORBIDDEN' in str(excinfo)
+
+ with pytest.raises(AppError) as excinfo:
+ response = self.test_app.get("/mod/reports/1/")
+ assert 'Bad response: 403 FORBIDDEN' in str(excinfo)
+
+ self.query_for_users()
+
+ with pytest.raises(AppError) as excinfo:
+ response, context = self.do_post({'action_to_resolve':[u'takeaway'],
+ 'take_away_privileges':[u'active'],
+ 'targeted_user':self.admin_user.id},
+ url='/mod/reports/1/')
+ self.query_for_users()
+ assert 'Bad response: 403 FORBIDDEN' in str(excinfo)
diff --git a/mediagoblin/tests/test_processing.py b/mediagoblin/tests/test_processing.py
index fe8489aa..591add96 100644
--- a/mediagoblin/tests/test_processing.py
+++ b/mediagoblin/tests/test_processing.py
@@ -1,7 +1,5 @@
#!/usr/bin/env python
-from nose.tools import assert_equal
-
from mediagoblin import processing
class TestProcessing(object):
@@ -10,7 +8,7 @@ class TestProcessing(object):
result = builder.fill(format)
if output is None:
return result
- assert_equal(output, result)
+ assert output == result
def test_easy_filename_fill(self):
self.run_fill('/home/user/foo.TXT', '{basename}bar{ext}', 'foobar.txt')
diff --git a/mediagoblin/tests/test_reporting.py b/mediagoblin/tests/test_reporting.py
new file mode 100644
index 00000000..a154a061
--- /dev/null
+++ b/mediagoblin/tests/test_reporting.py
@@ -0,0 +1,167 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import pytest
+
+from mediagoblin.tools import template
+from mediagoblin.tests.tools import (fixture_add_user, fixture_media_entry,
+ fixture_add_comment, fixture_add_comment_report)
+from mediagoblin.db.models import (MediaReport, CommentReport, User,
+ MediaComment)
+
+
+class TestReportFiling:
+ @pytest.fixture(autouse=True)
+ def _setup(self, test_app):
+ self.test_app = test_app
+
+ fixture_add_user(u'allie',
+ privileges=[u'reporter',u'active'])
+ fixture_add_user(u'natalie',
+ privileges=[u'active', u'moderator'])
+
+ def login(self, username):
+ self.test_app.post(
+ '/auth/login/', {
+ 'username': username,
+ 'password': 'toast'})
+
+ def logout(self):
+ self.test_app.get('/auth/logout/')
+
+ def do_post(self, data, *context_keys, **kwargs):
+ url = kwargs.pop('url', '/submit/')
+ do_follow = kwargs.pop('do_follow', False)
+ template.clear_test_template_context()
+ response = self.test_app.post(url, data, **kwargs)
+ if do_follow:
+ response.follow()
+ context_data = template.TEMPLATE_TEST_CONTEXT
+ for key in context_keys:
+ context_data = context_data[key]
+ return response, context_data
+
+ def query_for_users(self):
+ return (User.query.filter(User.username==u'allie').first(),
+ User.query.filter(User.username==u'natalie').first())
+
+ def testMediaReports(self):
+ self.login(u'allie')
+ allie_user, natalie_user = self.query_for_users()
+ allie_id = allie_user.id
+
+ media_entry = fixture_media_entry(uploader=natalie_user.id,
+ state=u'processed')
+
+ mid = media_entry.id
+ media_uri_slug = '/u/{0}/m/{1}/'.format(natalie_user.username,
+ media_entry.slug)
+
+ response = self.test_app.get(media_uri_slug + "report/")
+ assert response.status == "200 OK"
+
+ response, context = self.do_post(
+ {'report_reason':u'Testing Media Report',
+ 'reporter_id':unicode(allie_id)},url= media_uri_slug + "report/")
+
+ assert response.status == "302 FOUND"
+
+ media_report = MediaReport.query.first()
+
+ allie_user, natalie_user = self.query_for_users()
+ assert media_report is not None
+ assert media_report.report_content == u'Testing Media Report'
+ assert media_report.reporter_id == allie_id
+ assert media_report.reported_user_id == natalie_user.id
+ assert media_report.created is not None
+ assert media_report.discriminator == 'media_report'
+
+ def testCommentReports(self):
+ self.login(u'allie')
+ allie_user, natalie_user = self.query_for_users()
+ allie_id = allie_user.id
+
+ media_entry = fixture_media_entry(uploader=natalie_user.id,
+ state=u'processed')
+ mid = media_entry.id
+ fixture_add_comment(media_entry=mid,
+ author=natalie_user.id)
+ comment = MediaComment.query.first()
+
+ comment_uri_slug = '/u/{0}/m/{1}/c/{2}/'.format(natalie_user.username,
+ media_entry.slug,
+ comment.id)
+
+ response = self.test_app.get(comment_uri_slug + "report/")
+ assert response.status == "200 OK"
+
+ response, context = self.do_post({
+ 'report_reason':u'Testing Comment Report',
+ 'reporter_id':unicode(allie_id)},url= comment_uri_slug + "report/")
+
+ assert response.status == "302 FOUND"
+
+ comment_report = CommentReport.query.first()
+
+ allie_user, natalie_user = self.query_for_users()
+ assert comment_report is not None
+ assert comment_report.report_content == u'Testing Comment Report'
+ assert comment_report.reporter_id == allie_id
+ assert comment_report.reported_user_id == natalie_user.id
+ assert comment_report.created is not None
+ assert comment_report.discriminator == 'comment_report'
+
+ def testArchivingReports(self):
+ self.login(u'natalie')
+ allie_user, natalie_user = self.query_for_users()
+ allie_id, natalie_id = allie_user.id, natalie_user.id
+
+ fixture_add_comment(author=allie_user.id,
+ comment=u'Comment will be removed')
+ test_comment = MediaComment.query.filter(
+ MediaComment.author==allie_user.id).first()
+ fixture_add_comment_report(comment=test_comment,
+ reported_user=allie_user,
+ report_content=u'Testing Archived Reports #1',
+ reporter=natalie_user)
+ comment_report = CommentReport.query.filter(
+ CommentReport.reported_user==allie_user).first()
+
+ assert comment_report.report_content == u'Testing Archived Reports #1'
+ response, context = self.do_post(
+ {'action_to_resolve':[u'userban', u'delete'],
+ 'targeted_user':allie_user.id,
+ 'resolution_content':u'This is a test of archiving reports.'},
+ url='/mod/reports/{0}/'.format(comment_report.id))
+
+ assert response.status == "302 FOUND"
+ allie_user, natalie_user = self.query_for_users()
+
+ archived_report = CommentReport.query.filter(
+ CommentReport.reported_user==allie_user).first()
+
+ assert CommentReport.query.count() != 0
+ assert archived_report is not None
+ assert archived_report.report_content == u'Testing Archived Reports #1'
+ assert archived_report.reporter_id == natalie_id
+ assert archived_report.reported_user_id == allie_id
+ assert archived_report.created is not None
+ assert archived_report.resolved is not None
+ assert archived_report.result == u'''This is a test of archiving reports.
+natalie banned user allie indefinitely.
+natalie deleted the comment.'''
+ assert archived_report.discriminator == 'comment_report'
+
diff --git a/mediagoblin/tests/test_sql_migrations.py b/mediagoblin/tests/test_sql_migrations.py
index 2fc4c043..3d67fdf6 100644
--- a/mediagoblin/tests/test_sql_migrations.py
+++ b/mediagoblin/tests/test_sql_migrations.py
@@ -58,6 +58,10 @@ class Level1(Base1):
SET1_MODELS = [Creature1, Level1]
+FOUNDATIONS = {Creature1:[{'name':u'goblin','num_legs':2,'is_demon':False},
+ {'name':u'cerberus','num_legs':4,'is_demon':True}]
+ }
+
SET1_MIGRATIONS = {}
#######################################################
@@ -542,7 +546,6 @@ def _insert_migration3_objects(session):
session.commit()
-
def create_test_engine():
from sqlalchemy import create_engine
engine = create_engine('sqlite:///:memory:', echo=False)
@@ -572,7 +575,7 @@ def test_set1_to_set3():
printer = CollectingPrinter()
migration_manager = MigrationManager(
- u'__main__', SET1_MODELS, SET1_MIGRATIONS, Session(),
+ u'__main__', SET1_MODELS, FOUNDATIONS, SET1_MIGRATIONS, Session(),
printer)
# Check latest migration and database current migration
@@ -585,11 +588,13 @@ def test_set1_to_set3():
assert result == u'inited'
# Check output
assert printer.combined_string == (
- "-> Initializing main mediagoblin tables... done.\n")
+ "-> Initializing main mediagoblin tables... done.\n" + \
+ " + Laying foundations for Creature1 table\n" )
# Check version in database
assert migration_manager.latest_migration == 0
assert migration_manager.database_current_migration == 0
+
# Install the initial set
# -----------------------
@@ -597,8 +602,8 @@ def test_set1_to_set3():
# Try to "re-migrate" with same manager settings... nothing should happen
migration_manager = MigrationManager(
- u'__main__', SET1_MODELS, SET1_MIGRATIONS, Session(),
- printer)
+ u'__main__', SET1_MODELS, FOUNDATIONS, SET1_MIGRATIONS,
+ Session(), printer)
assert migration_manager.init_or_migrate() == None
# Check version in database
@@ -639,6 +644,20 @@ def test_set1_to_set3():
# Now check to see if stuff seems to be in there.
session = Session()
+ # Check the creation of the foundation rows on the creature table
+ creature = session.query(Creature1).filter_by(
+ name=u'goblin').one()
+ assert creature.num_legs == 2
+ assert creature.is_demon == False
+
+ creature = session.query(Creature1).filter_by(
+ name=u'cerberus').one()
+ assert creature.num_legs == 4
+ assert creature.is_demon == True
+
+
+ # Check the creation of the inserted rows on the creature and levels tables
+
creature = session.query(Creature1).filter_by(
name=u'centipede').one()
assert creature.num_legs == 100
@@ -679,7 +698,7 @@ def test_set1_to_set3():
# isn't said to be updated yet
printer = CollectingPrinter()
migration_manager = MigrationManager(
- u'__main__', SET3_MODELS, SET3_MIGRATIONS, Session(),
+ u'__main__', SET3_MODELS, FOUNDATIONS, SET3_MIGRATIONS, Session(),
printer)
assert migration_manager.latest_migration == 8
@@ -706,7 +725,7 @@ def test_set1_to_set3():
# Make sure version matches expected
migration_manager = MigrationManager(
- u'__main__', SET3_MODELS, SET3_MIGRATIONS, Session(),
+ u'__main__', SET3_MODELS, FOUNDATIONS, SET3_MIGRATIONS, Session(),
printer)
assert migration_manager.latest_migration == 8
assert migration_manager.database_current_migration == 8
@@ -772,6 +791,15 @@ def test_set1_to_set3():
# Now check to see if stuff seems to be in there.
session = Session()
+
+
+ # Start with making sure that the foundations did not run again
+ assert session.query(Creature3).filter_by(
+ name=u'goblin').count() == 1
+ assert session.query(Creature3).filter_by(
+ name=u'cerberus').count() == 1
+
+ # Then make sure the models have been migrated correctly
creature = session.query(Creature3).filter_by(
name=u'centipede').one()
assert creature.num_limbs == 100.0
diff --git a/mediagoblin/tests/test_submission.py b/mediagoblin/tests/test_submission.py
index 162b2d19..b5b13ed3 100644
--- a/mediagoblin/tests/test_submission.py
+++ b/mediagoblin/tests/test_submission.py
@@ -24,13 +24,14 @@ import pytest
from mediagoblin.tests.tools import fixture_add_user
from mediagoblin import mg_globals
-from mediagoblin.db.models import MediaEntry
+from mediagoblin.db.models import MediaEntry, User
+from mediagoblin.db.base import Session
from mediagoblin.tools import template
-from mediagoblin.media_types.image import MEDIA_MANAGER as img_MEDIA_MANAGER
+from mediagoblin.media_types.image import ImageMediaManager
from mediagoblin.media_types.pdf.processing import check_prerequisites as pdf_check_prerequisites
from .resources import GOOD_JPG, GOOD_PNG, EVIL_FILE, EVIL_JPG, EVIL_PNG, \
- BIG_BLUE, GOOD_PDF, GPS_JPG
+ BIG_BLUE, GOOD_PDF, GPS_JPG, MED_PNG, BIG_PNG
GOOD_TAG_STRING = u'yin,yang'
BAD_TAG_STRING = unicode('rage,' + 'f' * 26 + 'u' * 26)
@@ -46,12 +47,22 @@ class TestSubmission:
# TODO: Possibly abstract into a decorator like:
# @as_authenticated_user('chris')
- test_user = fixture_add_user()
-
- self.test_user = test_user
+ fixture_add_user(privileges=[u'active',u'uploader', u'commenter'])
self.login()
+ def our_user(self):
+ """
+ Fetch the user we're submitting with. Every .get() or .post()
+ invalidates the session; this is a hacky workaround.
+ """
+ #### FIXME: Pytest collects this as a test and runs this.
+ #### ... it shouldn't. At least it passes, but that's
+ #### totally stupid.
+ #### Also if we found a way to make this run it should be a
+ #### property.
+ return User.query.filter(User.username==u'chris').first()
+
def login(self):
self.test_app.post(
'/auth/login/', {
@@ -77,7 +88,7 @@ class TestSubmission:
return {'upload_files': [('file', filename)]}
def check_comments(self, request, media_id, count):
- comments = request.db.MediaComment.find({'media_entry': media_id})
+ comments = request.db.MediaComment.query.filter_by(media_entry=media_id)
assert count == len(list(comments))
def test_missing_fields(self):
@@ -97,19 +108,40 @@ class TestSubmission:
def check_normal_upload(self, title, filename):
response, context = self.do_post({'title': title}, do_follow=True,
**self.upload_data(filename))
- self.check_url(response, '/u/{0}/'.format(self.test_user.username))
+ self.check_url(response, '/u/{0}/'.format(self.our_user().username))
assert 'mediagoblin/user_pages/user.html' in context
# Make sure the media view is at least reachable, logged in...
- url = '/u/{0}/m/{1}/'.format(self.test_user.username,
+ url = '/u/{0}/m/{1}/'.format(self.our_user().username,
title.lower().replace(' ', '-'))
self.test_app.get(url)
# ... and logged out too.
self.logout()
self.test_app.get(url)
+ def user_upload_limits(self, uploaded=None, upload_limit=None):
+ our_user = self.our_user()
+
+ if uploaded:
+ our_user.uploaded = uploaded
+ if upload_limit:
+ our_user.upload_limit = upload_limit
+
+ our_user.save()
+ Session.expunge(our_user)
+
def test_normal_jpg(self):
+ # User uploaded should be 0
+ assert self.our_user().uploaded == 0
+
self.check_normal_upload(u'Normal upload 1', GOOD_JPG)
+ # User uploaded should be the same as GOOD_JPG size in Mb
+ file_size = os.stat(GOOD_JPG).st_size / (1024.0 * 1024)
+ file_size = float('{0:.2f}'.format(file_size))
+
+ # Reload user
+ assert self.our_user().uploaded == file_size
+
def test_normal_png(self):
self.check_normal_upload(u'Normal upload 2', GOOD_PNG)
@@ -118,11 +150,65 @@ class TestSubmission:
response, context = self.do_post({'title': u'Normal upload 3 (pdf)'},
do_follow=True,
**self.upload_data(GOOD_PDF))
- self.check_url(response, '/u/{0}/'.format(self.test_user.username))
+ self.check_url(response, '/u/{0}/'.format(self.our_user().username))
+ assert 'mediagoblin/user_pages/user.html' in context
+
+ def test_default_upload_limits(self):
+ self.user_upload_limits(uploaded=500)
+
+ # User uploaded should be 500
+ assert self.our_user().uploaded == 500
+
+ response, context = self.do_post({'title': u'Normal upload 4'},
+ do_follow=True,
+ **self.upload_data(GOOD_JPG))
+ self.check_url(response, '/u/{0}/'.format(self.our_user().username))
assert 'mediagoblin/user_pages/user.html' in context
+ # Shouldn't have uploaded
+ assert self.our_user().uploaded == 500
+
+ def test_user_upload_limit(self):
+ self.user_upload_limits(uploaded=25, upload_limit=25)
+
+ # User uploaded should be 25
+ assert self.our_user().uploaded == 25
+
+ response, context = self.do_post({'title': u'Normal upload 5'},
+ do_follow=True,
+ **self.upload_data(GOOD_JPG))
+ self.check_url(response, '/u/{0}/'.format(self.our_user().username))
+ assert 'mediagoblin/user_pages/user.html' in context
+
+ # Shouldn't have uploaded
+ assert self.our_user().uploaded == 25
+
+ def test_user_under_limit(self):
+ self.user_upload_limits(uploaded=499)
+
+ # User uploaded should be 499
+ assert self.our_user().uploaded == 499
+
+ response, context = self.do_post({'title': u'Normal upload 6'},
+ do_follow=False,
+ **self.upload_data(MED_PNG))
+ form = context['mediagoblin/submit/start.html']['submit_form']
+ assert form.file.errors == [u'Sorry, uploading this file will put you'
+ ' over your upload limit.']
+
+ # Shouldn't have uploaded
+ assert self.our_user().uploaded == 499
+
+ def test_big_file(self):
+ response, context = self.do_post({'title': u'Normal upload 7'},
+ do_follow=False,
+ **self.upload_data(BIG_PNG))
+
+ form = context['mediagoblin/submit/start.html']['submit_form']
+ assert form.file.errors == [u'Sorry, the file size is too big.']
+
def check_media(self, request, find_data, count=None):
- media = MediaEntry.find(find_data)
+ media = MediaEntry.query.filter_by(**find_data)
if count is not None:
assert media.count() == count
if count == 0:
@@ -155,6 +241,7 @@ class TestSubmission:
'ffffffffffffffffffffffffffuuuuuuuuuuuuuuuuuuuuuuuuuu']
def test_delete(self):
+ self.user_upload_limits(uploaded=50)
response, request = self.do_post({'title': u'Balanced Goblin'},
*REQUEST_CONTEXT, do_follow=True,
**self.upload_data(GOOD_JPG))
@@ -164,7 +251,7 @@ class TestSubmission:
# render and post to the edit page.
edit_url = request.urlgen(
'mediagoblin.edit.edit_media',
- user=self.test_user.username, media_id=media_id)
+ user=self.our_user().username, media_id=media_id)
self.test_app.get(edit_url)
self.test_app.post(edit_url,
{'title': u'Balanced Goblin',
@@ -177,7 +264,7 @@ class TestSubmission:
self.check_comments(request, media_id, 0)
comment_url = request.urlgen(
'mediagoblin.user_pages.media_post_comment',
- user=self.test_user.username, media_id=media_id)
+ user=self.our_user().username, media_id=media_id)
response = self.do_post({'comment_content': 'i love this test'},
url=comment_url, do_follow=True)[0]
self.check_comments(request, media_id, 1)
@@ -186,7 +273,7 @@ class TestSubmission:
# ---------------------------------------------------
delete_url = request.urlgen(
'mediagoblin.user_pages.media_confirm_delete',
- user=self.test_user.username, media_id=media_id)
+ user=self.our_user().username, media_id=media_id)
# Empty data means don't confirm
response = self.do_post({}, do_follow=True, url=delete_url)[0]
media = self.check_media(request, {'title': u'Balanced Goblin'}, 1)
@@ -199,6 +286,9 @@ class TestSubmission:
self.check_media(request, {'id': media_id}, 0)
self.check_comments(request, media_id, 0)
+ # Check that user.uploaded is the same as before the upload
+ assert self.our_user().uploaded == 50
+
def test_evil_file(self):
# Test non-suppoerted file with non-supported extension
# -----------------------------------------------------
@@ -219,7 +309,7 @@ class TestSubmission:
media = self.check_media(request, {'title': u'Balanced Goblin'}, 1)
assert media.media_type == u'mediagoblin.media_types.image'
- assert isinstance(media.media_manager, img_MEDIA_MANAGER)
+ assert isinstance(media.media_manager, ImageMediaManager)
assert media.media_manager.entry == media
@@ -240,8 +330,8 @@ class TestSubmission:
request = context['request']
- media = request.db.MediaEntry.find_one({
- u'title': u'UNIQUE_TITLE_PLS_DONT_CREATE_OTHER_MEDIA_WITH_THIS_TITLE'})
+ media = request.db.MediaEntry.query.filter_by(
+ title=u'UNIQUE_TITLE_PLS_DONT_CREATE_OTHER_MEDIA_WITH_THIS_TITLE').first()
assert media.media_type == 'mediagoblin.media_types.image'
@@ -251,8 +341,8 @@ class TestSubmission:
# they'll be caught as failures during the processing step.
response, context = self.do_post({'title': title}, do_follow=True,
**self.upload_data(filename))
- self.check_url(response, '/u/{0}/'.format(self.test_user.username))
- entry = mg_globals.database.MediaEntry.find_one({'title': title})
+ self.check_url(response, '/u/{0}/'.format(self.our_user().username))
+ entry = mg_globals.database.MediaEntry.query.filter_by(title=title).first()
assert entry.state == 'failed'
assert entry.fail_error == u'mediagoblin.processing:BadMediaFail'
diff --git a/mediagoblin/tests/test_submission/COPYING.txt b/mediagoblin/tests/test_submission/COPYING.txt
new file mode 100644
index 00000000..3818aae4
--- /dev/null
+++ b/mediagoblin/tests/test_submission/COPYING.txt
@@ -0,0 +1,5 @@
+Images located in this directory tree are released under a GPLv3 license
+and CC BY-SA 3.0 license. To the extent possible under law, the author(s)
+have dedicated all copyright and related and neighboring rights to these
+files to the public domain worldwide. These files are distributed without
+any warranty.
diff --git a/mediagoblin/tests/test_submission/big.png b/mediagoblin/tests/test_submission/big.png
new file mode 100644
index 00000000..a284cfda
--- /dev/null
+++ b/mediagoblin/tests/test_submission/big.png
Binary files differ
diff --git a/mediagoblin/tests/test_submission/medium.png b/mediagoblin/tests/test_submission/medium.png
new file mode 100644
index 00000000..e8b9ca00
--- /dev/null
+++ b/mediagoblin/tests/test_submission/medium.png
Binary files differ
diff --git a/mediagoblin/tests/testplugins/modify_context/__init__.py b/mediagoblin/tests/testplugins/modify_context/__init__.py
new file mode 100644
index 00000000..164e66c1
--- /dev/null
+++ b/mediagoblin/tests/testplugins/modify_context/__init__.py
@@ -0,0 +1,55 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+from mediagoblin.tools import pluginapi
+import pkg_resources
+
+
+def append_to_specific_context(context):
+ context['specific_page_append'] = 'in yer specificpage'
+ return context
+
+def append_to_global_context(context):
+ context['global_append'] = 'globally appended!'
+ return context
+
+def double_doubleme(context):
+ if 'doubleme' in context:
+ context['doubleme'] = context['doubleme'] * 2
+ return context
+
+
+def setup_plugin():
+ routes = [
+ ('modify_context.specific_page',
+ '/modify_context/specific/',
+ 'mediagoblin.tests.testplugins.modify_context.views:specific'),
+ ('modify_context.general_page',
+ '/modify_context/',
+ 'mediagoblin.tests.testplugins.modify_context.views:general')]
+
+ pluginapi.register_routes(routes)
+ pluginapi.register_template_path(
+ pkg_resources.resource_filename(
+ 'mediagoblin.tests.testplugins.modify_context', 'templates'))
+
+
+hooks = {
+ 'setup': setup_plugin,
+ ('modify_context.specific_page',
+ 'contextplugin/specific.html'): append_to_specific_context,
+ 'template_global_context': append_to_global_context,
+ 'template_context_prerender': double_doubleme}
diff --git a/mediagoblin/tests/testplugins/modify_context/templates/contextplugin/general.html b/mediagoblin/tests/testplugins/modify_context/templates/contextplugin/general.html
new file mode 100644
index 00000000..9cf96d3e
--- /dev/null
+++ b/mediagoblin/tests/testplugins/modify_context/templates/contextplugin/general.html
@@ -0,0 +1,5 @@
+General page!
+
+global thing: {{ global_append }}
+lol: {{ lol }}
+doubleme: {{ doubleme }}
diff --git a/mediagoblin/tests/testplugins/modify_context/templates/contextplugin/specific.html b/mediagoblin/tests/testplugins/modify_context/templates/contextplugin/specific.html
new file mode 100644
index 00000000..5b1b4c4a
--- /dev/null
+++ b/mediagoblin/tests/testplugins/modify_context/templates/contextplugin/specific.html
@@ -0,0 +1,6 @@
+Specific page!
+
+specific thing: {{ specific_page_append }}
+global thing: {{ global_append }}
+something: {{ something }}
+doubleme: {{ doubleme }}
diff --git a/mediagoblin/tests/testplugins/modify_context/views.py b/mediagoblin/tests/testplugins/modify_context/views.py
new file mode 100644
index 00000000..701ec6f9
--- /dev/null
+++ b/mediagoblin/tests/testplugins/modify_context/views.py
@@ -0,0 +1,33 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+from mediagoblin.tools.response import render_to_response
+
+
+def specific(request):
+ return render_to_response(
+ request,
+ 'contextplugin/specific.html',
+ {"something": "orother",
+ "doubleme": "happy"})
+
+
+def general(request):
+ return render_to_response(
+ request,
+ 'contextplugin/general.html',
+ {"lol": "cats",
+ "doubleme": "joy"})
diff --git a/mediagoblin/tests/testplugins/pluginspec/__init__.py b/mediagoblin/tests/testplugins/pluginspec/__init__.py
new file mode 100644
index 00000000..76ca2b1f
--- /dev/null
+++ b/mediagoblin/tests/testplugins/pluginspec/__init__.py
@@ -0,0 +1,22 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+def setup_plugin():
+ pass
+
+hooks = {
+ 'setup': setup_plugin,
+}
diff --git a/mediagoblin/tests/testplugins/pluginspec/config_spec.ini b/mediagoblin/tests/testplugins/pluginspec/config_spec.ini
new file mode 100644
index 00000000..5c9c3bd7
--- /dev/null
+++ b/mediagoblin/tests/testplugins/pluginspec/config_spec.ini
@@ -0,0 +1,4 @@
+[plugin_spec]
+some_string = string(default="blork")
+some_int = integer(default=50)
+dont_change_me = string(default="still the default") \ No newline at end of file
diff --git a/mediagoblin/tests/testplugins/staticstuff/__init__.py b/mediagoblin/tests/testplugins/staticstuff/__init__.py
new file mode 100644
index 00000000..a2591646
--- /dev/null
+++ b/mediagoblin/tests/testplugins/staticstuff/__init__.py
@@ -0,0 +1,36 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+from mediagoblin.tools.staticdirect import PluginStatic
+from mediagoblin.tools import pluginapi
+from pkg_resources import resource_filename
+
+def setup_plugin():
+ routes = [
+ ('staticstuff.static_demo',
+ '/staticstuff/',
+ 'mediagoblin.tests.testplugins.staticstuff.views:static_demo')]
+
+ pluginapi.register_routes(routes)
+
+
+hooks = {
+ 'setup': setup_plugin,
+ 'static_setup': lambda: PluginStatic(
+ 'staticstuff',
+ resource_filename(
+ 'mediagoblin.tests.testplugins.staticstuff',
+ 'static'))}
diff --git a/mediagoblin/tests/testplugins/staticstuff/static/css/bunnify.css b/mediagoblin/tests/testplugins/staticstuff/static/css/bunnify.css
new file mode 100644
index 00000000..1294ab8a
--- /dev/null
+++ b/mediagoblin/tests/testplugins/staticstuff/static/css/bunnify.css
@@ -0,0 +1,4 @@
+body {
+ background-color: #5edcf1;
+ color: #eb8add;
+} \ No newline at end of file
diff --git a/mediagoblin/tests/testplugins/staticstuff/views.py b/mediagoblin/tests/testplugins/staticstuff/views.py
new file mode 100644
index 00000000..34a5e8cb
--- /dev/null
+++ b/mediagoblin/tests/testplugins/staticstuff/views.py
@@ -0,0 +1,28 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import json
+
+from werkzeug import Response
+
+
+def static_demo(request):
+ return Response(json.dumps({
+ # this does not exist, but we'll pretend it does ;)
+ 'mgoblin_bunny_pic': request.staticdirect(
+ 'images/bunny_pic.png'),
+ 'plugin_bunny_css': request.staticdirect(
+ 'css/bunnify.css', 'staticstuff')}))
diff --git a/mediagoblin/tests/tools.py b/mediagoblin/tests/tools.py
index 52635e18..060dfda9 100644
--- a/mediagoblin/tests/tools.py
+++ b/mediagoblin/tests/tools.py
@@ -15,25 +15,25 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import sys
import os
import pkg_resources
import shutil
-from functools import wraps
from paste.deploy import loadapp
from webtest import TestApp
from mediagoblin import mg_globals
-from mediagoblin.db.models import User, MediaEntry, Collection
+from mediagoblin.db.models import User, MediaEntry, Collection, MediaComment, \
+ CommentSubscription, CommentNotification, Privilege, CommentReport
from mediagoblin.tools import testing
from mediagoblin.init.config import read_mediagoblin_config
from mediagoblin.db.base import Session
from mediagoblin.meddleware import BaseMeddleware
-from mediagoblin.auth.lib import bcrypt_gen_password_hash
+from mediagoblin.auth import gen_password_hash
from mediagoblin.gmg_commands.dbupdate import run_dbupdate
-from mediagoblin.init.celery import setup_celery_app
+
+from datetime import datetime
MEDIAGOBLIN_TEST_DB_NAME = u'__mediagoblin_tests__'
@@ -41,22 +41,10 @@ TEST_SERVER_CONFIG = pkg_resources.resource_filename(
'mediagoblin.tests', 'test_paste.ini')
TEST_APP_CONFIG = pkg_resources.resource_filename(
'mediagoblin.tests', 'test_mgoblin_app.ini')
-TEST_USER_DEV = pkg_resources.resource_filename(
- 'mediagoblin.tests', 'test_user_dev')
USER_DEV_DIRECTORIES_TO_SETUP = ['media/public', 'media/queue']
-BAD_CELERY_MESSAGE = """\
-Sorry, you *absolutely* must run tests with the
-mediagoblin.init.celery.from_tests module. Like so:
-
-$ CELERY_CONFIG_MODULE=mediagoblin.init.celery.from_tests {0}
-""".format(sys.argv[0])
-
-
-class BadCeleryEnviron(Exception): pass
-
class TestingMeddleware(BaseMeddleware):
"""
@@ -97,12 +85,6 @@ class TestingMeddleware(BaseMeddleware):
return
-def suicide_if_bad_celery_environ():
- if not os.environ.get('CELERY_CONFIG_MODULE') == \
- 'mediagoblin.init.celery.from_tests':
- raise BadCeleryEnviron(BAD_CELERY_MESSAGE)
-
-
def get_app(request, paste_config=None, mgoblin_config=None):
"""Create a MediaGoblin app for testing.
@@ -120,21 +102,13 @@ def get_app(request, paste_config=None, mgoblin_config=None):
# This is the directory we're copying the paste/mgoblin config stuff into
run_dir = request.config._tmpdirhandler.mktemp(
'mgoblin_app', numbered=True)
- user_dev_dir = run_dir.mkdir('test_user_dev').strpath
+ user_dev_dir = run_dir.mkdir('user_dev').strpath
new_paste_config = run_dir.join('paste.ini').strpath
new_mgoblin_config = run_dir.join('mediagoblin.ini').strpath
shutil.copyfile(paste_config, new_paste_config)
shutil.copyfile(mgoblin_config, new_mgoblin_config)
- suicide_if_bad_celery_environ()
-
- # Make sure we've turned on testing
- testing._activate_testing()
-
- # Leave this imported as it sets up celery.
- from mediagoblin.init.celery import from_tests
-
Session.rollback()
Session.remove()
@@ -154,9 +128,6 @@ def get_app(request, paste_config=None, mgoblin_config=None):
test_app = loadapp(
'config:' + new_paste_config)
- # Re-setup celery
- setup_celery_app(app_config, global_config)
-
# Insert the TestingMeddleware, which can do some
# sanity checks on every request/response.
# Doing it this way is probably not the cleanest way.
@@ -164,7 +135,6 @@ def get_app(request, paste_config=None, mgoblin_config=None):
mg_globals.app.meddleware.insert(0, TestingMeddleware(mg_globals.app))
app = TestApp(test_app)
-
return app
@@ -195,13 +165,13 @@ def assert_db_meets_expected(db, expected):
for collection_name, collection_data in expected.iteritems():
collection = db[collection_name]
for expected_document in collection_data:
- document = collection.find_one({'id': expected_document['id']})
+ document = collection.query.filter_by(id=expected_document['id']).first()
assert document is not None # make sure it exists
assert document == expected_document # make sure it matches
def fixture_add_user(username=u'chris', password=u'toast',
- active_user=True):
+ privileges=[], wants_comment_notification=True):
# Reuse existing user or create a new one
test_user = User.query.filter_by(username=username).first()
if test_user is None:
@@ -209,13 +179,14 @@ def fixture_add_user(username=u'chris', password=u'toast',
test_user.username = username
test_user.email = username + u'@example.com'
if password is not None:
- test_user.pw_hash = bcrypt_gen_password_hash(password)
- if active_user:
- test_user.email_verified = True
- test_user.status = u'active'
+ test_user.pw_hash = gen_password_hash(password)
+ test_user.wants_comment_notification = wants_comment_notification
+ for privilege in privileges:
+ query = Privilege.query.filter(Privilege.privilege_name==privilege)
+ if query.count():
+ test_user.all_privileges.append(query.one())
test_user.save()
-
# Reload
test_user = User.query.filter_by(username=username).first()
@@ -225,19 +196,79 @@ def fixture_add_user(username=u'chris', password=u'toast',
return test_user
+def fixture_comment_subscription(entry, notify=True, send_email=None):
+ if send_email is None:
+ uploader = User.query.filter_by(id=entry.uploader).first()
+ send_email = uploader.wants_comment_notification
+
+ cs = CommentSubscription(
+ media_entry_id=entry.id,
+ user_id=entry.uploader,
+ notify=notify,
+ send_email=send_email)
+
+ cs.save()
+
+ cs = CommentSubscription.query.filter_by(id=cs.id).first()
+
+ Session.expunge(cs)
+
+ return cs
+
+
+def fixture_add_comment_notification(entry_id, subject_id, user_id,
+ seen=False):
+ cn = CommentNotification(user_id=user_id,
+ seen=seen,
+ subject_id=subject_id)
+ cn.save()
+
+ cn = CommentNotification.query.filter_by(id=cn.id).first()
+
+ Session.expunge(cn)
+
+ return cn
+
+
def fixture_media_entry(title=u"Some title", slug=None,
- uploader=None, save=True, gen_slug=True):
+ uploader=None, save=True, gen_slug=True,
+ state=u'unprocessed', fake_upload=True,
+ expunge=True):
+ """
+ Add a media entry for testing purposes.
+
+ Caution: if you're adding multiple entries with fake_upload=True,
+ make sure you save between them... otherwise you'll hit an
+ IntegrityError from multiple newly-added-MediaEntries adding
+ FileKeynames at once. :)
+ """
+ if uploader is None:
+ uploader = fixture_add_user().id
+
entry = MediaEntry()
entry.title = title
entry.slug = slug
- entry.uploader = uploader or fixture_add_user().id
+ entry.uploader = uploader
entry.media_type = u'image'
+ entry.state = state
+
+ if fake_upload:
+ entry.media_files = {'thumb': ['a', 'b', 'c.jpg'],
+ 'medium': ['d', 'e', 'f.png'],
+ 'original': ['g', 'h', 'i.png']}
+ entry.media_type = u'mediagoblin.media_types.image'
if gen_slug:
entry.generate_slug()
+
if save:
entry.save()
+ if expunge:
+ entry = MediaEntry.query.filter_by(id=entry.id).first()
+
+ Session.expunge(entry)
+
return entry
@@ -260,3 +291,55 @@ def fixture_add_collection(name=u"My first Collection", user=None):
Session.expunge(coll)
return coll
+
+def fixture_add_comment(author=None, media_entry=None, comment=None):
+ if author is None:
+ author = fixture_add_user().id
+
+ if media_entry is None:
+ media_entry = fixture_media_entry().id
+
+ if comment is None:
+ comment = \
+ 'Auto-generated test comment by user #{0} on media #{0}'.format(
+ author, media_entry)
+
+ comment = MediaComment(author=author,
+ media_entry=media_entry,
+ content=comment)
+
+ comment.save()
+
+ Session.expunge(comment)
+
+ return comment
+
+def fixture_add_comment_report(comment=None, reported_user=None,
+ reporter=None, created=None, report_content=None):
+ if comment is None:
+ comment = fixture_add_comment()
+
+ if reported_user is None:
+ reported_user = fixture_add_user()
+
+ if reporter is None:
+ reporter = fixture_add_user()
+
+ if created is None:
+ created=datetime.now()
+
+ if report_content is None:
+ report_content = \
+ 'Auto-generated test report'
+
+ comment_report = CommentReport(comment=comment,
+ reported_user = reported_user,
+ reporter = reporter,
+ created = created,
+ report_content=report_content)
+
+ comment_report.save()
+
+ Session.expunge(comment_report)
+
+ return comment_report