Revised the login_required method of the DigestAuth class for readability.
This commit is contained in:
		| @@ -110,6 +110,36 @@ class DigestAuth: | |||||||
|         class NoLogInException(Exception): |         class NoLogInException(Exception): | ||||||
|             """The exception thrown when the user is not authorized.""" |             """The exception thrown when the user is not authorized.""" | ||||||
|  |  | ||||||
|  |         def get_logged_in_user() -> t.Optional[t.Any]: | ||||||
|  |             """Returns the currently logged-in user. | ||||||
|  |  | ||||||
|  |             :return: The currently logged-in user. | ||||||
|  |             :raise NoLogInException: When the user is not logged in. | ||||||
|  |             """ | ||||||
|  |             if "user" not in session: | ||||||
|  |                 raise NoLogInException | ||||||
|  |             user: t.Optional[t.Any] = self.__get_user(session["user"]) | ||||||
|  |             if user is None: | ||||||
|  |                 raise NoLogInException | ||||||
|  |             return user | ||||||
|  |  | ||||||
|  |         def auth_user(state: AuthState) -> t.Any: | ||||||
|  |             """Authenticates a user. | ||||||
|  |  | ||||||
|  |             :param state: The authentication state. | ||||||
|  |             :return: The user. | ||||||
|  |             :raise UnauthorizedException: When the authentication fails. | ||||||
|  |             """ | ||||||
|  |             authorization: Authorization = request.authorization | ||||||
|  |             if authorization is None: | ||||||
|  |                 raise UnauthorizedException | ||||||
|  |             if authorization.type != "digest": | ||||||
|  |                 raise UnauthorizedException( | ||||||
|  |                     "Not an HTTP digest authorization") | ||||||
|  |             self.authenticate(state) | ||||||
|  |             session["user"] = authorization.username | ||||||
|  |             return self.__get_user(authorization.username) | ||||||
|  |  | ||||||
|         @wraps(view) |         @wraps(view) | ||||||
|         def login_required_view(*args, **kwargs) -> t.Any: |         def login_required_view(*args, **kwargs) -> t.Any: | ||||||
|             """The login-protected view. |             """The login-protected view. | ||||||
| @@ -119,36 +149,24 @@ class DigestAuth: | |||||||
|             :return: The response. |             :return: The response. | ||||||
|             """ |             """ | ||||||
|             try: |             try: | ||||||
|                 if "user" not in session: |                 g.user = get_logged_in_user() | ||||||
|                     raise NoLogInException |  | ||||||
|                 user: t.Optional[t.Any] = self.__get_user(session["user"]) |  | ||||||
|                 if user is None: |  | ||||||
|                     raise NoLogInException |  | ||||||
|                 g.user = user |  | ||||||
|                 return view(*args, **kwargs) |                 return view(*args, **kwargs) | ||||||
|             except NoLogInException: |             except NoLogInException: | ||||||
|                 state: AuthState = AuthState() |                 pass | ||||||
|                 authorization: Authorization = request.authorization |  | ||||||
|                 try: |             state: AuthState = AuthState() | ||||||
|                     if authorization is None: |             try: | ||||||
|                         raise UnauthorizedException |                 g.user = auth_user(state) | ||||||
|                     if authorization.type != "digest": |                 self.__on_login(g.user) | ||||||
|                         raise UnauthorizedException( |                 return view(*args, **kwargs) | ||||||
|                             "Not an HTTP digest authorization") |             except UnauthorizedException as e: | ||||||
|                     self.authenticate(state) |                 if len(e.args) > 0: | ||||||
|                     session["user"] = authorization.username |                     sys.stderr.write(e.args[0] + "\n") | ||||||
|                     user = self.__get_user(authorization.username) |                 response: Response = Response() | ||||||
|                     g.user = user |                 response.status = 401 | ||||||
|                     self.__on_login(user) |                 response.headers["WWW-Authenticate"] \ | ||||||
|                     return view(*args, **kwargs) |                     = self.make_response_header(state) | ||||||
|                 except UnauthorizedException as e: |                 abort(response) | ||||||
|                     if len(e.args) > 0: |  | ||||||
|                         sys.stderr.write(e.args[0] + "\n") |  | ||||||
|                     response: Response = Response() |  | ||||||
|                     response.status = 401 |  | ||||||
|                     response.headers["WWW-Authenticate"] \ |  | ||||||
|                         = self.make_response_header(state) |  | ||||||
|                     abort(response) |  | ||||||
|  |  | ||||||
|         return login_required_view |         return login_required_view | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user