17 Commits

Author SHA1 Message Date
495e9a9785 Advanced to version 0.2.1. 2022-12-06 07:59:44 +08:00
cbbd2248f0 Added the pytest test example to README.rst. 2022-12-06 07:54:49 +08:00
2028cb1362 Revised the User class in the AuthenticationTestCase and FlaskLoginTestCase test classes to accept the clear-text password instead of the password hash, to simplify the code. 2022-12-03 14:30:22 +08:00
7e71115844 Fixed the test_logout test of the FlaskLoginTestCase test case to skip without Flask-Login. 2022-12-03 11:56:40 +08:00
491da61a79 Fixed the AuthenticationTestCase and FlaskLoginTestCase test cases to store the user instead of finding the user through flask-login or g, so that the visit tests work without the application context. 2022-12-03 11:55:52 +08:00
bbaebbc80d Fixed the long lines in the AuthenticationTestCase and FlaskLoginTestCase test cases. 2022-11-30 23:35:04 +08:00
0dfdf70c45 Fixed a minor problem in the get_logged_in_user function. 2022-11-30 08:31:58 +08:00
0432561b21 Revised the minor problems in README.rst. 2022-11-30 08:31:26 +08:00
3709cb4d66 Fixed the return type hint of the get_logged_in_user function. 2022-11-30 08:30:42 +08:00
9d0d0b2686 Revised the login_required method of the DigestAuth class for readability. 2022-11-29 22:24:25 +08:00
8c98d35934 Revised the calc_response function for readability. 2022-11-29 22:10:18 +08:00
7db38c7eae Revised the code in the make_authorization method of the test client. 2022-11-29 21:53:16 +08:00
9616fb3ddc Added the get_opaque inline function in the make_response_header method of the DigestAuth class for readability. 2022-11-29 21:52:19 +08:00
f473db29a8 Revised the order in .gitignore. 2022-11-29 20:32:25 +08:00
b39e9b1321 Added sonar-project.properties to .gitignore. 2022-11-29 20:25:08 +08:00
f3b525d715 Replaced random.random() with secrets.randbits() in the make_response_header method of the DigestAuth class. 2022-11-29 19:13:50 +08:00
0f3694ba05 Added the SonarQube .scannerwork directory to .gitignore. 2022-11-29 19:12:47 +08:00
8 changed files with 155 additions and 86 deletions

9
.gitignore vendored
View File

@ -23,10 +23,13 @@ dist
.pytest_cache
venv
flask_session
instance
.DS_Store
.idea
instance
flask_session
.scannerwork
sonar-project.properties
excludes

View File

@ -343,7 +343,7 @@ to ask the user for the username and password again.
Log In Bookkeeping
=================#
==================
You can register a callback to run when the user logs in, for ex.,
logging the log in event, adding the log in counter, etc.
@ -359,10 +359,13 @@ Writing Tests
=============
You can write tests with our test client that handles HTTP Digest
Authentication. Example for a unittest testcase:
Authentication.
Example for a unittest_ test case:
::
from flask import Flask
from flask_digest_auth import Client
from flask_testing import TestCase
from my_app import create_app
@ -385,6 +388,41 @@ Authentication. Example for a unittest testcase:
self.assertEqual(response.status_code, 200)
Example for a pytest_ test:
::
import pytest
from flask import Flask
from flask_digest_auth import Client
from my_app import create_app
@pytest.fixture()
def app():
app: Flask = create_app({
"SECRET_KEY": token_urlsafe(32),
"TESTING": True
})
app.test_client_class = Client
yield app
@pytest.fixture()
def client(app):
return app.test_client()
def test_admin(app: Flask, client: Client):
with app.app_context():
response = self.client.get("/admin")
assert response.status_code == 401
response = self.client.get(
"/admin", digest_auth=("my_name", "my_pass"))
assert response.status_code == 200
.. _unittest: https://docs.python.org/3/library/unittest.html
.. _pytest: https://pytest.org
Copyright
=========
@ -402,6 +440,7 @@ Copyright
See the License for the specific language governing permissions and
limitations under the License.
Authors
=======

