diff options
Diffstat (limited to 'python/itsdangerous')
| -rw-r--r-- | python/itsdangerous/__init__.py | 22 | ||||
| -rw-r--r-- | python/itsdangerous/_compat.py | 46 | ||||
| -rw-r--r-- | python/itsdangerous/_json.py | 18 | ||||
| -rw-r--r-- | python/itsdangerous/encoding.py | 49 | ||||
| -rw-r--r-- | python/itsdangerous/exc.py | 98 | ||||
| -rw-r--r-- | python/itsdangerous/jws.py | 218 | ||||
| -rw-r--r-- | python/itsdangerous/serializer.py | 233 | ||||
| -rw-r--r-- | python/itsdangerous/signer.py | 179 | ||||
| -rw-r--r-- | python/itsdangerous/timed.py | 147 | ||||
| -rw-r--r-- | python/itsdangerous/url_safe.py | 65 | 
10 files changed, 1075 insertions, 0 deletions
| diff --git a/python/itsdangerous/__init__.py b/python/itsdangerous/__init__.py new file mode 100644 index 0000000..0fcd8c1 --- /dev/null +++ b/python/itsdangerous/__init__.py @@ -0,0 +1,22 @@ +from ._json import json +from .encoding import base64_decode +from .encoding import base64_encode +from .encoding import want_bytes +from .exc import BadData +from .exc import BadHeader +from .exc import BadPayload +from .exc import BadSignature +from .exc import BadTimeSignature +from .exc import SignatureExpired +from .jws import JSONWebSignatureSerializer +from .jws import TimedJSONWebSignatureSerializer +from .serializer import Serializer +from .signer import HMACAlgorithm +from .signer import NoneAlgorithm +from .signer import Signer +from .timed import TimedSerializer +from .timed import TimestampSigner +from .url_safe import URLSafeSerializer +from .url_safe import URLSafeTimedSerializer + +__version__ = "1.1.0" diff --git a/python/itsdangerous/_compat.py b/python/itsdangerous/_compat.py new file mode 100644 index 0000000..2291bce --- /dev/null +++ b/python/itsdangerous/_compat.py @@ -0,0 +1,46 @@ +import decimal +import hmac +import numbers +import sys + +PY2 = sys.version_info[0] == 2 + +if PY2: +    from itertools import izip + +    text_type = unicode  # noqa: 821 +else: +    izip = zip +    text_type = str + +number_types = (numbers.Real, decimal.Decimal) + + +def _constant_time_compare(val1, val2): +    """Return ``True`` if the two strings are equal, ``False`` +    otherwise. + +    The time taken is independent of the number of characters that +    match. Do not use this function for anything else than comparision +    with known length targets. + +    This is should be implemented in C in order to get it completely +    right. + +    This is an alias of :func:`hmac.compare_digest` on Python>=2.7,3.3. +    """ +    len_eq = len(val1) == len(val2) +    if len_eq: +        result = 0 +        left = val1 +    else: +        result = 1 +        left = val2 +    for x, y in izip(bytearray(left), bytearray(val2)): +        result |= x ^ y +    return result == 0 + + +# Starting with 2.7/3.3 the standard library has a c-implementation for +# constant time string compares. +constant_time_compare = getattr(hmac, "compare_digest", _constant_time_compare) diff --git a/python/itsdangerous/_json.py b/python/itsdangerous/_json.py new file mode 100644 index 0000000..426b36e --- /dev/null +++ b/python/itsdangerous/_json.py @@ -0,0 +1,18 @@ +try: +    import simplejson as json +except ImportError: +    import json + + +class _CompactJSON(object): +    """Wrapper around json module that strips whitespace.""" + +    @staticmethod +    def loads(payload): +        return json.loads(payload) + +    @staticmethod +    def dumps(obj, **kwargs): +        kwargs.setdefault("ensure_ascii", False) +        kwargs.setdefault("separators", (",", ":")) +        return json.dumps(obj, **kwargs) diff --git a/python/itsdangerous/encoding.py b/python/itsdangerous/encoding.py new file mode 100644 index 0000000..1e28969 --- /dev/null +++ b/python/itsdangerous/encoding.py @@ -0,0 +1,49 @@ +import base64 +import string +import struct + +from ._compat import text_type +from .exc import BadData + + +def want_bytes(s, encoding="utf-8", errors="strict"): +    if isinstance(s, text_type): +        s = s.encode(encoding, errors) +    return s + + +def base64_encode(string): +    """Base64 encode a string of bytes or text. The resulting bytes are +    safe to use in URLs. +    """ +    string = want_bytes(string) +    return base64.urlsafe_b64encode(string).rstrip(b"=") + + +def base64_decode(string): +    """Base64 decode a URL-safe string of bytes or text. The result is +    bytes. +    """ +    string = want_bytes(string, encoding="ascii", errors="ignore") +    string += b"=" * (-len(string) % 4) + +    try: +        return base64.urlsafe_b64decode(string) +    except (TypeError, ValueError): +        raise BadData("Invalid base64-encoded data") + + +# The alphabet used by base64.urlsafe_* +_base64_alphabet = (string.ascii_letters + string.digits + "-_=").encode("ascii") + +_int64_struct = struct.Struct(">Q") +_int_to_bytes = _int64_struct.pack +_bytes_to_int = _int64_struct.unpack + + +def int_to_bytes(num): +    return _int_to_bytes(num).lstrip(b"\x00") + + +def bytes_to_int(bytestr): +    return _bytes_to_int(bytestr.rjust(8, b"\x00"))[0] diff --git a/python/itsdangerous/exc.py b/python/itsdangerous/exc.py new file mode 100644 index 0000000..287d691 --- /dev/null +++ b/python/itsdangerous/exc.py @@ -0,0 +1,98 @@ +from ._compat import PY2 +from ._compat import text_type + + +class BadData(Exception): +    """Raised if bad data of any sort was encountered. This is the base +    for all exceptions that itsdangerous defines. + +    .. versionadded:: 0.15 +    """ + +    message = None + +    def __init__(self, message): +        super(BadData, self).__init__(self, message) +        self.message = message + +    def __str__(self): +        return text_type(self.message) + +    if PY2: +        __unicode__ = __str__ + +        def __str__(self): +            return self.__unicode__().encode("utf-8") + + +class BadSignature(BadData): +    """Raised if a signature does not match.""" + +    def __init__(self, message, payload=None): +        BadData.__init__(self, message) + +        #: The payload that failed the signature test. In some +        #: situations you might still want to inspect this, even if +        #: you know it was tampered with. +        #: +        #: .. versionadded:: 0.14 +        self.payload = payload + + +class BadTimeSignature(BadSignature): +    """Raised if a time-based signature is invalid. This is a subclass +    of :class:`BadSignature`. +    """ + +    def __init__(self, message, payload=None, date_signed=None): +        BadSignature.__init__(self, message, payload) + +        #: If the signature expired this exposes the date of when the +        #: signature was created. This can be helpful in order to +        #: tell the user how long a link has been gone stale. +        #: +        #: .. versionadded:: 0.14 +        self.date_signed = date_signed + + +class SignatureExpired(BadTimeSignature): +    """Raised if a signature timestamp is older than ``max_age``. This +    is a subclass of :exc:`BadTimeSignature`. +    """ + + +class BadHeader(BadSignature): +    """Raised if a signed header is invalid in some form. This only +    happens for serializers that have a header that goes with the +    signature. + +    .. versionadded:: 0.24 +    """ + +    def __init__(self, message, payload=None, header=None, original_error=None): +        BadSignature.__init__(self, message, payload) + +        #: If the header is actually available but just malformed it +        #: might be stored here. +        self.header = header + +        #: If available, the error that indicates why the payload was +        #: not valid. This might be ``None``. +        self.original_error = original_error + + +class BadPayload(BadData): +    """Raised if a payload is invalid. This could happen if the payload +    is loaded despite an invalid signature, or if there is a mismatch +    between the serializer and deserializer. The original exception +    that occurred during loading is stored on as :attr:`original_error`. + +    .. versionadded:: 0.15 +    """ + +    def __init__(self, message, original_error=None): +        BadData.__init__(self, message) + +        #: If available, the error that indicates why the payload was +        #: not valid. This might be ``None``. +        self.original_error = original_error diff --git a/python/itsdangerous/jws.py b/python/itsdangerous/jws.py new file mode 100644 index 0000000..92e9ec8 --- /dev/null +++ b/python/itsdangerous/jws.py @@ -0,0 +1,218 @@ +import hashlib +import time +from datetime import datetime + +from ._compat import number_types +from ._json import _CompactJSON +from ._json import json +from .encoding import base64_decode +from .encoding import base64_encode +from .encoding import want_bytes +from .exc import BadData +from .exc import BadHeader +from .exc import BadPayload +from .exc import BadSignature +from .exc import SignatureExpired +from .serializer import Serializer +from .signer import HMACAlgorithm +from .signer import NoneAlgorithm + + +class JSONWebSignatureSerializer(Serializer): +    """This serializer implements JSON Web Signature (JWS) support. Only +    supports the JWS Compact Serialization. +    """ + +    jws_algorithms = { +        "HS256": HMACAlgorithm(hashlib.sha256), +        "HS384": HMACAlgorithm(hashlib.sha384), +        "HS512": HMACAlgorithm(hashlib.sha512), +        "none": NoneAlgorithm(), +    } + +    #: The default algorithm to use for signature generation +    default_algorithm = "HS512" + +    default_serializer = _CompactJSON + +    def __init__( +        self, +        secret_key, +        salt=None, +        serializer=None, +        serializer_kwargs=None, +        signer=None, +        signer_kwargs=None, +        algorithm_name=None, +    ): +        Serializer.__init__( +            self, +            secret_key=secret_key, +            salt=salt, +            serializer=serializer, +            serializer_kwargs=serializer_kwargs, +            signer=signer, +            signer_kwargs=signer_kwargs, +        ) +        if algorithm_name is None: +            algorithm_name = self.default_algorithm +        self.algorithm_name = algorithm_name +        self.algorithm = self.make_algorithm(algorithm_name) + +    def load_payload(self, payload, serializer=None, return_header=False): +        payload = want_bytes(payload) +        if b"." not in payload: +            raise BadPayload('No "." found in value') +        base64d_header, base64d_payload = payload.split(b".", 1) +        try: +            json_header = base64_decode(base64d_header) +        except Exception as e: +            raise BadHeader( +                "Could not base64 decode the header because of an exception", +                original_error=e, +            ) +        try: +            json_payload = base64_decode(base64d_payload) +        except Exception as e: +            raise BadPayload( +                "Could not base64 decode the payload because of an exception", +                original_error=e, +            ) +        try: +            header = Serializer.load_payload(self, json_header, serializer=json) +        except BadData as e: +            raise BadHeader( +                "Could not unserialize header because it was malformed", +                original_error=e, +            ) +        if not isinstance(header, dict): +            raise BadHeader("Header payload is not a JSON object", header=header) +        payload = Serializer.load_payload(self, json_payload, serializer=serializer) +        if return_header: +            return payload, header +        return payload + +    def dump_payload(self, header, obj): +        base64d_header = base64_encode( +            self.serializer.dumps(header, **self.serializer_kwargs) +        ) +        base64d_payload = base64_encode( +            self.serializer.dumps(obj, **self.serializer_kwargs) +        ) +        return base64d_header + b"." + base64d_payload + +    def make_algorithm(self, algorithm_name): +        try: +            return self.jws_algorithms[algorithm_name] +        except KeyError: +            raise NotImplementedError("Algorithm not supported") + +    def make_signer(self, salt=None, algorithm=None): +        if salt is None: +            salt = self.salt +        key_derivation = "none" if salt is None else None +        if algorithm is None: +            algorithm = self.algorithm +        return self.signer( +            self.secret_key, +            salt=salt, +            sep=".", +            key_derivation=key_derivation, +            algorithm=algorithm, +        ) + +    def make_header(self, header_fields): +        header = header_fields.copy() if header_fields else {} +        header["alg"] = self.algorithm_name +        return header + +    def dumps(self, obj, salt=None, header_fields=None): +        """Like :meth:`.Serializer.dumps` but creates a JSON Web +        Signature. It also allows for specifying additional fields to be +        included in the JWS header. +        """ +        header = self.make_header(header_fields) +        signer = self.make_signer(salt, self.algorithm) +        return signer.sign(self.dump_payload(header, obj)) + +    def loads(self, s, salt=None, return_header=False): +        """Reverse of :meth:`dumps`. If requested via ``return_header`` +        it will return a tuple of payload and header. +        """ +        payload, header = self.load_payload( +            self.make_signer(salt, self.algorithm).unsign(want_bytes(s)), +            return_header=True, +        ) +        if header.get("alg") != self.algorithm_name: +            raise BadHeader("Algorithm mismatch", header=header, payload=payload) +        if return_header: +            return payload, header +        return payload + +    def loads_unsafe(self, s, salt=None, return_header=False): +        kwargs = {"return_header": return_header} +        return self._loads_unsafe_impl(s, salt, kwargs, kwargs) + + +class TimedJSONWebSignatureSerializer(JSONWebSignatureSerializer): +    """Works like the regular :class:`JSONWebSignatureSerializer` but +    also records the time of the signing and can be used to expire +    signatures. + +    JWS currently does not specify this behavior but it mentions a +    possible extension like this in the spec. Expiry date is encoded +    into the header similar to what's specified in `draft-ietf-oauth +    -json-web-token <http://self-issued.info/docs/draft-ietf-oauth-json +    -web-token.html#expDef>`_. +    """ + +    DEFAULT_EXPIRES_IN = 3600 + +    def __init__(self, secret_key, expires_in=None, **kwargs): +        JSONWebSignatureSerializer.__init__(self, secret_key, **kwargs) +        if expires_in is None: +            expires_in = self.DEFAULT_EXPIRES_IN +        self.expires_in = expires_in + +    def make_header(self, header_fields): +        header = JSONWebSignatureSerializer.make_header(self, header_fields) +        iat = self.now() +        exp = iat + self.expires_in +        header["iat"] = iat +        header["exp"] = exp +        return header + +    def loads(self, s, salt=None, return_header=False): +        payload, header = JSONWebSignatureSerializer.loads( +            self, s, salt, return_header=True +        ) + +        if "exp" not in header: +            raise BadSignature("Missing expiry date", payload=payload) + +        int_date_error = BadHeader("Expiry date is not an IntDate", payload=payload) +        try: +            header["exp"] = int(header["exp"]) +        except ValueError: +            raise int_date_error +        if header["exp"] < 0: +            raise int_date_error + +        if header["exp"] < self.now(): +            raise SignatureExpired( +                "Signature expired", +                payload=payload, +                date_signed=self.get_issue_date(header), +            ) + +        if return_header: +            return payload, header +        return payload + +    def get_issue_date(self, header): +        rv = header.get("iat") +        if isinstance(rv, number_types): +            return datetime.utcfromtimestamp(int(rv)) + +    def now(self): +        return int(time.time()) diff --git a/python/itsdangerous/serializer.py b/python/itsdangerous/serializer.py new file mode 100644 index 0000000..12c20f4 --- /dev/null +++ b/python/itsdangerous/serializer.py @@ -0,0 +1,233 @@ +import hashlib + +from ._compat import text_type +from ._json import json +from .encoding import want_bytes +from .exc import BadPayload +from .exc import BadSignature +from .signer import Signer + + +def is_text_serializer(serializer): +    """Checks whether a serializer generates text or binary.""" +    return isinstance(serializer.dumps({}), text_type) + + +class Serializer(object): +    """This class provides a serialization interface on top of the +    signer. It provides a similar API to json/pickle and other modules +    but is structured differently internally. If you want to change the +    underlying implementation for parsing and loading you have to +    override the :meth:`load_payload` and :meth:`dump_payload` +    functions. + +    This implementation uses simplejson if available for dumping and +    loading and will fall back to the standard library's json module if +    it's not available. + +    You do not need to subclass this class in order to switch out or +    customize the :class:`.Signer`. You can instead pass a different +    class to the constructor as well as keyword arguments as a dict that +    should be forwarded. + +    .. code-block:: python + +        s = Serializer(signer_kwargs={'key_derivation': 'hmac'}) + +    You may want to upgrade the signing parameters without invalidating +    existing signatures that are in use. Fallback signatures can be +    given that will be tried if unsigning with the current signer fails. + +    Fallback signers can be defined by providing a list of +    ``fallback_signers``. Each item can be one of the following: a +    signer class (which is instantiated with ``signer_kwargs``, +    ``salt``, and ``secret_key``), a tuple +    ``(signer_class, signer_kwargs)``, or a dict of ``signer_kwargs``. + +    For example, this is a serializer that signs using SHA-512, but will +    unsign using either SHA-512 or SHA1: + +    .. code-block:: python + +        s = Serializer( +            signer_kwargs={"digest_method": hashlib.sha512}, +            fallback_signers=[{"digest_method": hashlib.sha1}] +        ) + +    .. versionchanged:: 0.14: +        The ``signer`` and ``signer_kwargs`` parameters were added to +        the constructor. + +    .. versionchanged:: 1.1.0: +        Added support for ``fallback_signers`` and configured a default +        SHA-512 fallback. This fallback is for users who used the yanked +        1.0.0 release which defaulted to SHA-512. +    """ + +    #: If a serializer module or class is not passed to the constructor +    #: this one is picked up. This currently defaults to :mod:`json`. +    default_serializer = json + +    #: The default :class:`Signer` class that is being used by this +    #: serializer. +    #: +    #: .. versionadded:: 0.14 +    default_signer = Signer + +    #: The default fallback signers. +    default_fallback_signers = [{"digest_method": hashlib.sha512}] + +    def __init__( +        self, +        secret_key, +        salt=b"itsdangerous", +        serializer=None, +        serializer_kwargs=None, +        signer=None, +        signer_kwargs=None, +        fallback_signers=None, +    ): +        self.secret_key = want_bytes(secret_key) +        self.salt = want_bytes(salt) +        if serializer is None: +            serializer = self.default_serializer +        self.serializer = serializer +        self.is_text_serializer = is_text_serializer(serializer) +        if signer is None: +            signer = self.default_signer +        self.signer = signer +        self.signer_kwargs = signer_kwargs or {} +        if fallback_signers is None: +            fallback_signers = list(self.default_fallback_signers or ()) +        self.fallback_signers = fallback_signers +        self.serializer_kwargs = serializer_kwargs or {} + +    def load_payload(self, payload, serializer=None): +        """Loads the encoded object. This function raises +        :class:`.BadPayload` if the payload is not valid. The +        ``serializer`` parameter can be used to override the serializer +        stored on the class. The encoded ``payload`` should always be +        bytes. +        """ +        if serializer is None: +            serializer = self.serializer +            is_text = self.is_text_serializer +        else: +            is_text = is_text_serializer(serializer) +        try: +            if is_text: +                payload = payload.decode("utf-8") +            return serializer.loads(payload) +        except Exception as e: +            raise BadPayload( +                "Could not load the payload because an exception" +                " occurred on unserializing the data.", +                original_error=e, +            ) + +    def dump_payload(self, obj): +        """Dumps the encoded object. The return value is always bytes. +        If the internal serializer returns text, the value will be +        encoded as UTF-8. +        """ +        return want_bytes(self.serializer.dumps(obj, **self.serializer_kwargs)) + +    def make_signer(self, salt=None): +        """Creates a new instance of the signer to be used. The default +        implementation uses the :class:`.Signer` base class. +        """ +        if salt is None: +            salt = self.salt +        return self.signer(self.secret_key, salt=salt, **self.signer_kwargs) + +    def iter_unsigners(self, salt=None): +        """Iterates over all signers to be tried for unsigning. Starts +        with the configured signer, then constructs each signer +        specified in ``fallback_signers``. +        """ +        if salt is None: +            salt = self.salt +        yield self.make_signer(salt) +        for fallback in self.fallback_signers: +            if type(fallback) is dict: +                kwargs = fallback +                fallback = self.signer +            elif type(fallback) is tuple: +                fallback, kwargs = fallback +            else: +                kwargs = self.signer_kwargs +            yield fallback(self.secret_key, salt=salt, **kwargs) + +    def dumps(self, obj, salt=None): +        """Returns a signed string serialized with the internal +        serializer. The return value can be either a byte or unicode +        string depending on the format of the internal serializer. +        """ +        payload = want_bytes(self.dump_payload(obj)) +        rv = self.make_signer(salt).sign(payload) +        if self.is_text_serializer: +            rv = rv.decode("utf-8") +        return rv + +    def dump(self, obj, f, salt=None): +        """Like :meth:`dumps` but dumps into a file. The file handle has +        to be compatible with what the internal serializer expects. +        """ +        f.write(self.dumps(obj, salt)) + +    def loads(self, s, salt=None): +        """Reverse of :meth:`dumps`. Raises :exc:`.BadSignature` if the +        signature validation fails. +        """ +        s = want_bytes(s) +        last_exception = None +        for signer in self.iter_unsigners(salt): +            try: +                return self.load_payload(signer.unsign(s)) +            except BadSignature as err: +                last_exception = err +        raise last_exception + +    def load(self, f, salt=None): +        """Like :meth:`loads` but loads from a file.""" +        return self.loads(f.read(), salt) + +    def loads_unsafe(self, s, salt=None): +        """Like :meth:`loads` but without verifying the signature. This +        is potentially very dangerous to use depending on how your +        serializer works. The return value is ``(signature_valid, +        payload)`` instead of just the payload. The first item will be a +        boolean that indicates if the signature is valid. This function +        never fails. + +        Use it for debugging only and if you know that your serializer +        module is not exploitable (for example, do not use it with a +        pickle serializer). + +        .. versionadded:: 0.15 +        """ +        return self._loads_unsafe_impl(s, salt) + +    def _loads_unsafe_impl(self, s, salt, load_kwargs=None, load_payload_kwargs=None): +        """Low level helper function to implement :meth:`loads_unsafe` +        in serializer subclasses. +        """ +        try: +            return True, self.loads(s, salt=salt, **(load_kwargs or {})) +        except BadSignature as e: +            if e.payload is None: +                return False, None +            try: +                return ( +                    False, +                    self.load_payload(e.payload, **(load_payload_kwargs or {})), +                ) +            except BadPayload: +                return False, None + +    def load_unsafe(self, f, *args, **kwargs): +        """Like :meth:`loads_unsafe` but loads from a file. + +        .. versionadded:: 0.15 +        """ +        return self.loads_unsafe(f.read(), *args, **kwargs) diff --git a/python/itsdangerous/signer.py b/python/itsdangerous/signer.py new file mode 100644 index 0000000..6bddc03 --- /dev/null +++ b/python/itsdangerous/signer.py @@ -0,0 +1,179 @@ +import hashlib +import hmac + +from ._compat import constant_time_compare +from .encoding import _base64_alphabet +from .encoding import base64_decode +from .encoding import base64_encode +from .encoding import want_bytes +from .exc import BadSignature + + +class SigningAlgorithm(object): +    """Subclasses must implement :meth:`get_signature` to provide +    signature generation functionality. +    """ + +    def get_signature(self, key, value): +        """Returns the signature for the given key and value.""" +        raise NotImplementedError() + +    def verify_signature(self, key, value, sig): +        """Verifies the given signature matches the expected +        signature. +        """ +        return constant_time_compare(sig, self.get_signature(key, value)) + + +class NoneAlgorithm(SigningAlgorithm): +    """Provides an algorithm that does not perform any signing and +    returns an empty signature. +    """ + +    def get_signature(self, key, value): +        return b"" + + +class HMACAlgorithm(SigningAlgorithm): +    """Provides signature generation using HMACs.""" + +    #: The digest method to use with the MAC algorithm. This defaults to +    #: SHA1, but can be changed to any other function in the hashlib +    #: module. +    default_digest_method = staticmethod(hashlib.sha1) + +    def __init__(self, digest_method=None): +        if digest_method is None: +            digest_method = self.default_digest_method +        self.digest_method = digest_method + +    def get_signature(self, key, value): +        mac = hmac.new(key, msg=value, digestmod=self.digest_method) +        return mac.digest() + + +class Signer(object): +    """This class can sign and unsign bytes, validating the signature +    provided. + +    Salt can be used to namespace the hash, so that a signed string is +    only valid for a given namespace. Leaving this at the default value +    or re-using a salt value across different parts of your application +    where the same signed value in one part can mean something different +    in another part is a security risk. + +    See :ref:`the-salt` for an example of what the salt is doing and how +    you can utilize it. + +    .. versionadded:: 0.14 +        ``key_derivation`` and ``digest_method`` were added as arguments +        to the class constructor. + +    .. versionadded:: 0.18 +        ``algorithm`` was added as an argument to the class constructor. +    """ + +    #: The digest method to use for the signer.  This defaults to +    #: SHA1 but can be changed to any other function in the hashlib +    #: module. +    #: +    #: .. versionadded:: 0.14 +    default_digest_method = staticmethod(hashlib.sha1) + +    #: Controls how the key is derived. The default is Django-style +    #: concatenation. Possible values are ``concat``, ``django-concat`` +    #: and ``hmac``. This is used for deriving a key from the secret key +    #: with an added salt. +    #: +    #: .. versionadded:: 0.14 +    default_key_derivation = "django-concat" + +    def __init__( +        self, +        secret_key, +        salt=None, +        sep=".", +        key_derivation=None, +        digest_method=None, +        algorithm=None, +    ): +        self.secret_key = want_bytes(secret_key) +        self.sep = want_bytes(sep) +        if self.sep in _base64_alphabet: +            raise ValueError( +                "The given separator cannot be used because it may be" +                " contained in the signature itself. Alphanumeric" +                " characters and `-_=` must not be used." +            ) +        self.salt = "itsdangerous.Signer" if salt is None else salt +        if key_derivation is None: +            key_derivation = self.default_key_derivation +        self.key_derivation = key_derivation +        if digest_method is None: +            digest_method = self.default_digest_method +        self.digest_method = digest_method +        if algorithm is None: +            algorithm = HMACAlgorithm(self.digest_method) +        self.algorithm = algorithm + +    def derive_key(self): +        """This method is called to derive the key. The default key +        derivation choices can be overridden here. Key derivation is not +        intended to be used as a security method to make a complex key +        out of a short password. Instead you should use large random +        secret keys. +        """ +        salt = want_bytes(self.salt) +        if self.key_derivation == "concat": +            return self.digest_method(salt + self.secret_key).digest() +        elif self.key_derivation == "django-concat": +            return self.digest_method(salt + b"signer" + self.secret_key).digest() +        elif self.key_derivation == "hmac": +            mac = hmac.new(self.secret_key, digestmod=self.digest_method) +            mac.update(salt) +            return mac.digest() +        elif self.key_derivation == "none": +            return self.secret_key +        else: +            raise TypeError("Unknown key derivation method") + +    def get_signature(self, value): +        """Returns the signature for the given value.""" +        value = want_bytes(value) +        key = self.derive_key() +        sig = self.algorithm.get_signature(key, value) +        return base64_encode(sig) + +    def sign(self, value): +        """Signs the given string.""" +        return want_bytes(value) + want_bytes(self.sep) + self.get_signature(value) + +    def verify_signature(self, value, sig): +        """Verifies the signature for the given value.""" +        key = self.derive_key() +        try: +            sig = base64_decode(sig) +        except Exception: +            return False +        return self.algorithm.verify_signature(key, value, sig) + +    def unsign(self, signed_value): +        """Unsigns the given string.""" +        signed_value = want_bytes(signed_value) +        sep = want_bytes(self.sep) +        if sep not in signed_value: +            raise BadSignature("No %r found in value" % self.sep) +        value, sig = signed_value.rsplit(sep, 1) +        if self.verify_signature(value, sig): +            return value +        raise BadSignature("Signature %r does not match" % sig, payload=value) + +    def validate(self, signed_value): +        """Only validates the given signed value. Returns ``True`` if +        the signature exists and is valid. +        """ +        try: +            self.unsign(signed_value) +            return True +        except BadSignature: +            return False diff --git a/python/itsdangerous/timed.py b/python/itsdangerous/timed.py new file mode 100644 index 0000000..4c117e4 --- /dev/null +++ b/python/itsdangerous/timed.py @@ -0,0 +1,147 @@ +import time +from datetime import datetime + +from ._compat import text_type +from .encoding import base64_decode +from .encoding import base64_encode +from .encoding import bytes_to_int +from .encoding import int_to_bytes +from .encoding import want_bytes +from .exc import BadSignature +from .exc import BadTimeSignature +from .exc import SignatureExpired +from .serializer import Serializer +from .signer import Signer + + +class TimestampSigner(Signer): +    """Works like the regular :class:`.Signer` but also records the time +    of the signing and can be used to expire signatures. The +    :meth:`unsign` method can raise :exc:`.SignatureExpired` if the +    unsigning failed because the signature is expired. +    """ + +    def get_timestamp(self): +        """Returns the current timestamp. The function must return an +        integer. +        """ +        return int(time.time()) + +    def timestamp_to_datetime(self, ts): +        """Used to convert the timestamp from :meth:`get_timestamp` into +        a datetime object. +        """ +        return datetime.utcfromtimestamp(ts) + +    def sign(self, value): +        """Signs the given string and also attaches time information.""" +        value = want_bytes(value) +        timestamp = base64_encode(int_to_bytes(self.get_timestamp())) +        sep = want_bytes(self.sep) +        value = value + sep + timestamp +        return value + sep + self.get_signature(value) + +    def unsign(self, value, max_age=None, return_timestamp=False): +        """Works like the regular :meth:`.Signer.unsign` but can also +        validate the time. See the base docstring of the class for +        the general behavior. If ``return_timestamp`` is ``True`` the +        timestamp of the signature will be returned as a naive +        :class:`datetime.datetime` object in UTC. +        """ +        try: +            result = Signer.unsign(self, value) +            sig_error = None +        except BadSignature as e: +            sig_error = e +            result = e.payload or b"" +        sep = want_bytes(self.sep) + +        # If there is no timestamp in the result there is something +        # seriously wrong. In case there was a signature error, we raise +        # that one directly, otherwise we have a weird situation in +        # which we shouldn't have come except someone uses a time-based +        # serializer on non-timestamp data, so catch that. +        if sep not in result: +            if sig_error: +                raise sig_error +            raise BadTimeSignature("timestamp missing", payload=result) + +        value, timestamp = result.rsplit(sep, 1) +        try: +            timestamp = bytes_to_int(base64_decode(timestamp)) +        except Exception: +            timestamp = None + +        # Signature is *not* okay. Raise a proper error now that we have +        # split the value and the timestamp. +        if sig_error is not None: +            raise BadTimeSignature( +                text_type(sig_error), payload=value, date_signed=timestamp +            ) + +        # Signature was okay but the timestamp is actually not there or +        # malformed. Should not happen, but we handle it anyway. +        if timestamp is None: +            raise BadTimeSignature("Malformed timestamp", payload=value) + +        # Check timestamp is not older than max_age +        if max_age is not None: +            age = self.get_timestamp() - timestamp +            if age > max_age: +                raise SignatureExpired( +                    "Signature age %s > %s seconds" % (age, max_age), +                    payload=value, +                    date_signed=self.timestamp_to_datetime(timestamp), +                ) + +        if return_timestamp: +            return value, self.timestamp_to_datetime(timestamp) +        return value + +    def validate(self, signed_value, max_age=None): +        """Only validates the given signed value. Returns ``True`` if +        the signature exists and is valid.""" +        try: +            self.unsign(signed_value, max_age=max_age) +            return True +        except BadSignature: +            return False + + +class TimedSerializer(Serializer): +    """Uses :class:`TimestampSigner` instead of the default +    :class:`.Signer`. +    """ + +    default_signer = TimestampSigner + +    def loads(self, s, max_age=None, return_timestamp=False, salt=None): +        """Reverse of :meth:`dumps`, raises :exc:`.BadSignature` if the +        signature validation fails. If a ``max_age`` is provided it will +        ensure the signature is not older than that time in seconds. In +        case the signature is outdated, :exc:`.SignatureExpired` is +        raised. All arguments are forwarded to the signer's +        :meth:`~TimestampSigner.unsign` method. +        """ +        s = want_bytes(s) +        last_exception = None +        for signer in self.iter_unsigners(salt): +            try: +                base64d, timestamp = signer.unsign(s, max_age, return_timestamp=True) +                payload = self.load_payload(base64d) +                if return_timestamp: +                    return payload, timestamp +                return payload +            # If we get a signature expired it means we could read the +            # signature but it's invalid.  In that case we do not want to +            # try the next signer. +            except SignatureExpired: +                raise +            except BadSignature as err: +                last_exception = err +        raise last_exception + +    def loads_unsafe(self, s, max_age=None, salt=None): +        load_kwargs = {"max_age": max_age} +        load_payload_kwargs = {} +        return self._loads_unsafe_impl(s, salt, load_kwargs, load_payload_kwargs) diff --git a/python/itsdangerous/url_safe.py b/python/itsdangerous/url_safe.py new file mode 100644 index 0000000..fcaa011 --- /dev/null +++ b/python/itsdangerous/url_safe.py @@ -0,0 +1,65 @@ +import zlib + +from ._json import _CompactJSON +from .encoding import base64_decode +from .encoding import base64_encode +from .exc import BadPayload +from .serializer import Serializer +from .timed import TimedSerializer + + +class URLSafeSerializerMixin(object): +    """Mixed in with a regular serializer it will attempt to zlib +    compress the string to make it shorter if necessary. It will also +    base64 encode the string so that it can safely be placed in a URL. +    """ + +    default_serializer = _CompactJSON + +    def load_payload(self, payload, *args, **kwargs): +        decompress = False +        if payload.startswith(b"."): +            payload = payload[1:] +            decompress = True +        try: +            json = base64_decode(payload) +        except Exception as e: +            raise BadPayload( +                "Could not base64 decode the payload because of an exception", +                original_error=e, +            ) +        if decompress: +            try: +                json = zlib.decompress(json) +            except Exception as e: +                raise BadPayload( +                    "Could not zlib decompress the payload before decoding the payload", +                    original_error=e, +                ) +        return super(URLSafeSerializerMixin, self).load_payload(json, *args, **kwargs) + +    def dump_payload(self, obj): +        json = super(URLSafeSerializerMixin, self).dump_payload(obj) +        is_compressed = False +        compressed = zlib.compress(json) +        if len(compressed) < (len(json) - 1): +            json = compressed +            is_compressed = True +        base64d = base64_encode(json) +        if is_compressed: +            base64d = b"." + base64d +        return base64d + + +class URLSafeSerializer(URLSafeSerializerMixin, Serializer): +    """Works like :class:`.Serializer` but dumps and loads into a URL +    safe string consisting of the upper and lowercase character of the +    alphabet as well as ``'_'``, ``'-'`` and ``'.'``. +    """ + + +class URLSafeTimedSerializer(URLSafeSerializerMixin, TimedSerializer): +    """Works like :class:`.TimedSerializer` but dumps and loads into a +    URL safe string consisting of the upper and lowercase character of +    the alphabet as well as ``'_'``, ``'-'`` and ``'.'``. +    """ | 
