Split the Flask-Login login manager initialization from the init_app method to the __init_login_manager method in the DigestAuth class, to simplify the code.
This commit is contained in:
		| @@ -344,50 +344,57 @@ class DigestAuth: | ||||
|             self.realm = app.config["DIGEST_AUTH_REALM"] | ||||
|  | ||||
|         if hasattr(app, "login_manager"): | ||||
|             from flask_login import LoginManager, login_user | ||||
|             self.__init_login_manager(app) | ||||
|  | ||||
|             login_manager: LoginManager = getattr(app, "login_manager") | ||||
|     def __init_login_manager(self, app: Flask) -> None: | ||||
|         """Initializes the Flask-Login login manager. | ||||
|  | ||||
|             @login_manager.unauthorized_handler | ||||
|             def unauthorized() -> None: | ||||
|                 """Handles when the user is unauthorized. | ||||
|         :param app: The Flask application. | ||||
|         :return: None. | ||||
|         """ | ||||
|         from flask_login import LoginManager, login_user | ||||
|         login_manager: LoginManager = getattr(app, "login_manager") | ||||
|  | ||||
|                 :return: None. | ||||
|                 """ | ||||
|                 state: AuthState = getattr(request, "_digest_auth_state") \ | ||||
|                     if hasattr(request, "_digest_auth_state") \ | ||||
|                     else AuthState() | ||||
|                 response: Response = Response() | ||||
|                 response.status = 401 | ||||
|                 response.headers["WWW-Authenticate"] \ | ||||
|                     = self.__make_response_header(state) | ||||
|                 abort(response) | ||||
|         @login_manager.unauthorized_handler | ||||
|         def unauthorized() -> None: | ||||
|             """Handles when the user is unauthorized. | ||||
|  | ||||
|             @login_manager.request_loader | ||||
|             def load_user_from_request(req: Request) -> Optional[Any]: | ||||
|                 """Loads the user from the request header. | ||||
|             :return: None. | ||||
|             """ | ||||
|             state: AuthState = getattr(request, "_digest_auth_state") \ | ||||
|                 if hasattr(request, "_digest_auth_state") \ | ||||
|                 else AuthState() | ||||
|             response: Response = Response() | ||||
|             response.status = 401 | ||||
|             response.headers["WWW-Authenticate"] \ | ||||
|                 = self.__make_response_header(state) | ||||
|             abort(response) | ||||
|  | ||||
|                 :param req: The request. | ||||
|                 :return: The authenticated user, or None if the | ||||
|                     authentication fails | ||||
|                 """ | ||||
|                 request._digest_auth_state = AuthState() | ||||
|                 authorization: Authorization = req.authorization | ||||
|                 try: | ||||
|                     if authorization is None: | ||||
|                         raise UnauthorizedException | ||||
|                     if authorization.type != "digest": | ||||
|                         raise UnauthorizedException( | ||||
|                             "Not an HTTP digest authorization") | ||||
|                     self.__authenticate(request._digest_auth_state) | ||||
|                     user = login_manager.user_callback(authorization.username) | ||||
|                     login_user(user) | ||||
|                     self.__on_login(user) | ||||
|                     return user | ||||
|                 except UnauthorizedException as e: | ||||
|                     if str(e) != "": | ||||
|                         app.logger.warning(str(e)) | ||||
|                     return None | ||||
|         @login_manager.request_loader | ||||
|         def load_user_from_request(req: Request) -> Optional[Any]: | ||||
|             """Loads the user from the request header. | ||||
|  | ||||
|             :param req: The request. | ||||
|             :return: The authenticated user, or None if the | ||||
|                 authentication fails | ||||
|             """ | ||||
|             request._digest_auth_state = AuthState() | ||||
|             authorization: Authorization = req.authorization | ||||
|             try: | ||||
|                 if authorization is None: | ||||
|                     raise UnauthorizedException | ||||
|                 if authorization.type != "digest": | ||||
|                     raise UnauthorizedException( | ||||
|                         "Not an HTTP digest authorization") | ||||
|                 self.__authenticate(request._digest_auth_state) | ||||
|                 user = login_manager.user_callback(authorization.username) | ||||
|                 login_user(user) | ||||
|                 self.__on_login(user) | ||||
|                 return user | ||||
|             except UnauthorizedException as e: | ||||
|                 if str(e) != "": | ||||
|                     app.logger.warning(str(e)) | ||||
|                 return None | ||||
|  | ||||
|     def logout(self) -> None: | ||||
|         """Logs out the user. | ||||
|   | ||||
		Reference in New Issue
	
	Block a user