View File

@ -17,7 +17,7 @@
[metadata]
name = flask-digest-auth
version = 0.2.0
version = 0.2.1
author = imacat
author_email = imacat@mail.imacat.idv.tw
description = The Flask HTTP Digest Authentication project.

View File

@ -61,6 +61,16 @@ def calc_response(
cnonce or nc is missing with the auth or auth-int qop.
"""
def validate_required(field: t.Optional[str], error: str) -> None:
"""Validates a required field.
:param field: The field that is required.
:param error: The error message.
:return: None.
"""
if field is None:
raise UnauthorizedException(error)
def calc_ha1() -> str:
"""Calculates and returns the first hash.
@ -68,16 +78,13 @@ def calc_response(
:raise UnauthorizedException: When the cnonce is missing with the MD5-sess
algorithm.
"""
if algorithm is None or algorithm == "MD5":
return password_hash
if algorithm == "MD5-sess":
if cnonce is None:
raise UnauthorizedException(
f"Missing \"cnonce\" with algorithm=\"{algorithm}\"")
validate_required(
cnonce, f"Missing \"cnonce\" with algorithm=\"{algorithm}\"")
return md5(f"{password_hash}:{nonce}:{cnonce}".encode("utf8")) \
.hexdigest()
raise UnauthorizedException(
f"Unsupported algorithm=\"{algorithm}\"")
# algorithm is None or algorithm == "MD5"
return password_hash
def calc_ha2() -> str:
"""Calculates the second hash.
@ -86,30 +93,20 @@ def calc_response(
:raise UnauthorizedException: When the body is missing with
qop="auth-int".
"""
if qop is None or qop == "auth":
return md5(f"{method}:{uri}".encode("utf8")).hexdigest()
if qop == "auth-int":
if body is None:
raise UnauthorizedException(
f"Missing \"body\" with qop=\"{qop}\"")
validate_required(body, f"Missing \"body\" with qop=\"{qop}\"")
return md5(
f"{method}:{uri}:{md5(body).hexdigest()}".encode("utf8")) \
.hexdigest()
raise UnauthorizedException(f"Unsupported qop=\"{qop}\"")
# qop is None or qop == "auth"
return md5(f"{method}:{uri}".encode("utf8")).hexdigest()
ha1: str = calc_ha1()
ha2: str = calc_ha2()
if qop is None:
return md5(f"{ha1}:{nonce}:{ha2}".encode("utf8")).hexdigest()
if qop == "auth" or qop == "auth-int":
if cnonce is None:
raise UnauthorizedException(
f"Missing \"cnonce\" with the qop=\"{qop}\"")
if nc is None:
raise UnauthorizedException(
f"Missing \"nc\" with the qop=\"{qop}\"")
validate_required(cnonce, f"Missing \"cnonce\" with the qop=\"{qop}\"")
validate_required(nc, f"Missing \"nc\" with the qop=\"{qop}\"")
return md5(f"{ha1}:{nonce}:{nc}:{cnonce}:{qop}:{ha2}".encode("utf8"))\
.hexdigest()
if cnonce is None:
raise UnauthorizedException(
f"Unsupported qop=\"{qop}\"")
# qop is None
return md5(f"{ha1}:{nonce}:{ha2}".encode("utf8")).hexdigest()

View File

@ -24,8 +24,7 @@ from __future__ import annotations
import sys
import typing as t
from functools import wraps
from random import random
from secrets import token_urlsafe
from secrets import token_urlsafe, randbits
from flask import g, request, Response, session, abort, Flask, Request
from itsdangerous import URLSafeTimedSerializer, BadData
@ -111,6 +110,37 @@ class DigestAuth:
class NoLogInException(Exception):
"""The exception thrown when the user is not authorized."""
def get_logged_in_user() -> t.Any:
"""Returns the currently logged-in user.
:return: The currently logged-in user.
:raise NoLogInException: When the user is not logged in.
"""
if "user" not in session:
raise NoLogInException
user: t.Optional[t.Any] = self.__get_user(session["user"])
if user is None:
del session["user"]
raise NoLogInException
return user
def auth_user(state: AuthState) -> t.Any:
"""Authenticates a user.
:param state: The authentication state.
:return: The user.
:raise UnauthorizedException: When the authentication fails.
"""
authorization: Authorization = request.authorization
if authorization is None:
raise UnauthorizedException
if authorization.type != "digest":
raise UnauthorizedException(
"Not an HTTP digest authorization")
self.authenticate(state)
session["user"] = authorization.username
return self.__get_user(authorization.username)
@wraps(view)
def login_required_view(*args, **kwargs) -> t.Any:
"""The login-protected view.
@ -120,36 +150,24 @@ class DigestAuth:
:return: The response.
"""
try:
if "user" not in session:
raise NoLogInException
user: t.Optional[t.Any] = self.__get_user(session["user"])
if user is None:
raise NoLogInException
g.user = user
g.user = get_logged_in_user()
return view(*args, **kwargs)
except NoLogInException:
state: AuthState = AuthState()
authorization: Authorization = request.authorization
try:
if authorization is None:
raise UnauthorizedException
if authorization.type != "digest":
raise UnauthorizedException(
"Not an HTTP digest authorization")
self.authenticate(state)
session["user"] = authorization.username
user = self.__get_user(authorization.username)
g.user = user
self.__on_login(user)
return view(*args, **kwargs)
except UnauthorizedException as e:
if len(e.args) > 0:
sys.stderr.write(e.args[0] + "\n")
response: Response = Response()
response.status = 401
response.headers["WWW-Authenticate"] \
= self.make_response_header(state)
abort(response)
pass
state: AuthState = AuthState()
try:
g.user = auth_user(state)
self.__on_login(g.user)
return view(*args, **kwargs)
except UnauthorizedException as e:
if len(e.args) > 0:
sys.stderr.write(e.args[0] + "\n")
response: Response = Response()
response.status = 401
response.headers["WWW-Authenticate"] \
= self.make_response_header(state)
abort(response)
return login_required_view
@ -204,11 +222,22 @@ class DigestAuth:
:param state: The authorization state.
:return: The WWW-Authenticate response header.
"""
opaque: t.Optional[str] = None if not self.use_opaque else \
(state.opaque if state.opaque is not None
else self.serializer.dumps(random(), salt="opaque"))
def get_opaque() -> t.Optional[str]:
"""Returns the opaque value.
:return: The opaque value.
"""
if not self.use_opaque:
return None
if state.opaque is not None:
return state.opaque
return self.serializer.dumps(randbits(32), salt="opaque")
opaque: t.Optional[str] = get_opaque()
nonce: str = self.serializer.dumps(
random(), salt="nonce" if opaque is None else f"nonce-{opaque}")
randbits(32),
salt="nonce" if opaque is None else f"nonce-{opaque}")
header: str = f"Digest realm=\"{self.realm}\""
if len(self.domain) > 0:

View File

@ -66,9 +66,8 @@ class Client(WerkzeugClient):
:return: The request authorization.
"""
qop: t.Optional[t.Literal["auth", "auth-int"]] = None
if www_authenticate.qop is not None:
if "auth" in www_authenticate.qop:
qop = "auth"
if www_authenticate.qop is not None and "auth" in www_authenticate.qop:
qop = "auth"
cnonce: t.Optional[str] = None
if qop is not None or www_authenticate.algorithm == "MD5-sess":

View File

@ -35,14 +35,15 @@ _PASSWORD: str = "Circle Of Life"
class User:
"""A dummy user"""
def __init__(self, username: str, password_hash: str):
def __init__(self, username: str, password: str):
"""Constructs a dummy user.
:param username: The username.
:param password_hash: The password hash.
:param password: The clear-text password.
"""
self.username: str = username
self.password_hash: str = password_hash
self.password_hash: str = make_password_hash(
_REALM, username, password)
self.visits: int = 0
@ -63,9 +64,8 @@ class AuthenticationTestCase(TestCase):
auth: DigestAuth = DigestAuth(realm=_REALM)
auth.init_app(app)
user_db: t.Dict[str, User] \
= {_USERNAME: User(
_USERNAME, make_password_hash(_REALM, _USERNAME, _PASSWORD))}
self.user: User = User(_USERNAME, _PASSWORD)
user_db: t.Dict[str, User] = {_USERNAME: self.user}
@auth.register_get_password
def get_password_hash(username: str) -> t.Optional[str]:
@ -141,7 +141,7 @@ class AuthenticationTestCase(TestCase):
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data.decode("UTF-8"),
f"Hello, {_USERNAME}! #2")
self.assertEqual(g.user.visits, 1)
self.assertEqual(self.user.visits, 1)
def test_stale_opaque(self) -> None:
"""Tests the stale and opaque value.
@ -218,4 +218,4 @@ class AuthenticationTestCase(TestCase):
response = self.client.get(admin_uri)
self.assertEqual(response.status_code, 200)
self.assertEqual(g.user.visits, 2)
self.assertEqual(self.user.visits, 2)

View File

@ -21,7 +21,6 @@
import typing as t
from secrets import token_urlsafe
import flask_login
from flask import Response, Flask, g, redirect, request
from flask_testing import TestCase
from werkzeug.datastructures import WWWAuthenticate, Authorization
@ -36,14 +35,15 @@ _PASSWORD: str = "Circle Of Life"
class User:
"""A dummy user."""
def __init__(self, username: str, password_hash: str):
def __init__(self, username: str, password: str):
"""Constructs a dummy user.
:param username: The username.
:param password_hash: The password hash.
:param password: The clear-text password.
"""
self.username: str = username
self.password_hash: str = password_hash
self.password_hash: str = make_password_hash(
_REALM, username, password)
self.visits: int = 0
self.is_authenticated: bool = True
self.is_active: bool = True
@ -86,9 +86,8 @@ class FlaskLoginTestCase(TestCase):
auth: DigestAuth = DigestAuth(realm=_REALM)
auth.init_app(app)
user_db: t.Dict[str, User] \
= {_USERNAME: User(
_USERNAME, make_password_hash(_REALM, _USERNAME, _PASSWORD))}
self.user: User = User(_USERNAME, _PASSWORD)
user_db: t.Dict[str, User] = {_USERNAME: self.user}
@auth.register_get_password
def get_password_hash(username: str) -> t.Optional[str]:
@ -154,7 +153,7 @@ class FlaskLoginTestCase(TestCase):
:return: None.
"""
if not self.has_flask_login:
self.skipTest("Skipped testing Flask-Login integration without it.")
self.skipTest("Skipped without Flask-Login.")
response: Response = self.client.get(self.app.url_for("admin-1"))
self.assertEqual(response.status_code, 401)
@ -167,7 +166,7 @@ class FlaskLoginTestCase(TestCase):
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data.decode("UTF-8"),
f"Hello, {_USERNAME}! #2")
self.assertEqual(flask_login.current_user.visits, 1)
self.assertEqual(self.user.visits, 1)
def test_stale_opaque(self) -> None:
"""Tests the stale and opaque value.
@ -175,7 +174,7 @@ class FlaskLoginTestCase(TestCase):
:return: None.
"""
if not self.has_flask_login:
self.skipTest("Skipped testing Flask-Login integration without it.")
self.skipTest("Skipped without Flask-Login.")
admin_uri: str = self.app.url_for("admin-1")
response: Response
@ -222,6 +221,9 @@ class FlaskLoginTestCase(TestCase):
:return: None.
"""
if not self.has_flask_login:
self.skipTest("Skipped without Flask-Login.")
admin_uri: str = self.app.url_for("admin-1")
logout_uri: str = self.app.url_for("logout")
response: Response
@ -253,4 +255,4 @@ class FlaskLoginTestCase(TestCase):
response = self.client.get(admin_uri)
self.assertEqual(response.status_code, 200)
self.assertEqual(flask_login.current_user.visits, 2)
self.assertEqual(self.user.visits, 2)