17 Commits

Author SHA1 Message Date
495e9a9785 Advanced to version 0.2.1. 2022-12-06 07:59:44 +08:00
cbbd2248f0 Added the pytest test example to README.rst. 2022-12-06 07:54:49 +08:00
2028cb1362 Revised the User class in the AuthenticationTestCase and FlaskLoginTestCase test classes to accept the clear-text password instead of the password hash, to simplify the code. 2022-12-03 14:30:22 +08:00
7e71115844 Fixed the test_logout test of the FlaskLoginTestCase test case to skip without Flask-Login. 2022-12-03 11:56:40 +08:00
491da61a79 Fixed the AuthenticationTestCase and FlaskLoginTestCase test cases to store the user instead of finding the user through flask-login or g, so that the visit tests work without the application context. 2022-12-03 11:55:52 +08:00
bbaebbc80d Fixed the long lines in the AuthenticationTestCase and FlaskLoginTestCase test cases. 2022-11-30 23:35:04 +08:00
0dfdf70c45 Fixed a minor problem in the get_logged_in_user function. 2022-11-30 08:31:58 +08:00
0432561b21 Revised the minor problems in README.rst. 2022-11-30 08:31:26 +08:00
3709cb4d66 Fixed the return type hint of the get_logged_in_user function. 2022-11-30 08:30:42 +08:00
9d0d0b2686 Revised the login_required method of the DigestAuth class for readability. 2022-11-29 22:24:25 +08:00
8c98d35934 Revised the calc_response function for readability. 2022-11-29 22:10:18 +08:00
7db38c7eae Revised the code in the make_authorization method of the test client. 2022-11-29 21:53:16 +08:00
9616fb3ddc Added the get_opaque inline function in the make_response_header method of the DigestAuth class for readability. 2022-11-29 21:52:19 +08:00
f473db29a8 Revised the order in .gitignore. 2022-11-29 20:32:25 +08:00
b39e9b1321 Added sonar-project.properties to .gitignore. 2022-11-29 20:25:08 +08:00
f3b525d715 Replaced random.random() with secrets.randbits() in the make_response_header method of the DigestAuth class. 2022-11-29 19:13:50 +08:00
0f3694ba05 Added the SonarQube .scannerwork directory to .gitignore. 2022-11-29 19:12:47 +08:00
8 changed files with 155 additions and 86 deletions

9
.gitignore vendored
View File

@ -23,10 +23,13 @@ dist
.pytest_cache .pytest_cache
venv venv
flask_session
instance
.DS_Store .DS_Store
.idea .idea
instance
flask_session
.scannerwork
sonar-project.properties
excludes excludes

View File

@ -343,7 +343,7 @@ to ask the user for the username and password again.
Log In Bookkeeping Log In Bookkeeping
=================# ==================
You can register a callback to run when the user logs in, for ex., You can register a callback to run when the user logs in, for ex.,
logging the log in event, adding the log in counter, etc. logging the log in event, adding the log in counter, etc.
@ -359,10 +359,13 @@ Writing Tests
============= =============
You can write tests with our test client that handles HTTP Digest You can write tests with our test client that handles HTTP Digest
Authentication. Example for a unittest testcase: Authentication.
Example for a unittest_ test case:
:: ::
from flask import Flask
from flask_digest_auth import Client from flask_digest_auth import Client
from flask_testing import TestCase from flask_testing import TestCase
from my_app import create_app from my_app import create_app
@ -385,6 +388,41 @@ Authentication. Example for a unittest testcase:
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
Example for a pytest_ test:
::
import pytest
from flask import Flask
from flask_digest_auth import Client
from my_app import create_app
@pytest.fixture()
def app():
app: Flask = create_app({
"SECRET_KEY": token_urlsafe(32),
"TESTING": True
})
app.test_client_class = Client
yield app
@pytest.fixture()
def client(app):
return app.test_client()
def test_admin(app: Flask, client: Client):
with app.app_context():
response = self.client.get("/admin")
assert response.status_code == 401
response = self.client.get(
"/admin", digest_auth=("my_name", "my_pass"))
assert response.status_code == 200
.. _unittest: https://docs.python.org/3/library/unittest.html
.. _pytest: https://pytest.org
Copyright Copyright
========= =========
@ -402,6 +440,7 @@ Copyright
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
Authors Authors
======= =======

