diff options
Diffstat (limited to 'mediagoblin/tests')
-rw-r--r-- | mediagoblin/tests/test_http_callback.py | 81 | ||||
-rw-r--r-- | mediagoblin/tests/test_mgoblin_app.ini | 4 | ||||
-rw-r--r-- | mediagoblin/tests/test_oauth.py | 183 | ||||
-rw-r--r-- | mediagoblin/tests/tools.py | 8 |
4 files changed, 272 insertions, 4 deletions
diff --git a/mediagoblin/tests/test_http_callback.py b/mediagoblin/tests/test_http_callback.py new file mode 100644 index 00000000..d769af1e --- /dev/null +++ b/mediagoblin/tests/test_http_callback.py @@ -0,0 +1,81 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import json + +from urlparse import urlparse, parse_qs + +from mediagoblin import mg_globals +from mediagoblin.tools import processing +from mediagoblin.tests.tools import get_test_app, fixture_add_user +from mediagoblin.tests.test_submission import GOOD_PNG +from mediagoblin.tests import test_oauth as oauth + + +class TestHTTPCallback(object): + def setUp(self): + self.app = get_test_app() + self.db = mg_globals.database + + self.user_password = 'secret' + self.user = fixture_add_user('call_back', self.user_password) + + self.login() + + def login(self): + self.app.post('/auth/login/', { + 'username': self.user.username, + 'password': self.user_password}) + + def get_access_token(self, client_id, client_secret, code): + response = self.app.get('/oauth/access_token', { + 'code': code, + 'client_id': client_id, + 'client_secret': client_secret}) + + response_data = json.loads(response.body) + + return response_data['access_token'] + + def test_callback(self): + ''' Test processing HTTP callback ''' + + self.oauth = oauth.TestOAuth() + self.oauth.setUp() + + redirect, client_id = self.oauth.test_4_authorize_confidential_client() + + code = parse_qs(urlparse(redirect.location).query)['code'][0] + + client = self.db.OAuthClient.query.filter( + self.db.OAuthClient.identifier == unicode(client_id)).first() + + client_secret = client.secret + + access_token = self.get_access_token(client_id, client_secret, code) + + callback_url = 'https://foo.example?secrettestmediagoblinparam' + + res = self.app.post('/api/submit?client_id={0}&access_token={1}\ +&client_secret={2}'.format( + client_id, + access_token, + client_secret), { + 'title': 'Test', + 'callback_url': callback_url}, + upload_files=[('file', GOOD_PNG)]) + + assert processing.TESTS_CALLBACKS[callback_url]['state'] == u'processed' diff --git a/mediagoblin/tests/test_mgoblin_app.ini b/mediagoblin/tests/test_mgoblin_app.ini index 3b979ff7..cde61a70 100644 --- a/mediagoblin/tests/test_mgoblin_app.ini +++ b/mediagoblin/tests/test_mgoblin_app.ini @@ -31,3 +31,7 @@ lock_dir = %(here)s/test_user_dev/beaker/cache/lock CELERY_ALWAYS_EAGER = true CELERY_RESULT_DBURI = "sqlite:///%(here)s/test_user_dev/celery.db" BROKER_HOST = "sqlite:///%(here)s/test_user_dev/kombu.db" + +[plugins] +[[mediagoblin.plugins.api]] +[[mediagoblin.plugins.oauth]] diff --git a/mediagoblin/tests/test_oauth.py b/mediagoblin/tests/test_oauth.py new file mode 100644 index 00000000..db4e226a --- /dev/null +++ b/mediagoblin/tests/test_oauth.py @@ -0,0 +1,183 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import json +import logging + +from urlparse import parse_qs, urlparse + +from mediagoblin import mg_globals +from mediagoblin.tools import template, pluginapi +from mediagoblin.tests.tools import get_test_app, fixture_add_user + + +_log = logging.getLogger(__name__) + + +class TestOAuth(object): + def setUp(self): + self.app = get_test_app() + self.db = mg_globals.database + + self.pman = pluginapi.PluginManager() + + self.user_password = '4cc355_70k3N' + self.user = fixture_add_user('joauth', self.user_password) + + self.login() + + def login(self): + self.app.post( + '/auth/login/', { + 'username': self.user.username, + 'password': self.user_password}) + + def register_client(self, name, client_type, description=None, + redirect_uri=''): + return self.app.post( + '/oauth/client/register', { + 'name': name, + 'description': description, + 'type': client_type, + 'redirect_uri': redirect_uri}) + + def get_context(self, template_name): + return template.TEMPLATE_TEST_CONTEXT[template_name] + + def test_1_public_client_registration_without_redirect_uri(self): + ''' Test 'public' OAuth client registration without any redirect uri ''' + response = self.register_client('OMGOMGOMG', 'public', + 'OMGOMG Apache License v2') + + ctx = self.get_context('oauth/client/register.html') + + client = self.db.OAuthClient.query.filter( + self.db.OAuthClient.name == 'OMGOMGOMG').first() + + assert response.status_int == 200 + + # Should display an error + assert ctx['form'].redirect_uri.errors + + # Should not pass through + assert not client + + def test_2_successful_public_client_registration(self): + ''' Successfully register a public client ''' + self.login() + self.register_client('OMGOMG', 'public', 'OMG!', + 'http://foo.example') + + client = self.db.OAuthClient.query.filter( + self.db.OAuthClient.name == 'OMGOMG').first() + + # Client should have been registered + assert client + + def test_3_successful_confidential_client_reg(self): + ''' Register a confidential OAuth client ''' + response = self.register_client('GMOGMO', 'confidential', 'NO GMO!') + + assert response.status_int == 302 + + client = self.db.OAuthClient.query.filter( + self.db.OAuthClient.name == 'GMOGMO').first() + + # Client should have been registered + assert client + + return client + + def test_4_authorize_confidential_client(self): + ''' Authorize a confidential client as a logged in user ''' + client = self.test_3_successful_confidential_client_reg() + + client_identifier = client.identifier + + redirect_uri = 'https://foo.example' + response = self.app.get('/oauth/authorize', { + 'client_id': client.identifier, + 'scope': 'admin', + 'redirect_uri': redirect_uri}) + + # User-agent should NOT be redirected + assert response.status_int == 200 + + ctx = self.get_context('oauth/authorize.html') + + form = ctx['form'] + + # Short for client authorization post reponse + capr = self.app.post( + '/oauth/client/authorize', { + 'client_id': form.client_id.data, + 'allow': 'Allow', + 'next': form.next.data}) + + assert capr.status_int == 302 + + authorization_response = capr.follow() + + assert authorization_response.location.startswith(redirect_uri) + + return authorization_response, client_identifier + + def get_code_from_redirect_uri(self, uri): + return parse_qs(urlparse(uri).query)['code'][0] + + def test_token_endpoint_successful_confidential_request(self): + ''' Successful request against token endpoint ''' + code_redirect, client_id = self.test_4_authorize_confidential_client() + + code = self.get_code_from_redirect_uri(code_redirect.location) + + client = self.db.OAuthClient.query.filter( + self.db.OAuthClient.identifier == unicode(client_id)).first() + + token_res = self.app.get('/oauth/access_token?client_id={0}&\ +code={1}&client_secret={2}'.format(client_id, code, client.secret)) + + assert token_res.status_int == 200 + + token_data = json.loads(token_res.body) + + assert not 'error' in token_data + assert 'access_token' in token_data + assert 'token_type' in token_data + assert 'expires_in' in token_data + assert type(token_data['expires_in']) == int + assert token_data['expires_in'] > 0 + + def test_token_endpont_missing_id_confidential_request(self): + ''' Unsuccessful request against token endpoint, missing client_id ''' + code_redirect, client_id = self.test_4_authorize_confidential_client() + + code = self.get_code_from_redirect_uri(code_redirect.location) + + client = self.db.OAuthClient.query.filter( + self.db.OAuthClient.identifier == unicode(client_id)).first() + + token_res = self.app.get('/oauth/access_token?\ +code={0}&client_secret={1}'.format(code, client.secret)) + + assert token_res.status_int == 200 + + token_data = json.loads(token_res.body) + + assert 'error' in token_data + assert not 'access_token' in token_data + assert token_data['error'] == 'invalid_request' + assert token_data['error_description'] == 'Missing client_id in request' diff --git a/mediagoblin/tests/tools.py b/mediagoblin/tests/tools.py index 6fd11fc2..d3369831 100644 --- a/mediagoblin/tests/tools.py +++ b/mediagoblin/tests/tools.py @@ -60,7 +60,7 @@ class BadCeleryEnviron(Exception): pass class TestingMeddleware(BaseMeddleware): """ Meddleware for the Unit tests - + It might make sense to perform some tests on all requests/responses. Or prepare them in a special manner. For example all html responses could be tested @@ -100,7 +100,7 @@ def suicide_if_bad_celery_environ(): if not os.environ.get('CELERY_CONFIG_MODULE') == \ 'mediagoblin.init.celery.from_tests': raise BadCeleryEnviron(BAD_CELERY_MESSAGE) - + def get_test_app(dump_old_app=True): suicide_if_bad_celery_environ() @@ -202,8 +202,8 @@ def assert_db_meets_expected(db, expected): assert document == expected_document # make sure it matches -def fixture_add_user(username = u'chris', password = 'toast', - active_user = True): +def fixture_add_user(username=u'chris', password='toast', + active_user=True): test_user = mg_globals.database.User() test_user.username = username test_user.email = username + u'@example.com' |