From 0a69f5d3c1990752300cf91e67f649152f4b4166 Mon Sep 17 00:00:00 2001 From: imacat Date: Wed, 23 Nov 2022 18:08:30 +1100 Subject: [PATCH] Added the initial version that works. --- README.rst | 6 +- src/flask_digest_auth/__init__.py | 24 ++++ src/flask_digest_auth/algo.py | 127 +++++++++++++++++ src/flask_digest_auth/auth.py | 205 +++++++++++++++++++++++++++ src/flask_digest_auth/exception.py | 25 ++++ src/flask_digest_auth/flask_login.py | 74 ++++++++++ src/flask_digest_auth/test.py | 102 +++++++++++++ tests/test_algo.py | 51 +++++++ tests/test_auth.py | 108 ++++++++++++++ tests/test_flask_login.py | 129 +++++++++++++++++ 10 files changed, 848 insertions(+), 3 deletions(-) create mode 100644 src/flask_digest_auth/__init__.py create mode 100644 src/flask_digest_auth/algo.py create mode 100644 src/flask_digest_auth/auth.py create mode 100644 src/flask_digest_auth/exception.py create mode 100644 src/flask_digest_auth/flask_login.py create mode 100644 src/flask_digest_auth/test.py create mode 100644 tests/test_algo.py create mode 100644 tests/test_auth.py create mode 100644 tests/test_flask_login.py diff --git a/README.rst b/README.rst index ba614f1..5bb6390 100644 --- a/README.rst +++ b/README.rst @@ -1,6 +1,6 @@ -==================================== -The Flask HTTP Digest Authentication -==================================== +================================ +Flask HTTP Digest Authentication +================================ Description diff --git a/src/flask_digest_auth/__init__.py b/src/flask_digest_auth/__init__.py new file mode 100644 index 0000000..83186a7 --- /dev/null +++ b/src/flask_digest_auth/__init__.py @@ -0,0 +1,24 @@ +# The Flask HTTP Digest Authentication Project. +# Author: imacat@mail.imacat.idv.tw (imacat), 2022/11/6 + +# Copyright (c) 2022 imacat. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""The HTTP digest authentication. + +""" +from flask_digest_auth.algo import make_password_hash, calc_response +from flask_digest_auth.auth import DigestAuth +from flask_digest_auth.flask_login import init_login_manager +from flask_digest_auth.test import Client diff --git a/src/flask_digest_auth/algo.py b/src/flask_digest_auth/algo.py new file mode 100644 index 0000000..0da4fa7 --- /dev/null +++ b/src/flask_digest_auth/algo.py @@ -0,0 +1,127 @@ +# The Flask HTTP Digest Authentication Project. +# Author: imacat@mail.imacat.idv.tw (imacat), 2022/11/3 + +# Copyright (c) 2022 imacat. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""The algorithm. + +""" +from __future__ import annotations + +import typing as t +from hashlib import md5 + +from flask_digest_auth.exception import UnauthorizedException + + +def make_password_hash(realm: str, username: str, password: str) -> str: + """Calculates the password hash for the HTTP digest authentication. + + :param realm: The realm. + :param username: The username. + :param password: The cleartext password. + :return: The password hash for the HTTP digest authentication. + """ + return md5(f"{username}:{realm}:{password}".encode("utf8")).hexdigest() + + +def calc_response( + method: str, uri: str, password_hash: str, + nonce: str, qop: t.Optional[t.Literal["auth", "auth-int"]] = None, + algorithm: t.Optional[t.Literal["MD5", "MD5-sess"]] = "MD5-sess", + cnonce: t.Optional[str] = None, nc: t.Optional[str] = None, + body: t.Optional[bytes] = None) -> str: + """Calculates the response value of the HTTP digest authentication. + + :param method: The request method. + :param uri: The request URI. + :param password_hash: The password hash for the HTTP digest authentication. + :param nonce: The nonce. + :param qop: the quality of protection. + :param algorithm: The algorithm, either "MD5" or "MD5-sess". + :param cnonce: The client nonce, which must exists when qop exists or + algorithm="MD5-sess". + :param nc: The request counter, which must exists when qop exists. + :param body: The request body, which must exists when qop="auth-int". + :return: The response value. + :raise UnauthorizedException: When the cnonce is missing with the MD5-sess + algorithm, when the body is missing with the auth-int qop, or when the + cnonce or nc is missing with the auth or auth-int qop. + """ + ha1: str = __calc_ha1(password_hash, nonce, algorithm, cnonce) + ha2: str = __calc_ha2(method, uri, qop, body) + if qop is None: + return md5(f"{ha1}:{nonce}:{ha2}".encode("utf8")).hexdigest() + if qop == "auth" or qop == "auth-int": + if cnonce is None: + raise UnauthorizedException( + f"Missing \"cnonce\" with the qop=\"{qop}\"") + if nc is None: + raise UnauthorizedException( + f"Missing \"nc\" with the qop=\"{qop}\"") + return md5(f"{ha1}:{nonce}:{nc}:{cnonce}:{qop}:{ha2}".encode("utf8"))\ + .hexdigest() + if cnonce is None: + raise UnauthorizedException( + f"Unsupported qop=\"{qop}\"") + + +def __calc_ha1(password_hash: str, nonce: str, + algorithm: t.Optional[t.Literal["MD5", "MD5-sess"]] = None, + cnonce: t.Optional[str] = None) -> str: + """Calculates and returns the first hash. + + :param password_hash: The password hash for the HTTP digest authentication. + :param nonce: The nonce. + :param algorithm: The algorithm, either "MD5", "MD5-sess", or None. + :param cnonce: The client nonce. It must be provided when the algorithm is + "MD5-sess". + :return: The first hash. + :raise UnauthorizedException: When the cnonce is missing with the MD5-sess + algorithm. + """ + if algorithm is None or algorithm == "MD5": + return password_hash + if algorithm == "MD5-sess": + if cnonce is None: + raise UnauthorizedException( + f"Missing \"cnonce\" with algorithm=\"{algorithm}\"") + return md5(f"{password_hash}:{nonce}:{cnonce}".encode("utf8"))\ + .hexdigest() + raise UnauthorizedException( + f"Unsupported algorithm=\"{algorithm}\"") + + +def __calc_ha2(method: str, uri: str, + qop: t.Optional[t.Literal["auth", "auth-int"]] = None, + body: t.Optional[bytes] = None) -> str: + """Calculates the second hash. + + :param method: The request method. + :param uri: The request URI. + :param qop: The quality of protection, either "auth", "auth-int" or None. + :param body: The request body. It must be provided when the quality of + protection is "auth-int". + :return: The second hash. + :raise UnauthorizedException: When the body is missing with qop="auth-int". + """ + if qop is None or qop == "auth": + return md5(f"{method}:{uri}".encode("utf8")).hexdigest() + if qop == "auth-int": + if body is None: + raise UnauthorizedException(f"Missing \"body\" with qop=\"{qop}\"") + return md5(f"{method}:{uri}:{md5(body).hexdigest()}".encode("utf8"))\ + .hexdigest() + raise UnauthorizedException(f"Unsupported qop=\"{qop}\"") diff --git a/src/flask_digest_auth/auth.py b/src/flask_digest_auth/auth.py new file mode 100644 index 0000000..05df6dd --- /dev/null +++ b/src/flask_digest_auth/auth.py @@ -0,0 +1,205 @@ +# The Flask HTTP Digest Authentication Project. +# Author: imacat@mail.imacat.idv.tw (imacat), 2022/10/22 + +# Copyright (c) 2022 imacat. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""The HTTP Digest Authentication. +See RFC 2617 HTTP Authentication: Basic and Digest Access Authentication + +""" +from __future__ import annotations + +import sys +import typing as t +from functools import wraps +from random import random +from secrets import token_urlsafe + +from flask import g, request, Response, session, abort +from itsdangerous import URLSafeTimedSerializer, BadData +from werkzeug.datastructures import Authorization + +from flask_digest_auth.algo import calc_response +from flask_digest_auth.exception import UnauthorizedException + + +class DigestAuth: + """The HTTP digest authentication.""" + + def __init__(self, realm: t.Optional[str] = None): + """Constructs the HTTP digest authentication. + + :param realm: The realm. + """ + self.secret_key: str = token_urlsafe(32) + self.serializer: URLSafeTimedSerializer \ + = URLSafeTimedSerializer(self.secret_key) + self.realm: str = "" if realm is None else realm + self.algorithm: t.Optional[str] = None + self.use_opaque: bool = True + self.domain: t.List[str] = [] + self.qop: t.List[str] = ["auth", "auth-int"] + self.__get_password_hash: t.Callable[[str], t.Optional[str]] \ + = lambda x: None + self.__get_user: t.Callable[[str], t.Optional] = lambda x: None + + def login_required(self, view) -> t.Callable: + """The view decorator for HTTP digest authentication. + + :param view: + :return: The login-protected view. + """ + + class NoLogInException(Exception): + """The exception thrown when the user is not authorized.""" + + @wraps(view) + def login_required_view(*args, **kwargs) -> t.Any: + """The login-protected view. + + :param args: The positional arguments of the view. + :param kwargs: The keyword arguments of the view. + :return: The response. + """ + try: + if "user" not in session: + raise NoLogInException + user: t.Optional[t.Any] = self.__get_user(session["user"]) + if user is None: + raise NoLogInException + g.user = user + return view(*args, **kwargs) + except NoLogInException: + state: AuthState = AuthState() + authorization: Authorization = request.authorization + try: + if authorization is None: + raise UnauthorizedException + if authorization.type != "digest": + raise UnauthorizedException( + "Not an HTTP digest authorization") + self.authenticate(state) + session["user"] = authorization.username + g.user = self.__get_user(authorization.username) + return view(*args, **kwargs) + except UnauthorizedException as e: + if len(e.args) > 0: + sys.stderr.write(e.args[0] + "\n") + response: Response = Response() + response.status = 401 + response.headers["WWW-Authenticate"] \ + = self.make_response_header(state) + abort(response) + + return login_required_view + + def authenticate(self, state: AuthState) -> None: + """Authenticate a user. + + :param state: The authorization state. + :return: None. + :raise UnauthorizedException: When the authentication failed. + """ + authorization: Authorization = request.authorization + if self.use_opaque: + if authorization.opaque is None: + raise UnauthorizedException( + "Missing \"opaque\" in the Authorization header") + try: + self.serializer.loads( + authorization.opaque, salt="opaque", max_age=1800) + except BadData: + raise UnauthorizedException("Invalid opaque") + state.opaque = authorization.opaque + password_hash: t.Optional[str] = self.__get_password_hash( + authorization.username) + if password_hash is None: + raise UnauthorizedException( + f"No such user \"{authorization.username}\"") + expected: str = calc_response( + method=request.method, uri=authorization.uri, + password_hash=password_hash, nonce=authorization.nonce, + qop=authorization.qop, + algorithm=authorization.get("algorithm"), + cnonce=authorization.cnonce, nc=authorization.nc, + body=request.data) + if authorization.response != expected: + state.stale = False + raise UnauthorizedException("Incorrect response value") + try: + self.serializer.loads( + authorization.nonce, + salt="nonce" if authorization.opaque is None + else f"nonce-{authorization.opaque}") + except BadData: + state.stale = True + raise UnauthorizedException("Invalid nonce") + + def make_response_header(self, state: AuthState) -> str: + """Composes and returns the WWW-Authenticate response header. + + :param state: The authorization state. + :return: The WWW-Authenticate response header. + """ + opaque: t.Optional[str] = None if not self.use_opaque else \ + (state.opaque if state.opaque is not None + else self.serializer.dumps(random(), salt="opaque")) + nonce: str = self.serializer.dumps( + random(), salt="nonce" if opaque is None else f"nonce-{opaque}") + + header: str = f"Digest realm=\"{self.realm}\"" + if len(self.domain) > 0: + domain_list: str = ",".join(self.domain) + header += f", domain=\"{domain_list}\"" + header += f", nonce=\"{nonce}\"" + if opaque is not None: + header += f", opaque=\"{opaque}\"" + if state.stale is not None: + header += f", stale=TRUE" if state.stale else f", stale=FALSE" + if self.algorithm is not None: + header += f", algorithm=\"{self.algorithm}\"" + if len(self.qop) > 0: + qop_list: str = ",".join(self.qop) + header += f", qop=\"{qop_list}\"" + return header + + def register_get_password(self, func: t.Callable[[str], t.Optional[str]])\ + -> None: + """Registers the callback to obtain the password hash. + + :param func: The callback that given the username, returns the password + hash, or None if the user does not exist. + :return: None. + """ + self.__get_password_hash = func + + def register_get_user(self, func: t.Callable[[str], t.Optional[t.Any]])\ + -> None: + """Registers the callback to obtain the user. + + :param func: The callback that given the username, returns the user, + or None if the user does not exist. + :return: None. + """ + self.__get_user = func + + +class AuthState: + """The authorization state.""" + + def __init__(self): + """Constructs the authorization state.""" + self.opaque: t.Optional[str] = None + self.stale: t.Optional[bool] = None diff --git a/src/flask_digest_auth/exception.py b/src/flask_digest_auth/exception.py new file mode 100644 index 0000000..81d54d7 --- /dev/null +++ b/src/flask_digest_auth/exception.py @@ -0,0 +1,25 @@ +# The Flask HTTP Digest Authentication Project. +# Author: imacat@mail.imacat.idv.tw (imacat), 2022/11/3 + +# Copyright (c) 2022 imacat. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""The exception. + +""" + + +class UnauthorizedException(Exception): + """The exception thrown when the authentication is failed.""" + pass diff --git a/src/flask_digest_auth/flask_login.py b/src/flask_digest_auth/flask_login.py new file mode 100644 index 0000000..65358ca --- /dev/null +++ b/src/flask_digest_auth/flask_login.py @@ -0,0 +1,74 @@ +# The Flask HTTP Digest Authentication Project. +# Author: imacat@mail.imacat.idv.tw (imacat), 2022/11/14 + +# Copyright (c) 2022 imacat. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""The Flask-Login integration. + +""" + +import typing as t + +from flask import Response, abort, current_app, Request, g +from flask_login import LoginManager, login_user +from werkzeug.datastructures import Authorization + +from flask_digest_auth.auth import DigestAuth, AuthState +from flask_digest_auth.exception import UnauthorizedException + + +def init_login_manager(auth: DigestAuth, login_manager: LoginManager) -> None: + """Initialize the login manager. + + :param auth: The HTTP digest authentication. + :param login_manager: The login manager from FlaskLogin. + :return: None. + """ + + @login_manager.unauthorized_handler + def unauthorized() -> None: + """Handles when the user is unauthorized. + + :return: None. + """ + response: Response = Response() + response.status = 401 + response.headers["WWW-Authenticate"] = auth.make_response_header( + g.digest_auth_state) + abort(response) + + @login_manager.request_loader + def load_user_from_request(request: Request) -> t.Optional[t.Any]: + """Loads the user from the request header. + + :param request: The request. + :return: The authenticated user, or None if the authentication fails + """ + g.digest_auth_state = AuthState() + authorization: Authorization = request.authorization + try: + if authorization is None: + raise UnauthorizedException + if authorization.type != "digest": + raise UnauthorizedException( + "Not an HTTP digest authorization") + auth.authenticate(g.digest_auth_state) + user = login_manager.user_callback(authorization.username) + login_user(user) + return user + except UnauthorizedException as e: + if str(e) != "": + current_app.logger.warning(str(e)) + return None diff --git a/src/flask_digest_auth/test.py b/src/flask_digest_auth/test.py new file mode 100644 index 0000000..4446e0d --- /dev/null +++ b/src/flask_digest_auth/test.py @@ -0,0 +1,102 @@ +# The Flask HTTP Digest Authentication Project. +# Author: imacat@mail.imacat.idv.tw (imacat), 2022/11/3 + +# Copyright (c) 2022 imacat. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""The test client with HTTP digest authentication enabled. + +""" +import typing as t +from secrets import token_urlsafe + +from flask import g +from werkzeug.datastructures import Authorization, WWWAuthenticate +from werkzeug.test import TestResponse, Client as WerkzeugClient + +from flask_digest_auth.algo import calc_response, make_password_hash + + +class Client(WerkzeugClient): + """The test client with HTTP digest authentication enabled.""" + + def open(self, *args, digest_auth: t.Optional[t.Tuple[str, str]] = None, + **kwargs) -> TestResponse: + """Opens a request. + + :param args: The arguments. + :param digest_auth: The username and password for the HTTP digest + authentication. + :param kwargs: The keyword arguments. + :return: The response. + """ + response: TestResponse = super(Client, self).open(*args, **kwargs) + www_authenticate: WWWAuthenticate = response.www_authenticate + if not (response.status_code == 401 + and www_authenticate.type == "digest" + and digest_auth is not None): + return response + if hasattr(g, "_login_user"): + delattr(g, "_login_user") + auth_data: Authorization = _get_req_auth( + www_authenticate, args[0], digest_auth[0], digest_auth[1]) + response = super(Client, self).open(*args, auth=auth_data, **kwargs) + return response + + +def _get_req_auth(www_authenticate: WWWAuthenticate, uri: str, + username: str, password: str) -> Authorization: + """Returns the request authorization from the response header. + + :param www_authenticate: The WWW-Authenticate response. + :param uri: The request URI. + :param username: The username. + :param password: The password. + :return: The request authorization. + """ + qop: t.Optional[t.Literal["auth", "auth-int"]] = None + if www_authenticate.qop is not None: + if "auth" in www_authenticate.qop: + qop = "auth" + + cnonce: t.Optional[str] = None + if qop is not None or www_authenticate.algorithm == "MD5-sess": + cnonce = token_urlsafe(8) + nc: t.Optional[str] = None + count: int = 1 + if qop is not None: + nc: str = hex(count)[2:].zfill(8) + + expected: str = calc_response( + method="GET", uri=uri, + password_hash=make_password_hash(www_authenticate.realm, + username, password), + nonce=www_authenticate.nonce, qop=qop, + algorithm=www_authenticate.algorithm, cnonce=cnonce, nc=nc, body=None) + + data: t.Dict[str, str] = { + "username": username, "realm": www_authenticate.realm, + "nonce": www_authenticate.nonce, "uri": uri, "response": expected} + if www_authenticate.algorithm is not None: + data["algorithm"] = www_authenticate.algorithm + if cnonce is not None: + data["cnonce"] = cnonce + if www_authenticate.opaque is not None: + data["opaque"] = www_authenticate.opaque + if qop is not None: + data["qop"] = qop + if nc is not None: + data["nc"] = nc + + return Authorization("digest", data=data) diff --git a/tests/test_algo.py b/tests/test_algo.py new file mode 100644 index 0000000..78f428e --- /dev/null +++ b/tests/test_algo.py @@ -0,0 +1,51 @@ +# The Flask HTTP Digest Authentication Project. +# Author: imacat@mail.imacat.idv.tw (imacat), 2022/10/30 + +# Copyright (c) 2022 imacat. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""The test case for the HTTP digest authentication algorithm. + +""" +import typing as t +import unittest + +from flask_digest_auth import make_password_hash, calc_response + + +class AlgorithmTestCase(unittest.TestCase): + """The test case for the HTTP digest authentication algorithm.""" + + def test_response_value(self) -> None: + """Tests the response value. + See https://en.wikipedia.org/wiki/Digest_access_authentication. + + :return: None. + """ + realm: str = "testrealm@host.com" + username: str = "Mufasa" + password: str = "Circle Of Life" + method: str = "GET" + uri: str = "/dir/index.html" + nonce: str = "dcd98b7102dd2f0e8b11d0f600bfb0c093" + qop: t.Optional[t.Literal["auth", "auth-int"]] = "auth" + algorithm: t.Optional[t.Literal["MD5", "MD5-sess"]] = None + cnonce: t.Optional[str] = "0a4f113b" + nc: t.Optional[str] = "00000001" + body: t.Optional[bytes] = None + + password_hash: str = make_password_hash(realm, username, password) + response: str = calc_response(method, uri, password_hash, nonce, qop, + algorithm, cnonce, nc, body) + self.assertEqual(response, "6629fae49393a05397450978507c4ef1") diff --git a/tests/test_auth.py b/tests/test_auth.py new file mode 100644 index 0000000..7b79f99 --- /dev/null +++ b/tests/test_auth.py @@ -0,0 +1,108 @@ +# The Flask HTTP Digest Authentication Project. +# Author: imacat@mail.imacat.idv.tw (imacat), 2022/10/22 + +# Copyright (c) 2022 imacat. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""The test case for the HTTP digest authentication. + +""" +import typing as t +from secrets import token_urlsafe +from types import SimpleNamespace + +from flask import Response, Flask, g +from flask_testing import TestCase + +from flask_digest_auth import DigestAuth, make_password_hash, Client + +_REALM: str = "testrealm@host.com" +_USERNAME: str = "Mufasa" +_PASSWORD: str = "Circle Of Life" + + +class AuthenticationTestCase(TestCase): + """The test case for the HTTP digest authentication.""" + + def create_app(self): + """Creates the Flask application. + + :return: The Flask application. + """ + auth: DigestAuth = DigestAuth(realm=_REALM) + user_db: t.Dict[str, str] \ + = {_USERNAME: make_password_hash(_REALM, _USERNAME, _PASSWORD)} + + @auth.register_get_password + def get_password_hash(username: str) -> t.Optional[str]: + """Returns the password hash of a user. + + :param username: The username. + :return: The password hash, or None if the user does not exist. + """ + return user_db[username] if username in user_db else None + + @auth.register_get_user + def get_user(username: str) -> t.Optional[t.Any]: + """Returns a user. + + :param username: The username. + :return: The user, or None if the user does not exist. + """ + return SimpleNamespace(username=username) if username in user_db \ + else None + + app: Flask = Flask(__name__) + app.config.from_mapping({ + "SECRET_KEY": token_urlsafe(32), + "TESTING": True + }) + app.test_client_class = Client + + @app.route("/login-required-1/auth", endpoint="auth-1") + @auth.login_required + def login_required_1() -> str: + """The first dummy view. + + :return: The response. + """ + return f"Hello, {g.user.username}! #1" + + @app.route("/login-required-2/auth", endpoint="auth-2") + @auth.login_required + def login_required_2() -> str: + """The second dummy view. + + :return: The response. + """ + return f"Hello, {g.user.username}! #2" + + return app + + def test_auth(self) -> None: + """Tests the authentication. + + :return: None. + """ + response: Response = self.client.get(self.app.url_for("auth-1")) + self.assertEqual(response.status_code, 401) + response = self.client.get( + self.app.url_for("auth-1"), digest_auth=(_USERNAME, _PASSWORD)) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.data.decode("UTF-8"), + f"Hello, {_USERNAME}! #1") + response: Response = self.client.get(self.app.url_for("auth-2")) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.data.decode("UTF-8"), + f"Hello, {_USERNAME}! #2") diff --git a/tests/test_flask_login.py b/tests/test_flask_login.py new file mode 100644 index 0000000..4262563 --- /dev/null +++ b/tests/test_flask_login.py @@ -0,0 +1,129 @@ +# The Flask HTTP Digest Authentication Project. +# Author: imacat@mail.imacat.idv.tw (imacat), 2022/11/23 + +# Copyright (c) 2022 imacat. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""The test case for the Flask-Login integration. + +""" +import typing as t +from secrets import token_urlsafe + +import flask_login +from flask import Response, Flask +from flask_login import LoginManager +from flask_testing import TestCase + +from flask_digest_auth import DigestAuth, make_password_hash, Client, \ + init_login_manager + +_REALM: str = "testrealm@host.com" +_USERNAME: str = "Mufasa" +_PASSWORD: str = "Circle Of Life" + + +class User: + def __init__(self, username: str): + self.username: str = username + self.is_authenticated: bool = True + self.is_active: bool = True + self.is_anonymous: bool = False + + def get_id(self) -> str: + """Returns the username. + This is required by Flask-Login. + + :return: The username. + """ + return self.username + + +class FlaskLoginTestCase(TestCase): + """The test case with the Flask-Login integration.""" + + def create_app(self): + """Creates the Flask application. + + :return: The Flask application. + """ + auth: DigestAuth = DigestAuth(realm=_REALM) + login_manager: LoginManager = LoginManager() + user_db: t.Dict[str, str] \ + = {_USERNAME: make_password_hash(_REALM, _USERNAME, _PASSWORD)} + + @auth.register_get_password + def get_password_hash(username: str) -> t.Optional[str]: + """Returns the password hash of a user. + + :param username: The username. + :return: The password hash, or None if the user does not exist. + """ + return user_db[username] if username in user_db else None + + app: Flask = Flask(__name__) + app.config.from_mapping({ + "SECRET_KEY": token_urlsafe(32), + "TESTING": True + }) + app.test_client_class = Client + + login_manager.init_app(app) + init_login_manager(auth, login_manager) + + @login_manager.user_loader + def load_user(user_id: str) -> t.Optional[User]: + """Loads a user. + + :param user_id: The username. + :return: The user, or None if the user does not exist. + """ + return User(user_id) if user_id in user_db else None + + @app.route("/login-required-1/auth", endpoint="auth-1") + @flask_login.login_required + def login_required_1() -> str: + """The first dummy view. + + :return: The response. + """ + return f"Hello, {flask_login.current_user.username}! #1" + + @app.route("/login-required-2/auth", endpoint="auth-2") + @flask_login.login_required + def login_required_2() -> str: + """The second dummy view. + + :return: The response. + """ + return f"Hello, {flask_login.current_user.username}! #2" + + return app + + def test_auth(self) -> None: + """Tests the authentication. + + :return: None. + """ + response: Response = self.client.get(self.app.url_for("auth-1")) + self.assertEqual(response.status_code, 401) + response = self.client.get( + self.app.url_for("auth-1"), digest_auth=(_USERNAME, _PASSWORD)) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.data.decode("UTF-8"), + f"Hello, {_USERNAME}! #1") + response: Response = self.client.get(self.app.url_for("auth-2")) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.data.decode("UTF-8"), + f"Hello, {_USERNAME}! #2")