Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/11328.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Send `Accept: application/json` from the manager's AppProxy client so endpoint create/delete failures return parseable JSON instead of HTML error pages.
1 change: 1 addition & 0 deletions changes/11333.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Surface AppProxy coordinator failures from `AppProxyClient` as `AppProxyConnectionError` / `AppProxyResponseError` instead of silently dropping deletion errors or leaking raw `aiohttp` exceptions, and preserve the upstream error body as `extra_data` for diagnostics.
118 changes: 93 additions & 25 deletions src/ai/backend/manager/clients/appproxy/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import json
import logging
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick:

Suggested change
from contextlib import asynccontextmanager
from contextlib import asynccontextmanager as actxmgr

from typing import Any
from uuid import UUID

Expand Down Expand Up @@ -97,21 +99,93 @@ async def fetch_status(self) -> AppProxyStatusResponse:
extra_msg=f"Invalid response from AppProxy at {self._address}"
) from e

@asynccontextmanager
async def _request(
self,
method: str,
path: str,
*,
operation: str,
json_body: Any = None,
) -> AsyncIterator[aiohttp.ClientResponse]:
"""Issue an authenticated request and translate transport errors.

Connection failures become ``AppProxyConnectionError``. Non-2xx
responses become ``AppProxyResponseError`` with the upstream body
attached as ``extra_data`` so a structured ``BackendAIError``
payload returned by the coordinator survives the translation.
"""
try:
async with self._client_session.request(
method,
path,
headers={
"Accept": "application/json",
"X-BackendAI-Token": self._token,
},
json=json_body,
) as resp:
if resp.status >= 400:
text = await resp.text()
try:
Comment on lines +129 to +130
Copy link

Copilot AI Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

await resp.text() can raise (e.g., decoding errors or payload read errors). If that happens, the code will skip raising AppProxyResponseError and instead leak the low-level exception. To keep error translation reliable, handle failures when reading the response body (e.g., use resp.text(errors="replace") and/or catch read/decode exceptions and fall back to a placeholder body).

Suggested change
text = await resp.text()
try:
try:
text = await resp.text(errors="replace")
except Exception as e:
text = f"<failed to read response body: {e}>"
try:

Copilot uses AI. Check for mistakes.
error_body: Any = json.loads(text) if text else None
except json.JSONDecodeError:
error_body = text
log.error(
"AppProxy at {} returned {} during {}: {!r}",
self._address,
resp.status,
operation,
error_body,
Comment on lines +118 to +139
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't a structure where we use yield when resp.status // 100 == 2 (or up to 300) and handle errors for the rest be more readable? Also, rather than parsing the JSON again after resp.text, wouldn’t it be better to try resp.json first and fall back to resp.text only if that fails? You might find some useful code patterns by looking at the StorageProxyHTTPClient code.

)
Comment on lines +135 to +140
Copy link

Copilot AI Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging the full upstream error_body at error level can leak sensitive details into logs (e.g., user identifiers, request details, internal stack traces, or other coordinator-provided context). Consider logging a truncated/sanitized body, logging only metadata (status/operation/request id), and/or moving the full body to debug-level logging while still attaching it to extra_data for programmatic diagnostics.

Suggested change
"AppProxy at {} returned {} during {}: {!r}",
self._address,
resp.status,
operation,
error_body,
)
"AppProxy at {} returned {} during {}",
self._address,
resp.status,
operation,
)
if text:
preview = text[:512] + ("..." if len(text) > 512 else "")
log.debug(
"AppProxy error body preview at {} during {}: {!r}",
self._address,
operation,
preview,
)

Copilot uses AI. Check for mistakes.
raise AppProxyResponseError(
extra_msg=(f"AppProxy returned HTTP {resp.status} during {operation}"),
extra_data={"status": resp.status, "body": error_body},
Copy link

Copilot AI Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra_data currently omits request metadata such as HTTP method and request path, which can make diagnosing bulk operations harder. Consider including method and path (and possibly operation) in extra_data so downstream logs/telemetry can attribute failures without relying on parsing the error message string.

Suggested change
extra_data={"status": resp.status, "body": error_body},
extra_data={
"status": resp.status,
"body": error_body,
"method": method,
"path": path,
"operation": operation,
},

Copilot uses AI. Check for mistakes.
)
yield resp
except aiohttp.ClientConnectorError as e:
log.error(
"Failed to connect to AppProxy at {} during {}: {}",
self._address,
operation,
e,
)
raise AppProxyConnectionError(
extra_msg=f"Failed to connect to AppProxy at {self._address}"
) from e
Comment on lines +111 to +155
Copy link

