aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--mediagoblin/federation/oauth.py6
-rw-r--r--mediagoblin/federation/tools/request.py19
-rw-r--r--mediagoblin/federation/views.py8
-rw-r--r--mediagoblin/tests/test_oauth1.py58
4 files changed, 71 insertions, 20 deletions
diff --git a/mediagoblin/federation/oauth.py b/mediagoblin/federation/oauth.py
index ea0fea2c..764b8535 100644
--- a/mediagoblin/federation/oauth.py
+++ b/mediagoblin/federation/oauth.py
@@ -26,8 +26,9 @@ class GMGRequestValidator(RequestValidator):
enforce_ssl = False
- def __init__(self, data=None):
+ def __init__(self, data=None, *args, **kwargs):
self.POST = data
+ super(GMGRequestValidator, self).__init__(*args, **kwargs)
def save_request_token(self, token, request):
""" Saves request token in db """
@@ -38,7 +39,8 @@ class GMGRequestValidator(RequestValidator):
secret=token["oauth_token_secret"],
)
request_token.client = client_id
- request_token.callback = token.get("oauth_callback", None)
+ if u"oauth_callback" in self.POST:
+ request_token.callback = self.POST[u"oauth_callback"]
request_token.save()
def save_verifier(self, token, verifier, request):
diff --git a/mediagoblin/federation/tools/request.py b/mediagoblin/federation/tools/request.py
index 4f5be277..6e484bb6 100644
--- a/mediagoblin/federation/tools/request.py
+++ b/mediagoblin/federation/tools/request.py
@@ -14,14 +14,19 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import re
-
-# Regex for parsing Authorization string
-auth_header_re = re.compile('(\w+)[:=] ?"?(\w+)"?')
-
def decode_authorization_header(header):
""" Decodes a HTTP Authorization Header to python dictionary """
- authorization = header.get("Authorization", "")
- tokens = dict(auth_header_re.findall(authorization))
+ authorization = header.get("Authorization", "").lstrip(" ").lstrip("OAuth")
+ tokens = {}
+
+ for param in authorization.split(","):
+ key, value = param.split("=")
+
+ key = key.lstrip(" ")
+ value = value.lstrip(" ").lstrip('"')
+ value = value.rstrip(" ").rstrip('"')
+
+ tokens[key] = value
+
return tokens
diff --git a/mediagoblin/federation/views.py b/mediagoblin/federation/views.py
index 7eb9f148..633a19d4 100644
--- a/mediagoblin/federation/views.py
+++ b/mediagoblin/federation/views.py
@@ -198,7 +198,6 @@ def request_token(request):
authorization = decode_authorization_header(data)
-
if authorization == dict() or u"oauth_consumer_key" not in authorization:
error = "Missing required parameter."
return json_response({"error": error}, status=400)
@@ -206,12 +205,13 @@ def request_token(request):
# check the client_id
client_id = authorization[u"oauth_consumer_key"]
client = Client.query.filter_by(id=client_id).first()
- if client is None:
+
+ if client == None:
# client_id is invalid
error = "Invalid client_id"
return json_response({"error": error}, status=400)
- # make request token and return to client
+ # make request token and return to client
request_validator = GMGRequestValidator(authorization)
rv = RequestTokenEndpoint(request_validator)
tokens = rv.create_request_token(request, authorization)
@@ -219,7 +219,7 @@ def request_token(request):
# store the nonce & timestamp before we return back
nonce = authorization[u"oauth_nonce"]
timestamp = authorization[u"oauth_timestamp"]
- timestamp = datetime.datetime.fromtimestamp(int(timestamp))
+ timestamp = datetime.datetime.fromtimestamp(float(timestamp))
nc = NonceTimestamp(nonce=nonce, timestamp=timestamp)
nc.save()
diff --git a/mediagoblin/tests/test_oauth1.py b/mediagoblin/tests/test_oauth1.py
index f3b44850..073c2884 100644
--- a/mediagoblin/tests/test_oauth1.py
+++ b/mediagoblin/tests/test_oauth1.py
@@ -14,17 +14,23 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import json
+import cgi
import pytest
from urlparse import parse_qs, urlparse
+from oauthlib.oauth1 import Client
+
from mediagoblin import mg_globals
from mediagoblin.tools import template, pluginapi
from mediagoblin.tests.tools import fixture_add_user
class TestOAuth(object):
+
+ MIME_FORM = "application/x-www-form-urlencoded"
+ MIME_JSON = "application/json"
+
@pytest.fixture(autouse=True)
def setup(self, test_app):
self.test_app = test_app
@@ -54,7 +60,7 @@ class TestOAuth(object):
def test_client_client_register_limited_info(self):
""" Tests that a client can be registered with limited information """
response = self.register_client()
- client_info = json.loads(response.body)
+ client_info = response.json
client = self.db.Client.query.filter_by(id=client_info["client_id"]).first()
@@ -72,7 +78,7 @@ class TestOAuth(object):
}
response = self.register_client(**query)
- client_info = json.loads(response.body)
+ client_info = response.json
client = self.db.Client.query.filter_by(id=client_info["client_id"]).first()
@@ -89,7 +95,7 @@ class TestOAuth(object):
# first we need to register a client
response = self.register_client()
- client_info = json.loads(response.body)
+ client_info = response.json
client = self.db.Client.query.filter_by(id=client_info["client_id"]).first()
# Now update
@@ -105,8 +111,8 @@ class TestOAuth(object):
update_response = self.register_client(**update_query)
assert update_response.status_int == 200
- client_info = json.loads(update_response.body)
- client = self.Client.query.filter_by(id=client_info["client_id"]).first()
+ client_info = update_response.json
+ client = self.db.Client.query.filter_by(id=client_info["client_id"]).first()
assert client.secret == client_info["client_secret"]
assert client.application_type == update_query["application_type"]
@@ -115,8 +121,46 @@ class TestOAuth(object):
assert client.logo_url == update_query["logo_url"]
assert client.redirect_uri == update_query["redirect_uris"].split()
- def request_token(self):
+ def to_authorize_headers(self, data):
+ headers = ""
+ for key, value in data.items():
+ headers += '{0}="{1}",'.format(key, value)
+ return {"Authorization": "OAuth " + headers[:-1]}
+
+ def test_request_token(self):
""" Test a request for a request token """
response = self.register_client()
+ client_id = response.json["client_id"]
+
+ endpoint = "/oauth/request_token"
+ request_query = {
+ "oauth_consumer_key": client_id,
+ "oauth_nonce": "abcdefghij",
+ "oauth_timestamp": 123456789.0,
+ "oauth_callback": "https://some.url/callback",
+ }
+
+ headers = self.to_authorize_headers(request_query)
+
+ headers["Content-Type"] = self.MIME_FORM
+
+ response = self.test_app.post(endpoint, headers=headers)
+ response = cgi.parse_qs(response.body)
+
+ # each element is a list, reduce it to a string
+ for key, value in response.items():
+ response[key] = value[0]
+
+ request_token = self.db.RequestToken.query.filter_by(
+ token=response["oauth_token"]
+ ).first()
+
+ client = self.db.Client.query.filter_by(id=client_id).first()
+
+ assert request_token is not None
+ assert request_token.secret == response["oauth_token_secret"]
+ assert request_token.client == client.id
+ assert request_token.used == False
+ assert request_token.callback == request_query["oauth_callback"]