aboutsummaryrefslogtreecommitdiffstats
path: root/mediagoblin/tests/test_oauth.py
diff options
context:
space:
mode:
Diffstat (limited to 'mediagoblin/tests/test_oauth.py')
-rw-r--r--mediagoblin/tests/test_oauth.py115
1 files changed, 71 insertions, 44 deletions
diff --git a/mediagoblin/tests/test_oauth.py b/mediagoblin/tests/test_oauth.py
index 901556fe..ea3bd798 100644
--- a/mediagoblin/tests/test_oauth.py
+++ b/mediagoblin/tests/test_oauth.py
@@ -17,6 +17,7 @@
import json
import logging
+import pytest
from urlparse import parse_qs, urlparse
from mediagoblin import mg_globals
@@ -28,7 +29,10 @@ _log = logging.getLogger(__name__)
class TestOAuth(object):
- def _setup(self, test_app):
+ @pytest.fixture(autouse=True)
+ def setup(self, test_app):
+ self.test_app = test_app
+
self.db = mg_globals.database
self.pman = pluginapi.PluginManager()
@@ -36,17 +40,17 @@ class TestOAuth(object):
self.user_password = u'4cc355_70k3N'
self.user = fixture_add_user(u'joauth', self.user_password)
- self.login(test_app)
+ self.login()
- def login(self, test_app):
- test_app.post(
- '/auth/login/', {
- 'username': self.user.username,
- 'password': self.user_password})
+ def login(self):
+ self.test_app.post(
+ '/auth/login/', {
+ 'username': self.user.username,
+ 'password': self.user_password})
- def register_client(self, test_app, name, client_type, description=None,
- redirect_uri=''):
- return test_app.post(
+ def register_client(self, name, client_type, description=None,
+ redirect_uri=''):
+ return self.test_app.post(
'/oauth/client/register', {
'name': name,
'description': description,
@@ -56,12 +60,10 @@ class TestOAuth(object):
def get_context(self, template_name):
return template.TEMPLATE_TEST_CONTEXT[template_name]
- def test_1_public_client_registration_without_redirect_uri(self, test_app):
+ def test_1_public_client_registration_without_redirect_uri(self):
''' Test 'public' OAuth client registration without any redirect uri '''
- self._setup(test_app)
-
- response = self.register_client(test_app, u'OMGOMGOMG', 'public',
- 'OMGOMG Apache License v2')
+ response = self.register_client(
+ u'OMGOMGOMG', 'public', 'OMGOMG Apache License v2')
ctx = self.get_context('oauth/client/register.html')
@@ -71,29 +73,30 @@ class TestOAuth(object):
assert response.status_int == 200
# Should display an error
- assert ctx['form'].redirect_uri.errors
+ assert len(ctx['form'].redirect_uri.errors)
# Should not pass through
assert not client
- def test_2_successful_public_client_registration(self, test_app):
+ def test_2_successful_public_client_registration(self):
''' Successfully register a public client '''
- self._setup(test_app)
- self.register_client(test_app, u'OMGOMG', 'public', 'OMG!',
- 'http://foo.example')
+ uri = 'http://foo.example'
+ self.register_client(
+ u'OMGOMG', 'public', 'OMG!', uri)
client = self.db.OAuthClient.query.filter(
self.db.OAuthClient.name == u'OMGOMG').first()
+ # redirect_uri should be set
+ assert client.redirect_uri == uri
+
# Client should have been registered
assert client
- def test_3_successful_confidential_client_reg(self, test_app):
+ def test_3_successful_confidential_client_reg(self):
''' Register a confidential OAuth client '''
- self._setup(test_app)
-
response = self.register_client(
- test_app, u'GMOGMO', 'confidential', 'NO GMO!')
+ u'GMOGMO', 'confidential', 'NO GMO!')
assert response.status_int == 302
@@ -105,18 +108,16 @@ class TestOAuth(object):
return client
- def test_4_authorize_confidential_client(self, test_app):
+ def test_4_authorize_confidential_client(self):
''' Authorize a confidential client as a logged in user '''
- self._setup(test_app)
-
- client = self.test_3_successful_confidential_client_reg(test_app)
+ client = self.test_3_successful_confidential_client_reg()
client_identifier = client.identifier
redirect_uri = 'https://foo.example'
- response = test_app.get('/oauth/authorize', {
+ response = self.test_app.get('/oauth/authorize', {
'client_id': client.identifier,
- 'scope': 'admin',
+ 'scope': 'all',
'redirect_uri': redirect_uri})
# User-agent should NOT be redirected
@@ -127,7 +128,7 @@ class TestOAuth(object):
form = ctx['form']
# Short for client authorization post reponse
- capr = test_app.post(
+ capr = self.test_app.post(
'/oauth/client/authorize', {
'client_id': form.client_id.data,
'allow': 'Allow',
@@ -142,21 +143,19 @@ class TestOAuth(object):
return authorization_response, client_identifier
def get_code_from_redirect_uri(self, uri):
+ ''' Get the value of ?code= from an URI '''
return parse_qs(urlparse(uri).query)['code'][0]
- def test_token_endpoint_successful_confidential_request(self, test_app):
+ def test_token_endpoint_successful_confidential_request(self):
''' Successful request against token endpoint '''
- self._setup(test_app)
-
- code_redirect, client_id = self.test_4_authorize_confidential_client(
- test_app)
+ code_redirect, client_id = self.test_4_authorize_confidential_client()
code = self.get_code_from_redirect_uri(code_redirect.location)
client = self.db.OAuthClient.query.filter(
self.db.OAuthClient.identifier == unicode(client_id)).first()
- token_res = test_app.get('/oauth/access_token?client_id={0}&\
+ token_res = self.test_app.get('/oauth/access_token?client_id={0}&\
code={1}&client_secret={2}'.format(client_id, code, client.secret))
assert token_res.status_int == 200
@@ -170,19 +169,21 @@ code={1}&client_secret={2}'.format(client_id, code, client.secret))
assert type(token_data['expires_in']) == int
assert token_data['expires_in'] > 0
- def test_token_endpont_missing_id_confidential_request(self, test_app):
- ''' Unsuccessful request against token endpoint, missing client_id '''
- self._setup(test_app)
+ # There should be a refresh token provided in the token data
+ assert len(token_data['refresh_token'])
+
+ return client_id, token_data
- code_redirect, client_id = self.test_4_authorize_confidential_client(
- test_app)
+ def test_token_endpont_missing_id_confidential_request(self):
+ ''' Unsuccessful request against token endpoint, missing client_id '''
+ code_redirect, client_id = self.test_4_authorize_confidential_client()
code = self.get_code_from_redirect_uri(code_redirect.location)
client = self.db.OAuthClient.query.filter(
self.db.OAuthClient.identifier == unicode(client_id)).first()
- token_res = test_app.get('/oauth/access_token?\
+ token_res = self.test_app.get('/oauth/access_token?\
code={0}&client_secret={1}'.format(code, client.secret))
assert token_res.status_int == 200
@@ -192,4 +193,30 @@ code={0}&client_secret={1}'.format(code, client.secret))
assert 'error' in token_data
assert not 'access_token' in token_data
assert token_data['error'] == 'invalid_request'
- assert token_data['error_description'] == 'Missing client_id in request'
+ assert len(token_data['error_description'])
+
+ def test_refresh_token(self):
+ ''' Try to get a new access token using the refresh token '''
+ # Get an access token and a refresh token
+ client_id, token_data =\
+ self.test_token_endpoint_successful_confidential_request()
+
+ client = self.db.OAuthClient.query.filter(
+ self.db.OAuthClient.identifier == client_id).first()
+
+ token_res = self.test_app.get('/oauth/access_token',
+ {'refresh_token': token_data['refresh_token'],
+ 'client_id': client_id,
+ 'client_secret': client.secret
+ })
+
+ assert token_res.status_int == 200
+
+ new_token_data = json.loads(token_res.body)
+
+ assert not 'error' in new_token_data
+ assert 'access_token' in new_token_data
+ assert 'token_type' in new_token_data
+ assert 'expires_in' in new_token_data
+ assert type(new_token_data['expires_in']) == int
+ assert new_token_data['expires_in'] > 0