diff options
Diffstat (limited to 'python/itsdangerous/jws.py')
-rw-r--r-- | python/itsdangerous/jws.py | 218 |
1 files changed, 218 insertions, 0 deletions
diff --git a/python/itsdangerous/jws.py b/python/itsdangerous/jws.py new file mode 100644 index 0000000..92e9ec8 --- /dev/null +++ b/python/itsdangerous/jws.py @@ -0,0 +1,218 @@ +import hashlib +import time +from datetime import datetime + +from ._compat import number_types +from ._json import _CompactJSON +from ._json import json +from .encoding import base64_decode +from .encoding import base64_encode +from .encoding import want_bytes +from .exc import BadData +from .exc import BadHeader +from .exc import BadPayload +from .exc import BadSignature +from .exc import SignatureExpired +from .serializer import Serializer +from .signer import HMACAlgorithm +from .signer import NoneAlgorithm + + +class JSONWebSignatureSerializer(Serializer): + """This serializer implements JSON Web Signature (JWS) support. Only + supports the JWS Compact Serialization. + """ + + jws_algorithms = { + "HS256": HMACAlgorithm(hashlib.sha256), + "HS384": HMACAlgorithm(hashlib.sha384), + "HS512": HMACAlgorithm(hashlib.sha512), + "none": NoneAlgorithm(), + } + + #: The default algorithm to use for signature generation + default_algorithm = "HS512" + + default_serializer = _CompactJSON + + def __init__( + self, + secret_key, + salt=None, + serializer=None, + serializer_kwargs=None, + signer=None, + signer_kwargs=None, + algorithm_name=None, + ): + Serializer.__init__( + self, + secret_key=secret_key, + salt=salt, + serializer=serializer, + serializer_kwargs=serializer_kwargs, + signer=signer, + signer_kwargs=signer_kwargs, + ) + if algorithm_name is None: + algorithm_name = self.default_algorithm + self.algorithm_name = algorithm_name + self.algorithm = self.make_algorithm(algorithm_name) + + def load_payload(self, payload, serializer=None, return_header=False): + payload = want_bytes(payload) + if b"." not in payload: + raise BadPayload('No "." found in value') + base64d_header, base64d_payload = payload.split(b".", 1) + try: + json_header = base64_decode(base64d_header) + except Exception as e: + raise BadHeader( + "Could not base64 decode the header because of an exception", + original_error=e, + ) + try: + json_payload = base64_decode(base64d_payload) + except Exception as e: + raise BadPayload( + "Could not base64 decode the payload because of an exception", + original_error=e, + ) + try: + header = Serializer.load_payload(self, json_header, serializer=json) + except BadData as e: + raise BadHeader( + "Could not unserialize header because it was malformed", + original_error=e, + ) + if not isinstance(header, dict): + raise BadHeader("Header payload is not a JSON object", header=header) + payload = Serializer.load_payload(self, json_payload, serializer=serializer) + if return_header: + return payload, header + return payload + + def dump_payload(self, header, obj): + base64d_header = base64_encode( + self.serializer.dumps(header, **self.serializer_kwargs) + ) + base64d_payload = base64_encode( + self.serializer.dumps(obj, **self.serializer_kwargs) + ) + return base64d_header + b"." + base64d_payload + + def make_algorithm(self, algorithm_name): + try: + return self.jws_algorithms[algorithm_name] + except KeyError: + raise NotImplementedError("Algorithm not supported") + + def make_signer(self, salt=None, algorithm=None): + if salt is None: + salt = self.salt + key_derivation = "none" if salt is None else None + if algorithm is None: + algorithm = self.algorithm + return self.signer( + self.secret_key, + salt=salt, + sep=".", + key_derivation=key_derivation, + algorithm=algorithm, + ) + + def make_header(self, header_fields): + header = header_fields.copy() if header_fields else {} + header["alg"] = self.algorithm_name + return header + + def dumps(self, obj, salt=None, header_fields=None): + """Like :meth:`.Serializer.dumps` but creates a JSON Web + Signature. It also allows for specifying additional fields to be + included in the JWS header. + """ + header = self.make_header(header_fields) + signer = self.make_signer(salt, self.algorithm) + return signer.sign(self.dump_payload(header, obj)) + + def loads(self, s, salt=None, return_header=False): + """Reverse of :meth:`dumps`. If requested via ``return_header`` + it will return a tuple of payload and header. + """ + payload, header = self.load_payload( + self.make_signer(salt, self.algorithm).unsign(want_bytes(s)), + return_header=True, + ) + if header.get("alg") != self.algorithm_name: + raise BadHeader("Algorithm mismatch", header=header, payload=payload) + if return_header: + return payload, header + return payload + + def loads_unsafe(self, s, salt=None, return_header=False): + kwargs = {"return_header": return_header} + return self._loads_unsafe_impl(s, salt, kwargs, kwargs) + + +class TimedJSONWebSignatureSerializer(JSONWebSignatureSerializer): + """Works like the regular :class:`JSONWebSignatureSerializer` but + also records the time of the signing and can be used to expire + signatures. + + JWS currently does not specify this behavior but it mentions a + possible extension like this in the spec. Expiry date is encoded + into the header similar to what's specified in `draft-ietf-oauth + -json-web-token <http://self-issued.info/docs/draft-ietf-oauth-json + -web-token.html#expDef>`_. + """ + + DEFAULT_EXPIRES_IN = 3600 + + def __init__(self, secret_key, expires_in=None, **kwargs): + JSONWebSignatureSerializer.__init__(self, secret_key, **kwargs) + if expires_in is None: + expires_in = self.DEFAULT_EXPIRES_IN + self.expires_in = expires_in + + def make_header(self, header_fields): + header = JSONWebSignatureSerializer.make_header(self, header_fields) + iat = self.now() + exp = iat + self.expires_in + header["iat"] = iat + header["exp"] = exp + return header + + def loads(self, s, salt=None, return_header=False): + payload, header = JSONWebSignatureSerializer.loads( + self, s, salt, return_header=True + ) + + if "exp" not in header: + raise BadSignature("Missing expiry date", payload=payload) + + int_date_error = BadHeader("Expiry date is not an IntDate", payload=payload) + try: + header["exp"] = int(header["exp"]) + except ValueError: + raise int_date_error + if header["exp"] < 0: + raise int_date_error + + if header["exp"] < self.now(): + raise SignatureExpired( + "Signature expired", + payload=payload, + date_signed=self.get_issue_date(header), + ) + + if return_header: + return payload, header + return payload + + def get_issue_date(self, header): + rv = header.get("iat") + if isinstance(rv, number_types): + return datetime.utcfromtimestamp(int(rv)) + + def now(self): + return int(time.time()) |