View File

@ -17,7 +17,7 @@
[metadata] [metadata]
name = flask-digest-auth name = flask-digest-auth
version = 0.2.0 version = 0.2.1
author = imacat author = imacat
author_email = imacat@mail.imacat.idv.tw author_email = imacat@mail.imacat.idv.tw
description = The Flask HTTP Digest Authentication project. description = The Flask HTTP Digest Authentication project.

View File

@ -61,6 +61,16 @@ def calc_response(
cnonce or nc is missing with the auth or auth-int qop. cnonce or nc is missing with the auth or auth-int qop.
""" """
def validate_required(field: t.Optional[str], error: str) -> None:
"""Validates a required field.
:param field: The field that is required.
:param error: The error message.
:return: None.
"""
if field is None:
raise UnauthorizedException(error)
def calc_ha1() -> str: def calc_ha1() -> str:
"""Calculates and returns the first hash. """Calculates and returns the first hash.
@ -68,16 +78,13 @@ def calc_response(
:raise UnauthorizedException: When the cnonce is missing with the MD5-sess :raise UnauthorizedException: When the cnonce is missing with the MD5-sess
algorithm. algorithm.
""" """
if algorithm is None or algorithm == "MD5":
return password_hash
if algorithm == "MD5-sess": if algorithm == "MD5-sess":
if cnonce is None: validate_required(
raise UnauthorizedException( cnonce, f"Missing \"cnonce\" with algorithm=\"{algorithm}\"")
f"Missing \"cnonce\" with algorithm=\"{algorithm}\"")
return md5(f"{password_hash}:{nonce}:{cnonce}".encode("utf8")) \ return md5(f"{password_hash}:{nonce}:{cnonce}".encode("utf8")) \
.hexdigest() .hexdigest()
raise UnauthorizedException( # algorithm is None or algorithm == "MD5"
f"Unsupported algorithm=\"{algorithm}\"") return password_hash
def calc_ha2() -> str: def calc_ha2() -> str:
"""Calculates the second hash. """Calculates the second hash.
@ -86,30 +93,20 @@ def calc_response(
:raise UnauthorizedException: When the body is missing with :raise UnauthorizedException: When the body is missing with
qop="auth-int". qop="auth-int".
""" """
if qop is None or qop == "auth":
return md5(f"{method}:{uri}".encode("utf8")).hexdigest()
if qop == "auth-int": if qop == "auth-int":
if body is None: validate_required(body, f"Missing \"body\" with qop=\"{qop}\"")
raise UnauthorizedException(
f"Missing \"body\" with qop=\"{qop}\"")
return md5( return md5(
f"{method}:{uri}:{md5(body).hexdigest()}".encode("utf8")) \ f"{method}:{uri}:{md5(body).hexdigest()}".encode("utf8")) \
.hexdigest() .hexdigest()
raise UnauthorizedException(f"Unsupported qop=\"{qop}\"") # qop is None or qop == "auth"
return md5(f"{method}:{uri}".encode("utf8")).hexdigest()
ha1: str = calc_ha1() ha1: str = calc_ha1()
ha2: str = calc_ha2() ha2: str = calc_ha2()
if qop is None:
return md5(f"{ha1}:{nonce}:{ha2}".encode("utf8")).hexdigest()
if qop == "auth" or qop == "auth-int": if qop == "auth" or qop == "auth-int":
if cnonce is None: validate_required(cnonce, f"Missing \"cnonce\" with the qop=\"{qop}\"")
raise UnauthorizedException( validate_required(nc, f"Missing \"nc\" with the qop=\"{qop}\"")
f"Missing \"cnonce\" with the qop=\"{qop}\"")
if nc is None:
raise UnauthorizedException(
f"Missing \"nc\" with the qop=\"{qop}\"")
return md5(f"{ha1}:{nonce}:{nc}:{cnonce}:{qop}:{ha2}".encode("utf8"))\ return md5(f"{ha1}:{nonce}:{nc}:{cnonce}:{qop}:{ha2}".encode("utf8"))\
.hexdigest() .hexdigest()
if cnonce is None: # qop is None
raise UnauthorizedException( return md5(f"{ha1}:{nonce}:{ha2}".encode("utf8")).hexdigest()
f"Unsupported qop=\"{qop}\"")

View File

@ -24,8 +24,7 @@ from __future__ import annotations
import sys import sys
import typing as t import typing as t
from functools import wraps from functools import wraps
from random import random from secrets import token_urlsafe, randbits
from secrets import token_urlsafe
from flask import g, request, Response, session, abort, Flask, Request from flask import g, request, Response, session, abort, Flask, Request
from itsdangerous import URLSafeTimedSerializer, BadData from itsdangerous import URLSafeTimedSerializer, BadData
@ -111,6 +110,37 @@ class DigestAuth:
class NoLogInException(Exception): class NoLogInException(Exception):
"""The exception thrown when the user is not authorized.""" """The exception thrown when the user is not authorized."""
def get_logged_in_user() -> t.Any:
"""Returns the currently logged-in user.
:return: The currently logged-in user.
:raise NoLogInException: When the user is not logged in.
"""
if "user" not in session:
raise NoLogInException
user: t.Optional[t.Any] = self.__get_user(session["user"])
if user is None:
del session["user"]
raise NoLogInException
return user
def auth_user(state: AuthState) -> t.Any:
"""Authenticates a user.
:param state: The authentication state.
:return: The user.
:raise UnauthorizedException: When the authentication fails.
"""
authorization: Authorization = request.authorization
if authorization is None:
raise UnauthorizedException
if authorization.type != "digest":
raise UnauthorizedException(
"Not an HTTP digest authorization")
self.authenticate(state)
session["user"] = authorization.username
return self.__get_user(authorization.username)
@wraps(view) @wraps(view)
def login_required_view(*args, **kwargs) -> t.Any: def login_required_view(*args, **kwargs) -> t.Any:
"""The login-protected view. """The login-protected view.
@ -120,36 +150,24 @@ class DigestAuth:
:return: The response. :return: The response.
""" """
try: try:
if "user" not in session: g.user = get_logged_in_user()
raise NoLogInException
user: t.Optional[t.Any] = self.__get_user(session["user"])
if user is None:
raise NoLogInException
g.user = user
return view(*args, **kwargs) return view(*args, **kwargs)
except NoLogInException: except NoLogInException:
state: AuthState = AuthState() pass
authorization: Authorization = request.authorization
try: state: AuthState = AuthState()
if authorization is None: try:
raise UnauthorizedException g.user = auth_user(state)
if authorization.type != "digest": self.__on_login(g.user)
raise UnauthorizedException( return view(*args, **kwargs)
"Not an HTTP digest authorization") except UnauthorizedException as e:
self.authenticate(state) if len(e.args) > 0:
session["user"] = authorization.username sys.stderr.write(e.args[0] + "\n")
user = self.__get_user(authorization.username) response: Response = Response()
g.user = user response.status = 401
self.__on_login(user) response.headers["WWW-Authenticate"] \
return view(*args, **kwargs) = self.make_response_header(state)
except UnauthorizedException as e: abort(response)
if len(e.args) > 0:
sys.stderr.write(e.args[0] + "\n")
response: Response = Response()
response.status = 401
response.headers["WWW-Authenticate"] \
= self.make_response_header(state)
abort(response)
return login_required_view return login_required_view
@ -204,11 +222,22 @@ class DigestAuth:
:param state: The authorization state. :param state: The authorization state.
:return: The WWW-Authenticate response header. :return: The WWW-Authenticate response header.
""" """
opaque: t.Optional[str] = None if not self.use_opaque else \
(state.opaque if state.opaque is not None def get_opaque() -> t.Optional[str]:
else self.serializer.dumps(random(), salt="opaque")) """Returns the opaque value.
:return: The opaque value.
"""
if not self.use_opaque:
return None
if state.opaque is not None:
return state.opaque
return self.serializer.dumps(randbits(32), salt="opaque")
opaque: t.Optional[str] = get_opaque()
nonce: str = self.serializer.dumps( nonce: str = self.serializer.dumps(
random(), salt="nonce" if opaque is None else f"nonce-{opaque}") randbits(32),
salt="nonce" if opaque is None else f"nonce-{opaque}")
header: str = f"Digest realm=\"{self.realm}\"" header: str = f"Digest realm=\"{self.realm}\""
if len(self.domain) > 0: if len(self.domain) > 0:

View File

@ -66,9 +66,8 @@ class Client(WerkzeugClient):
:return: The request authorization. :return: The request authorization.
""" """
qop: t.Optional[t.Literal["auth", "auth-int"]] = None qop: t.Optional[t.Literal["auth", "auth-int"]] = None
if www_authenticate.qop is not None: if www_authenticate.qop is not None and "auth" in www_authenticate.qop:
if "auth" in www_authenticate.qop: qop = "auth"
qop = "auth"
cnonce: t.Optional[str] = None cnonce: t.Optional[str] = None
if qop is not None or www_authenticate.algorithm == "MD5-sess": if qop is not None or www_authenticate.algorithm == "MD5-sess":

View File

@ -35,14 +35,15 @@ _PASSWORD: str = "Circle Of Life"
class User: class User:
"""A dummy user""" """A dummy user"""
def __init__(self, username: str, password_hash: str): def __init__(self, username: str, password: str):
"""Constructs a dummy user. """Constructs a dummy user.
:param username: The username. :param username: The username.
:param password_hash: The password hash. :param password: The clear-text password.
""" """
self.username: str = username self.username: str = username
self.password_hash: str = password_hash self.password_hash: str = make_password_hash(
_REALM, username, password)
self.visits: int = 0 self.visits: int = 0
@ -63,9 +64,8 @@ class AuthenticationTestCase(TestCase):
auth: DigestAuth = DigestAuth(realm=_REALM) auth: DigestAuth = DigestAuth(realm=_REALM)
auth.init_app(app) auth.init_app(app)
user_db: t.Dict[str, User] \ self.user: User = User(_USERNAME, _PASSWORD)
= {_USERNAME: User( user_db: t.Dict[str, User] = {_USERNAME: self.user}
_USERNAME, make_password_hash(_REALM, _USERNAME, _PASSWORD))}
@auth.register_get_password @auth.register_get_password
def get_password_hash(username: str) -> t.Optional[str]: def get_password_hash(username: str) -> t.Optional[str]:
@ -141,7 +141,7 @@ class AuthenticationTestCase(TestCase):
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
self.assertEqual(response.data.decode("UTF-8"), self.assertEqual(response.data.decode("UTF-8"),
f"Hello, {_USERNAME}! #2") f"Hello, {_USERNAME}! #2")
self.assertEqual(g.user.visits, 1) self.assertEqual(self.user.visits, 1)
def test_stale_opaque(self) -> None: def test_stale_opaque(self) -> None:
"""Tests the stale and opaque value. """Tests the stale and opaque value.
@ -218,4 +218,4 @@ class AuthenticationTestCase(TestCase):
response = self.client.get(admin_uri) response = self.client.get(admin_uri)
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
self.assertEqual(g.user.visits, 2) self.assertEqual(self.user.visits, 2)

View File

@ -21,7 +21,6 @@
import typing as t import typing as t
from secrets import token_urlsafe from secrets import token_urlsafe
import flask_login
from flask import Response, Flask, g, redirect, request from flask import Response, Flask, g, redirect, request
from flask_testing import TestCase from flask_testing import TestCase
from werkzeug.datastructures import WWWAuthenticate, Authorization from werkzeug.datastructures import WWWAuthenticate, Authorization
@ -36,14 +35,15 @@ _PASSWORD: str = "Circle Of Life"
class User: class User:
"""A dummy user.""" """A dummy user."""
def __init__(self, username: str, password_hash: str): def __init__(self, username: str, password: str):
"""Constructs a dummy user. """Constructs a dummy user.
:param username: The username. :param username: The username.
:param password_hash: The password hash. :param password: The clear-text password.
""" """
self.username: str = username self.username: str = username
self.password_hash: str = password_hash self.password_hash: str = make_password_hash(
_REALM, username, password)
self.visits: int = 0 self.visits: int = 0
self.is_authenticated: bool = True self.is_authenticated: bool = True
self.is_active: bool = True self.is_active: bool = True
@ -86,9 +86,8 @@ class FlaskLoginTestCase(TestCase):
auth: DigestAuth = DigestAuth(realm=_REALM) auth: DigestAuth = DigestAuth(realm=_REALM)
auth.init_app(app) auth.init_app(app)
user_db: t.Dict[str, User] \ self.user: User = User(_USERNAME, _PASSWORD)
= {_USERNAME: User( user_db: t.Dict[str, User] = {_USERNAME: self.user}
_USERNAME, make_password_hash(_REALM, _USERNAME, _PASSWORD))}
@auth.register_get_password @auth.register_get_password
def get_password_hash(username: str) -> t.Optional[str]: def get_password_hash(username: str) -> t.Optional[str]:
@ -154,7 +153,7 @@ class FlaskLoginTestCase(TestCase):
:return: None. :return: None.
""" """
if not self.has_flask_login: if not self.has_flask_login:
self.skipTest("Skipped testing Flask-Login integration without it.") self.skipTest("Skipped without Flask-Login.")
response: Response = self.client.get(self.app.url_for("admin-1")) response: Response = self.client.get(self.app.url_for("admin-1"))
self.assertEqual(response.status_code, 401) self.assertEqual(response.status_code, 401)
@ -167,7 +166,7 @@ class FlaskLoginTestCase(TestCase):
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
self.assertEqual(response.data.decode("UTF-8"), self.assertEqual(response.data.decode("UTF-8"),
f"Hello, {_USERNAME}! #2") f"Hello, {_USERNAME}! #2")
self.assertEqual(flask_login.current_user.visits, 1) self.assertEqual(self.user.visits, 1)
def test_stale_opaque(self) -> None: def test_stale_opaque(self) -> None:
"""Tests the stale and opaque value. """Tests the stale and opaque value.
@ -175,7 +174,7 @@ class FlaskLoginTestCase(TestCase):
:return: None. :return: None.
""" """
if not self.has_flask_login: if not self.has_flask_login:
self.skipTest("Skipped testing Flask-Login integration without it.") self.skipTest("Skipped without Flask-Login.")
admin_uri: str = self.app.url_for("admin-1") admin_uri: str = self.app.url_for("admin-1")
response: Response response: Response
@ -222,6 +221,9 @@ class FlaskLoginTestCase(TestCase):
:return: None. :return: None.
""" """
if not self.has_flask_login:
self.skipTest("Skipped without Flask-Login.")
admin_uri: str = self.app.url_for("admin-1") admin_uri: str = self.app.url_for("admin-1")
logout_uri: str = self.app.url_for("logout") logout_uri: str = self.app.url_for("logout")
response: Response response: Response
@ -253,4 +255,4 @@ class FlaskLoginTestCase(TestCase):
response = self.client.get(admin_uri) response = self.client.get(admin_uri)
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
self.assertEqual(flask_login.current_user.visits, 2) self.assertEqual(self.user.visits, 2)