From 79b657a8e0bfe0938bec8783a5533509347fa12a Mon Sep 17 00:00:00 2001 From: Tom Most Date: Mon, 1 Jul 2024 13:11:40 -0700 Subject: [PATCH] Revert "Merge pull request #131 from jameshilliard/HTTPDigestAuth" This reverts commit 22a776ddb32f1ff2d141c2fa8b4a47b2a11307f5, reversing changes made to c9f0baed7eeb9b8056bbdab387470de244b031fd. Per 2024-06-28 discussion in #twisted on libera.chat and post-merge review comments on GitHub [1], this implementation doesn't look spec compliant. [1]: https://github.com/twisted/treq/commit/22a776ddb32f1ff2d141c2fa8b4a47b2a11307f5#diff-14b7fa3002098fc55f2cdf2bf01a0039eb02598ae1e852170183d5935f4ef4d0R369 --- .github/workflows/ci.yaml | 4 +- docs/examples/digest_auth.py | 16 -- docs/howto.rst | 21 -- docs/index.rst | 2 +- src/treq/auth.py | 367 +------------------------ src/treq/test/test_auth.py | 167 +---------- src/treq/test/test_treq_integration.py | 148 +--------- tox.ini | 4 +- 8 files changed, 18 insertions(+), 711 deletions(-) delete mode 100644 docs/examples/digest_auth.py diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 7a0f1fcc..5c8b6c80 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -16,7 +16,7 @@ jobs: - uses: actions/setup-python@v5 with: - python-version: '3.12' + python-version: '3.11' - uses: actions/cache@v4 with: @@ -42,7 +42,7 @@ jobs: - uses: actions/setup-python@v5 with: - python-version: "3.12" + python-version: "3.11" - uses: actions/cache@v4 with: diff --git a/docs/examples/digest_auth.py b/docs/examples/digest_auth.py deleted file mode 100644 index ae590718..00000000 --- a/docs/examples/digest_auth.py +++ /dev/null @@ -1,16 +0,0 @@ -from twisted.internet.task import react -from _utils import print_response - -import treq -from treq.auth import HTTPDigestAuth - - -def main(reactor, *args): - d = treq.get( - 'http://httpbin.org/digest-auth/auth/treq/treq', - auth=HTTPDigestAuth('treq', 'treq') - ) - d.addCallback(print_response) - return d - -react(main, []) diff --git a/docs/howto.rst b/docs/howto.rst index 90b3addf..3f304983 100644 --- a/docs/howto.rst +++ b/docs/howto.rst @@ -92,27 +92,6 @@ The ``auth`` argument should be a tuple of the form ``('username', 'password')`` Full example: :download:`basic_auth.py ` -HTTP Digest authentication is supported by passing an instance of -:py:class:`treq.auth.HTTPDigestAuth` to any of the request functions by using the `auth` keyword argument. -We support only "auth" QoP as defined at `RFC 2617`_ -or simple `RFC 2069`_ without QoP at the moment. Treq takes care of -caching HTTP digest credentials — after authorizing any URL/method pair, -the library will use the initially received HTTP digest credentials on that endpoint -for subsequent requests, and will not perform any redundant requests to obtain the -credentials. - -:py:class:`treq.auth.HTTPDigestAuth` class accepts ``username`` and ``password`` -as constructor arguments. - -.. literalinclude:: examples/digest_auth.py - :linenos: - :lines: 5-14 - -Full example: :download:`digest_auth.py ` - -.. _RFC 2617: http://www.ietf.org/rfc/rfc2617.txt -.. _RFC 2069: http://www.ietf.org/rfc/rfc2069.txt - Redirects --------- diff --git a/docs/index.rst b/docs/index.rst index fdacb19d..059dfccb 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -71,7 +71,7 @@ Here is a list of `requests`_ features and their status in treq. +----------------------------------+----------+----------+ | Basic Authentication | yes | yes | +----------------------------------+----------+----------+ -| Digest Authentication | yes | yes | +| Digest Authentication | yes | no | +----------------------------------+----------+----------+ | Elegant Key/Value Cookies | yes | yes | +----------------------------------+----------+----------+ diff --git a/src/treq/auth.py b/src/treq/auth.py index 88ac0fca..ffae1ff4 100644 --- a/src/treq/auth.py +++ b/src/treq/auth.py @@ -1,225 +1,22 @@ -# -*- test-case-name: treq.test.test_auth,treq.test.test_treq_integration -*- # Copyright 2012-2020 The treq Authors. # See LICENSE for details. -from __future__ import absolute_import, division, print_function, annotations -import re -import time -import hashlib +from __future__ import absolute_import, division, print_function import binascii -from enum import Enum -from typing import Union, Optional, TypedDict -from urllib.parse import urlparse +from typing import Union -from twisted.python.randbytes import secureRandom from twisted.web.http_headers import Headers -from twisted.web.iweb import IAgent, IBodyProducer, IResponse -from twisted.internet.defer import Deferred - +from twisted.web.iweb import IAgent from zope.interface import implementer -from requests.utils import parse_dict_header - - -_DIGEST_HEADER_PREFIX_REGEXP = re.compile(b"digest ", flags=re.IGNORECASE) - - -class _DIGEST_ALGO(str, Enum): - MD5 = "MD5" - MD5_SESS = "MD5-SESS" - SHA = "SHA" - SHA_256 = "SHA-256" - SHA_512 = "SHA-512" - - -def _generate_client_nonce(server_side_nonce: str) -> str: - return hashlib.sha1( - hashlib.sha1(server_side_nonce.encode("utf-8")).digest() - + secureRandom(16) - + time.ctime().encode("utf-8") - ).hexdigest()[:16] - - -def _md5_utf_digest(x: str) -> str: - return hashlib.md5(x.encode("utf-8")).hexdigest() - - -def _sha1_utf_digest(x: str) -> str: - return hashlib.sha1(x.encode("utf-8")).hexdigest() - - -def _sha256_utf_digest(x: str) -> str: - return hashlib.sha256(x.encode("utf-8")).hexdigest() - - -def _sha512_utf_digest(x: str) -> str: - return hashlib.sha512(x.encode("utf-8")).hexdigest() - - -class _DigestAuthCacheParams(TypedDict): - path: bytes - method: bytes - cached: bool - nonce: str - realm: str - qop: str | None - algorithm: _DIGEST_ALGO - opaque: str | None - - -class _DigestAuthCacheEntry(TypedDict): - c: int - p: _DigestAuthCacheParams - - -class HTTPDigestAuth(object): - """ - HTTP Digest authentication credentials. - - This container will cache digest auth parameters, - in order not to recompute these for each request. - """ - - def __init__(self, username: Union[str, bytes], password: Union[str, bytes]): - self._username: str = ( - username.decode("utf-8") if isinstance(username, bytes) else username - ) - self._password: str = ( - password.decode("utf-8") if isinstance(password, bytes) else password - ) - - # (method,uri) --> digest auth cache - self._digest_auth_cache: dict[tuple[bytes, bytes], _DigestAuthCacheEntry] = {} - - def _build_authentication_header( - self, - url: bytes, - method: bytes, - cached: bool, - nonce: str, - realm: str, - qop: Optional[str] = None, - algorithm: _DIGEST_ALGO = _DIGEST_ALGO.MD5, - opaque: Optional[str] = None, - ) -> str: - """ - Build the authorization header for credentials got from the server. - Algorithm is accurately ported from https://github.com/psf/requests - with small adjustments. - See - https://github.com/psf/requests/blob/v2.5.1/requests/auth.py#L72 - for details. - - :param algorithm: algorithm to be used for authentication, - defaults to MD5, supported values are - "MD5", "MD5-SESS" and "SHA" - :param realm: HTTP Digest authentication realm - :param nonce: "nonce" HTTP Digest authentication param - :param qop: Quality Of Protection HTTP Digest auth param - :param opaque: "opaque" HTTP Digest authentication param - (should be sent back to server unchanged) - :param cached: Identifies that authentication already have been - performed for URI/method, - and new request should use the same params as first - authenticated request - :param url: the URI path where we are authenticating - :param method: HTTP method to be used when requesting - - :return: HTTP Digest authentication string - """ - algo = algorithm.upper() - p_parsed = urlparse(url.decode("utf-8")) - # path is request-uri defined in RFC 2616 which should not be empty - path = p_parsed.path or "/" - if p_parsed.query: - path += f"?{p_parsed.query}" - - A1 = f"{self._username}:{realm}:{self._password}" - A2 = f"{method.decode('utf-8')}:{path}" - - if algo == _DIGEST_ALGO.MD5 or algo == _DIGEST_ALGO.MD5_SESS: - digest_hash_func = _md5_utf_digest - elif algo == _DIGEST_ALGO.SHA: - digest_hash_func = _sha1_utf_digest - elif algo == _DIGEST_ALGO.SHA_256: - digest_hash_func = _sha256_utf_digest - elif algo == _DIGEST_ALGO.SHA_512: - digest_hash_func = _sha512_utf_digest - else: - raise ValueError( - f"Unsupported Digest Auth algorithm identifier passed: {algo}" - ) - - KD = lambda s, d: digest_hash_func(f"{s}:{d}") # noqa:E731 - - HA1 = digest_hash_func(A1) - HA2 = digest_hash_func(A2) - - if cached: - self._digest_auth_cache[(method, url)]["c"] += 1 - nonce_count = self._digest_auth_cache[(method, url)]["c"] - else: - nonce_count = 1 - - ncvalue = "%08x" % nonce_count - - cnonce = _generate_client_nonce(nonce) - if algo == _DIGEST_ALGO.MD5_SESS: - HA1 = digest_hash_func(f"{HA1}:{nonce}:{cnonce}") - - if not qop: - respdig = KD(HA1, f"{HA2}:{nonce}") - elif qop == "auth" or "auth" in qop.split(","): - noncebit = f"{nonce}:{ncvalue}:{cnonce}:auth:{HA2}" - respdig = KD(HA1, noncebit) - else: - raise UnknownQopForDigestAuth(qop) - - base = ( - f'username="{self._username}", realm="{realm}", nonce="{nonce}", ' - f'uri="{path}", response="{respdig}"' - ) - if opaque: - base += f', opaque="{opaque}"' - if algorithm: - base += f', algorithm="{algorithm.value}"' - if qop: - base += f', qop="auth", nc={ncvalue}, cnonce="{cnonce}"' - - if not cached: - cache_params: _DigestAuthCacheParams = { - "path": url, - "method": method, - "cached": cached, - "nonce": nonce, - "realm": realm, - "qop": qop, - "algorithm": algorithm, - "opaque": opaque, - } - self._digest_auth_cache[(method, url)] = {"p": cache_params, "c": 1} - - return f"Digest {base}" - - def _cached_metadata_for( - self, method: bytes, uri: bytes - ) -> Optional[_DigestAuthCacheEntry]: - return self._digest_auth_cache.get((method, uri)) class UnknownAuthConfig(Exception): """ The authentication config provided couldn't be interpreted. """ - def __init__(self, config): - super(Exception, self).__init__("{0!r} not of a known type.".format(config)) - - -class UnknownQopForDigestAuth(Exception): - def __init__(self, qop: Optional[str]): super(Exception, self).__init__( - "Unsupported Quality Of Protection value passed: {qop}".format(qop=qop) - ) + '{0!r} not of a known type.'.format(config)) @implementer(IAgent) @@ -233,7 +30,6 @@ class _RequestHeaderSetterAgent: Headers to set on each request before forwarding it to the wrapped agent. """ - def __init__(self, agent, headers): self._agent = agent self._headers = headers @@ -247,148 +43,7 @@ def request(self, method, uri, headers=None, bodyProducer=None): requestHeaders.setRawHeaders(header, values) return self._agent.request( - method, uri, headers=requestHeaders, bodyProducer=bodyProducer - ) - - -@implementer(IAgent) -class _RequestDigestAuthenticationAgent: - def __init__(self, agent: IAgent, auth: HTTPDigestAuth): - self._agent = agent - self._auth = auth - - def _on_401_response( - self, - www_authenticate_response: IResponse, - method: bytes, - uri: bytes, - headers: Optional[Headers], - bodyProducer: Optional[IBodyProducer], - ) -> Deferred[IResponse]: - """ - Handle the server`s 401 response, that is capable with authentication - headers, build the Authorization header - for - - :param www_authenticate_response: t.w.client.Response object - :param method: HTTP method to be used to perform the request - :param uri: URI to be used - :param headers: Additional headers to be sent with the request, - instead of "Authorization" header - :param bodyProducer: IBodyProducer implementer instance that would be - used to fetch the response body - - :return: - """ - assert ( - www_authenticate_response.code == 401 - ), """Got invalid pre-authentication response code, probably URL - does not support Digest auth - """ - www_authenticate_header_string = ( - www_authenticate_response.headers.getRawHeaders( - b"www-authenticate", [b""] - )[0] - ) - digest_header = _DIGEST_HEADER_PREFIX_REGEXP.sub( - b"", www_authenticate_header_string, count=1 - ) - digest_authentication_params = parse_dict_header(digest_header.decode("utf-8")) - - digest_authentication_header = self._auth._build_authentication_header( - uri, - method, - False, - digest_authentication_params["nonce"], - digest_authentication_params["realm"], - qop=digest_authentication_params.get("qop", None), - algorithm=_DIGEST_ALGO( - digest_authentication_params.get("algorithm", "MD5") - ), - opaque=digest_authentication_params.get("opaque", None), - ) - return self._perform_request( - digest_authentication_header, method, uri, headers, bodyProducer - ) - - def _perform_request( - self, - digest_authentication_header: str, - method: bytes, - uri: bytes, - headers: Optional[Headers], - bodyProducer: Optional[IBodyProducer], - ) -> Deferred[IResponse]: - """ - Add Authorization header and perform the request with - actual credentials - - :param digest_authentication_header: HTTP Digest Authorization - header string - :param method: HTTP method to be used to perform the request - :param uri: URI to be used - :param headers: Headers to be sent with the request - :param bodyProducer: IBodyProducer implementer instance that would be - used to fetch the response body - - :return: t.i.defer.Deferred (holding the result of the request) - """ - if not headers: - headers = Headers( - {b"Authorization": [digest_authentication_header.encode("utf-8")]} - ) - else: - headers.addRawHeader( - b"Authorization", digest_authentication_header.encode("utf-8") - ) - return self._agent.request( - method, uri, headers=headers, bodyProducer=bodyProducer - ) - - def request( - self, - method: bytes, - uri: bytes, - headers: Optional[Headers] = None, - bodyProducer: Optional[IBodyProducer] = None, - ) -> Deferred[IResponse]: - """ - Wrap the agent with HTTP Digest authentication. - - :param method: HTTP method to be used to perform the request - :param uri: URI to be used - :param headers: Headers to be sent with the request - :param bodyProducer: IBodyProducer implementer instance that would be - used to fetch the response body - - :return: t.i.defer.Deferred (holding the result of the request) - """ - - digest_auth_metadata = self._auth._cached_metadata_for(method, uri) - - if digest_auth_metadata is None: - # Perform first request for getting the realm; - # the client awaits for 401 response code here - d = self._agent.request(b"GET", uri, headers=headers, bodyProducer=None) - d.addCallback(self._on_401_response, method, uri, headers, bodyProducer) - else: - # We have performed authentication on that URI already - digest_params_from_cache = digest_auth_metadata["p"] - digest_params_from_cache["cached"] = True - digest_authentication_header = self._auth._build_authentication_header( - digest_params_from_cache["path"], - digest_params_from_cache["method"], - digest_params_from_cache["cached"], - digest_params_from_cache["nonce"], - digest_params_from_cache["realm"], - qop=digest_params_from_cache["qop"], - algorithm=digest_params_from_cache["algorithm"], - opaque=digest_params_from_cache["opaque"], - ) - d = self._perform_request( - digest_authentication_header, method, uri, headers, bodyProducer - ) - return d + method, uri, headers=requestHeaders, bodyProducer=bodyProducer) def add_basic_auth( @@ -413,18 +68,18 @@ def add_basic_auth( :returns: :class:`~twisted.web.iweb.IAgent` """ if not isinstance(username, bytes): - username = username.encode("utf-8") + username = username.encode('utf-8') if not isinstance(password, bytes): - password = password.encode("utf-8") + password = password.encode('utf-8') - creds = binascii.b2a_base64(b"%s:%s" % (username, password)).rstrip(b"\n") + creds = binascii.b2a_base64(b'%s:%s' % (username, password)).rstrip(b'\n') return _RequestHeaderSetterAgent( agent, - Headers({b"Authorization": [b"Basic " + creds]}), + Headers({b'Authorization': [b'Basic ' + creds]}), ) -def add_auth(agent: IAgent, auth_config: Union[tuple, HTTPDigestAuth]) -> IAgent: +def add_auth(agent, auth_config): """ Wrap an agent to perform authentication @@ -440,7 +95,5 @@ def add_auth(agent: IAgent, auth_config: Union[tuple, HTTPDigestAuth]) -> IAgent """ if isinstance(auth_config, tuple): return add_basic_auth(agent, auth_config[0], auth_config[1]) - elif isinstance(auth_config, HTTPDigestAuth): - return _RequestDigestAuthenticationAgent(agent, auth_config) raise UnknownAuthConfig(auth_config) diff --git a/src/treq/test/test_auth.py b/src/treq/test/test_auth.py index db4f1e8e..4ba169dc 100644 --- a/src/treq/test/test_auth.py +++ b/src/treq/test/test_auth.py @@ -5,8 +5,7 @@ from twisted.web.iweb import IAgent from treq._agentspy import agent_spy -from treq.auth import _RequestHeaderSetterAgent, add_auth, \ - UnknownAuthConfig, HTTPDigestAuth, _DIGEST_ALGO +from treq.auth import _RequestHeaderSetterAgent, add_auth, UnknownAuthConfig class RequestHeaderSetterAgentTests(SynchronousTestCase): @@ -131,57 +130,6 @@ def test_add_basic_auth_bytes(self): Headers({b'Authorization': [b'Basic AQ//Ov/wAQ==']}), ) - def test_add_digest_auth(self): - """ - add_auth() wraps the given agent with one that adds a ``Authorization: - Digest ...`` authentication handler. - """ - agent, requests = agent_spy() - username = 'spam' - password = 'eggs' - auth = HTTPDigestAuth(username, password) - authAgent = add_auth(agent, auth) - - authAgent.request(b'method', b'uri') - - self.assertEqual( - authAgent._auth, - auth, - ) - self.assertEqual( - authAgent._auth._username, - username, - ) - self.assertEqual( - authAgent._auth._password, - password, - ) - - def test_add_digest_auth_bytes(self): - """ - Digest auth can be passed as `bytes` which will be encoded as utf-8. - """ - agent, requests = agent_spy() - username = b'spam' - password = b'eggs' - auth = HTTPDigestAuth(username, password) - authAgent = add_auth(agent, auth) - - authAgent.request(b'method', b'uri') - - self.assertEqual( - authAgent._auth, - auth, - ) - self.assertEqual( - authAgent._auth._username, - username.decode('utf-8'), - ) - self.assertEqual( - authAgent._auth._password, - password.decode('utf-8'), - ) - def test_add_unknown_auth(self): """ add_auth() raises UnknownAuthConfig when given anything other than @@ -191,116 +139,3 @@ def test_add_unknown_auth(self): invalidAuth = 1234 self.assertRaises(UnknownAuthConfig, add_auth, agent, invalidAuth) - - -class HttpDigestAuthTests(SynchronousTestCase): - - def setUp(self): - self.maxDiff = None - self._auth = HTTPDigestAuth('spam', 'eggs') - - def test_digest_unknown_algorithm(self): - """ - _DIGEST_ALGO('UNKNOWN') raises ValueError when the algorithm is unknown. - """ - with self.assertRaises(ValueError) as e: - _DIGEST_ALGO('UNKNOWN') - self.assertIn("'UNKNOWN' is not a valid _DIGEST_ALGO", str(e.exception)) - - def test_build_authentication_header_md5_no_cache_no_qop(self): - """ - _build_authentication_header test vectors using the MD5 algo and without - qop parameter generate the expected digest header when cache is - uninitialized. - """ - auth_header = self._auth._build_authentication_header( - b'/spam/eggs', b'GET', False, - 'b7f36bc385a662ed615f27bd9e94eecd', - 'me@dragons', qop=None, - algorithm=_DIGEST_ALGO('MD5') - ) - self.assertEquals( - auth_header, - 'Digest username="spam", realm="me@dragons", ' + - 'nonce="b7f36bc385a662ed615f27bd9e94eecd", ' + - 'uri="/spam/eggs", ' + - 'response="fc05d17c55156b278132a52dc0dca526", algorithm="MD5"', - ) - - def test_build_authentication_header_md5_sess_no_cache(self): - """ - _build_authentication_header test vectors using the MD5-SESS algo and - with qop parameter generate the expected digest header when cache is - uninitialized. - """ - auth_header = self._auth._build_authentication_header( - b'/spam/eggs?ham=bacon', b'GET', False, - 'b7f36bc385a662ed615f27bd9e94eecd', - 'me@dragons', qop='auth', - algorithm=_DIGEST_ALGO('MD5-SESS') - ) - self.assertRegex( - auth_header, - 'Digest username="spam", realm="me@dragons", ' + - 'nonce="b7f36bc385a662ed615f27bd9e94eecd", ' + - 'uri="/spam/eggs\\?ham=bacon", ' + - 'response="([0-9a-f]{32})", ' + - 'algorithm="MD5-SESS", qop="auth", ' + - 'nc=00000001, cnonce="([0-9a-f]{16})"', - ) - - def test_build_authentication_header_sha_no_cache_no_qop(self): - """ - _build_authentication_header test vectors using the SHA(SHA-1) algo and - without the qop parameter generate the expected digest header when cache - is uninitialized. - """ - auth_header = self._auth._build_authentication_header( - b'/spam/eggs', b'GET', False, - 'b7f36bc385a662ed615f27bd9e94eecd', - 'me@dragons', qop=None, - algorithm=_DIGEST_ALGO('SHA') - ) - - self.assertEquals( - auth_header, - 'Digest username="spam", realm="me@dragons", ' + - 'nonce="b7f36bc385a662ed615f27bd9e94eecd", ' + - 'uri="/spam/eggs", ' + - 'response="45420a4786287998bcb99dfde563c3a198109b31", ' + - 'algorithm="SHA"' - ) - - def test_build_authentication_header_sha512_cache(self): - """ - _build_authentication_header test vectors using the SHA-512 algo and - with the qop parameter generate the expected digest header when the - digest cache is used for the second request. - """ - # Emulate 1st request - self._auth._build_authentication_header( - b'/spam/eggs', b'GET', False, - 'b7f36bc385a662ed615f27bd9e94eecd', - 'me@dragons', qop='auth', - algorithm=_DIGEST_ALGO('SHA-512') - ) - # Get header after cached request - auth_header = self._auth._build_authentication_header( - b'/spam/eggs', b'GET', True, - 'b7f36bc385a662ed615f27bd9e94eecd', - 'me@dragons', qop='auth', - algorithm=_DIGEST_ALGO('SHA-512') - ) - - # Make sure metadata was cached - self.assertTrue(self._auth._cached_metadata_for(b'GET', b'/spam/eggs')) - - self.assertRegex( - auth_header, - 'Digest username="spam", realm="me@dragons", ' + - 'nonce="b7f36bc385a662ed615f27bd9e94eecd", ' + - 'uri="/spam/eggs", ' + - 'response="([0-9a-f]{128})", ' + - 'algorithm="SHA-512", qop="auth", ' + - 'nc=00000002, cnonce="([0-9a-f]+?)"', - ) diff --git a/src/treq/test/test_treq_integration.py b/src/treq/test/test_treq_integration.py index 8ab79e7c..1518fb46 100644 --- a/src/treq/test/test_treq_integration.py +++ b/src/treq/test/test_treq_integration.py @@ -1,5 +1,4 @@ from io import BytesIO -from typing import Optional from twisted.python.url import URL @@ -12,14 +11,12 @@ from twisted.web.client import (Agent, BrowserLikePolicyForHTTPS, HTTPConnectionPool, ResponseFailed) -from twisted.web.http_headers import Headers from treq.test.util import DEBUG, skip_on_windows_because_of_199 from .local_httpbin.parent import _HTTPBinProcess import treq -from treq.auth import HTTPDigestAuth, UnknownQopForDigestAuth skip = skip_on_windows_because_of_199() @@ -54,7 +51,7 @@ class TreqIntegrationTests(TestCase): head = with_baseurl(treq.head) post = with_baseurl(treq.post) put = with_baseurl(treq.put) - patch_req = with_baseurl(treq.patch) + patch = with_baseurl(treq.patch) delete = with_baseurl(treq.delete) _httpbin_process = _HTTPBinProcess(https=False) @@ -208,7 +205,7 @@ def test_put(self): @inlineCallbacks def test_patch(self): - response = yield self.patch_req('/patch', data=b'Hello!') + response = yield self.patch('/patch', data=b'Hello!') self.assertEqual(response.code, 200) yield self.assert_data(response, 'Hello!') yield print_response(response) @@ -244,147 +241,6 @@ def test_failed_basic_auth(self): self.assertEqual(response.code, 401) yield print_response(response) - @inlineCallbacks - def test_digest_auth(self): - """ - Digest authentication succeeds. - """ - response = yield self.get('/digest-auth/auth/treq/treq', - auth=HTTPDigestAuth('treq', 'treq')) - self.assertEqual(response.code, 200) - yield print_response(response) - json = yield treq.json_content(response) - self.assertTrue(json['authenticated']) - self.assertEqual(json['user'], 'treq') - - @inlineCallbacks - def test_digest_auth_multi_qop(self): - """ - Digest authentication with alternative qop type. - """ - response = yield self.get('/digest-auth/undefined/treq/treq', - auth=HTTPDigestAuth('treq', 'treq')) - self.assertEqual(response.code, 200) - yield print_response(response) - json = yield treq.json_content(response) - self.assertTrue(json['authenticated']) - self.assertEqual(json['user'], 'treq') - - @inlineCallbacks - def test_digest_auth_multiple_calls(self): - """ - Proper Digest authentication credentials caching works across - multiple requests. - """ - - calls = 0 - headers_for_second_request: Optional[Headers] = None - - # Original Agent request call - agent_request_orig = Agent.request - - def agent_request_patched(*args, **kwargs): - """ - Patched Agent.request function, - that increases call count on every HTTP request - and appends. - """ - nonlocal calls, headers_for_second_request - response_deferred = agent_request_orig(*args, **kwargs) - calls += 1 - if calls == 2: - headers_for_second_request = args[3] - return response_deferred - - self.patch(Agent, 'request', agent_request_patched) - - auth = HTTPDigestAuth('treq-digest-auth-multiple', 'treq') - - response1 = yield self.get( - '/digest-auth/auth/treq-digest-auth-multiple/treq', - auth=auth - ) - self.assertEqual(response1.code, 200) - yield print_response(response1) - json1 = yield treq.json_content(response1) - - # Assume we did two actual HTTP requests - one to obtain credentials - # and second is original request with authentication - self.assertEqual( - calls, - 2 - ) - self.assertIn( - b'Authorization', - dict(headers_for_second_request.getAllRawHeaders()) - ) - - response2 = yield self.get( - '/digest-auth/auth/treq-digest-auth-multiple/treq', - auth=auth, - cookies=response1.cookies() - ) - self.assertEqual(response2.code, 200) - yield print_response(response2) - json2 = yield treq.json_content(response2) - self.assertTrue(json1['authenticated']) - self.assertEqual(json1['user'], 'treq-digest-auth-multiple') - - # Assume that responses are the same - self.assertEqual(json1, json2) - - # Assume we need only one call to obtain second response - self.assertEqual( - calls, - 3 - ) - - @inlineCallbacks - def test_digest_auth_sha256(self): - """ - Digest authentication with sha256 works. - """ - response = yield self.get('/digest-auth/auth/treq/treq/SHA-256', - auth=HTTPDigestAuth('treq', 'treq')) - self.assertEqual(response.code, 200) - yield print_response(response) - json = yield treq.json_content(response) - self.assertTrue(json['authenticated']) - self.assertEqual(json['user'], 'treq') - - @inlineCallbacks - def test_digest_auth_sha512(self): - """ - Digest authentication with sha512 works. - """ - response = yield self.get('/digest-auth/auth/treq/treq/SHA-512', - auth=HTTPDigestAuth('treq', 'treq')) - self.assertEqual(response.code, 200) - yield print_response(response) - json = yield treq.json_content(response) - self.assertTrue(json['authenticated']) - self.assertEqual(json['user'], 'treq') - - @inlineCallbacks - def test_failed_digest_auth(self): - """ - Digest auth with invalid credentials fails. - """ - response = yield self.get('/digest-auth/auth/treq/treq', - auth=HTTPDigestAuth('not-treq', 'not-treq')) - self.assertEqual(response.code, 401) - yield print_response(response) - - @inlineCallbacks - def test_failed_digest_auth_int(self): - """ - Digest authentication when qop type is unsupported fails with - UnknownQopForDigestAuth. - """ - with self.assertRaises(UnknownQopForDigestAuth): - yield self.get('/digest-auth/auth-int/treq/treq', - auth=HTTPDigestAuth('treq', 'treq')) - @inlineCallbacks def test_timeout(self): """ diff --git a/tox.ini b/tox.ini index 58a7ce6a..c43dcf0b 100644 --- a/tox.ini +++ b/tox.ini @@ -26,7 +26,7 @@ commands = {envbindir}/trial {posargs:treq} [testenv:mypy] -basepython = python3.12 +basepython = python3.8 deps = mypy==1.0.1 mypy-zope==0.9.1 @@ -67,7 +67,7 @@ commands = [testenv:docs] extras = docs changedir = docs -basepython = python3.12 +basepython = python3.8 commands = sphinx-build -b html . html