diff options
Diffstat (limited to 'mediagoblin/tests/test_oauth.py')
-rw-r--r-- | mediagoblin/tests/test_oauth.py | 115 |
1 files changed, 71 insertions, 44 deletions
diff --git a/mediagoblin/tests/test_oauth.py b/mediagoblin/tests/test_oauth.py index 901556fe..ea3bd798 100644 --- a/mediagoblin/tests/test_oauth.py +++ b/mediagoblin/tests/test_oauth.py @@ -17,6 +17,7 @@ import json import logging +import pytest from urlparse import parse_qs, urlparse from mediagoblin import mg_globals @@ -28,7 +29,10 @@ _log = logging.getLogger(__name__) class TestOAuth(object): - def _setup(self, test_app): + @pytest.fixture(autouse=True) + def setup(self, test_app): + self.test_app = test_app + self.db = mg_globals.database self.pman = pluginapi.PluginManager() @@ -36,17 +40,17 @@ class TestOAuth(object): self.user_password = u'4cc355_70k3N' self.user = fixture_add_user(u'joauth', self.user_password) - self.login(test_app) + self.login() - def login(self, test_app): - test_app.post( - '/auth/login/', { - 'username': self.user.username, - 'password': self.user_password}) + def login(self): + self.test_app.post( + '/auth/login/', { + 'username': self.user.username, + 'password': self.user_password}) - def register_client(self, test_app, name, client_type, description=None, - redirect_uri=''): - return test_app.post( + def register_client(self, name, client_type, description=None, + redirect_uri=''): + return self.test_app.post( '/oauth/client/register', { 'name': name, 'description': description, @@ -56,12 +60,10 @@ class TestOAuth(object): def get_context(self, template_name): return template.TEMPLATE_TEST_CONTEXT[template_name] - def test_1_public_client_registration_without_redirect_uri(self, test_app): + def test_1_public_client_registration_without_redirect_uri(self): ''' Test 'public' OAuth client registration without any redirect uri ''' - self._setup(test_app) - - response = self.register_client(test_app, u'OMGOMGOMG', 'public', - 'OMGOMG Apache License v2') + response = self.register_client( + u'OMGOMGOMG', 'public', 'OMGOMG Apache License v2') ctx = self.get_context('oauth/client/register.html') @@ -71,29 +73,30 @@ class TestOAuth(object): assert response.status_int == 200 # Should display an error - assert ctx['form'].redirect_uri.errors + assert len(ctx['form'].redirect_uri.errors) # Should not pass through assert not client - def test_2_successful_public_client_registration(self, test_app): + def test_2_successful_public_client_registration(self): ''' Successfully register a public client ''' - self._setup(test_app) - self.register_client(test_app, u'OMGOMG', 'public', 'OMG!', - 'http://foo.example') + uri = 'http://foo.example' + self.register_client( + u'OMGOMG', 'public', 'OMG!', uri) client = self.db.OAuthClient.query.filter( self.db.OAuthClient.name == u'OMGOMG').first() + # redirect_uri should be set + assert client.redirect_uri == uri + # Client should have been registered assert client - def test_3_successful_confidential_client_reg(self, test_app): + def test_3_successful_confidential_client_reg(self): ''' Register a confidential OAuth client ''' - self._setup(test_app) - response = self.register_client( - test_app, u'GMOGMO', 'confidential', 'NO GMO!') + u'GMOGMO', 'confidential', 'NO GMO!') assert response.status_int == 302 @@ -105,18 +108,16 @@ class TestOAuth(object): return client - def test_4_authorize_confidential_client(self, test_app): + def test_4_authorize_confidential_client(self): ''' Authorize a confidential client as a logged in user ''' - self._setup(test_app) - - client = self.test_3_successful_confidential_client_reg(test_app) + client = self.test_3_successful_confidential_client_reg() client_identifier = client.identifier redirect_uri = 'https://foo.example' - response = test_app.get('/oauth/authorize', { + response = self.test_app.get('/oauth/authorize', { 'client_id': client.identifier, - 'scope': 'admin', + 'scope': 'all', 'redirect_uri': redirect_uri}) # User-agent should NOT be redirected @@ -127,7 +128,7 @@ class TestOAuth(object): form = ctx['form'] # Short for client authorization post reponse - capr = test_app.post( + capr = self.test_app.post( '/oauth/client/authorize', { 'client_id': form.client_id.data, 'allow': 'Allow', @@ -142,21 +143,19 @@ class TestOAuth(object): return authorization_response, client_identifier def get_code_from_redirect_uri(self, uri): + ''' Get the value of ?code= from an URI ''' return parse_qs(urlparse(uri).query)['code'][0] - def test_token_endpoint_successful_confidential_request(self, test_app): + def test_token_endpoint_successful_confidential_request(self): ''' Successful request against token endpoint ''' - self._setup(test_app) - - code_redirect, client_id = self.test_4_authorize_confidential_client( - test_app) + code_redirect, client_id = self.test_4_authorize_confidential_client() code = self.get_code_from_redirect_uri(code_redirect.location) client = self.db.OAuthClient.query.filter( self.db.OAuthClient.identifier == unicode(client_id)).first() - token_res = test_app.get('/oauth/access_token?client_id={0}&\ + token_res = self.test_app.get('/oauth/access_token?client_id={0}&\ code={1}&client_secret={2}'.format(client_id, code, client.secret)) assert token_res.status_int == 200 @@ -170,19 +169,21 @@ code={1}&client_secret={2}'.format(client_id, code, client.secret)) assert type(token_data['expires_in']) == int assert token_data['expires_in'] > 0 - def test_token_endpont_missing_id_confidential_request(self, test_app): - ''' Unsuccessful request against token endpoint, missing client_id ''' - self._setup(test_app) + # There should be a refresh token provided in the token data + assert len(token_data['refresh_token']) + + return client_id, token_data - code_redirect, client_id = self.test_4_authorize_confidential_client( - test_app) + def test_token_endpont_missing_id_confidential_request(self): + ''' Unsuccessful request against token endpoint, missing client_id ''' + code_redirect, client_id = self.test_4_authorize_confidential_client() code = self.get_code_from_redirect_uri(code_redirect.location) client = self.db.OAuthClient.query.filter( self.db.OAuthClient.identifier == unicode(client_id)).first() - token_res = test_app.get('/oauth/access_token?\ + token_res = self.test_app.get('/oauth/access_token?\ code={0}&client_secret={1}'.format(code, client.secret)) assert token_res.status_int == 200 @@ -192,4 +193,30 @@ code={0}&client_secret={1}'.format(code, client.secret)) assert 'error' in token_data assert not 'access_token' in token_data assert token_data['error'] == 'invalid_request' - assert token_data['error_description'] == 'Missing client_id in request' + assert len(token_data['error_description']) + + def test_refresh_token(self): + ''' Try to get a new access token using the refresh token ''' + # Get an access token and a refresh token + client_id, token_data =\ + self.test_token_endpoint_successful_confidential_request() + + client = self.db.OAuthClient.query.filter( + self.db.OAuthClient.identifier == client_id).first() + + token_res = self.test_app.get('/oauth/access_token', + {'refresh_token': token_data['refresh_token'], + 'client_id': client_id, + 'client_secret': client.secret + }) + + assert token_res.status_int == 200 + + new_token_data = json.loads(token_res.body) + + assert not 'error' in new_token_data + assert 'access_token' in new_token_data + assert 'token_type' in new_token_data + assert 'expires_in' in new_token_data + assert type(new_token_data['expires_in']) == int + assert new_token_data['expires_in'] > 0 |