aboutsummaryrefslogtreecommitdiffstats
path: root/python/itsdangerous/jws.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/itsdangerous/jws.py')
-rw-r--r--python/itsdangerous/jws.py218
1 files changed, 0 insertions, 218 deletions
diff --git a/python/itsdangerous/jws.py b/python/itsdangerous/jws.py
deleted file mode 100644
index 92e9ec8..0000000
--- a/python/itsdangerous/jws.py
+++ /dev/null
@@ -1,218 +0,0 @@
-import hashlib
-import time
-from datetime import datetime
-
-from ._compat import number_types
-from ._json import _CompactJSON
-from ._json import json
-from .encoding import base64_decode
-from .encoding import base64_encode
-from .encoding import want_bytes
-from .exc import BadData
-from .exc import BadHeader
-from .exc import BadPayload
-from .exc import BadSignature
-from .exc import SignatureExpired
-from .serializer import Serializer
-from .signer import HMACAlgorithm
-from .signer import NoneAlgorithm
-
-
-class JSONWebSignatureSerializer(Serializer):
- """This serializer implements JSON Web Signature (JWS) support. Only
- supports the JWS Compact Serialization.
- """
-
- jws_algorithms = {
- "HS256": HMACAlgorithm(hashlib.sha256),
- "HS384": HMACAlgorithm(hashlib.sha384),
- "HS512": HMACAlgorithm(hashlib.sha512),
- "none": NoneAlgorithm(),
- }
-
- #: The default algorithm to use for signature generation
- default_algorithm = "HS512"
-
- default_serializer = _CompactJSON
-
- def __init__(
- self,
- secret_key,
- salt=None,
- serializer=None,
- serializer_kwargs=None,
- signer=None,
- signer_kwargs=None,
- algorithm_name=None,
- ):
- Serializer.__init__(
- self,
- secret_key=secret_key,
- salt=salt,
- serializer=serializer,
- serializer_kwargs=serializer_kwargs,
- signer=signer,
- signer_kwargs=signer_kwargs,
- )
- if algorithm_name is None:
- algorithm_name = self.default_algorithm
- self.algorithm_name = algorithm_name
- self.algorithm = self.make_algorithm(algorithm_name)
-
- def load_payload(self, payload, serializer=None, return_header=False):
- payload = want_bytes(payload)
- if b"." not in payload:
- raise BadPayload('No "." found in value')
- base64d_header, base64d_payload = payload.split(b".", 1)
- try:
- json_header = base64_decode(base64d_header)
- except Exception as e:
- raise BadHeader(
- "Could not base64 decode the header because of an exception",
- original_error=e,
- )
- try:
- json_payload = base64_decode(base64d_payload)
- except Exception as e:
- raise BadPayload(
- "Could not base64 decode the payload because of an exception",
- original_error=e,
- )
- try:
- header = Serializer.load_payload(self, json_header, serializer=json)
- except BadData as e:
- raise BadHeader(
- "Could not unserialize header because it was malformed",
- original_error=e,
- )
- if not isinstance(header, dict):
- raise BadHeader("Header payload is not a JSON object", header=header)
- payload = Serializer.load_payload(self, json_payload, serializer=serializer)
- if return_header:
- return payload, header
- return payload
-
- def dump_payload(self, header, obj):
- base64d_header = base64_encode(
- self.serializer.dumps(header, **self.serializer_kwargs)
- )
- base64d_payload = base64_encode(
- self.serializer.dumps(obj, **self.serializer_kwargs)
- )
- return base64d_header + b"." + base64d_payload
-
- def make_algorithm(self, algorithm_name):
- try:
- return self.jws_algorithms[algorithm_name]
- except KeyError:
- raise NotImplementedError("Algorithm not supported")
-
- def make_signer(self, salt=None, algorithm=None):
- if salt is None:
- salt = self.salt
- key_derivation = "none" if salt is None else None
- if algorithm is None:
- algorithm = self.algorithm
- return self.signer(
- self.secret_key,
- salt=salt,
- sep=".",
- key_derivation=key_derivation,
- algorithm=algorithm,
- )
-
- def make_header(self, header_fields):
- header = header_fields.copy() if header_fields else {}
- header["alg"] = self.algorithm_name
- return header
-
- def dumps(self, obj, salt=None, header_fields=None):
- """Like :meth:`.Serializer.dumps` but creates a JSON Web
- Signature. It also allows for specifying additional fields to be
- included in the JWS header.
- """
- header = self.make_header(header_fields)
- signer = self.make_signer(salt, self.algorithm)
- return signer.sign(self.dump_payload(header, obj))
-
- def loads(self, s, salt=None, return_header=False):
- """Reverse of :meth:`dumps`. If requested via ``return_header``
- it will return a tuple of payload and header.
- """
- payload, header = self.load_payload(
- self.make_signer(salt, self.algorithm).unsign(want_bytes(s)),
- return_header=True,
- )
- if header.get("alg") != self.algorithm_name:
- raise BadHeader("Algorithm mismatch", header=header, payload=payload)
- if return_header:
- return payload, header
- return payload
-
- def loads_unsafe(self, s, salt=None, return_header=False):
- kwargs = {"return_header": return_header}
- return self._loads_unsafe_impl(s, salt, kwargs, kwargs)
-
-
-class TimedJSONWebSignatureSerializer(JSONWebSignatureSerializer):
- """Works like the regular :class:`JSONWebSignatureSerializer` but
- also records the time of the signing and can be used to expire
- signatures.
-
- JWS currently does not specify this behavior but it mentions a
- possible extension like this in the spec. Expiry date is encoded
- into the header similar to what's specified in `draft-ietf-oauth
- -json-web-token <http://self-issued.info/docs/draft-ietf-oauth-json
- -web-token.html#expDef>`_.
- """
-
- DEFAULT_EXPIRES_IN = 3600
-
- def __init__(self, secret_key, expires_in=None, **kwargs):
- JSONWebSignatureSerializer.__init__(self, secret_key, **kwargs)
- if expires_in is None:
- expires_in = self.DEFAULT_EXPIRES_IN
- self.expires_in = expires_in
-
- def make_header(self, header_fields):
- header = JSONWebSignatureSerializer.make_header(self, header_fields)
- iat = self.now()
- exp = iat + self.expires_in
- header["iat"] = iat
- header["exp"] = exp
- return header
-
- def loads(self, s, salt=None, return_header=False):
- payload, header = JSONWebSignatureSerializer.loads(
- self, s, salt, return_header=True
- )
-
- if "exp" not in header:
- raise BadSignature("Missing expiry date", payload=payload)
-
- int_date_error = BadHeader("Expiry date is not an IntDate", payload=payload)
- try:
- header["exp"] = int(header["exp"])
- except ValueError:
- raise int_date_error
- if header["exp"] < 0:
- raise int_date_error
-
- if header["exp"] < self.now():
- raise SignatureExpired(
- "Signature expired",
- payload=payload,
- date_signed=self.get_issue_date(header),
- )
-
- if return_header:
- return payload, header
- return payload
-
- def get_issue_date(self, header):
- rv = header.get("iat")
- if isinstance(rv, number_types):
- return datetime.utcfromtimestamp(int(rv))
-
- def now(self):
- return int(time.time())