From a23187b4afa632da8a90cb9eae83f0f6a5eec903 Mon Sep 17 00:00:00 2001 From: Eric Callahan Date: Sat, 20 Jan 2024 06:21:36 -0500 Subject: [PATCH] authorization: fix access.refresh regression Allow expired JWTs for HTTP endpoints that do not require authentication. This is technically an error by the client, as it should not provide invalid JWTs for an endpoint, however Moonraker previously allowed this as the token was not verified on unathenticated endpoints. Signed-off-by: Eric Callahan --- moonraker/components/authorization.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/moonraker/components/authorization.py b/moonraker/components/authorization.py index 53ac2e5..69c2ee3 100644 --- a/moonraker/components/authorization.py +++ b/moonraker/components/authorization.py @@ -579,7 +579,7 @@ class Authorization: return b".".join([jwt_msg, jwt_sig]).decode() def decode_jwt( - self, token: str, token_type: str = "access" + self, token: str, token_type: str = "access", check_exp: bool = True ) -> Dict[str, Any]: message, sig = token.rsplit('.', maxsplit=1) enc_header, enc_payload = message.split('.') @@ -607,7 +607,7 @@ class Authorization: raise self.server.error("Invalid JWT Issuer", 401) if payload['aud'] != "Moonraker": raise self.server.error("Invalid JWT Audience", 401) - if payload['exp'] < int(time.time()): + if check_exp and payload['exp'] < int(time.time()): raise self.server.error("JWT Expired", 401) # get user @@ -687,23 +687,23 @@ class Authorization: self.oneshot_tokens[token] = (ip_addr, user, hdl) return token - def _check_json_web_token(self, - request: HTTPServerRequest - ) -> Optional[Dict[str, Any]]: + def _check_json_web_token( + self, request: HTTPServerRequest, required: bool = True + ) -> Optional[Dict[str, Any]]: auth_token: Optional[str] = request.headers.get("Authorization") if auth_token is None: auth_token = request.headers.get("X-Access-Token") if auth_token is None: qtoken = request.query_arguments.get('access_token', None) if qtoken is not None: - auth_token = qtoken[-1].decode() + auth_token = qtoken[-1].decode(errors="ignore") elif auth_token.startswith("Bearer "): auth_token = auth_token[7:] else: return None if auth_token: try: - return self.decode_jwt(auth_token) + return self.decode_jwt(auth_token, check_exp=required) except Exception: logging.exception(f"JWT Decode Error {auth_token}") raise HTTPError(401, "JWT Decode Error") @@ -768,7 +768,7 @@ class Authorization: return None # Check JSON Web Token - jwt_user = self._check_json_web_token(request) + jwt_user = self._check_json_web_token(request, auth_required) if jwt_user is not None: return jwt_user