authorization: remove "permitted_paths" attribute

Track authentication requirements in the API Definition.  This
eliminates the need to look up the authentication component
to disable auth on an endpoint.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan
2023-11-26 16:26:22 -05:00
parent eed759e111
commit b3b60757aa
6 changed files with 51 additions and 62 deletions

View File

@@ -168,6 +168,7 @@ class APIDefinition:
request_types: RequestType
transports: TransportType
callback: Callable[[WebRequest], Coroutine]
auth_required: bool
_cache: ClassVar[Dict[str, APIDefinition]] = {}
def request(
@@ -196,6 +197,7 @@ class APIDefinition:
request_types: Union[List[str], RequestType],
callback: Callable[[WebRequest], Coroutine],
transports: Union[List[str], TransportType] = TransportType.all(),
auth_required: bool = True,
is_remote: bool = False
) -> APIDefinition:
if isinstance(request_types, list):
@@ -239,7 +241,7 @@ class APIDefinition:
api_def = cls(
endpoint, http_path, rpc_methods, request_types,
transports, callback
transports, callback, auth_required
)
cls._cache[endpoint] = api_def
return api_def
@@ -335,7 +337,7 @@ class BaseRemoteConnection(APITransport):
def screen_rpc_request(
self, api_def: APIDefinition, req_type: RequestType, args: Dict[str, Any]
) -> None:
self.check_authenticated(api_def.endpoint)
self.check_authenticated(api_def)
async def _process_message(self, message: str) -> None:
try:
@@ -366,16 +368,16 @@ class BaseRemoteConnection(APITransport):
self.user_info = auth.validate_jwt(token)
elif api_key is not None and self.user_info is None:
self.user_info = auth.validate_api_key(api_key)
else:
self.check_authenticated()
elif self._need_auth:
raise self.server.error("Unauthorized", 401)
def check_authenticated(self, path: str = "") -> None:
def check_authenticated(self, api_def: APIDefinition) -> None:
if not self._need_auth:
return
auth: AuthComp = self.server.lookup_component("authorization", None)
if auth is None:
return
if not auth.is_path_permitted(path):
if api_def.auth_required:
raise self.server.error("Unauthorized", 401)
def on_user_logout(self, user: str) -> bool: