aboutsummaryrefslogtreecommitdiffstats
path: root/python/itsdangerous/jws.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/itsdangerous/jws.py')
-rw-r--r--python/itsdangerous/jws.py218
1 files changed, 218 insertions, 0 deletions
diff --git a/python/itsdangerous/jws.py b/python/itsdangerous/jws.py
new file mode 100644
index 0000000..92e9ec8
--- /dev/null
+++ b/python/itsdangerous/jws.py
@@ -0,0 +1,218 @@
+import hashlib
+import time
+from datetime import datetime
+
+from ._compat import number_types
+from ._json import _CompactJSON
+from ._json import json
+from .encoding import base64_decode
+from .encoding import base64_encode
+from .encoding import want_bytes
+from .exc import BadData
+from .exc import BadHeader
+from .exc import BadPayload
+from .exc import BadSignature
+from .exc import SignatureExpired
+from .serializer import Serializer
+from .signer import HMACAlgorithm
+from .signer import NoneAlgorithm
+
+
+class JSONWebSignatureSerializer(Serializer):
+ """This serializer implements JSON Web Signature (JWS) support. Only
+ supports the JWS Compact Serialization.
+ """
+
+ jws_algorithms = {
+ "HS256": HMACAlgorithm(hashlib.sha256),
+ "HS384": HMACAlgorithm(hashlib.sha384),
+ "HS512": HMACAlgorithm(hashlib.sha512),
+ "none": NoneAlgorithm(),
+ }
+
+ #: The default algorithm to use for signature generation
+ default_algorithm = "HS512"
+
+ default_serializer = _CompactJSON
+
+ def __init__(
+ self,
+ secret_key,
+ salt=None,
+ serializer=None,
+ serializer_kwargs=None,
+ signer=None,
+ signer_kwargs=None,
+ algorithm_name=None,
+ ):
+ Serializer.__init__(
+ self,
+ secret_key=secret_key,
+ salt=salt,
+ serializer=serializer,
+ serializer_kwargs=serializer_kwargs,
+ signer=signer,
+ signer_kwargs=signer_kwargs,
+ )
+ if algorithm_name is None:
+ algorithm_name = self.default_algorithm
+ self.algorithm_name = algorithm_name
+ self.algorithm = self.make_algorithm(algorithm_name)
+
+ def load_payload(self, payload, serializer=None, return_header=False):
+ payload = want_bytes(payload)
+ if b"." not in payload:
+ raise BadPayload('No "." found in value')
+ base64d_header, base64d_payload = payload.split(b".", 1)
+ try:
+ json_header = base64_decode(base64d_header)
+ except Exception as e:
+ raise BadHeader(
+ "Could not base64 decode the header because of an exception",
+ original_error=e,
+ )
+ try:
+ json_payload = base64_decode(base64d_payload)
+ except Exception as e:
+ raise BadPayload(
+ "Could not base64 decode the payload because of an exception",
+ original_error=e,
+ )
+ try:
+ header = Serializer.load_payload(self, json_header, serializer=json)
+ except BadData as e:
+ raise BadHeader(
+ "Could not unserialize header because it was malformed",
+ original_error=e,
+ )
+ if not isinstance(header, dict):
+ raise BadHeader("Header payload is not a JSON object", header=header)
+ payload = Serializer.load_payload(self, json_payload, serializer=serializer)
+ if return_header:
+ return payload, header
+ return payload
+
+ def dump_payload(self, header, obj):
+ base64d_header = base64_encode(
+ self.serializer.dumps(header, **self.serializer_kwargs)
+ )
+ base64d_payload = base64_encode(
+ self.serializer.dumps(obj, **self.serializer_kwargs)
+ )
+ return base64d_header + b"." + base64d_payload
+
+ def make_algorithm(self, algorithm_name):
+ try:
+ return self.jws_algorithms[algorithm_name]
+ except KeyError:
+ raise NotImplementedError("Algorithm not supported")
+
+ def make_signer(self, salt=None, algorithm=None):
+ if salt is None:
+ salt = self.salt
+ key_derivation = "none" if salt is None else None
+ if algorithm is None:
+ algorithm = self.algorithm
+ return self.signer(
+ self.secret_key,
+ salt=salt,
+ sep=".",
+ key_derivation=key_derivation,
+ algorithm=algorithm,
+ )
+
+ def make_header(self, header_fields):
+ header = header_fields.copy() if header_fields else {}
+ header["alg"] = self.algorithm_name
+ return header
+
+ def dumps(self, obj, salt=None, header_fields=None):
+ """Like :meth:`.Serializer.dumps` but creates a JSON Web
+ Signature. It also allows for specifying additional fields to be
+ included in the JWS header.
+ """
+ header = self.make_header(header_fields)
+ signer = self.make_signer(salt, self.algorithm)
+ return signer.sign(self.dump_payload(header, obj))
+
+ def loads(self, s, salt=None, return_header=False):
+ """Reverse of :meth:`dumps`. If requested via ``return_header``
+ it will return a tuple of payload and header.
+ """
+ payload, header = self.load_payload(
+ self.make_signer(salt, self.algorithm).unsign(want_bytes(s)),
+ return_header=True,
+ )
+ if header.get("alg") != self.algorithm_name:
+ raise BadHeader("Algorithm mismatch", header=header, payload=payload)
+ if return_header:
+ return payload, header
+ return payload
+
+ def loads_unsafe(self, s, salt=None, return_header=False):
+ kwargs = {"return_header": return_header}
+ return self._loads_unsafe_impl(s, salt, kwargs, kwargs)
+
+
+class TimedJSONWebSignatureSerializer(JSONWebSignatureSerializer):
+ """Works like the regular :class:`JSONWebSignatureSerializer` but
+ also records the time of the signing and can be used to expire
+ signatures.
+
+ JWS currently does not specify this behavior but it mentions a
+ possible extension like this in the spec. Expiry date is encoded
+ into the header similar to what's specified in `draft-ietf-oauth
+ -json-web-token <http://self-issued.info/docs/draft-ietf-oauth-json
+ -web-token.html#expDef>`_.
+ """
+
+ DEFAULT_EXPIRES_IN = 3600
+
+ def __init__(self, secret_key, expires_in=None, **kwargs):
+ JSONWebSignatureSerializer.__init__(self, secret_key, **kwargs)
+ if expires_in is None:
+ expires_in = self.DEFAULT_EXPIRES_IN
+ self.expires_in = expires_in
+
+ def make_header(self, header_fields):
+ header = JSONWebSignatureSerializer.make_header(self, header_fields)
+ iat = self.now()
+ exp = iat + self.expires_in
+ header["iat"] = iat
+ header["exp"] = exp
+ return header
+
+ def loads(self, s, salt=None, return_header=False):
+ payload, header = JSONWebSignatureSerializer.loads(
+ self, s, salt, return_header=True
+ )
+
+ if "exp" not in header:
+ raise BadSignature("Missing expiry date", payload=payload)
+
+ int_date_error = BadHeader("Expiry date is not an IntDate", payload=payload)
+ try:
+ header["exp"] = int(header["exp"])
+ except ValueError:
+ raise int_date_error
+ if header["exp"] < 0:
+ raise int_date_error
+
+ if header["exp"] < self.now():
+ raise SignatureExpired(
+ "Signature expired",
+ payload=payload,
+ date_signed=self.get_issue_date(header),
+ )
+
+ if return_header:
+ return payload, header
+ return payload
+
+ def get_issue_date(self, header):
+ rv = header.get("iat")
+ if isinstance(rv, number_types):
+ return datetime.utcfromtimestamp(int(rv))
+
+ def now(self):
+ return int(time.time())