diff --git a/changes/11328.fix.md b/changes/11328.fix.md new file mode 100644 index 00000000000..c614d5362b0 --- /dev/null +++ b/changes/11328.fix.md @@ -0,0 +1 @@ +Send `Accept: application/json` from the manager's AppProxy client so endpoint create/delete failures return parseable JSON instead of HTML error pages. diff --git a/changes/11333.fix.md b/changes/11333.fix.md new file mode 100644 index 00000000000..97cde474555 --- /dev/null +++ b/changes/11333.fix.md @@ -0,0 +1 @@ +Surface AppProxy coordinator failures from `AppProxyClient` as `AppProxyConnectionError` / `AppProxyResponseError` instead of silently dropping deletion errors or leaking raw `aiohttp` exceptions, and preserve the upstream error body as `extra_data` for diagnostics. diff --git a/src/ai/backend/manager/clients/appproxy/client.py b/src/ai/backend/manager/clients/appproxy/client.py index 6e2e1306481..de983d64ea7 100644 --- a/src/ai/backend/manager/clients/appproxy/client.py +++ b/src/ai/backend/manager/clients/appproxy/client.py @@ -2,6 +2,8 @@ import json import logging +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager from typing import Any from uuid import UUID @@ -97,21 +99,93 @@ async def fetch_status(self) -> AppProxyStatusResponse: extra_msg=f"Invalid response from AppProxy at {self._address}" ) from e + @asynccontextmanager + async def _request( + self, + method: str, + path: str, + *, + operation: str, + json_body: Any = None, + ) -> AsyncIterator[aiohttp.ClientResponse]: + """Issue an authenticated request and translate transport errors. + + Connection failures become ``AppProxyConnectionError``. Non-2xx + responses become ``AppProxyResponseError`` with the upstream body + attached as ``extra_data`` so a structured ``BackendAIError`` + payload returned by the coordinator survives the translation. + """ + try: + async with self._client_session.request( + method, + path, + headers={ + "Accept": "application/json", + "X-BackendAI-Token": self._token, + }, + json=json_body, + ) as resp: + if resp.status >= 400: + text = await resp.text() + try: + error_body: Any = json.loads(text) if text else None + except json.JSONDecodeError: + error_body = text + log.error( + "AppProxy at {} returned {} during {}: {!r}", + self._address, + resp.status, + operation, + error_body, + ) + raise AppProxyResponseError( + extra_msg=(f"AppProxy returned HTTP {resp.status} during {operation}"), + extra_data={"status": resp.status, "body": error_body}, + ) + yield resp + except aiohttp.ClientConnectorError as e: + log.error( + "Failed to connect to AppProxy at {} during {}: {}", + self._address, + operation, + e, + ) + raise AppProxyConnectionError( + extra_msg=f"Failed to connect to AppProxy at {self._address}" + ) from e + + async def _parse_json( + self, + resp: aiohttp.ClientResponse, + *, + operation: str, + ) -> Any: + try: + return await resp.json() + except (aiohttp.ContentTypeError, json.JSONDecodeError) as e: + log.error( + "Failed to parse AppProxy {} response from {}: {}", + operation, + self._address, + e, + ) + raise AppProxyResponseError( + extra_msg=(f"Invalid response from AppProxy at {self._address} during {operation}"), + ) from e + @appproxy_client_resilience.apply() async def create_endpoint( self, endpoint_id: UUID, body: CreateEndpointRequestBody, ) -> dict[str, Any]: - async with self._client_session.post( + async with self._request( + "POST", f"/v2/endpoints/{endpoint_id}", - json=body.model_dump(mode="json"), - headers={ - "X-BackendAI-Token": self._token, - }, + operation="create_endpoint", + json_body=body.model_dump(mode="json"), ) as resp: - resp.raise_for_status() - result: dict[str, Any] = await resp.json() + result: dict[str, Any] = await self._parse_json(resp, operation="create_endpoint") return result @appproxy_client_resilience.apply() @@ -126,15 +200,13 @@ async def create_endpoints_bulk( so this is the preferred way to register many deployments at once (e.g. from the deployment provisioning handler). """ - async with self._client_session.post( + async with self._request( + "POST", "/v2/endpoints/bulk", - json=body.model_dump(mode="json"), - headers={ - "X-BackendAI-Token": self._token, - }, + operation="create_endpoints_bulk", + json_body=body.model_dump(mode="json"), ) as resp: - resp.raise_for_status() - payload = await resp.json() + payload = await self._parse_json(resp, operation="create_endpoints_bulk") return BulkCreateEndpointResponse.model_validate(payload) @appproxy_client_resilience.apply() @@ -142,11 +214,10 @@ async def delete_endpoint( self, endpoint_id: UUID, ) -> None: - async with self._client_session.delete( + async with self._request( + "DELETE", f"/v2/endpoints/{endpoint_id}", - headers={ - "X-BackendAI-Token": self._token, - }, + operation="delete_endpoint", ): pass @@ -161,14 +232,11 @@ async def delete_endpoints_bulk( per-endpoint result in input order, so the caller can decide how to treat partial failures (retry, log, etc.). """ - async with self._client_session.request( + async with self._request( "DELETE", "/v2/endpoints/bulk", - json=body.model_dump(mode="json"), - headers={ - "X-BackendAI-Token": self._token, - }, + operation="delete_endpoints_bulk", + json_body=body.model_dump(mode="json"), ) as resp: - resp.raise_for_status() - payload = await resp.json() + payload = await self._parse_json(resp, operation="delete_endpoints_bulk") return BulkDeleteEndpointResponse.model_validate(payload)