aboutsummaryrefslogtreecommitdiffstats
path: root/mediagoblin/tests
diff options
context:
space:
mode:
Diffstat (limited to 'mediagoblin/tests')
-rw-r--r--mediagoblin/tests/test_http_callback.py81
-rw-r--r--mediagoblin/tests/test_mgoblin_app.ini4
-rw-r--r--mediagoblin/tests/test_oauth.py183
-rw-r--r--mediagoblin/tests/tools.py8
4 files changed, 272 insertions, 4 deletions
diff --git a/mediagoblin/tests/test_http_callback.py b/mediagoblin/tests/test_http_callback.py
new file mode 100644
index 00000000..d769af1e
--- /dev/null
+++ b/mediagoblin/tests/test_http_callback.py
@@ -0,0 +1,81 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import json
+
+from urlparse import urlparse, parse_qs
+
+from mediagoblin import mg_globals
+from mediagoblin.tools import processing
+from mediagoblin.tests.tools import get_test_app, fixture_add_user
+from mediagoblin.tests.test_submission import GOOD_PNG
+from mediagoblin.tests import test_oauth as oauth
+
+
+class TestHTTPCallback(object):
+ def setUp(self):
+ self.app = get_test_app()
+ self.db = mg_globals.database
+
+ self.user_password = 'secret'
+ self.user = fixture_add_user('call_back', self.user_password)
+
+ self.login()
+
+ def login(self):
+ self.app.post('/auth/login/', {
+ 'username': self.user.username,
+ 'password': self.user_password})
+
+ def get_access_token(self, client_id, client_secret, code):
+ response = self.app.get('/oauth/access_token', {
+ 'code': code,
+ 'client_id': client_id,
+ 'client_secret': client_secret})
+
+ response_data = json.loads(response.body)
+
+ return response_data['access_token']
+
+ def test_callback(self):
+ ''' Test processing HTTP callback '''
+
+ self.oauth = oauth.TestOAuth()
+ self.oauth.setUp()
+
+ redirect, client_id = self.oauth.test_4_authorize_confidential_client()
+
+ code = parse_qs(urlparse(redirect.location).query)['code'][0]
+
+ client = self.db.OAuthClient.query.filter(
+ self.db.OAuthClient.identifier == unicode(client_id)).first()
+
+ client_secret = client.secret
+
+ access_token = self.get_access_token(client_id, client_secret, code)
+
+ callback_url = 'https://foo.example?secrettestmediagoblinparam'
+
+ res = self.app.post('/api/submit?client_id={0}&access_token={1}\
+&client_secret={2}'.format(
+ client_id,
+ access_token,
+ client_secret), {
+ 'title': 'Test',
+ 'callback_url': callback_url},
+ upload_files=[('file', GOOD_PNG)])
+
+ assert processing.TESTS_CALLBACKS[callback_url]['state'] == u'processed'
diff --git a/mediagoblin/tests/test_mgoblin_app.ini b/mediagoblin/tests/test_mgoblin_app.ini
index 3b979ff7..cde61a70 100644
--- a/mediagoblin/tests/test_mgoblin_app.ini
+++ b/mediagoblin/tests/test_mgoblin_app.ini
@@ -31,3 +31,7 @@ lock_dir = %(here)s/test_user_dev/beaker/cache/lock
CELERY_ALWAYS_EAGER = true
CELERY_RESULT_DBURI = "sqlite:///%(here)s/test_user_dev/celery.db"
BROKER_HOST = "sqlite:///%(here)s/test_user_dev/kombu.db"
+
+[plugins]
+[[mediagoblin.plugins.api]]
+[[mediagoblin.plugins.oauth]]
diff --git a/mediagoblin/tests/test_oauth.py b/mediagoblin/tests/test_oauth.py
new file mode 100644
index 00000000..db4e226a
--- /dev/null
+++ b/mediagoblin/tests/test_oauth.py
@@ -0,0 +1,183 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import json
+import logging
+
+from urlparse import parse_qs, urlparse
+
+from mediagoblin import mg_globals
+from mediagoblin.tools import template, pluginapi
+from mediagoblin.tests.tools import get_test_app, fixture_add_user
+
+
+_log = logging.getLogger(__name__)
+
+
+class TestOAuth(object):
+ def setUp(self):
+ self.app = get_test_app()
+ self.db = mg_globals.database
+
+ self.pman = pluginapi.PluginManager()
+
+ self.user_password = '4cc355_70k3N'
+ self.user = fixture_add_user('joauth', self.user_password)
+
+ self.login()
+
+ def login(self):
+ self.app.post(
+ '/auth/login/', {
+ 'username': self.user.username,
+ 'password': self.user_password})
+
+ def register_client(self, name, client_type, description=None,
+ redirect_uri=''):
+ return self.app.post(
+ '/oauth/client/register', {
+ 'name': name,
+ 'description': description,
+ 'type': client_type,
+ 'redirect_uri': redirect_uri})
+
+ def get_context(self, template_name):
+ return template.TEMPLATE_TEST_CONTEXT[template_name]
+
+ def test_1_public_client_registration_without_redirect_uri(self):
+ ''' Test 'public' OAuth client registration without any redirect uri '''
+ response = self.register_client('OMGOMGOMG', 'public',
+ 'OMGOMG Apache License v2')
+
+ ctx = self.get_context('oauth/client/register.html')
+
+ client = self.db.OAuthClient.query.filter(
+ self.db.OAuthClient.name == 'OMGOMGOMG').first()
+
+ assert response.status_int == 200
+
+ # Should display an error
+ assert ctx['form'].redirect_uri.errors
+
+ # Should not pass through
+ assert not client
+
+ def test_2_successful_public_client_registration(self):
+ ''' Successfully register a public client '''
+ self.login()
+ self.register_client('OMGOMG', 'public', 'OMG!',
+ 'http://foo.example')
+
+ client = self.db.OAuthClient.query.filter(
+ self.db.OAuthClient.name == 'OMGOMG').first()
+
+ # Client should have been registered
+ assert client
+
+ def test_3_successful_confidential_client_reg(self):
+ ''' Register a confidential OAuth client '''
+ response = self.register_client('GMOGMO', 'confidential', 'NO GMO!')
+
+ assert response.status_int == 302
+
+ client = self.db.OAuthClient.query.filter(
+ self.db.OAuthClient.name == 'GMOGMO').first()
+
+ # Client should have been registered
+ assert client
+
+ return client
+
+ def test_4_authorize_confidential_client(self):
+ ''' Authorize a confidential client as a logged in user '''
+ client = self.test_3_successful_confidential_client_reg()
+
+ client_identifier = client.identifier
+
+ redirect_uri = 'https://foo.example'
+ response = self.app.get('/oauth/authorize', {
+ 'client_id': client.identifier,
+ 'scope': 'admin',
+ 'redirect_uri': redirect_uri})
+
+ # User-agent should NOT be redirected
+ assert response.status_int == 200
+
+ ctx = self.get_context('oauth/authorize.html')
+
+ form = ctx['form']
+
+ # Short for client authorization post reponse
+ capr = self.app.post(
+ '/oauth/client/authorize', {
+ 'client_id': form.client_id.data,
+ 'allow': 'Allow',
+ 'next': form.next.data})
+
+ assert capr.status_int == 302
+
+ authorization_response = capr.follow()
+
+ assert authorization_response.location.startswith(redirect_uri)
+
+ return authorization_response, client_identifier
+
+ def get_code_from_redirect_uri(self, uri):
+ return parse_qs(urlparse(uri).query)['code'][0]
+
+ def test_token_endpoint_successful_confidential_request(self):
+ ''' Successful request against token endpoint '''
+ code_redirect, client_id = self.test_4_authorize_confidential_client()
+
+ code = self.get_code_from_redirect_uri(code_redirect.location)
+
+ client = self.db.OAuthClient.query.filter(
+ self.db.OAuthClient.identifier == unicode(client_id)).first()
+
+ token_res = self.app.get('/oauth/access_token?client_id={0}&\
+code={1}&client_secret={2}'.format(client_id, code, client.secret))
+
+ assert token_res.status_int == 200
+
+ token_data = json.loads(token_res.body)
+
+ assert not 'error' in token_data
+ assert 'access_token' in token_data
+ assert 'token_type' in token_data
+ assert 'expires_in' in token_data
+ assert type(token_data['expires_in']) == int
+ assert token_data['expires_in'] > 0
+
+ def test_token_endpont_missing_id_confidential_request(self):
+ ''' Unsuccessful request against token endpoint, missing client_id '''
+ code_redirect, client_id = self.test_4_authorize_confidential_client()
+
+ code = self.get_code_from_redirect_uri(code_redirect.location)
+
+ client = self.db.OAuthClient.query.filter(
+ self.db.OAuthClient.identifier == unicode(client_id)).first()
+
+ token_res = self.app.get('/oauth/access_token?\
+code={0}&client_secret={1}'.format(code, client.secret))
+
+ assert token_res.status_int == 200
+
+ token_data = json.loads(token_res.body)
+
+ assert 'error' in token_data
+ assert not 'access_token' in token_data
+ assert token_data['error'] == 'invalid_request'
+ assert token_data['error_description'] == 'Missing client_id in request'
diff --git a/mediagoblin/tests/tools.py b/mediagoblin/tests/tools.py
index 6fd11fc2..d3369831 100644
--- a/mediagoblin/tests/tools.py
+++ b/mediagoblin/tests/tools.py
@@ -60,7 +60,7 @@ class BadCeleryEnviron(Exception): pass
class TestingMeddleware(BaseMeddleware):
"""
Meddleware for the Unit tests
-
+
It might make sense to perform some tests on all
requests/responses. Or prepare them in a special
manner. For example all html responses could be tested
@@ -100,7 +100,7 @@ def suicide_if_bad_celery_environ():
if not os.environ.get('CELERY_CONFIG_MODULE') == \
'mediagoblin.init.celery.from_tests':
raise BadCeleryEnviron(BAD_CELERY_MESSAGE)
-
+
def get_test_app(dump_old_app=True):
suicide_if_bad_celery_environ()
@@ -202,8 +202,8 @@ def assert_db_meets_expected(db, expected):
assert document == expected_document # make sure it matches
-def fixture_add_user(username = u'chris', password = 'toast',
- active_user = True):
+def fixture_add_user(username=u'chris', password='toast',
+ active_user=True):
test_user = mg_globals.database.User()
test_user.username = username
test_user.email = username + u'@example.com'