Copilot AI Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring says “translate transport errors”, but the implementation only maps aiohttp.ClientConnectorError. Timeouts (e.g., asyncio.TimeoutError) and other aiohttp.ClientError subclasses can still leak out as non-domain exceptions, which undermines the goal of preserving AppProxy domain context. Consider either (a) broadening the exception mapping to include timeouts and relevant aiohttp.ClientError connection/payload exceptions, or (b) narrowing the docstring to match the actual behavior.

Copilot uses AI. Check for mistakes.

async def _parse_json(
self,
resp: aiohttp.ClientResponse,
*,
operation: str,
) -> Any:
try:
return await resp.json()
except (aiohttp.ContentTypeError, json.JSONDecodeError) as e:
Comment on lines +163 to +165
Copy link

Copilot AI Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aiohttp.ClientResponse.json() raises ContentTypeError when the response is valid JSON but the Content-Type header is not application/json. If you want to be tolerant of upstream mislabeling (which is common with proxies and some error handlers), consider parsing JSON regardless of content type (while still treating non-JSON bodies as invalid via JSONDecodeError). This reduces false-negative “Invalid response” errors when the payload is actually JSON.

Copilot uses AI. Check for mistakes.
log.error(
"Failed to parse AppProxy {} response from {}: {}",
operation,
self._address,
e,
)
raise AppProxyResponseError(
extra_msg=(f"Invalid response from AppProxy at {self._address} during {operation}"),
) from e

@appproxy_client_resilience.apply()
async def create_endpoint(
self,
endpoint_id: UUID,
body: CreateEndpointRequestBody,
) -> dict[str, Any]:
async with self._client_session.post(
async with self._request(
"POST",
f"/v2/endpoints/{endpoint_id}",
json=body.model_dump(mode="json"),
headers={
"X-BackendAI-Token": self._token,
},
operation="create_endpoint",
json_body=body.model_dump(mode="json"),
) as resp:
resp.raise_for_status()
result: dict[str, Any] = await resp.json()
result: dict[str, Any] = await self._parse_json(resp, operation="create_endpoint")
return result

@appproxy_client_resilience.apply()
Expand All @@ -126,27 +200,24 @@ async def create_endpoints_bulk(
so this is the preferred way to register many deployments at
once (e.g. from the deployment provisioning handler).
"""
async with self._client_session.post(
async with self._request(
"POST",
"/v2/endpoints/bulk",
json=body.model_dump(mode="json"),
headers={
"X-BackendAI-Token": self._token,
},
operation="create_endpoints_bulk",
json_body=body.model_dump(mode="json"),
) as resp:
resp.raise_for_status()
payload = await resp.json()
payload = await self._parse_json(resp, operation="create_endpoints_bulk")
return BulkCreateEndpointResponse.model_validate(payload)

@appproxy_client_resilience.apply()
async def delete_endpoint(
self,
endpoint_id: UUID,
) -> None:
async with self._client_session.delete(
async with self._request(
"DELETE",
f"/v2/endpoints/{endpoint_id}",
headers={
"X-BackendAI-Token": self._token,
},
operation="delete_endpoint",
):
pass

Expand All @@ -161,14 +232,11 @@ async def delete_endpoints_bulk(
per-endpoint result in input order, so the caller can decide how
to treat partial failures (retry, log, etc.).
"""
async with self._client_session.request(
async with self._request(
"DELETE",
"/v2/endpoints/bulk",
json=body.model_dump(mode="json"),
headers={
"X-BackendAI-Token": self._token,
},
operation="delete_endpoints_bulk",
json_body=body.model_dump(mode="json"),
) as resp:
resp.raise_for_status()
payload = await resp.json()
payload = await self._parse_json(resp, operation="delete_endpoints_bulk")
return BulkDeleteEndpointResponse.model_validate(payload)
Loading