aboutsummaryrefslogtreecommitdiffstats
path: root/python/urllib3/contrib/_securetransport/low_level.py
diff options
context:
space:
mode:
authorJames Taylor <user234683@users.noreply.github.com>2019-09-06 16:31:13 -0700
committerJames Taylor <user234683@users.noreply.github.com>2019-09-06 16:31:13 -0700
commit3d57e14df7ba5f14a634295caf3b2e60da50bfe2 (patch)
tree4903bcb79a49ad714a1a9129765b9545405c9978 /python/urllib3/contrib/_securetransport/low_level.py
parentac32b24b2a011292b704a3f27e8fd08a7ae9424b (diff)
downloadyt-local-3d57e14df7ba5f14a634295caf3b2e60da50bfe2.tar.lz
yt-local-3d57e14df7ba5f14a634295caf3b2e60da50bfe2.tar.xz
yt-local-3d57e14df7ba5f14a634295caf3b2e60da50bfe2.zip
Remove windows python distribution from repo and add requirements.txt
Diffstat (limited to 'python/urllib3/contrib/_securetransport/low_level.py')
-rw-r--r--python/urllib3/contrib/_securetransport/low_level.py346
1 files changed, 0 insertions, 346 deletions
diff --git a/python/urllib3/contrib/_securetransport/low_level.py b/python/urllib3/contrib/_securetransport/low_level.py
deleted file mode 100644
index b13cd9e..0000000
--- a/python/urllib3/contrib/_securetransport/low_level.py
+++ /dev/null
@@ -1,346 +0,0 @@
-"""
-Low-level helpers for the SecureTransport bindings.
-
-These are Python functions that are not directly related to the high-level APIs
-but are necessary to get them to work. They include a whole bunch of low-level
-CoreFoundation messing about and memory management. The concerns in this module
-are almost entirely about trying to avoid memory leaks and providing
-appropriate and useful assistance to the higher-level code.
-"""
-import base64
-import ctypes
-import itertools
-import re
-import os
-import ssl
-import tempfile
-
-from .bindings import Security, CoreFoundation, CFConst
-
-
-# This regular expression is used to grab PEM data out of a PEM bundle.
-_PEM_CERTS_RE = re.compile(
- b"-----BEGIN CERTIFICATE-----\n(.*?)\n-----END CERTIFICATE-----", re.DOTALL
-)
-
-
-def _cf_data_from_bytes(bytestring):
- """
- Given a bytestring, create a CFData object from it. This CFData object must
- be CFReleased by the caller.
- """
- return CoreFoundation.CFDataCreate(
- CoreFoundation.kCFAllocatorDefault, bytestring, len(bytestring)
- )
-
-
-def _cf_dictionary_from_tuples(tuples):
- """
- Given a list of Python tuples, create an associated CFDictionary.
- """
- dictionary_size = len(tuples)
-
- # We need to get the dictionary keys and values out in the same order.
- keys = (t[0] for t in tuples)
- values = (t[1] for t in tuples)
- cf_keys = (CoreFoundation.CFTypeRef * dictionary_size)(*keys)
- cf_values = (CoreFoundation.CFTypeRef * dictionary_size)(*values)
-
- return CoreFoundation.CFDictionaryCreate(
- CoreFoundation.kCFAllocatorDefault,
- cf_keys,
- cf_values,
- dictionary_size,
- CoreFoundation.kCFTypeDictionaryKeyCallBacks,
- CoreFoundation.kCFTypeDictionaryValueCallBacks,
- )
-
-
-def _cf_string_to_unicode(value):
- """
- Creates a Unicode string from a CFString object. Used entirely for error
- reporting.
-
- Yes, it annoys me quite a lot that this function is this complex.
- """
- value_as_void_p = ctypes.cast(value, ctypes.POINTER(ctypes.c_void_p))
-
- string = CoreFoundation.CFStringGetCStringPtr(
- value_as_void_p,
- CFConst.kCFStringEncodingUTF8
- )
- if string is None:
- buffer = ctypes.create_string_buffer(1024)
- result = CoreFoundation.CFStringGetCString(
- value_as_void_p,
- buffer,
- 1024,
- CFConst.kCFStringEncodingUTF8
- )
- if not result:
- raise OSError('Error copying C string from CFStringRef')
- string = buffer.value
- if string is not None:
- string = string.decode('utf-8')
- return string
-
-
-def _assert_no_error(error, exception_class=None):
- """
- Checks the return code and throws an exception if there is an error to
- report
- """
- if error == 0:
- return
-
- cf_error_string = Security.SecCopyErrorMessageString(error, None)
- output = _cf_string_to_unicode(cf_error_string)
- CoreFoundation.CFRelease(cf_error_string)
-
- if output is None or output == u'':
- output = u'OSStatus %s' % error
-
- if exception_class is None:
- exception_class = ssl.SSLError
-
- raise exception_class(output)
-
-
-def _cert_array_from_pem(pem_bundle):
- """
- Given a bundle of certs in PEM format, turns them into a CFArray of certs
- that can be used to validate a cert chain.
- """
- # Normalize the PEM bundle's line endings.
- pem_bundle = pem_bundle.replace(b"\r\n", b"\n")
-
- der_certs = [
- base64.b64decode(match.group(1))
- for match in _PEM_CERTS_RE.finditer(pem_bundle)
- ]
- if not der_certs:
- raise ssl.SSLError("No root certificates specified")
-
- cert_array = CoreFoundation.CFArrayCreateMutable(
- CoreFoundation.kCFAllocatorDefault,
- 0,
- ctypes.byref(CoreFoundation.kCFTypeArrayCallBacks)
- )
- if not cert_array:
- raise ssl.SSLError("Unable to allocate memory!")
-
- try:
- for der_bytes in der_certs:
- certdata = _cf_data_from_bytes(der_bytes)
- if not certdata:
- raise ssl.SSLError("Unable to allocate memory!")
- cert = Security.SecCertificateCreateWithData(
- CoreFoundation.kCFAllocatorDefault, certdata
- )
- CoreFoundation.CFRelease(certdata)
- if not cert:
- raise ssl.SSLError("Unable to build cert object!")
-
- CoreFoundation.CFArrayAppendValue(cert_array, cert)
- CoreFoundation.CFRelease(cert)
- except Exception:
- # We need to free the array before the exception bubbles further.
- # We only want to do that if an error occurs: otherwise, the caller
- # should free.
- CoreFoundation.CFRelease(cert_array)
-
- return cert_array
-
-
-def _is_cert(item):
- """
- Returns True if a given CFTypeRef is a certificate.
- """
- expected = Security.SecCertificateGetTypeID()
- return CoreFoundation.CFGetTypeID(item) == expected
-
-
-def _is_identity(item):
- """
- Returns True if a given CFTypeRef is an identity.
- """
- expected = Security.SecIdentityGetTypeID()
- return CoreFoundation.CFGetTypeID(item) == expected
-
-
-def _temporary_keychain():
- """
- This function creates a temporary Mac keychain that we can use to work with
- credentials. This keychain uses a one-time password and a temporary file to
- store the data. We expect to have one keychain per socket. The returned
- SecKeychainRef must be freed by the caller, including calling
- SecKeychainDelete.
-
- Returns a tuple of the SecKeychainRef and the path to the temporary
- directory that contains it.
- """
- # Unfortunately, SecKeychainCreate requires a path to a keychain. This
- # means we cannot use mkstemp to use a generic temporary file. Instead,
- # we're going to create a temporary directory and a filename to use there.
- # This filename will be 8 random bytes expanded into base64. We also need
- # some random bytes to password-protect the keychain we're creating, so we
- # ask for 40 random bytes.
- random_bytes = os.urandom(40)
- filename = base64.b16encode(random_bytes[:8]).decode('utf-8')
- password = base64.b16encode(random_bytes[8:]) # Must be valid UTF-8
- tempdirectory = tempfile.mkdtemp()
-
- keychain_path = os.path.join(tempdirectory, filename).encode('utf-8')
-
- # We now want to create the keychain itself.
- keychain = Security.SecKeychainRef()
- status = Security.SecKeychainCreate(
- keychain_path,
- len(password),
- password,
- False,
- None,
- ctypes.byref(keychain)
- )
- _assert_no_error(status)
-
- # Having created the keychain, we want to pass it off to the caller.
- return keychain, tempdirectory
-
-
-def _load_items_from_file(keychain, path):
- """
- Given a single file, loads all the trust objects from it into arrays and
- the keychain.
- Returns a tuple of lists: the first list is a list of identities, the
- second a list of certs.
- """
- certificates = []
- identities = []
- result_array = None
-
- with open(path, 'rb') as f:
- raw_filedata = f.read()
-
- try:
- filedata = CoreFoundation.CFDataCreate(
- CoreFoundation.kCFAllocatorDefault,
- raw_filedata,
- len(raw_filedata)
- )
- result_array = CoreFoundation.CFArrayRef()
- result = Security.SecItemImport(
- filedata, # cert data
- None, # Filename, leaving it out for now
- None, # What the type of the file is, we don't care
- None, # what's in the file, we don't care
- 0, # import flags
- None, # key params, can include passphrase in the future
- keychain, # The keychain to insert into
- ctypes.byref(result_array) # Results
- )
- _assert_no_error(result)
-
- # A CFArray is not very useful to us as an intermediary
- # representation, so we are going to extract the objects we want
- # and then free the array. We don't need to keep hold of keys: the
- # keychain already has them!
- result_count = CoreFoundation.CFArrayGetCount(result_array)
- for index in range(result_count):
- item = CoreFoundation.CFArrayGetValueAtIndex(
- result_array, index
- )
- item = ctypes.cast(item, CoreFoundation.CFTypeRef)
-
- if _is_cert(item):
- CoreFoundation.CFRetain(item)
- certificates.append(item)
- elif _is_identity(item):
- CoreFoundation.CFRetain(item)
- identities.append(item)
- finally:
- if result_array:
- CoreFoundation.CFRelease(result_array)
-
- CoreFoundation.CFRelease(filedata)
-
- return (identities, certificates)
-
-
-def _load_client_cert_chain(keychain, *paths):
- """
- Load certificates and maybe keys from a number of files. Has the end goal
- of returning a CFArray containing one SecIdentityRef, and then zero or more
- SecCertificateRef objects, suitable for use as a client certificate trust
- chain.
- """
- # Ok, the strategy.
- #
- # This relies on knowing that macOS will not give you a SecIdentityRef
- # unless you have imported a key into a keychain. This is a somewhat
- # artificial limitation of macOS (for example, it doesn't necessarily
- # affect iOS), but there is nothing inside Security.framework that lets you
- # get a SecIdentityRef without having a key in a keychain.
- #
- # So the policy here is we take all the files and iterate them in order.
- # Each one will use SecItemImport to have one or more objects loaded from
- # it. We will also point at a keychain that macOS can use to work with the
- # private key.
- #
- # Once we have all the objects, we'll check what we actually have. If we
- # already have a SecIdentityRef in hand, fab: we'll use that. Otherwise,
- # we'll take the first certificate (which we assume to be our leaf) and
- # ask the keychain to give us a SecIdentityRef with that cert's associated
- # key.
- #
- # We'll then return a CFArray containing the trust chain: one
- # SecIdentityRef and then zero-or-more SecCertificateRef objects. The
- # responsibility for freeing this CFArray will be with the caller. This
- # CFArray must remain alive for the entire connection, so in practice it
- # will be stored with a single SSLSocket, along with the reference to the
- # keychain.
- certificates = []
- identities = []
-
- # Filter out bad paths.
- paths = (path for path in paths if path)
-
- try:
- for file_path in paths:
- new_identities, new_certs = _load_items_from_file(
- keychain, file_path
- )
- identities.extend(new_identities)
- certificates.extend(new_certs)
-
- # Ok, we have everything. The question is: do we have an identity? If
- # not, we want to grab one from the first cert we have.
- if not identities:
- new_identity = Security.SecIdentityRef()
- status = Security.SecIdentityCreateWithCertificate(
- keychain,
- certificates[0],
- ctypes.byref(new_identity)
- )
- _assert_no_error(status)
- identities.append(new_identity)
-
- # We now want to release the original certificate, as we no longer
- # need it.
- CoreFoundation.CFRelease(certificates.pop(0))
-
- # We now need to build a new CFArray that holds the trust chain.
- trust_chain = CoreFoundation.CFArrayCreateMutable(
- CoreFoundation.kCFAllocatorDefault,
- 0,
- ctypes.byref(CoreFoundation.kCFTypeArrayCallBacks),
- )
- for item in itertools.chain(identities, certificates):
- # ArrayAppendValue does a CFRetain on the item. That's fine,
- # because the finally block will release our other refs to them.
- CoreFoundation.CFArrayAppendValue(trust_chain, item)
-
- return trust_chain
- finally:
- for obj in itertools.chain(identities, certificates):
- CoreFoundation.CFRelease(obj)