Compare commits

..

6 Commits

4 changed files with 87 additions and 61 deletions

8
.gitignore vendored
View File

@ -23,11 +23,13 @@ dist
.pytest_cache
venv
flask_session
instance
.DS_Store
.idea
instance
flask_session
.scannerwork
sonar-project.properties
excludes

View File

@ -61,6 +61,16 @@ def calc_response(
cnonce or nc is missing with the auth or auth-int qop.
"""
def validate_required(field: t.Optional[str], error: str) -> None:
"""Validates a required field.
:param field: The field that is required.
:param error: The error message.
:return: None.
"""
if field is None:
raise UnauthorizedException(error)
def calc_ha1() -> str:
"""Calculates and returns the first hash.
@ -68,16 +78,13 @@ def calc_response(
:raise UnauthorizedException: When the cnonce is missing with the MD5-sess
algorithm.
"""
if algorithm is None or algorithm == "MD5":
return password_hash
if algorithm == "MD5-sess":
if cnonce is None:
raise UnauthorizedException(
f"Missing \"cnonce\" with algorithm=\"{algorithm}\"")
validate_required(
cnonce, f"Missing \"cnonce\" with algorithm=\"{algorithm}\"")
return md5(f"{password_hash}:{nonce}:{cnonce}".encode("utf8")) \
.hexdigest()
raise UnauthorizedException(
f"Unsupported algorithm=\"{algorithm}\"")
# algorithm is None or algorithm == "MD5"
return password_hash
def calc_ha2() -> str:
"""Calculates the second hash.
@ -86,30 +93,20 @@ def calc_response(
:raise UnauthorizedException: When the body is missing with
qop="auth-int".
"""
if qop is None or qop == "auth":
return md5(f"{method}:{uri}".encode("utf8")).hexdigest()
if qop == "auth-int":
if body is None:
raise UnauthorizedException(
f"Missing \"body\" with qop=\"{qop}\"")
validate_required(body, f"Missing \"body\" with qop=\"{qop}\"")
return md5(
f"{method}:{uri}:{md5(body).hexdigest()}".encode("utf8")) \
.hexdigest()
raise UnauthorizedException(f"Unsupported qop=\"{qop}\"")
# qop is None or qop == "auth"
return md5(f"{method}:{uri}".encode("utf8")).hexdigest()
ha1: str = calc_ha1()
ha2: str = calc_ha2()
if qop is None:
return md5(f"{ha1}:{nonce}:{ha2}".encode("utf8")).hexdigest()
if qop == "auth" or qop == "auth-int":
if cnonce is None:
raise UnauthorizedException(
f"Missing \"cnonce\" with the qop=\"{qop}\"")
if nc is None:
raise UnauthorizedException(
f"Missing \"nc\" with the qop=\"{qop}\"")
validate_required(cnonce, f"Missing \"cnonce\" with the qop=\"{qop}\"")
validate_required(nc, f"Missing \"nc\" with the qop=\"{qop}\"")
return md5(f"{ha1}:{nonce}:{nc}:{cnonce}:{qop}:{ha2}".encode("utf8"))\
.hexdigest()
if cnonce is None:
raise UnauthorizedException(
f"Unsupported qop=\"{qop}\"")
# qop is None
return md5(f"{ha1}:{nonce}:{ha2}".encode("utf8")).hexdigest()

View File

@ -110,6 +110,36 @@ class DigestAuth:
class NoLogInException(Exception):
"""The exception thrown when the user is not authorized."""
def get_logged_in_user() -> t.Optional[t.Any]:
"""Returns the currently logged-in user.
:return: The currently logged-in user.
:raise NoLogInException: When the user is not logged in.
"""
if "user" not in session:
raise NoLogInException
user: t.Optional[t.Any] = self.__get_user(session["user"])
if user is None:
raise NoLogInException
return user
def auth_user(state: AuthState) -> t.Any:
"""Authenticates a user.
:param state: The authentication state.
:return: The user.
:raise UnauthorizedException: When the authentication fails.
"""
authorization: Authorization = request.authorization
if authorization is None:
raise UnauthorizedException
if authorization.type != "digest":
raise UnauthorizedException(
"Not an HTTP digest authorization")
self.authenticate(state)
session["user"] = authorization.username
return self.__get_user(authorization.username)
@wraps(view)
def login_required_view(*args, **kwargs) -> t.Any:
"""The login-protected view.
@ -119,27 +149,15 @@ class DigestAuth:
:return: The response.
"""
try:
if "user" not in session:
raise NoLogInException
user: t.Optional[t.Any] = self.__get_user(session["user"])
if user is None:
raise NoLogInException
g.user = user
g.user = get_logged_in_user()
return view(*args, **kwargs)
except NoLogInException:
pass
state: AuthState = AuthState()
authorization: Authorization = request.authorization
try:
if authorization is None:
raise UnauthorizedException
if authorization.type != "digest":
raise UnauthorizedException(
"Not an HTTP digest authorization")
self.authenticate(state)
session["user"] = authorization.username
user = self.__get_user(authorization.username)
g.user = user
self.__on_login(user)
g.user = auth_user(state)
self.__on_login(g.user)
return view(*args, **kwargs)
except UnauthorizedException as e:
if len(e.args) > 0:
@ -203,9 +221,19 @@ class DigestAuth:
:param state: The authorization state.
:return: The WWW-Authenticate response header.
"""
opaque: t.Optional[str] = None if not self.use_opaque else \
(state.opaque if state.opaque is not None
else self.serializer.dumps(randbits(32), salt="opaque"))
def get_opaque() -> t.Optional[str]:
"""Returns the opaque value.
:return: The opaque value.
"""
if not self.use_opaque:
return None
if state.opaque is not None:
return state.opaque
return self.serializer.dumps(randbits(32), salt="opaque")
opaque: t.Optional[str] = get_opaque()
nonce: str = self.serializer.dumps(
randbits(32),
salt="nonce" if opaque is None else f"nonce-{opaque}")

View File

@ -66,8 +66,7 @@ class Client(WerkzeugClient):
:return: The request authorization.
"""
qop: t.Optional[t.Literal["auth", "auth-int"]] = None
if www_authenticate.qop is not None:
if "auth" in www_authenticate.qop:
if www_authenticate.qop is not None and "auth" in www_authenticate.qop:
qop = "auth"
cnonce: t.Optional[str] = None