aboutsummaryrefslogtreecommitdiffstats
path: root/python/werkzeug/datastructures.py
diff options
context:
space:
mode:
authorJames Taylor <user234683@users.noreply.github.com>2019-09-06 16:31:13 -0700
committerJames Taylor <user234683@users.noreply.github.com>2019-09-06 16:31:13 -0700
commit3d57e14df7ba5f14a634295caf3b2e60da50bfe2 (patch)
tree4903bcb79a49ad714a1a9129765b9545405c9978 /python/werkzeug/datastructures.py
parentac32b24b2a011292b704a3f27e8fd08a7ae9424b (diff)
downloadyt-local-3d57e14df7ba5f14a634295caf3b2e60da50bfe2.tar.lz
yt-local-3d57e14df7ba5f14a634295caf3b2e60da50bfe2.tar.xz
yt-local-3d57e14df7ba5f14a634295caf3b2e60da50bfe2.zip
Remove windows python distribution from repo and add requirements.txt
Diffstat (limited to 'python/werkzeug/datastructures.py')
-rw-r--r--python/werkzeug/datastructures.py2852
1 files changed, 0 insertions, 2852 deletions
diff --git a/python/werkzeug/datastructures.py b/python/werkzeug/datastructures.py
deleted file mode 100644
index 9643db9..0000000
--- a/python/werkzeug/datastructures.py
+++ /dev/null
@@ -1,2852 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
- werkzeug.datastructures
- ~~~~~~~~~~~~~~~~~~~~~~~
-
- This module provides mixins and classes with an immutable interface.
-
- :copyright: 2007 Pallets
- :license: BSD-3-Clause
-"""
-import codecs
-import mimetypes
-import re
-from copy import deepcopy
-from itertools import repeat
-
-from ._compat import BytesIO
-from ._compat import collections_abc
-from ._compat import integer_types
-from ._compat import iteritems
-from ._compat import iterkeys
-from ._compat import iterlists
-from ._compat import itervalues
-from ._compat import make_literal_wrapper
-from ._compat import PY2
-from ._compat import string_types
-from ._compat import text_type
-from ._compat import to_native
-from ._internal import _missing
-from .filesystem import get_filesystem_encoding
-
-_locale_delim_re = re.compile(r"[_-]")
-
-
-def is_immutable(self):
- raise TypeError("%r objects are immutable" % self.__class__.__name__)
-
-
-def iter_multi_items(mapping):
- """Iterates over the items of a mapping yielding keys and values
- without dropping any from more complex structures.
- """
- if isinstance(mapping, MultiDict):
- for item in iteritems(mapping, multi=True):
- yield item
- elif isinstance(mapping, dict):
- for key, value in iteritems(mapping):
- if isinstance(value, (tuple, list)):
- for value in value:
- yield key, value
- else:
- yield key, value
- else:
- for item in mapping:
- yield item
-
-
-def native_itermethods(names):
- if not PY2:
- return lambda x: x
-
- def setviewmethod(cls, name):
- viewmethod_name = "view%s" % name
- repr_name = "view_%s" % name
-
- def viewmethod(self, *a, **kw):
- return ViewItems(self, name, repr_name, *a, **kw)
-
- viewmethod.__name__ = viewmethod_name
- viewmethod.__doc__ = "`%s()` object providing a view on %s" % (
- viewmethod_name,
- name,
- )
- setattr(cls, viewmethod_name, viewmethod)
-
- def setitermethod(cls, name):
- itermethod = getattr(cls, name)
- setattr(cls, "iter%s" % name, itermethod)
-
- def listmethod(self, *a, **kw):
- return list(itermethod(self, *a, **kw))
-
- listmethod.__name__ = name
- listmethod.__doc__ = "Like :py:meth:`iter%s`, but returns a list." % name
- setattr(cls, name, listmethod)
-
- def wrap(cls):
- for name in names:
- setitermethod(cls, name)
- setviewmethod(cls, name)
- return cls
-
- return wrap
-
-
-class ImmutableListMixin(object):
- """Makes a :class:`list` immutable.
-
- .. versionadded:: 0.5
-
- :private:
- """
-
- _hash_cache = None
-
- def __hash__(self):
- if self._hash_cache is not None:
- return self._hash_cache
- rv = self._hash_cache = hash(tuple(self))
- return rv
-
- def __reduce_ex__(self, protocol):
- return type(self), (list(self),)
-
- def __delitem__(self, key):
- is_immutable(self)
-
- def __iadd__(self, other):
- is_immutable(self)
-
- __imul__ = __iadd__
-
- def __setitem__(self, key, value):
- is_immutable(self)
-
- def append(self, item):
- is_immutable(self)
-
- remove = append
-
- def extend(self, iterable):
- is_immutable(self)
-
- def insert(self, pos, value):
- is_immutable(self)
-
- def pop(self, index=-1):
- is_immutable(self)
-
- def reverse(self):
- is_immutable(self)
-
- def sort(self, cmp=None, key=None, reverse=None):
- is_immutable(self)
-
-
-class ImmutableList(ImmutableListMixin, list):
- """An immutable :class:`list`.
-
- .. versionadded:: 0.5
-
- :private:
- """
-
- def __repr__(self):
- return "%s(%s)" % (self.__class__.__name__, list.__repr__(self))
-
-
-class ImmutableDictMixin(object):
- """Makes a :class:`dict` immutable.
-
- .. versionadded:: 0.5
-
- :private:
- """
-
- _hash_cache = None
-
- @classmethod
- def fromkeys(cls, keys, value=None):
- instance = super(cls, cls).__new__(cls)
- instance.__init__(zip(keys, repeat(value)))
- return instance
-
- def __reduce_ex__(self, protocol):
- return type(self), (dict(self),)
-
- def _iter_hashitems(self):
- return iteritems(self)
-
- def __hash__(self):
- if self._hash_cache is not None:
- return self._hash_cache
- rv = self._hash_cache = hash(frozenset(self._iter_hashitems()))
- return rv
-
- def setdefault(self, key, default=None):
- is_immutable(self)
-
- def update(self, *args, **kwargs):
- is_immutable(self)
-
- def pop(self, key, default=None):
- is_immutable(self)
-
- def popitem(self):
- is_immutable(self)
-
- def __setitem__(self, key, value):
- is_immutable(self)
-
- def __delitem__(self, key):
- is_immutable(self)
-
- def clear(self):
- is_immutable(self)
-
-
-class ImmutableMultiDictMixin(ImmutableDictMixin):
- """Makes a :class:`MultiDict` immutable.
-
- .. versionadded:: 0.5
-
- :private:
- """
-
- def __reduce_ex__(self, protocol):
- return type(self), (list(iteritems(self, multi=True)),)
-
- def _iter_hashitems(self):
- return iteritems(self, multi=True)
-
- def add(self, key, value):
- is_immutable(self)
-
- def popitemlist(self):
- is_immutable(self)
-
- def poplist(self, key):
- is_immutable(self)
-
- def setlist(self, key, new_list):
- is_immutable(self)
-
- def setlistdefault(self, key, default_list=None):
- is_immutable(self)
-
-
-class UpdateDictMixin(object):
- """Makes dicts call `self.on_update` on modifications.
-
- .. versionadded:: 0.5
-
- :private:
- """
-
- on_update = None
-
- def calls_update(name): # noqa: B902
- def oncall(self, *args, **kw):
- rv = getattr(super(UpdateDictMixin, self), name)(*args, **kw)
- if self.on_update is not None:
- self.on_update(self)
- return rv
-
- oncall.__name__ = name
- return oncall
-
- def setdefault(self, key, default=None):
- modified = key not in self
- rv = super(UpdateDictMixin, self).setdefault(key, default)
- if modified and self.on_update is not None:
- self.on_update(self)
- return rv
-
- def pop(self, key, default=_missing):
- modified = key in self
- if default is _missing:
- rv = super(UpdateDictMixin, self).pop(key)
- else:
- rv = super(UpdateDictMixin, self).pop(key, default)
- if modified and self.on_update is not None:
- self.on_update(self)
- return rv
-
- __setitem__ = calls_update("__setitem__")
- __delitem__ = calls_update("__delitem__")
- clear = calls_update("clear")
- popitem = calls_update("popitem")
- update = calls_update("update")
- del calls_update
-
-
-class TypeConversionDict(dict):
- """Works like a regular dict but the :meth:`get` method can perform
- type conversions. :class:`MultiDict` and :class:`CombinedMultiDict`
- are subclasses of this class and provide the same feature.
-
- .. versionadded:: 0.5
- """
-
- def get(self, key, default=None, type=None):
- """Return the default value if the requested data doesn't exist.
- If `type` is provided and is a callable it should convert the value,
- return it or raise a :exc:`ValueError` if that is not possible. In
- this case the function will return the default as if the value was not
- found:
-
- >>> d = TypeConversionDict(foo='42', bar='blub')
- >>> d.get('foo', type=int)
- 42
- >>> d.get('bar', -1, type=int)
- -1
-
- :param key: The key to be looked up.
- :param default: The default value to be returned if the key can't
- be looked up. If not further specified `None` is
- returned.
- :param type: A callable that is used to cast the value in the
- :class:`MultiDict`. If a :exc:`ValueError` is raised
- by this callable the default value is returned.
- """
- try:
- rv = self[key]
- except KeyError:
- return default
- if type is not None:
- try:
- rv = type(rv)
- except ValueError:
- rv = default
- return rv
-
-
-class ImmutableTypeConversionDict(ImmutableDictMixin, TypeConversionDict):
- """Works like a :class:`TypeConversionDict` but does not support
- modifications.
-
- .. versionadded:: 0.5
- """
-
- def copy(self):
- """Return a shallow mutable copy of this object. Keep in mind that
- the standard library's :func:`copy` function is a no-op for this class
- like for any other python immutable type (eg: :class:`tuple`).
- """
- return TypeConversionDict(self)
-
- def __copy__(self):
- return self
-
-
-class ViewItems(object):
- def __init__(self, multi_dict, method, repr_name, *a, **kw):
- self.__multi_dict = multi_dict
- self.__method = method
- self.__repr_name = repr_name
- self.__a = a
- self.__kw = kw
-
- def __get_items(self):
- return getattr(self.__multi_dict, self.__method)(*self.__a, **self.__kw)
-
- def __repr__(self):
- return "%s(%r)" % (self.__repr_name, list(self.__get_items()))
-
- def __iter__(self):
- return iter(self.__get_items())
-
-
-@native_itermethods(["keys", "values", "items", "lists", "listvalues"])
-class MultiDict(TypeConversionDict):
- """A :class:`MultiDict` is a dictionary subclass customized to deal with
- multiple values for the same key which is for example used by the parsing
- functions in the wrappers. This is necessary because some HTML form
- elements pass multiple values for the same key.
-
- :class:`MultiDict` implements all standard dictionary methods.
- Internally, it saves all values for a key as a list, but the standard dict
- access methods will only return the first value for a key. If you want to
- gain access to the other values, too, you have to use the `list` methods as
- explained below.
-
- Basic Usage:
-
- >>> d = MultiDict([('a', 'b'), ('a', 'c')])
- >>> d
- MultiDict([('a', 'b'), ('a', 'c')])
- >>> d['a']
- 'b'
- >>> d.getlist('a')
- ['b', 'c']
- >>> 'a' in d
- True
-
- It behaves like a normal dict thus all dict functions will only return the
- first value when multiple values for one key are found.
-
- From Werkzeug 0.3 onwards, the `KeyError` raised by this class is also a
- subclass of the :exc:`~exceptions.BadRequest` HTTP exception and will
- render a page for a ``400 BAD REQUEST`` if caught in a catch-all for HTTP
- exceptions.
-
- A :class:`MultiDict` can be constructed from an iterable of
- ``(key, value)`` tuples, a dict, a :class:`MultiDict` or from Werkzeug 0.2
- onwards some keyword parameters.
-
- :param mapping: the initial value for the :class:`MultiDict`. Either a
- regular dict, an iterable of ``(key, value)`` tuples
- or `None`.
- """
-
- def __init__(self, mapping=None):
- if isinstance(mapping, MultiDict):
- dict.__init__(self, ((k, l[:]) for k, l in iterlists(mapping)))
- elif isinstance(mapping, dict):
- tmp = {}
- for key, value in iteritems(mapping):
- if isinstance(value, (tuple, list)):
- if len(value) == 0:
- continue
- value = list(value)
- else:
- value = [value]
- tmp[key] = value
- dict.__init__(self, tmp)
- else:
- tmp = {}
- for key, value in mapping or ():
- tmp.setdefault(key, []).append(value)
- dict.__init__(self, tmp)
-
- def __getstate__(self):
- return dict(self.lists())
-
- def __setstate__(self, value):
- dict.clear(self)
- dict.update(self, value)
-
- def __getitem__(self, key):
- """Return the first data value for this key;
- raises KeyError if not found.
-
- :param key: The key to be looked up.
- :raise KeyError: if the key does not exist.
- """
-
- if key in self:
- lst = dict.__getitem__(self, key)
- if len(lst) > 0:
- return lst[0]
- raise exceptions.BadRequestKeyError(key)
-
- def __setitem__(self, key, value):
- """Like :meth:`add` but removes an existing key first.
-
- :param key: the key for the value.
- :param value: the value to set.
- """
- dict.__setitem__(self, key, [value])
-
- def add(self, key, value):
- """Adds a new value for the key.
-
- .. versionadded:: 0.6
-
- :param key: the key for the value.
- :param value: the value to add.
- """
- dict.setdefault(self, key, []).append(value)
-
- def getlist(self, key, type=None):
- """Return the list of items for a given key. If that key is not in the
- `MultiDict`, the return value will be an empty list. Just as `get`
- `getlist` accepts a `type` parameter. All items will be converted
- with the callable defined there.
-
- :param key: The key to be looked up.
- :param type: A callable that is used to cast the value in the
- :class:`MultiDict`. If a :exc:`ValueError` is raised
- by this callable the value will be removed from the list.
- :return: a :class:`list` of all the values for the key.
- """
- try:
- rv = dict.__getitem__(self, key)
- except KeyError:
- return []
- if type is None:
- return list(rv)
- result = []
- for item in rv:
- try:
- result.append(type(item))
- except ValueError:
- pass
- return result
-
- def setlist(self, key, new_list):
- """Remove the old values for a key and add new ones. Note that the list
- you pass the values in will be shallow-copied before it is inserted in
- the dictionary.
-
- >>> d = MultiDict()
- >>> d.setlist('foo', ['1', '2'])
- >>> d['foo']
- '1'
- >>> d.getlist('foo')
- ['1', '2']
-
- :param key: The key for which the values are set.
- :param new_list: An iterable with the new values for the key. Old values
- are removed first.
- """
- dict.__setitem__(self, key, list(new_list))
-
- def setdefault(self, key, default=None):
- """Returns the value for the key if it is in the dict, otherwise it
- returns `default` and sets that value for `key`.
-
- :param key: The key to be looked up.
- :param default: The default value to be returned if the key is not
- in the dict. If not further specified it's `None`.
- """
- if key not in self:
- self[key] = default
- else:
- default = self[key]
- return default
-
- def setlistdefault(self, key, default_list=None):
- """Like `setdefault` but sets multiple values. The list returned
- is not a copy, but the list that is actually used internally. This
- means that you can put new values into the dict by appending items
- to the list:
-
- >>> d = MultiDict({"foo": 1})
- >>> d.setlistdefault("foo").extend([2, 3])
- >>> d.getlist("foo")
- [1, 2, 3]
-
- :param key: The key to be looked up.
- :param default_list: An iterable of default values. It is either copied
- (in case it was a list) or converted into a list
- before returned.
- :return: a :class:`list`
- """
- if key not in self:
- default_list = list(default_list or ())
- dict.__setitem__(self, key, default_list)
- else:
- default_list = dict.__getitem__(self, key)
- return default_list
-
- def items(self, multi=False):
- """Return an iterator of ``(key, value)`` pairs.
-
- :param multi: If set to `True` the iterator returned will have a pair
- for each value of each key. Otherwise it will only
- contain pairs for the first value of each key.
- """
-
- for key, values in iteritems(dict, self):
- if multi:
- for value in values:
- yield key, value
- else:
- yield key, values[0]
-
- def lists(self):
- """Return a iterator of ``(key, values)`` pairs, where values is the list
- of all values associated with the key."""
-
- for key, values in iteritems(dict, self):
- yield key, list(values)
-
- def keys(self):
- return iterkeys(dict, self)
-
- __iter__ = keys
-
- def values(self):
- """Returns an iterator of the first value on every key's value list."""
- for values in itervalues(dict, self):
- yield values[0]
-
- def listvalues(self):
- """Return an iterator of all values associated with a key. Zipping
- :meth:`keys` and this is the same as calling :meth:`lists`:
-
- >>> d = MultiDict({"foo": [1, 2, 3]})
- >>> zip(d.keys(), d.listvalues()) == d.lists()
- True
- """
-
- return itervalues(dict, self)
-
- def copy(self):
- """Return a shallow copy of this object."""
- return self.__class__(self)
-
- def deepcopy(self, memo=None):
- """Return a deep copy of this object."""
- return self.__class__(deepcopy(self.to_dict(flat=False), memo))
-
- def to_dict(self, flat=True):
- """Return the contents as regular dict. If `flat` is `True` the
- returned dict will only have the first item present, if `flat` is
- `False` all values will be returned as lists.
-
- :param flat: If set to `False` the dict returned will have lists
- with all the values in it. Otherwise it will only
- contain the first value for each key.
- :return: a :class:`dict`
- """
- if flat:
- return dict(iteritems(self))
- return dict(self.lists())
-
- def update(self, other_dict):
- """update() extends rather than replaces existing key lists:
-
- >>> a = MultiDict({'x': 1})
- >>> b = MultiDict({'x': 2, 'y': 3})
- >>> a.update(b)
- >>> a
- MultiDict([('y', 3), ('x', 1), ('x', 2)])
-
- If the value list for a key in ``other_dict`` is empty, no new values
- will be added to the dict and the key will not be created:
-
- >>> x = {'empty_list': []}
- >>> y = MultiDict()
- >>> y.update(x)
- >>> y
- MultiDict([])
- """
- for key, value in iter_multi_items(other_dict):
- MultiDict.add(self, key, value)
-
- def pop(self, key, default=_missing):
- """Pop the first item for a list on the dict. Afterwards the
- key is removed from the dict, so additional values are discarded:
-
- >>> d = MultiDict({"foo": [1, 2, 3]})
- >>> d.pop("foo")
- 1
- >>> "foo" in d
- False
-
- :param key: the key to pop.
- :param default: if provided the value to return if the key was
- not in the dictionary.
- """
- try:
- lst = dict.pop(self, key)
-
- if len(lst) == 0:
- raise exceptions.BadRequestKeyError(key)
-
- return lst[0]
- except KeyError:
- if default is not _missing:
- return default
- raise exceptions.BadRequestKeyError(key)
-
- def popitem(self):
- """Pop an item from the dict."""
- try:
- item = dict.popitem(self)
-
- if len(item[1]) == 0:
- raise exceptions.BadRequestKeyError(item)
-
- return (item[0], item[1][0])
- except KeyError as e:
- raise exceptions.BadRequestKeyError(e.args[0])
-
- def poplist(self, key):
- """Pop the list for a key from the dict. If the key is not in the dict
- an empty list is returned.
-
- .. versionchanged:: 0.5
- If the key does no longer exist a list is returned instead of
- raising an error.
- """
- return dict.pop(self, key, [])
-
- def popitemlist(self):
- """Pop a ``(key, list)`` tuple from the dict."""
- try:
- return dict.popitem(self)
- except KeyError as e:
- raise exceptions.BadRequestKeyError(e.args[0])
-
- def __copy__(self):
- return self.copy()
-
- def __deepcopy__(self, memo):
- return self.deepcopy(memo=memo)
-
- def __repr__(self):
- return "%s(%r)" % (self.__class__.__name__, list(iteritems(self, multi=True)))
-
-
-class _omd_bucket(object):
- """Wraps values in the :class:`OrderedMultiDict`. This makes it
- possible to keep an order over multiple different keys. It requires
- a lot of extra memory and slows down access a lot, but makes it
- possible to access elements in O(1) and iterate in O(n).
- """
-
- __slots__ = ("prev", "key", "value", "next")
-
- def __init__(self, omd, key, value):
- self.prev = omd._last_bucket
- self.key = key
- self.value = value
- self.next = None
-
- if omd._first_bucket is None:
- omd._first_bucket = self
- if omd._last_bucket is not None:
- omd._last_bucket.next = self
- omd._last_bucket = self
-
- def unlink(self, omd):
- if self.prev:
- self.prev.next = self.next
- if self.next:
- self.next.prev = self.prev
- if omd._first_bucket is self:
- omd._first_bucket = self.next
- if omd._last_bucket is self:
- omd._last_bucket = self.prev
-
-
-@native_itermethods(["keys", "values", "items", "lists", "listvalues"])
-class OrderedMultiDict(MultiDict):
- """Works like a regular :class:`MultiDict` but preserves the
- order of the fields. To convert the ordered multi dict into a
- list you can use the :meth:`items` method and pass it ``multi=True``.
-
- In general an :class:`OrderedMultiDict` is an order of magnitude
- slower than a :class:`MultiDict`.
-
- .. admonition:: note
-
- Due to a limitation in Python you cannot convert an ordered
- multi dict into a regular dict by using ``dict(multidict)``.
- Instead you have to use the :meth:`to_dict` method, otherwise
- the internal bucket objects are exposed.
- """
-
- def __init__(self, mapping=None):
- dict.__init__(self)
- self._first_bucket = self._last_bucket = None
- if mapping is not None:
- OrderedMultiDict.update(self, mapping)
-
- def __eq__(self, other):
- if not isinstance(other, MultiDict):
- return NotImplemented
- if isinstance(other, OrderedMultiDict):
- iter1 = iteritems(self, multi=True)
- iter2 = iteritems(other, multi=True)
- try:
- for k1, v1 in iter1:
- k2, v2 = next(iter2)
- if k1 != k2 or v1 != v2:
- return False
- except StopIteration:
- return False
- try:
- next(iter2)
- except StopIteration:
- return True
- return False
- if len(self) != len(other):
- return False
- for key, values in iterlists(self):
- if other.getlist(key) != values:
- return False
- return True
-
- __hash__ = None
-
- def __ne__(self, other):
- return not self.__eq__(other)
-
- def __reduce_ex__(self, protocol):
- return type(self), (list(iteritems(self, multi=True)),)
-
- def __getstate__(self):
- return list(iteritems(self, multi=True))
-
- def __setstate__(self, values):
- dict.clear(self)
- for key, value in values:
- self.add(key, value)
-
- def __getitem__(self, key):
- if key in self:
- return dict.__getitem__(self, key)[0].value
- raise exceptions.BadRequestKeyError(key)
-
- def __setitem__(self, key, value):
- self.poplist(key)
- self.add(key, value)
-
- def __delitem__(self, key):
- self.pop(key)
-
- def keys(self):
- return (key for key, value in iteritems(self))
-
- __iter__ = keys
-
- def values(self):
- return (value for key, value in iteritems(self))
-
- def items(self, multi=False):
- ptr = self._first_bucket
- if multi:
- while ptr is not None:
- yield ptr.key, ptr.value
- ptr = ptr.next
- else:
- returned_keys = set()
- while ptr is not None:
- if ptr.key not in returned_keys:
- returned_keys.add(ptr.key)
- yield ptr.key, ptr.value
- ptr = ptr.next
-
- def lists(self):
- returned_keys = set()
- ptr = self._first_bucket
- while ptr is not None:
- if ptr.key not in returned_keys:
- yield ptr.key, self.getlist(ptr.key)
- returned_keys.add(ptr.key)
- ptr = ptr.next
-
- def listvalues(self):
- for _key, values in iterlists(self):
- yield values
-
- def add(self, key, value):
- dict.setdefault(self, key, []).append(_omd_bucket(self, key, value))
-
- def getlist(self, key, type=None):
- try:
- rv = dict.__getitem__(self, key)
- except KeyError:
- return []
- if type is None:
- return [x.value for x in rv]
- result = []
- for item in rv:
- try:
- result.append(type(item.value))
- except ValueError:
- pass
- return result
-
- def setlist(self, key, new_list):
- self.poplist(key)
- for value in new_list:
- self.add(key, value)
-
- def setlistdefault(self, key, default_list=None):
- raise TypeError("setlistdefault is unsupported for ordered multi dicts")
-
- def update(self, mapping):
- for key, value in iter_multi_items(mapping):
- OrderedMultiDict.add(self, key, value)
-
- def poplist(self, key):
- buckets = dict.pop(self, key, ())
- for bucket in buckets:
- bucket.unlink(self)
- return [x.value for x in buckets]
-
- def pop(self, key, default=_missing):
- try:
- buckets = dict.pop(self, key)
- except KeyError:
- if default is not _missing:
- return default
- raise exceptions.BadRequestKeyError(key)
- for bucket in buckets:
- bucket.unlink(self)
- return buckets[0].value
-
- def popitem(self):
- try:
- key, buckets = dict.popitem(self)
- except KeyError as e:
- raise exceptions.BadRequestKeyError(e.args[0])
- for bucket in buckets:
- bucket.unlink(self)
- return key, buckets[0].value
-
- def popitemlist(self):
- try:
- key, buckets = dict.popitem(self)
- except KeyError as e:
- raise exceptions.BadRequestKeyError(e.args[0])
- for bucket in buckets:
- bucket.unlink(self)
- return key, [x.value for x in buckets]
-
-
-def _options_header_vkw(value, kw):
- return dump_options_header(
- value, dict((k.replace("_", "-"), v) for k, v in kw.items())
- )
-
-
-def _unicodify_header_value(value):
- if isinstance(value, bytes):
- value = value.decode("latin-1")
- if not isinstance(value, text_type):
- value = text_type(value)
- return value
-
-
-@native_itermethods(["keys", "values", "items"])
-class Headers(object):
- """An object that stores some headers. It has a dict-like interface
- but is ordered and can store the same keys multiple times.
-
- This data structure is useful if you want a nicer way to handle WSGI
- headers which are stored as tuples in a list.
-
- From Werkzeug 0.3 onwards, the :exc:`KeyError` raised by this class is
- also a subclass of the :class:`~exceptions.BadRequest` HTTP exception
- and will render a page for a ``400 BAD REQUEST`` if caught in a
- catch-all for HTTP exceptions.
-
- Headers is mostly compatible with the Python :class:`wsgiref.headers.Headers`
- class, with the exception of `__getitem__`. :mod:`wsgiref` will return
- `None` for ``headers['missing']``, whereas :class:`Headers` will raise
- a :class:`KeyError`.
-
- To create a new :class:`Headers` object pass it a list or dict of headers
- which are used as default values. This does not reuse the list passed
- to the constructor for internal usage.
-
- :param defaults: The list of default values for the :class:`Headers`.
-
- .. versionchanged:: 0.9
- This data structure now stores unicode values similar to how the
- multi dicts do it. The main difference is that bytes can be set as
- well which will automatically be latin1 decoded.
-
- .. versionchanged:: 0.9
- The :meth:`linked` function was removed without replacement as it
- was an API that does not support the changes to the encoding model.
- """
-
- def __init__(self, defaults=None):
- self._list = []
- if defaults is not None:
- if isinstance(defaults, (list, Headers)):
- self._list.extend(defaults)
- else:
- self.extend(defaults)
-
- def __getitem__(self, key, _get_mode=False):
- if not _get_mode:
- if isinstance(key, integer_types):
- return self._list[key]
- elif isinstance(key, slice):
- return self.__class__(self._list[key])
- if not isinstance(key, string_types):
- raise exceptions.BadRequestKeyError(key)
- ikey = key.lower()
- for k, v in self._list:
- if k.lower() == ikey:
- return v
- # micro optimization: if we are in get mode we will catch that
- # exception one stack level down so we can raise a standard
- # key error instead of our special one.
- if _get_mode:
- raise KeyError()
- raise exceptions.BadRequestKeyError(key)
-
- def __eq__(self, other):
- return other.__class__ is self.__class__ and set(other._list) == set(self._list)
-
- __hash__ = None
-
- def __ne__(self, other):
- return not self.__eq__(other)
-
- def get(self, key, default=None, type=None, as_bytes=False):
- """Return the default value if the requested data doesn't exist.
- If `type` is provided and is a callable it should convert the value,
- return it or raise a :exc:`ValueError` if that is not possible. In
- this case the function will return the default as if the value was not
- found:
-
- >>> d = Headers([('Content-Length', '42')])
- >>> d.get('Content-Length', type=int)
- 42
-
- If a headers object is bound you must not add unicode strings
- because no encoding takes place.
-
- .. versionadded:: 0.9
- Added support for `as_bytes`.
-
- :param key: The key to be looked up.
- :param default: The default value to be returned if the key can't
- be looked up. If not further specified `None` is
- returned.
- :param type: A callable that is used to cast the value in the
- :class:`Headers`. If a :exc:`ValueError` is raised
- by this callable the default value is returned.
- :param as_bytes: return bytes instead of unicode strings.
- """
- try:
- rv = self.__getitem__(key, _get_mode=True)
- except KeyError:
- return default
- if as_bytes:
- rv = rv.encode("latin1")
- if type is None:
- return rv
- try:
- return type(rv)
- except ValueError:
- return default
-
- def getlist(self, key, type=None, as_bytes=False):
- """Return the list of items for a given key. If that key is not in the
- :class:`Headers`, the return value will be an empty list. Just as
- :meth:`get` :meth:`getlist` accepts a `type` parameter. All items will
- be converted with the callable defined there.
-
- .. versionadded:: 0.9
- Added support for `as_bytes`.
-
- :param key: The key to be looked up.
- :param type: A callable that is used to cast the value in the
- :class:`Headers`. If a :exc:`ValueError` is raised
- by this callable the value will be removed from the list.
- :return: a :class:`list` of all the values for the key.
- :param as_bytes: return bytes instead of unicode strings.
- """
- ikey = key.lower()
- result = []
- for k, v in self:
- if k.lower() == ikey:
- if as_bytes:
- v = v.encode("latin1")
- if type is not None:
- try:
- v = type(v)
- except ValueError:
- continue
- result.append(v)
- return result
-
- def get_all(self, name):
- """Return a list of all the values for the named field.
-
- This method is compatible with the :mod:`wsgiref`
- :meth:`~wsgiref.headers.Headers.get_all` method.
- """
- return self.getlist(name)
-
- def items(self, lower=False):
- for key, value in self:
- if lower:
- key = key.lower()
- yield key, value
-
- def keys(self, lower=False):
- for key, _ in iteritems(self, lower):
- yield key
-
- def values(self):
- for _, value in iteritems(self):
- yield value
-
- def extend(self, iterable):
- """Extend the headers with a dict or an iterable yielding keys and
- values.
- """
- if isinstance(iterable, dict):
- for key, value in iteritems(iterable):
- if isinstance(value, (tuple, list)):
- for v in value:
- self.add(key, v)
- else:
- self.add(key, value)
- else:
- for key, value in iterable:
- self.add(key, value)
-
- def __delitem__(self, key, _index_operation=True):
- if _index_operation and isinstance(key, (integer_types, slice)):
- del self._list[key]
- return
- key = key.lower()
- new = []
- for k, v in self._list:
- if k.lower() != key:
- new.append((k, v))
- self._list[:] = new
-
- def remove(self, key):
- """Remove a key.
-
- :param key: The key to be removed.
- """
- return self.__delitem__(key, _index_operation=False)
-
- def pop(self, key=None, default=_missing):
- """Removes and returns a key or index.
-
- :param key: The key to be popped. If this is an integer the item at
- that position is removed, if it's a string the value for
- that key is. If the key is omitted or `None` the last
- item is removed.
- :return: an item.
- """
- if key is None:
- return self._list.pop()
- if isinstance(key, integer_types):
- return self._list.pop(key)
- try:
- rv = self[key]
- self.remove(key)
- except KeyError:
- if default is not _missing:
- return default
- raise
- return rv
-
- def popitem(self):
- """Removes a key or index and returns a (key, value) item."""
- return self.pop()
-
- def __contains__(self, key):
- """Check if a key is present."""
- try:
- self.__getitem__(key, _get_mode=True)
- except KeyError:
- return False
- return True
-
- has_key = __contains__
-
- def __iter__(self):
- """Yield ``(key, value)`` tuples."""
- return iter(self._list)
-
- def __len__(self):
- return len(self._list)
-
- def add(self, _key, _value, **kw):
- """Add a new header tuple to the list.
-
- Keyword arguments can specify additional parameters for the header
- value, with underscores converted to dashes::
-
- >>> d = Headers()
- >>> d.add('Content-Type', 'text/plain')
- >>> d.add('Content-Disposition', 'attachment', filename='foo.png')
-
- The keyword argument dumping uses :func:`dump_options_header`
- behind the scenes.
-
- .. versionadded:: 0.4.1
- keyword arguments were added for :mod:`wsgiref` compatibility.
- """
- if kw:
- _value = _options_header_vkw(_value, kw)
- _key = _unicodify_header_value(_key)
- _value = _unicodify_header_value(_value)
- self._validate_value(_value)
- self._list.append((_key, _value))
-
- def _validate_value(self, value):
- if not isinstance(value, text_type):
- raise TypeError("Value should be unicode.")
- if u"\n" in value or u"\r" in value:
- raise ValueError(
- "Detected newline in header value. This is "
- "a potential security problem"
- )
-
- def add_header(self, _key, _value, **_kw):
- """Add a new header tuple to the list.
-
- An alias for :meth:`add` for compatibility with the :mod:`wsgiref`
- :meth:`~wsgiref.headers.Headers.add_header` method.
- """
- self.add(_key, _value, **_kw)
-
- def clear(self):
- """Clears all headers."""
- del self._list[:]
-
- def set(self, _key, _value, **kw):
- """Remove all header tuples for `key` and add a new one. The newly
- added key either appears at the end of the list if there was no
- entry or replaces the first one.
-
- Keyword arguments can specify additional parameters for the header
- value, with underscores converted to dashes. See :meth:`add` for
- more information.
-
- .. versionchanged:: 0.6.1
- :meth:`set` now accepts the same arguments as :meth:`add`.
-
- :param key: The key to be inserted.
- :param value: The value to be inserted.
- """
- if kw:
- _value = _options_header_vkw(_value, kw)
- _key = _unicodify_header_value(_key)
- _value = _unicodify_header_value(_value)
- self._validate_value(_value)
- if not self._list:
- self._list.append((_key, _value))
- return
- listiter = iter(self._list)
- ikey = _key.lower()
- for idx, (old_key, _old_value) in enumerate(listiter):
- if old_key.lower() == ikey:
- # replace first ocurrence
- self._list[idx] = (_key, _value)
- break
- else:
- self._list.append((_key, _value))
- return
- self._list[idx + 1 :] = [t for t in listiter if t[0].lower() != ikey]
-
- def setdefault(self, key, default):
- """Returns the value for the key if it is in the dict, otherwise it
- returns `default` and sets that value for `key`.
-
- :param key: The key to be looked up.
- :param default: The default value to be returned if the key is not
- in the dict. If not further specified it's `None`.
- """
- if key in self:
- return self[key]
- self.set(key, default)
- return default
-
- def __setitem__(self, key, value):
- """Like :meth:`set` but also supports index/slice based setting."""
- if isinstance(key, (slice, integer_types)):
- if isinstance(key, integer_types):
- value = [value]
- value = [
- (_unicodify_header_value(k), _unicodify_header_value(v))
- for (k, v) in value
- ]
- [self._validate_value(v) for (k, v) in value]
- if isinstance(key, integer_types):
- self._list[key] = value[0]
- else:
- self._list[key] = value
- else:
- self.set(key, value)
-
- def to_list(self, charset="iso-8859-1"):
- """Convert the headers into a list suitable for WSGI.
-
- .. deprecated:: 0.9
- """
- from warnings import warn
-
- warn(
- "'to_list' deprecated as of version 0.9 and will be removed"
- " in version 1.0. Use 'to_wsgi_list' instead.",
- DeprecationWarning,
- stacklevel=2,
- )
- return self.to_wsgi_list()
-
- def to_wsgi_list(self):
- """Convert the headers into a list suitable for WSGI.
-
- The values are byte strings in Python 2 converted to latin1 and unicode
- strings in Python 3 for the WSGI server to encode.
-
- :return: list
- """
- if PY2:
- return [(to_native(k), v.encode("latin1")) for k, v in self]
- return list(self)
-
- def copy(self):
- return self.__class__(self._list)
-
- def __copy__(self):
- return self.copy()
-
- def __str__(self):
- """Returns formatted headers suitable for HTTP transmission."""
- strs = []
- for key, value in self.to_wsgi_list():
- strs.append("%s: %s" % (key, value))
- strs.append("\r\n")
- return "\r\n".join(strs)
-
- def __repr__(self):
- return "%s(%r)" % (self.__class__.__name__, list(self))
-
-
-class ImmutableHeadersMixin(object):
- """Makes a :class:`Headers` immutable. We do not mark them as
- hashable though since the only usecase for this datastructure
- in Werkzeug is a view on a mutable structure.
-
- .. versionadded:: 0.5
-
- :private:
- """
-
- def __delitem__(self, key, **kwargs):
- is_immutable(self)
-
- def __setitem__(self, key, value):
- is_immutable(self)
-
- set = __setitem__
-
- def add(self, item):
- is_immutable(self)
-
- remove = add_header = add
-
- def extend(self, iterable):
- is_immutable(self)
-
- def insert(self, pos, value):
- is_immutable(self)
-
- def pop(self, index=-1):
- is_immutable(self)
-
- def popitem(self):
- is_immutable(self)
-
- def setdefault(self, key, default):
- is_immutable(self)
-
-
-class EnvironHeaders(ImmutableHeadersMixin, Headers):
- """Read only version of the headers from a WSGI environment. This
- provides the same interface as `Headers` and is constructed from
- a WSGI environment.
-
- From Werkzeug 0.3 onwards, the `KeyError` raised by this class is also a
- subclass of the :exc:`~exceptions.BadRequest` HTTP exception and will
- render a page for a ``400 BAD REQUEST`` if caught in a catch-all for
- HTTP exceptions.
- """
-
- def __init__(self, environ):
- self.environ = environ
-
- def __eq__(self, other):
- return self.environ is other.environ
-
- __hash__ = None
-
- def __getitem__(self, key, _get_mode=False):
- # _get_mode is a no-op for this class as there is no index but
- # used because get() calls it.
- if not isinstance(key, string_types):
- raise KeyError(key)
- key = key.upper().replace("-", "_")
- if key in ("CONTENT_TYPE", "CONTENT_LENGTH"):
- return _unicodify_header_value(self.environ[key])
- return _unicodify_header_value(self.environ["HTTP_" + key])
-
- def __len__(self):
- # the iter is necessary because otherwise list calls our
- # len which would call list again and so forth.
- return len(list(iter(self)))
-
- def __iter__(self):
- for key, value in iteritems(self.environ):
- if key.startswith("HTTP_") and key not in (
- "HTTP_CONTENT_TYPE",
- "HTTP_CONTENT_LENGTH",
- ):
- yield (
- key[5:].replace("_", "-").title(),
- _unicodify_header_value(value),
- )
- elif key in ("CONTENT_TYPE", "CONTENT_LENGTH") and value:
- yield (key.replace("_", "-").title(), _unicodify_header_value(value))
-
- def copy(self):
- raise TypeError("cannot create %r copies" % self.__class__.__name__)
-
-
-@native_itermethods(["keys", "values", "items", "lists", "listvalues"])
-class CombinedMultiDict(ImmutableMultiDictMixin, MultiDict):
- """A read only :class:`MultiDict` that you can pass multiple :class:`MultiDict`
- instances as sequence and it will combine the return values of all wrapped
- dicts:
-
- >>> from werkzeug.datastructures import CombinedMultiDict, MultiDict
- >>> post = MultiDict([('foo', 'bar')])
- >>> get = MultiDict([('blub', 'blah')])
- >>> combined = CombinedMultiDict([get, post])
- >>> combined['foo']
- 'bar'
- >>> combined['blub']
- 'blah'
-
- This works for all read operations and will raise a `TypeError` for
- methods that usually change data which isn't possible.
-
- From Werkzeug 0.3 onwards, the `KeyError` raised by this class is also a
- subclass of the :exc:`~exceptions.BadRequest` HTTP exception and will
- render a page for a ``400 BAD REQUEST`` if caught in a catch-all for HTTP
- exceptions.
- """
-
- def __reduce_ex__(self, protocol):
- return type(self), (self.dicts,)
-
- def __init__(self, dicts=None):
- self.dicts = dicts or []
-
- @classmethod
- def fromkeys(cls):
- raise TypeError("cannot create %r instances by fromkeys" % cls.__name__)
-
- def __getitem__(self, key):
- for d in self.dicts:
- if key in d:
- return d[key]
- raise exceptions.BadRequestKeyError(key)
-
- def get(self, key, default=None, type=None):
- for d in self.dicts:
- if key in d:
- if type is not None:
- try:
- return type(d[key])
- except ValueError:
- continue
- return d[key]
- return default
-
- def getlist(self, key, type=None):
- rv = []
- for d in self.dicts:
- rv.extend(d.getlist(key, type))
- return rv
-
- def _keys_impl(self):
- """This function exists so __len__ can be implemented more efficiently,
- saving one list creation from an iterator.
-
- Using this for Python 2's ``dict.keys`` behavior would be useless since
- `dict.keys` in Python 2 returns a list, while we have a set here.
- """
- rv = set()
- for d in self.dicts:
- rv.update(iterkeys(d))
- return rv
-
- def keys(self):
- return iter(self._keys_impl())
-
- __iter__ = keys
-
- def items(self, multi=False):
- found = set()
- for d in self.dicts:
- for key, value in iteritems(d, multi):
- if multi:
- yield key, value
- elif key not in found:
- found.add(key)
- yield key, value
-
- def values(self):
- for _key, value in iteritems(self):
- yield value
-
- def lists(self):
- rv = {}
- for d in self.dicts:
- for key, values in iterlists(d):
- rv.setdefault(key, []).extend(values)
- return iteritems(rv)
-
- def listvalues(self):
- return (x[1] for x in self.lists())
-
- def copy(self):
- """Return a shallow mutable copy of this object.
-
- This returns a :class:`MultiDict` representing the data at the
- time of copying. The copy will no longer reflect changes to the
- wrapped dicts.
-
- .. versionchanged:: 0.15
- Return a mutable :class:`MultiDict`.
- """
- return MultiDict(self)
-
- def to_dict(self, flat=True):
- """Return the contents as regular dict. If `flat` is `True` the
- returned dict will only have the first item present, if `flat` is
- `False` all values will be returned as lists.
-
- :param flat: If set to `False` the dict returned will have lists
- with all the values in it. Otherwise it will only
- contain the first item for each key.
- :return: a :class:`dict`
- """
- rv = {}
- for d in reversed(self.dicts):
- rv.update(d.to_dict(flat))
- return rv
-
- def __len__(self):
- return len(self._keys_impl())
-
- def __contains__(self, key):
- for d in self.dicts:
- if key in d:
- return True
- return False
-
- has_key = __contains__
-
- def __repr__(self):
- return "%s(%r)" % (self.__class__.__name__, self.dicts)
-
-
-class FileMultiDict(MultiDict):
- """A special :class:`MultiDict` that has convenience methods to add
- files to it. This is used for :class:`EnvironBuilder` and generally
- useful for unittesting.
-
- .. versionadded:: 0.5
- """
-
- def add_file(self, name, file, filename=None, content_type=None):
- """Adds a new file to the dict. `file` can be a file name or
- a :class:`file`-like or a :class:`FileStorage` object.
-
- :param name: the name of the field.
- :param file: a filename or :class:`file`-like object
- :param filename: an optional filename
- :param content_type: an optional content type
- """
- if isinstance(file, FileStorage):
- value = file
- else:
- if isinstance(file, string_types):
- if filename is None:
- filename = file
- file = open(file, "rb")
- if filename and content_type is None:
- content_type = (
- mimetypes.guess_type(filename)[0] or "application/octet-stream"
- )
- value = FileStorage(file, filename, name, content_type)
-
- self.add(name, value)
-
-
-class ImmutableDict(ImmutableDictMixin, dict):
- """An immutable :class:`dict`.
-
- .. versionadded:: 0.5
- """
-
- def __repr__(self):
- return "%s(%s)" % (self.__class__.__name__, dict.__repr__(self))
-
- def copy(self):
- """Return a shallow mutable copy of this object. Keep in mind that
- the standard library's :func:`copy` function is a no-op for this class
- like for any other python immutable type (eg: :class:`tuple`).
- """
- return dict(self)
-
- def __copy__(self):
- return self
-
-
-class ImmutableMultiDict(ImmutableMultiDictMixin, MultiDict):
- """An immutable :class:`MultiDict`.
-
- .. versionadded:: 0.5
- """
-
- def copy(self):
- """Return a shallow mutable copy of this object. Keep in mind that
- the standard library's :func:`copy` function is a no-op for this class
- like for any other python immutable type (eg: :class:`tuple`).
- """
- return MultiDict(self)
-
- def __copy__(self):
- return self
-
-
-class ImmutableOrderedMultiDict(ImmutableMultiDictMixin, OrderedMultiDict):
- """An immutable :class:`OrderedMultiDict`.
-
- .. versionadded:: 0.6
- """
-
- def _iter_hashitems(self):
- return enumerate(iteritems(self, multi=True))
-
- def copy(self):
- """Return a shallow mutable copy of this object. Keep in mind that
- the standard library's :func:`copy` function is a no-op for this class
- like for any other python immutable type (eg: :class:`tuple`).
- """
- return OrderedMultiDict(self)
-
- def __copy__(self):
- return self
-
-
-@native_itermethods(["values"])
-class Accept(ImmutableList):
- """An :class:`Accept` object is just a list subclass for lists of
- ``(value, quality)`` tuples. It is automatically sorted by specificity
- and quality.
-
- All :class:`Accept` objects work similar to a list but provide extra
- functionality for working with the data. Containment checks are
- normalized to the rules of that header:
-
- >>> a = CharsetAccept([('ISO-8859-1', 1), ('utf-8', 0.7)])
- >>> a.best
- 'ISO-8859-1'
- >>> 'iso-8859-1' in a
- True
- >>> 'UTF8' in a
- True
- >>> 'utf7' in a
- False
-
- To get the quality for an item you can use normal item lookup:
-
- >>> print a['utf-8']
- 0.7
- >>> a['utf7']
- 0
-
- .. versionchanged:: 0.5
- :class:`Accept` objects are forced immutable now.
- """
-
- def __init__(self, values=()):
- if values is None:
- list.__init__(self)
- self.provided = False
- elif isinstance(values, Accept):
- self.provided = values.provided
- list.__init__(self, values)
- else:
- self.provided = True
- values = sorted(
- values,
- key=lambda x: (self._specificity(x[0]), x[1], x[0]),
- reverse=True,
- )
- list.__init__(self, values)
-
- def _specificity(self, value):
- """Returns a tuple describing the value's specificity."""
- return (value != "*",)
-
- def _value_matches(self, value, item):
- """Check if a value matches a given accept item."""
- return item == "*" or item.lower() == value.lower()
-
- def __getitem__(self, key):
- """Besides index lookup (getting item n) you can also pass it a string
- to get the quality for the item. If the item is not in the list, the
- returned quality is ``0``.
- """
- if isinstance(key, string_types):
- return self.quality(key)
- return list.__getitem__(self, key)
-
- def quality(self, key):
- """Returns the quality of the key.
-
- .. versionadded:: 0.6
- In previous versions you had to use the item-lookup syntax
- (eg: ``obj[key]`` instead of ``obj.quality(key)``)
- """
- for item, quality in self:
- if self._value_matches(key, item):
- return quality
- return 0
-
- def __contains__(self, value):
- for item, _quality in self:
- if self._value_matches(value, item):
- return True
- return False
-
- def __repr__(self):
- return "%s([%s])" % (
- self.__class__.__name__,
- ", ".join("(%r, %s)" % (x, y) for x, y in self),
- )
-
- def index(self, key):
- """Get the position of an entry or raise :exc:`ValueError`.
-
- :param key: The key to be looked up.
-
- .. versionchanged:: 0.5
- This used to raise :exc:`IndexError`, which was inconsistent
- with the list API.
- """
- if isinstance(key, string_types):
- for idx, (item, _quality) in enumerate(self):
- if self._value_matches(key, item):
- return idx
- raise ValueError(key)
- return list.index(self, key)
-
- def find(self, key):
- """Get the position of an entry or return -1.
-
- :param key: The key to be looked up.
- """
- try:
- return self.index(key)
- except ValueError:
- return -1
-
- def values(self):
- """Iterate over all values."""
- for item in self:
- yield item[0]
-
- def to_header(self):
- """Convert the header set into an HTTP header string."""
- result = []
- for value, quality in self:
- if quality != 1:
- value = "%s;q=%s" % (value, quality)
- result.append(value)
- return ",".join(result)
-
- def __str__(self):
- return self.to_header()
-
- def _best_single_match(self, match):
- for client_item, quality in self:
- if self._value_matches(match, client_item):
- # self is sorted by specificity descending, we can exit
- return client_item, quality
-
- def best_match(self, matches, default=None):
- """Returns the best match from a list of possible matches based
- on the specificity and quality of the client. If two items have the
- same quality and specificity, the one is returned that comes first.
-
- :param matches: a list of matches to check for
- :param default: the value that is returned if none match
- """
- result = default
- best_quality = -1
- best_specificity = (-1,)
- for server_item in matches:
- match = self._best_single_match(server_item)
- if not match:
- continue
- client_item, quality = match
- specificity = self._specificity(client_item)
- if quality <= 0 or quality < best_quality:
- continue
- # better quality or same quality but more specific => better match
- if quality > best_quality or specificity > best_specificity:
- result = server_item
- best_quality = quality
- best_specificity = specificity
- return result
-
- @property
- def best(self):
- """The best match as value."""
- if self:
- return self[0][0]
-
-
-class MIMEAccept(Accept):
- """Like :class:`Accept` but with special methods and behavior for
- mimetypes.
- """
-
- def _specificity(self, value):
- return tuple(x != "*" for x in value.split("/", 1))
-
- def _value_matches(self, value, item):
- def _normalize(x):
- x = x.lower()
- return ("*", "*") if x == "*" else x.split("/", 1)
-
- # this is from the application which is trusted. to avoid developer
- # frustration we actually check these for valid values
- if "/" not in value:
- raise ValueError("invalid mimetype %r" % value)
- value_type, value_subtype = _normalize(value)
- if value_type == "*" and value_subtype != "*":
- raise ValueError("invalid mimetype %r" % value)
-
- if "/" not in item:
- return False
- item_type, item_subtype = _normalize(item)
- if item_type == "*" and item_subtype != "*":
- return False
- return (
- item_type == item_subtype == "*" or value_type == value_subtype == "*"
- ) or (
- item_type == value_type
- and (
- item_subtype == "*"
- or value_subtype == "*"
- or item_subtype == value_subtype
- )
- )
-
- @property
- def accept_html(self):
- """True if this object accepts HTML."""
- return (
- "text/html" in self or "application/xhtml+xml" in self or self.accept_xhtml
- )
-
- @property
- def accept_xhtml(self):
- """True if this object accepts XHTML."""
- return "application/xhtml+xml" in self or "application/xml" in self
-
- @property
- def accept_json(self):
- """True if this object accepts JSON."""
- return "application/json" in self
-
-
-class LanguageAccept(Accept):
- """Like :class:`Accept` but with normalization for languages."""
-
- def _value_matches(self, value, item):
- def _normalize(language):
- return _locale_delim_re.split(language.lower())
-
- return item == "*" or _normalize(value) == _normalize(item)
-
-
-class CharsetAccept(Accept):
- """Like :class:`Accept` but with normalization for charsets."""
-
- def _value_matches(self, value, item):
- def _normalize(name):
- try:
- return codecs.lookup(name).name
- except LookupError:
- return name.lower()
-
- return item == "*" or _normalize(value) == _normalize(item)
-
-
-def cache_property(key, empty, type):
- """Return a new property object for a cache header. Useful if you
- want to add support for a cache extension in a subclass."""
- return property(
- lambda x: x._get_cache_value(key, empty, type),
- lambda x, v: x._set_cache_value(key, v, type),
- lambda x: x._del_cache_value(key),
- "accessor for %r" % key,
- )
-
-
-class _CacheControl(UpdateDictMixin, dict):
- """Subclass of a dict that stores values for a Cache-Control header. It
- has accessors for all the cache-control directives specified in RFC 2616.
- The class does not differentiate between request and response directives.
-
- Because the cache-control directives in the HTTP header use dashes the
- python descriptors use underscores for that.
-
- To get a header of the :class:`CacheControl` object again you can convert
- the object into a string or call the :meth:`to_header` method. If you plan
- to subclass it and add your own items have a look at the sourcecode for
- that class.
-
- .. versionchanged:: 0.4
-
- Setting `no_cache` or `private` to boolean `True` will set the implicit
- none-value which is ``*``:
-
- >>> cc = ResponseCacheControl()
- >>> cc.no_cache = True
- >>> cc
- <ResponseCacheControl 'no-cache'>
- >>> cc.no_cache
- '*'
- >>> cc.no_cache = None
- >>> cc
- <ResponseCacheControl ''>
-
- In versions before 0.5 the behavior documented here affected the now
- no longer existing `CacheControl` class.
- """
-
- no_cache = cache_property("no-cache", "*", None)
- no_store = cache_property("no-store", None, bool)
- max_age = cache_property("max-age", -1, int)
- no_transform = cache_property("no-transform", None, None)
-
- def __init__(self, values=(), on_update=None):
- dict.__init__(self, values or ())
- self.on_update = on_update
- self.provided = values is not None
-
- def _get_cache_value(self, key, empty, type):
- """Used internally by the accessor properties."""
- if type is bool:
- return key in self
- if key in self:
- value = self[key]
- if value is None:
- return empty
- elif type is not None:
- try:
- value = type(value)
- except ValueError:
- pass
- return value
-
- def _set_cache_value(self, key, value, type):
- """Used internally by the accessor properties."""
- if type is bool:
- if value:
- self[key] = None
- else:
- self.pop(key, None)
- else:
- if value is None:
- self.pop(key)
- elif value is True:
- self[key] = None
- else:
- self[key] = value
-
- def _del_cache_value(self, key):
- """Used internally by the accessor properties."""
- if key in self:
- del self[key]
-
- def to_header(self):
- """Convert the stored values into a cache control header."""
- return dump_header(self)
-
- def __str__(self):
- return self.to_header()
-
- def __repr__(self):
- return "<%s %s>" % (
- self.__class__.__name__,
- " ".join("%s=%r" % (k, v) for k, v in sorted(self.items())),
- )
-
-
-class RequestCacheControl(ImmutableDictMixin, _CacheControl):
- """A cache control for requests. This is immutable and gives access
- to all the request-relevant cache control headers.
-
- To get a header of the :class:`RequestCacheControl` object again you can
- convert the object into a string or call the :meth:`to_header` method. If
- you plan to subclass it and add your own items have a look at the sourcecode
- for that class.
-
- .. versionadded:: 0.5
- In previous versions a `CacheControl` class existed that was used
- both for request and response.
- """
-
- max_stale = cache_property("max-stale", "*", int)
- min_fresh = cache_property("min-fresh", "*", int)
- no_transform = cache_property("no-transform", None, None)
- only_if_cached = cache_property("only-if-cached", None, bool)
-
-
-class ResponseCacheControl(_CacheControl):
- """A cache control for responses. Unlike :class:`RequestCacheControl`
- this is mutable and gives access to response-relevant cache control
- headers.
-
- To get a header of the :class:`ResponseCacheControl` object again you can
- convert the object into a string or call the :meth:`to_header` method. If
- you plan to subclass it and add your own items have a look at the sourcecode
- for that class.
-
- .. versionadded:: 0.5
- In previous versions a `CacheControl` class existed that was used
- both for request and response.
- """
-
- public = cache_property("public", None, bool)
- private = cache_property("private", "*", None)
- must_revalidate = cache_property("must-revalidate", None, bool)
- proxy_revalidate = cache_property("proxy-revalidate", None, bool)
- s_maxage = cache_property("s-maxage", None, None)
-
-
-# attach cache_property to the _CacheControl as staticmethod
-# so that others can reuse it.
-_CacheControl.cache_property = staticmethod(cache_property)
-
-
-class CallbackDict(UpdateDictMixin, dict):
- """A dict that calls a function passed every time something is changed.
- The function is passed the dict instance.
- """
-
- def __init__(self, initial=None, on_update=None):
- dict.__init__(self, initial or ())
- self.on_update = on_update
-
- def __repr__(self):
- return "<%s %s>" % (self.__class__.__name__, dict.__repr__(self))
-
-
-class HeaderSet(collections_abc.MutableSet):
- """Similar to the :class:`ETags` class this implements a set-like structure.
- Unlike :class:`ETags` this is case insensitive and used for vary, allow, and
- content-language headers.
-
- If not constructed using the :func:`parse_set_header` function the
- instantiation works like this:
-
- >>> hs = HeaderSet(['foo', 'bar', 'baz'])
- >>> hs
- HeaderSet(['foo', 'bar', 'baz'])
- """
-
- def __init__(self, headers=None, on_update=None):
- self._headers = list(headers or ())
- self._set = set([x.lower() for x in self._headers])
- self.on_update = on_update
-
- def add(self, header):
- """Add a new header to the set."""
- self.update((header,))
-
- def remove(self, header):
- """Remove a header from the set. This raises an :exc:`KeyError` if the
- header is not in the set.
-
- .. versionchanged:: 0.5
- In older versions a :exc:`IndexError` was raised instead of a
- :exc:`KeyError` if the object was missing.
-
- :param header: the header to be removed.
- """
- key = header.lower()
- if key not in self._set:
- raise KeyError(header)
- self._set.remove(key)
- for idx, key in enumerate(self._headers):
- if key.lower() == header:
- del self._headers[idx]
- break
- if self.on_update is not None:
- self.on_update(self)
-
- def update(self, iterable):
- """Add all the headers from the iterable to the set.
-
- :param iterable: updates the set with the items from the iterable.
- """
- inserted_any = False
- for header in iterable:
- key = header.lower()
- if key not in self._set:
- self._headers.append(header)
- self._set.add(key)
- inserted_any = True
- if inserted_any and self.on_update is not None:
- self.on_update(self)
-
- def discard(self, header):
- """Like :meth:`remove` but ignores errors.
-
- :param header: the header to be discarded.
- """
- try:
- return self.remove(header)
- except KeyError:
- pass
-
- def find(self, header):
- """Return the index of the header in the set or return -1 if not found.
-
- :param header: the header to be looked up.
- """
- header = header.lower()
- for idx, item in enumerate(self._headers):
- if item.lower() == header:
- return idx
- return -1
-
- def index(self, header):
- """Return the index of the header in the set or raise an
- :exc:`IndexError`.
-
- :param header: the header to be looked up.
- """
- rv = self.find(header)
- if rv < 0:
- raise IndexError(header)
- return rv
-
- def clear(self):
- """Clear the set."""
- self._set.clear()
- del self._headers[:]
- if self.on_update is not None:
- self.on_update(self)
-
- def as_set(self, preserve_casing=False):
- """Return the set as real python set type. When calling this, all
- the items are converted to lowercase and the ordering is lost.
-
- :param preserve_casing: if set to `True` the items in the set returned
- will have the original case like in the
- :class:`HeaderSet`, otherwise they will
- be lowercase.
- """
- if preserve_casing:
- return set(self._headers)
- return set(self._set)
-
- def to_header(self):
- """Convert the header set into an HTTP header string."""
- return ", ".join(map(quote_header_value, self._headers))
-
- def __getitem__(self, idx):
- return self._headers[idx]
-
- def __delitem__(self, idx):
- rv = self._headers.pop(idx)
- self._set.remove(rv.lower())
- if self.on_update is not None:
- self.on_update(self)
-
- def __setitem__(self, idx, value):
- old = self._headers[idx]
- self._set.remove(old.lower())
- self._headers[idx] = value
- self._set.add(value.lower())
- if self.on_update is not None:
- self.on_update(self)
-
- def __contains__(self, header):
- return header.lower() in self._set
-
- def __len__(self):
- return len(self._set)
-
- def __iter__(self):
- return iter(self._headers)
-
- def __nonzero__(self):
- return bool(self._set)
-
- def __str__(self):
- return self.to_header()
-
- def __repr__(self):
- return "%s(%r)" % (self.__class__.__name__, self._headers)
-
-
-class ETags(collections_abc.Container, collections_abc.Iterable):
- """A set that can be used to check if one etag is present in a collection
- of etags.
- """
-
- def __init__(self, strong_etags=None, weak_etags=None, star_tag=False):
- self._strong = frozenset(not star_tag and strong_etags or ())
- self._weak = frozenset(weak_etags or ())
- self.star_tag = star_tag
-
- def as_set(self, include_weak=False):
- """Convert the `ETags` object into a python set. Per default all the
- weak etags are not part of this set."""
- rv = set(self._strong)
- if include_weak:
- rv.update(self._weak)
- return rv
-
- def is_weak(self, etag):
- """Check if an etag is weak."""
- return etag in self._weak
-
- def is_strong(self, etag):
- """Check if an etag is strong."""
- return etag in self._strong
-
- def contains_weak(self, etag):
- """Check if an etag is part of the set including weak and strong tags."""
- return self.is_weak(etag) or self.contains(etag)
-
- def contains(self, etag):
- """Check if an etag is part of the set ignoring weak tags.
- It is also possible to use the ``in`` operator.
- """
- if self.star_tag:
- return True
- return self.is_strong(etag)
-
- def contains_raw(self, etag):
- """When passed a quoted tag it will check if this tag is part of the
- set. If the tag is weak it is checked against weak and strong tags,
- otherwise strong only."""
- etag, weak = unquote_etag(etag)
- if weak:
- return self.contains_weak(etag)
- return self.contains(etag)
-
- def to_header(self):
- """Convert the etags set into a HTTP header string."""
- if self.star_tag:
- return "*"
- return ", ".join(
- ['"%s"' % x for x in self._strong] + ['W/"%s"' % x for x in self._weak]
- )
-
- def __call__(self, etag=None, data=None, include_weak=False):
- if [etag, data].count(None) != 1:
- raise TypeError("either tag or data required, but at least one")
- if etag is None:
- etag = generate_etag(data)
- if include_weak:
- if etag in self._weak:
- return True
- return etag in self._strong
-
- def __bool__(self):
- return bool(self.star_tag or self._strong or self._weak)
-
- __nonzero__ = __bool__
-
- def __str__(self):
- return self.to_header()
-
- def __iter__(self):
- return iter(self._strong)
-
- def __contains__(self, etag):
- return self.contains(etag)
-
- def __repr__(self):
- return "<%s %r>" % (self.__class__.__name__, str(self))
-
-
-class IfRange(object):
- """Very simple object that represents the `If-Range` header in parsed
- form. It will either have neither a etag or date or one of either but
- never both.
-
- .. versionadded:: 0.7
- """
-
- def __init__(self, etag=None, date=None):
- #: The etag parsed and unquoted. Ranges always operate on strong
- #: etags so the weakness information is not necessary.
- self.etag = etag
- #: The date in parsed format or `None`.
- self.date = date
-
- def to_header(self):
- """Converts the object back into an HTTP header."""
- if self.date is not None:
- return http_date(self.date)
- if self.etag is not None:
- return quote_etag(self.etag)
- return ""
-
- def __str__(self):
- return self.to_header()
-
- def __repr__(self):
- return "<%s %r>" % (self.__class__.__name__, str(self))
-
-
-class Range(object):
- """Represents a ``Range`` header. All methods only support only
- bytes as the unit. Stores a list of ranges if given, but the methods
- only work if only one range is provided.
-
- :raise ValueError: If the ranges provided are invalid.
-
- .. versionchanged:: 0.15
- The ranges passed in are validated.
-
- .. versionadded:: 0.7
- """
-
- def __init__(self, units, ranges):
- #: The units of this range. Usually "bytes".
- self.units = units
- #: A list of ``(begin, end)`` tuples for the range header provided.
- #: The ranges are non-inclusive.
- self.ranges = ranges
-
- for start, end in ranges:
- if start is None or (end is not None and (start < 0 or start >= end)):
- raise ValueError("{} is not a valid range.".format((start, end)))
-
- def range_for_length(self, length):
- """If the range is for bytes, the length is not None and there is
- exactly one range and it is satisfiable it returns a ``(start, stop)``
- tuple, otherwise `None`.
- """
- if self.units != "bytes" or length is None or len(self.ranges) != 1:
- return None
- start, end = self.ranges[0]
- if end is None:
- end = length
- if start < 0:
- start += length
- if is_byte_range_valid(start, end, length):
- return start, min(end, length)
-
- def make_content_range(self, length):
- """Creates a :class:`~werkzeug.datastructures.ContentRange` object
- from the current range and given content length.
- """
- rng = self.range_for_length(length)
- if rng is not None:
- return ContentRange(self.units, rng[0], rng[1], length)
-
- def to_header(self):
- """Converts the object back into an HTTP header."""
- ranges = []
- for begin, end in self.ranges:
- if end is None:
- ranges.append("%s-" % begin if begin >= 0 else str(begin))
- else:
- ranges.append("%s-%s" % (begin, end - 1))
- return "%s=%s" % (self.units, ",".join(ranges))
-
- def to_content_range_header(self, length):
- """Converts the object into `Content-Range` HTTP header,
- based on given length
- """
- range_for_length = self.range_for_length(length)
- if range_for_length is not None:
- return "%s %d-%d/%d" % (
- self.units,
- range_for_length[0],
- range_for_length[1] - 1,
- length,
- )
- return None
-
- def __str__(self):
- return self.to_header()
-
- def __repr__(self):
- return "<%s %r>" % (self.__class__.__name__, str(self))
-
-
-class ContentRange(object):
- """Represents the content range header.
-
- .. versionadded:: 0.7
- """
-
- def __init__(self, units, start, stop, length=None, on_update=None):
- assert is_byte_range_valid(start, stop, length), "Bad range provided"
- self.on_update = on_update
- self.set(start, stop, length, units)
-
- def _callback_property(name): # noqa: B902
- def fget(self):
- return getattr(self, name)
-
- def fset(self, value):
- setattr(self, name, value)
- if self.on_update is not None:
- self.on_update(self)
-
- return property(fget, fset)
-
- #: The units to use, usually "bytes"
- units = _callback_property("_units")
- #: The start point of the range or `None`.
- start = _callback_property("_start")
- #: The stop point of the range (non-inclusive) or `None`. Can only be
- #: `None` if also start is `None`.
- stop = _callback_property("_stop")
- #: The length of the range or `None`.
- length = _callback_property("_length")
- del _callback_property
-
- def set(self, start, stop, length=None, units="bytes"):
- """Simple method to update the ranges."""
- assert is_byte_range_valid(start, stop, length), "Bad range provided"
- self._units = units
- self._start = start
- self._stop = stop
- self._length = length
- if self.on_update is not None:
- self.on_update(self)
-
- def unset(self):
- """Sets the units to `None` which indicates that the header should
- no longer be used.
- """
- self.set(None, None, units=None)
-
- def to_header(self):
- if self.units is None:
- return ""
- if self.length is None:
- length = "*"
- else:
- length = self.length
- if self.start is None:
- return "%s */%s" % (self.units, length)
- return "%s %s-%s/%s" % (self.units, self.start, self.stop - 1, length)
-
- def __nonzero__(self):
- return self.units is not None
-
- __bool__ = __nonzero__
-
- def __str__(self):
- return self.to_header()
-
- def __repr__(self):
- return "<%s %r>" % (self.__class__.__name__, str(self))
-
-
-class Authorization(ImmutableDictMixin, dict):
- """Represents an `Authorization` header sent by the client. You should
- not create this kind of object yourself but use it when it's returned by
- the `parse_authorization_header` function.
-
- This object is a dict subclass and can be altered by setting dict items
- but it should be considered immutable as it's returned by the client and
- not meant for modifications.
-
- .. versionchanged:: 0.5
- This object became immutable.
- """
-
- def __init__(self, auth_type, data=None):
- dict.__init__(self, data or {})
- self.type = auth_type
-
- username = property(
- lambda self: self.get("username"),
- doc="""
- The username transmitted. This is set for both basic and digest
- auth all the time.""",
- )
- password = property(
- lambda self: self.get("password"),
- doc="""
- When the authentication type is basic this is the password
- transmitted by the client, else `None`.""",
- )
- realm = property(
- lambda self: self.get("realm"),
- doc="""
- This is the server realm sent back for HTTP digest auth.""",
- )
- nonce = property(
- lambda self: self.get("nonce"),
- doc="""
- The nonce the server sent for digest auth, sent back by the client.
- A nonce should be unique for every 401 response for HTTP digest
- auth.""",
- )
- uri = property(
- lambda self: self.get("uri"),
- doc="""
- The URI from Request-URI of the Request-Line; duplicated because
- proxies are allowed to change the Request-Line in transit. HTTP
- digest auth only.""",
- )
- nc = property(
- lambda self: self.get("nc"),
- doc="""
- The nonce count value transmitted by clients if a qop-header is
- also transmitted. HTTP digest auth only.""",
- )
- cnonce = property(
- lambda self: self.get("cnonce"),
- doc="""
- If the server sent a qop-header in the ``WWW-Authenticate``
- header, the client has to provide this value for HTTP digest auth.
- See the RFC for more details.""",
- )
- response = property(
- lambda self: self.get("response"),
- doc="""
- A string of 32 hex digits computed as defined in RFC 2617, which
- proves that the user knows a password. Digest auth only.""",
- )
- opaque = property(
- lambda self: self.get("opaque"),
- doc="""
- The opaque header from the server returned unchanged by the client.
- It is recommended that this string be base64 or hexadecimal data.
- Digest auth only.""",
- )
- qop = property(
- lambda self: self.get("qop"),
- doc="""
- Indicates what "quality of protection" the client has applied to
- the message for HTTP digest auth. Note that this is a single token,
- not a quoted list of alternatives as in WWW-Authenticate.""",
- )
-
-
-class WWWAuthenticate(UpdateDictMixin, dict):
- """Provides simple access to `WWW-Authenticate` headers."""
-
- #: list of keys that require quoting in the generated header
- _require_quoting = frozenset(["domain", "nonce", "opaque", "realm", "qop"])
-
- def __init__(self, auth_type=None, values=None, on_update=None):
- dict.__init__(self, values or ())
- if auth_type:
- self["__auth_type__"] = auth_type
- self.on_update = on_update
-
- def set_basic(self, realm="authentication required"):
- """Clear the auth info and enable basic auth."""
- dict.clear(self)
- dict.update(self, {"__auth_type__": "basic", "realm": realm})
- if self.on_update:
- self.on_update(self)
-
- def set_digest(
- self, realm, nonce, qop=("auth",), opaque=None, algorithm=None, stale=False
- ):
- """Clear the auth info and enable digest auth."""
- d = {
- "__auth_type__": "digest",
- "realm": realm,
- "nonce": nonce,
- "qop": dump_header(qop),
- }
- if stale:
- d["stale"] = "TRUE"
- if opaque is not None:
- d["opaque"] = opaque
- if algorithm is not None:
- d["algorithm"] = algorithm
- dict.clear(self)
- dict.update(self, d)
- if self.on_update:
- self.on_update(self)
-
- def to_header(self):
- """Convert the stored values into a WWW-Authenticate header."""
- d = dict(self)
- auth_type = d.pop("__auth_type__", None) or "basic"
- return "%s %s" % (
- auth_type.title(),
- ", ".join(
- [
- "%s=%s"
- % (
- key,
- quote_header_value(
- value, allow_token=key not in self._require_quoting
- ),
- )
- for key, value in iteritems(d)
- ]
- ),
- )
-
- def __str__(self):
- return self.to_header()
-
- def __repr__(self):
- return "<%s %r>" % (self.__class__.__name__, self.to_header())
-
- def auth_property(name, doc=None): # noqa: B902
- """A static helper function for subclasses to add extra authentication
- system properties onto a class::
-
- class FooAuthenticate(WWWAuthenticate):
- special_realm = auth_property('special_realm')
-
- For more information have a look at the sourcecode to see how the
- regular properties (:attr:`realm` etc.) are implemented.
- """
-
- def _set_value(self, value):
- if value is None:
- self.pop(name, None)
- else:
- self[name] = str(value)
-
- return property(lambda x: x.get(name), _set_value, doc=doc)
-
- def _set_property(name, doc=None): # noqa: B902
- def fget(self):
- def on_update(header_set):
- if not header_set and name in self:
- del self[name]
- elif header_set:
- self[name] = header_set.to_header()
-
- return parse_set_header(self.get(name), on_update)
-
- return property(fget, doc=doc)
-
- type = auth_property(
- "__auth_type__",
- doc="""The type of the auth mechanism. HTTP currently specifies
- ``Basic`` and ``Digest``.""",
- )
- realm = auth_property(
- "realm",
- doc="""A string to be displayed to users so they know which
- username and password to use. This string should contain at
- least the name of the host performing the authentication and
- might additionally indicate the collection of users who might
- have access.""",
- )
- domain = _set_property(
- "domain",
- doc="""A list of URIs that define the protection space. If a URI
- is an absolute path, it is relative to the canonical root URL of
- the server being accessed.""",
- )
- nonce = auth_property(
- "nonce",
- doc="""
- A server-specified data string which should be uniquely generated
- each time a 401 response is made. It is recommended that this
- string be base64 or hexadecimal data.""",
- )
- opaque = auth_property(
- "opaque",
- doc="""A string of data, specified by the server, which should
- be returned by the client unchanged in the Authorization header
- of subsequent requests with URIs in the same protection space.
- It is recommended that this string be base64 or hexadecimal
- data.""",
- )
- algorithm = auth_property(
- "algorithm",
- doc="""A string indicating a pair of algorithms used to produce
- the digest and a checksum. If this is not present it is assumed
- to be "MD5". If the algorithm is not understood, the challenge
- should be ignored (and a different one used, if there is more
- than one).""",
- )
- qop = _set_property(
- "qop",
- doc="""A set of quality-of-privacy directives such as auth and
- auth-int.""",
- )
-
- @property
- def stale(self):
- """A flag, indicating that the previous request from the client
- was rejected because the nonce value was stale.
- """
- val = self.get("stale")
- if val is not None:
- return val.lower() == "true"
-
- @stale.setter
- def stale(self, value):
- if value is None:
- self.pop("stale", None)
- else:
- self["stale"] = "TRUE" if value else "FALSE"
-
- auth_property = staticmethod(auth_property)
- del _set_property
-
-
-class FileStorage(object):
- """The :class:`FileStorage` class is a thin wrapper over incoming files.
- It is used by the request object to represent uploaded files. All the
- attributes of the wrapper stream are proxied by the file storage so
- it's possible to do ``storage.read()`` instead of the long form
- ``storage.stream.read()``.
- """
-
- def __init__(
- self,
- stream=None,
- filename=None,
- name=None,
- content_type=None,
- content_length=None,
- headers=None,
- ):
- self.name = name
- self.stream = stream or BytesIO()
-
- # if no filename is provided we can attempt to get the filename
- # from the stream object passed. There we have to be careful to
- # skip things like <fdopen>, <stderr> etc. Python marks these
- # special filenames with angular brackets.
- if filename is None:
- filename = getattr(stream, "name", None)
- s = make_literal_wrapper(filename)
- if filename and filename[0] == s("<") and filename[-1] == s(">"):
- filename = None
-
- # On Python 3 we want to make sure the filename is always unicode.
- # This might not be if the name attribute is bytes due to the
- # file being opened from the bytes API.
- if not PY2 and isinstance(filename, bytes):
- filename = filename.decode(get_filesystem_encoding(), "replace")
-
- self.filename = filename
- if headers is None:
- headers = Headers()
- self.headers = headers
- if content_type is not None:
- headers["Content-Type"] = content_type
- if content_length is not None:
- headers["Content-Length"] = str(content_length)
-
- def _parse_content_type(self):
- if not hasattr(self, "_parsed_content_type"):
- self._parsed_content_type = parse_options_header(self.content_type)
-
- @property
- def content_type(self):
- """The content-type sent in the header. Usually not available"""
- return self.headers.get("content-type")
-
- @property
- def content_length(self):
- """The content-length sent in the header. Usually not available"""
- return int(self.headers.get("content-length") or 0)
-
- @property
- def mimetype(self):
- """Like :attr:`content_type`, but without parameters (eg, without
- charset, type etc.) and always lowercase. For example if the content
- type is ``text/HTML; charset=utf-8`` the mimetype would be
- ``'text/html'``.
-
- .. versionadded:: 0.7
- """
- self._parse_content_type()
- return self._parsed_content_type[0].lower()
-
- @property
- def mimetype_params(self):
- """The mimetype parameters as dict. For example if the content
- type is ``text/html; charset=utf-8`` the params would be
- ``{'charset': 'utf-8'}``.
-
- .. versionadded:: 0.7
- """
- self._parse_content_type()
- return self._parsed_content_type[1]
-
- def save(self, dst, buffer_size=16384):
- """Save the file to a destination path or file object. If the
- destination is a file object you have to close it yourself after the
- call. The buffer size is the number of bytes held in memory during
- the copy process. It defaults to 16KB.
-
- For secure file saving also have a look at :func:`secure_filename`.
-
- :param dst: a filename or open file object the uploaded file
- is saved to.
- :param buffer_size: the size of the buffer. This works the same as
- the `length` parameter of
- :func:`shutil.copyfileobj`.
- """
- from shutil import copyfileobj
-
- close_dst = False
- if isinstance(dst, string_types):
- dst = open(dst, "wb")
- close_dst = True
- try:
- copyfileobj(self.stream, dst, buffer_size)
- finally:
- if close_dst:
- dst.close()
-
- def close(self):
- """Close the underlying file if possible."""
- try:
- self.stream.close()
- except Exception:
- pass
-
- def __nonzero__(self):
- return bool(self.filename)
-
- __bool__ = __nonzero__
-
- def __getattr__(self, name):
- try:
- return getattr(self.stream, name)
- except AttributeError:
- # SpooledTemporaryFile doesn't implement IOBase, get the
- # attribute from its backing file instead.
- # https://github.com/python/cpython/pull/3249
- if hasattr(self.stream, "_file"):
- return getattr(self.stream._file, name)
- raise
-
- def __iter__(self):
- return iter(self.stream)
-
- def __repr__(self):
- return "<%s: %r (%r)>" % (
- self.__class__.__name__,
- self.filename,
- self.content_type,
- )
-
-
-# circular dependencies
-from . import exceptions
-from .http import dump_header
-from .http import dump_options_header
-from .http import generate_etag
-from .http import http_date
-from .http import is_byte_range_valid
-from .http import parse_options_header
-from .http import parse_set_header
-from .http import quote_etag
-from .http import quote_header_value
-from .http import unquote_etag