Compare commits
6 Commits
f3b525d715
...
9d0d0b2686
Author | SHA1 | Date | |
---|---|---|---|
9d0d0b2686 | |||
8c98d35934 | |||
7db38c7eae | |||
9616fb3ddc | |||
f473db29a8 | |||
b39e9b1321 |
8
.gitignore
vendored
8
.gitignore
vendored
@ -23,11 +23,13 @@ dist
|
|||||||
.pytest_cache
|
.pytest_cache
|
||||||
venv
|
venv
|
||||||
|
|
||||||
flask_session
|
|
||||||
instance
|
|
||||||
|
|
||||||
.DS_Store
|
.DS_Store
|
||||||
.idea
|
.idea
|
||||||
|
|
||||||
|
instance
|
||||||
|
flask_session
|
||||||
|
|
||||||
.scannerwork
|
.scannerwork
|
||||||
|
sonar-project.properties
|
||||||
|
|
||||||
excludes
|
excludes
|
||||||
|
@ -61,6 +61,16 @@ def calc_response(
|
|||||||
cnonce or nc is missing with the auth or auth-int qop.
|
cnonce or nc is missing with the auth or auth-int qop.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
def validate_required(field: t.Optional[str], error: str) -> None:
|
||||||
|
"""Validates a required field.
|
||||||
|
|
||||||
|
:param field: The field that is required.
|
||||||
|
:param error: The error message.
|
||||||
|
:return: None.
|
||||||
|
"""
|
||||||
|
if field is None:
|
||||||
|
raise UnauthorizedException(error)
|
||||||
|
|
||||||
def calc_ha1() -> str:
|
def calc_ha1() -> str:
|
||||||
"""Calculates and returns the first hash.
|
"""Calculates and returns the first hash.
|
||||||
|
|
||||||
@ -68,16 +78,13 @@ def calc_response(
|
|||||||
:raise UnauthorizedException: When the cnonce is missing with the MD5-sess
|
:raise UnauthorizedException: When the cnonce is missing with the MD5-sess
|
||||||
algorithm.
|
algorithm.
|
||||||
"""
|
"""
|
||||||
if algorithm is None or algorithm == "MD5":
|
|
||||||
return password_hash
|
|
||||||
if algorithm == "MD5-sess":
|
if algorithm == "MD5-sess":
|
||||||
if cnonce is None:
|
validate_required(
|
||||||
raise UnauthorizedException(
|
cnonce, f"Missing \"cnonce\" with algorithm=\"{algorithm}\"")
|
||||||
f"Missing \"cnonce\" with algorithm=\"{algorithm}\"")
|
|
||||||
return md5(f"{password_hash}:{nonce}:{cnonce}".encode("utf8")) \
|
return md5(f"{password_hash}:{nonce}:{cnonce}".encode("utf8")) \
|
||||||
.hexdigest()
|
.hexdigest()
|
||||||
raise UnauthorizedException(
|
# algorithm is None or algorithm == "MD5"
|
||||||
f"Unsupported algorithm=\"{algorithm}\"")
|
return password_hash
|
||||||
|
|
||||||
def calc_ha2() -> str:
|
def calc_ha2() -> str:
|
||||||
"""Calculates the second hash.
|
"""Calculates the second hash.
|
||||||
@ -86,30 +93,20 @@ def calc_response(
|
|||||||
:raise UnauthorizedException: When the body is missing with
|
:raise UnauthorizedException: When the body is missing with
|
||||||
qop="auth-int".
|
qop="auth-int".
|
||||||
"""
|
"""
|
||||||
if qop is None or qop == "auth":
|
|
||||||
return md5(f"{method}:{uri}".encode("utf8")).hexdigest()
|
|
||||||
if qop == "auth-int":
|
if qop == "auth-int":
|
||||||
if body is None:
|
validate_required(body, f"Missing \"body\" with qop=\"{qop}\"")
|
||||||
raise UnauthorizedException(
|
|
||||||
f"Missing \"body\" with qop=\"{qop}\"")
|
|
||||||
return md5(
|
return md5(
|
||||||
f"{method}:{uri}:{md5(body).hexdigest()}".encode("utf8")) \
|
f"{method}:{uri}:{md5(body).hexdigest()}".encode("utf8")) \
|
||||||
.hexdigest()
|
.hexdigest()
|
||||||
raise UnauthorizedException(f"Unsupported qop=\"{qop}\"")
|
# qop is None or qop == "auth"
|
||||||
|
return md5(f"{method}:{uri}".encode("utf8")).hexdigest()
|
||||||
|
|
||||||
ha1: str = calc_ha1()
|
ha1: str = calc_ha1()
|
||||||
ha2: str = calc_ha2()
|
ha2: str = calc_ha2()
|
||||||
if qop is None:
|
|
||||||
return md5(f"{ha1}:{nonce}:{ha2}".encode("utf8")).hexdigest()
|
|
||||||
if qop == "auth" or qop == "auth-int":
|
if qop == "auth" or qop == "auth-int":
|
||||||
if cnonce is None:
|
validate_required(cnonce, f"Missing \"cnonce\" with the qop=\"{qop}\"")
|
||||||
raise UnauthorizedException(
|
validate_required(nc, f"Missing \"nc\" with the qop=\"{qop}\"")
|
||||||
f"Missing \"cnonce\" with the qop=\"{qop}\"")
|
|
||||||
if nc is None:
|
|
||||||
raise UnauthorizedException(
|
|
||||||
f"Missing \"nc\" with the qop=\"{qop}\"")
|
|
||||||
return md5(f"{ha1}:{nonce}:{nc}:{cnonce}:{qop}:{ha2}".encode("utf8"))\
|
return md5(f"{ha1}:{nonce}:{nc}:{cnonce}:{qop}:{ha2}".encode("utf8"))\
|
||||||
.hexdigest()
|
.hexdigest()
|
||||||
if cnonce is None:
|
# qop is None
|
||||||
raise UnauthorizedException(
|
return md5(f"{ha1}:{nonce}:{ha2}".encode("utf8")).hexdigest()
|
||||||
f"Unsupported qop=\"{qop}\"")
|
|
||||||
|
@ -110,6 +110,36 @@ class DigestAuth:
|
|||||||
class NoLogInException(Exception):
|
class NoLogInException(Exception):
|
||||||
"""The exception thrown when the user is not authorized."""
|
"""The exception thrown when the user is not authorized."""
|
||||||
|
|
||||||
|
def get_logged_in_user() -> t.Optional[t.Any]:
|
||||||
|
"""Returns the currently logged-in user.
|
||||||
|
|
||||||
|
:return: The currently logged-in user.
|
||||||
|
:raise NoLogInException: When the user is not logged in.
|
||||||
|
"""
|
||||||
|
if "user" not in session:
|
||||||
|
raise NoLogInException
|
||||||
|
user: t.Optional[t.Any] = self.__get_user(session["user"])
|
||||||
|
if user is None:
|
||||||
|
raise NoLogInException
|
||||||
|
return user
|
||||||
|
|
||||||
|
def auth_user(state: AuthState) -> t.Any:
|
||||||
|
"""Authenticates a user.
|
||||||
|
|
||||||
|
:param state: The authentication state.
|
||||||
|
:return: The user.
|
||||||
|
:raise UnauthorizedException: When the authentication fails.
|
||||||
|
"""
|
||||||
|
authorization: Authorization = request.authorization
|
||||||
|
if authorization is None:
|
||||||
|
raise UnauthorizedException
|
||||||
|
if authorization.type != "digest":
|
||||||
|
raise UnauthorizedException(
|
||||||
|
"Not an HTTP digest authorization")
|
||||||
|
self.authenticate(state)
|
||||||
|
session["user"] = authorization.username
|
||||||
|
return self.__get_user(authorization.username)
|
||||||
|
|
||||||
@wraps(view)
|
@wraps(view)
|
||||||
def login_required_view(*args, **kwargs) -> t.Any:
|
def login_required_view(*args, **kwargs) -> t.Any:
|
||||||
"""The login-protected view.
|
"""The login-protected view.
|
||||||
@ -119,36 +149,24 @@ class DigestAuth:
|
|||||||
:return: The response.
|
:return: The response.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
if "user" not in session:
|
g.user = get_logged_in_user()
|
||||||
raise NoLogInException
|
|
||||||
user: t.Optional[t.Any] = self.__get_user(session["user"])
|
|
||||||
if user is None:
|
|
||||||
raise NoLogInException
|
|
||||||
g.user = user
|
|
||||||
return view(*args, **kwargs)
|
return view(*args, **kwargs)
|
||||||
except NoLogInException:
|
except NoLogInException:
|
||||||
state: AuthState = AuthState()
|
pass
|
||||||
authorization: Authorization = request.authorization
|
|
||||||
try:
|
state: AuthState = AuthState()
|
||||||
if authorization is None:
|
try:
|
||||||
raise UnauthorizedException
|
g.user = auth_user(state)
|
||||||
if authorization.type != "digest":
|
self.__on_login(g.user)
|
||||||
raise UnauthorizedException(
|
return view(*args, **kwargs)
|
||||||
"Not an HTTP digest authorization")
|
except UnauthorizedException as e:
|
||||||
self.authenticate(state)
|
if len(e.args) > 0:
|
||||||
session["user"] = authorization.username
|
sys.stderr.write(e.args[0] + "\n")
|
||||||
user = self.__get_user(authorization.username)
|
response: Response = Response()
|
||||||
g.user = user
|
response.status = 401
|
||||||
self.__on_login(user)
|
response.headers["WWW-Authenticate"] \
|
||||||
return view(*args, **kwargs)
|
= self.make_response_header(state)
|
||||||
except UnauthorizedException as e:
|
abort(response)
|
||||||
if len(e.args) > 0:
|
|
||||||
sys.stderr.write(e.args[0] + "\n")
|
|
||||||
response: Response = Response()
|
|
||||||
response.status = 401
|
|
||||||
response.headers["WWW-Authenticate"] \
|
|
||||||
= self.make_response_header(state)
|
|
||||||
abort(response)
|
|
||||||
|
|
||||||
return login_required_view
|
return login_required_view
|
||||||
|
|
||||||
@ -203,9 +221,19 @@ class DigestAuth:
|
|||||||
:param state: The authorization state.
|
:param state: The authorization state.
|
||||||
:return: The WWW-Authenticate response header.
|
:return: The WWW-Authenticate response header.
|
||||||
"""
|
"""
|
||||||
opaque: t.Optional[str] = None if not self.use_opaque else \
|
|
||||||
(state.opaque if state.opaque is not None
|
def get_opaque() -> t.Optional[str]:
|
||||||
else self.serializer.dumps(randbits(32), salt="opaque"))
|
"""Returns the opaque value.
|
||||||
|
|
||||||
|
:return: The opaque value.
|
||||||
|
"""
|
||||||
|
if not self.use_opaque:
|
||||||
|
return None
|
||||||
|
if state.opaque is not None:
|
||||||
|
return state.opaque
|
||||||
|
return self.serializer.dumps(randbits(32), salt="opaque")
|
||||||
|
|
||||||
|
opaque: t.Optional[str] = get_opaque()
|
||||||
nonce: str = self.serializer.dumps(
|
nonce: str = self.serializer.dumps(
|
||||||
randbits(32),
|
randbits(32),
|
||||||
salt="nonce" if opaque is None else f"nonce-{opaque}")
|
salt="nonce" if opaque is None else f"nonce-{opaque}")
|
||||||
|
@ -66,9 +66,8 @@ class Client(WerkzeugClient):
|
|||||||
:return: The request authorization.
|
:return: The request authorization.
|
||||||
"""
|
"""
|
||||||
qop: t.Optional[t.Literal["auth", "auth-int"]] = None
|
qop: t.Optional[t.Literal["auth", "auth-int"]] = None
|
||||||
if www_authenticate.qop is not None:
|
if www_authenticate.qop is not None and "auth" in www_authenticate.qop:
|
||||||
if "auth" in www_authenticate.qop:
|
qop = "auth"
|
||||||
qop = "auth"
|
|
||||||
|
|
||||||
cnonce: t.Optional[str] = None
|
cnonce: t.Optional[str] = None
|
||||||
if qop is not None or www_authenticate.algorithm == "MD5-sess":
|
if qop is not None or www_authenticate.algorithm == "MD5-sess":
|
||||||
|
Loading…
Reference in New Issue
Block a user