Skip to content

Commit 2f35fa5

Browse files
Merge pull request #826 from atlanhq/GOV-667
fix(transport): prevent duplicate AuthPolicy creation on retry after timeout
2 parents e39ea14 + a8aeffc commit 2f35fa5

8 files changed

Lines changed: 1150 additions & 10 deletions

File tree

pyatlan/client/aio/client.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,9 @@ def __init__(self, **kwargs):
168168

169169
# Create async session with custom transport that supports retry and proxy
170170
self._async_session = httpx.AsyncClient(
171-
transport=PyatlanAsyncTransport(retry=self.retry, **transport_kwargs),
171+
transport=PyatlanAsyncTransport(
172+
retry=self.retry, client=self, **transport_kwargs
173+
),
172174
headers={
173175
"x-atlan-agent": "sdk",
174176
"x-atlan-agent-id": "python",
@@ -980,7 +982,9 @@ async def max_retries( # type: ignore[override,misc]
980982
if self.verify is not None:
981983
transport_kwargs["verify"] = self.verify
982984

983-
new_transport = PyatlanAsyncTransport(retry=max_retries, **transport_kwargs)
985+
new_transport = PyatlanAsyncTransport(
986+
retry=max_retries, client=self, **transport_kwargs
987+
)
984988
session._transport = new_transport
985989

986990
LOGGER.debug(

pyatlan/client/atlan.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -211,8 +211,11 @@ def __init__(self, **data):
211211
# Configure httpx client with custom transport that supports retry and proxy
212212
# Note: We pass proxy/SSL config to the transport, not the client,
213213
# so that retry logic properly respects these settings
214+
# Pass self reference to transport for duplicate checking during retries
214215
self._session = httpx.Client(
215-
transport=PyatlanSyncTransport(retry=self.retry, **transport_kwargs),
216+
transport=PyatlanSyncTransport(
217+
retry=self.retry, client=self, **transport_kwargs
218+
),
216219
headers={
217220
"x-atlan-agent": "sdk",
218221
"x-atlan-agent-id": "python",
@@ -2007,7 +2010,9 @@ def max_retries( # type: ignore[misc]
20072010
if self.verify is not None:
20082011
transport_kwargs["verify"] = self.verify
20092012

2010-
new_transport = PyatlanSyncTransport(retry=max_retries, **transport_kwargs)
2013+
new_transport = PyatlanSyncTransport(
2014+
retry=max_retries, client=self, **transport_kwargs
2015+
)
20112016
self._session._transport = new_transport
20122017

20132018
LOGGER.debug(

pyatlan/client/common/transport.py

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
# SPDX-License-Identifier: Apache-2.0
2+
# Copyright 2025 Atlan Pte. Ltd.
3+
"""
4+
Shared transport utilities for sync and async Atlan HTTP transports.
5+
6+
Provides duplicate AuthPolicy detection logic used by both
7+
PyatlanSyncTransport and PyatlanAsyncTransport.
8+
"""
9+
10+
from __future__ import annotations
11+
12+
import json
13+
import logging
14+
from typing import Any, Optional
15+
16+
import httpx
17+
18+
from pyatlan.client.constants import BULK_UPDATE, INDEX_SEARCH
19+
from pyatlan.errors import ErrorCode
20+
from pyatlan.model.search import Bool, DSL, IndexSearchRequest, Term
21+
22+
logger = logging.getLogger(__name__)
23+
24+
25+
def build_policy_search_request(
26+
policy_name: str, persona_guid: str
27+
) -> IndexSearchRequest:
28+
"""Build an IndexSearchRequest to find an existing AuthPolicy by name and persona."""
29+
query = Bool(
30+
filter=[
31+
Term(field="__typeName.keyword", value="AuthPolicy"),
32+
Term(field="name.keyword", value=policy_name),
33+
Term(field="__persona", value=persona_guid),
34+
]
35+
)
36+
return IndexSearchRequest(
37+
dsl=DSL(query=query, size=1, from_=0),
38+
attributes=["name", "qualifiedName"],
39+
)
40+
41+
42+
def create_mock_response(
43+
existing_policy: dict, temp_guid: str = "-1"
44+
) -> httpx.Response:
45+
"""Build a mock bulk-entity response containing an already-created policy."""
46+
response_body = {
47+
"mutatedEntities": {"CREATE": [existing_policy]},
48+
"guidAssignments": {temp_guid: existing_policy.get("guid")},
49+
}
50+
return httpx.Response(
51+
status_code=200,
52+
json=response_body,
53+
request=httpx.Request("POST", f"http://mock/{BULK_UPDATE.path}"),
54+
)
55+
56+
57+
def parse_auth_policy_entity(request: httpx.Request) -> Optional[tuple[str, str, str]]:
58+
"""
59+
Parse the request body and return (policy_name, persona_guid, temp_guid)
60+
if the request is a bulk POST containing an AuthPolicy, else None.
61+
"""
62+
if request.method != "POST" or BULK_UPDATE.path not in str(request.url):
63+
return None
64+
if not request.content:
65+
return None
66+
67+
try:
68+
body = json.loads(request.content.decode("utf-8"))
69+
except (json.JSONDecodeError, UnicodeDecodeError):
70+
logger.debug(
71+
"parse_auth_policy_entity: failed to decode request body, skipping duplicate check"
72+
)
73+
return None
74+
75+
for entity in body.get("entities", []):
76+
if entity.get("typeName") != "AuthPolicy":
77+
continue
78+
policy_name = entity.get("attributes", {}).get("name")
79+
access_control = entity.get("attributes", {}).get("accessControl")
80+
persona_guid = (
81+
access_control.get("guid") if isinstance(access_control, dict) else None
82+
)
83+
if policy_name and persona_guid:
84+
return policy_name, persona_guid, entity.get("guid", "-1")
85+
return None
86+
87+
88+
def find_existing_policy(
89+
client: Any, policy_name: str, persona_guid: str
90+
) -> Optional[dict]:
91+
"""
92+
Search for an existing AuthPolicy by name and persona GUID (synchronous).
93+
94+
Raises:
95+
ErrorCode.UNABLE_TO_SEARCH_EXISTING_POLICY: if the search call fails.
96+
"""
97+
try:
98+
search_request = build_policy_search_request(policy_name, persona_guid)
99+
raw_json = client._call_api(INDEX_SEARCH, request_obj=search_request)
100+
if raw_json and raw_json.get("entities"):
101+
return raw_json["entities"][0]
102+
return None
103+
except Exception as e:
104+
raise ErrorCode.UNABLE_TO_SEARCH_EXISTING_POLICY.exception_with_parameters(
105+
policy_name, persona_guid, str(e)
106+
) from e
107+
108+
109+
async def find_existing_policy_async(
110+
client: Any, policy_name: str, persona_guid: str
111+
) -> Optional[dict]:
112+
"""
113+
Search for an existing AuthPolicy by name and persona GUID (asynchronous).
114+
115+
Raises:
116+
ErrorCode.UNABLE_TO_SEARCH_EXISTING_POLICY: if the search call fails.
117+
"""
118+
try:
119+
search_request = build_policy_search_request(policy_name, persona_guid)
120+
raw_json = await client._call_api(INDEX_SEARCH, request_obj=search_request)
121+
if raw_json and raw_json.get("entities"):
122+
return raw_json["entities"][0]
123+
return None
124+
except Exception as e:
125+
raise ErrorCode.UNABLE_TO_SEARCH_EXISTING_POLICY.exception_with_parameters(
126+
policy_name, persona_guid, str(e)
127+
) from e
128+
129+
130+
def check_for_duplicate_policy(
131+
client: Any, request: httpx.Request
132+
) -> Optional[httpx.Response]:
133+
"""
134+
Check whether a bulk POST is creating an AuthPolicy that already exists (synchronous).
135+
Only called during retry attempts, never on the first request.
136+
137+
Returns a mock response with the existing policy if a duplicate is found,
138+
or None to let the retry proceed normally.
139+
140+
Raises:
141+
ErrorCode.UNABLE_TO_SEARCH_EXISTING_POLICY: if the duplicate search fails.
142+
"""
143+
parsed = parse_auth_policy_entity(request)
144+
if not parsed:
145+
return None
146+
147+
policy_name, persona_guid, temp_guid = parsed
148+
existing_policy = find_existing_policy(client, policy_name, persona_guid)
149+
if existing_policy:
150+
logger.info(
151+
f"Found existing policy '{policy_name}' with guid "
152+
f"{existing_policy.get('guid')} during retry check"
153+
)
154+
return create_mock_response(existing_policy, temp_guid)
155+
return None
156+
157+
158+
async def check_for_duplicate_policy_async(
159+
client: Any, request: httpx.Request
160+
) -> Optional[httpx.Response]:
161+
"""
162+
Check whether a bulk POST is creating an AuthPolicy that already exists (asynchronous).
163+
Only called during retry attempts, never on the first request.
164+
165+
Returns a mock response with the existing policy if a duplicate is found,
166+
or None to let the retry proceed normally.
167+
168+
Raises:
169+
ErrorCode.UNABLE_TO_SEARCH_EXISTING_POLICY: if the duplicate search fails.
170+
"""
171+
parsed = parse_auth_policy_entity(request)
172+
if not parsed:
173+
return None
174+
175+
policy_name, persona_guid, temp_guid = parsed
176+
existing_policy = await find_existing_policy_async(
177+
client, policy_name, persona_guid
178+
)
179+
if existing_policy:
180+
logger.info(
181+
f"Found existing policy '{policy_name}' with guid "
182+
f"{existing_policy.get('guid')} during retry check"
183+
)
184+
return create_mock_response(existing_policy, temp_guid)
185+
return None

pyatlan/client/transport.py

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
# type: ignore
21
"""
32
Custom HTTP transport with retry support for Atlan Python SDK.
43
@@ -8,11 +7,19 @@
87

98
import logging
109
from functools import partial
11-
from typing import Any, Optional, Union
10+
from typing import TYPE_CHECKING, Any, Optional, Union, cast
1211

1312
import httpx
1413
from httpx_retries import Retry
1514

15+
from pyatlan.client.common.transport import (
16+
check_for_duplicate_policy,
17+
check_for_duplicate_policy_async,
18+
)
19+
20+
if TYPE_CHECKING:
21+
from pyatlan.client.atlan import AtlanClient
22+
1623
logger = logging.getLogger(__name__)
1724

1825

@@ -43,9 +50,11 @@ class PyatlanSyncTransport(httpx.BaseTransport):
4350
def __init__(
4451
self,
4552
retry: Optional[Retry] = None,
53+
client: Optional["AtlanClient"] = None,
4654
**kwargs: Any,
4755
) -> None:
4856
self.retry = retry or Retry()
57+
self._client = client # Reference to AtlanClient for duplicate checking
4958
# Ensure trust_env is True by default to respect environment variables
5059
# unless explicitly overridden
5160
if "trust_env" not in kwargs:
@@ -97,6 +106,19 @@ def _retry_operation(
97106
logger.debug(
98107
"_retry_operation retrying response=%s retry=%s", response, retry
99108
)
109+
110+
# ONLY during retry: check if this is a policy creation and if duplicate exists
111+
if self._client:
112+
duplicate_response = check_for_duplicate_policy(
113+
self._client, request
114+
)
115+
if duplicate_response:
116+
logger.warning(
117+
"RETRY PREVENTED: Policy already exists (likely from previous "
118+
"request that timed out but succeeded). Returning existing policy."
119+
)
120+
return duplicate_response
121+
100122
retry = retry.increment()
101123
retry.sleep(response)
102124

@@ -109,9 +131,9 @@ def _retry_operation(
109131
continue
110132

111133
if retry.is_exhausted() or not retry.is_retryable_status_code(
112-
response.status_code
134+
cast(httpx.Response, response).status_code
113135
):
114-
return response
136+
return cast(httpx.Response, response)
115137

116138
def close(self) -> None:
117139
"""Close the underlying transport."""
@@ -145,9 +167,11 @@ class PyatlanAsyncTransport(httpx.AsyncBaseTransport):
145167
def __init__(
146168
self,
147169
retry: Optional[Retry] = None,
170+
client: Optional["AtlanClient"] = None,
148171
**kwargs: Any,
149172
) -> None:
150173
self.retry = retry or Retry()
174+
self._client = client # Reference to AtlanClient for duplicate checking
151175
# Ensure trust_env is True by default to respect environment variables
152176
# unless explicitly overridden
153177
if "trust_env" not in kwargs:
@@ -201,6 +225,19 @@ async def _retry_operation_async(
201225
response,
202226
retry,
203227
)
228+
229+
# ONLY during retry: check if this is a policy creation and if duplicate exists
230+
if self._client:
231+
duplicate_response = await check_for_duplicate_policy_async(
232+
self._client, request
233+
)
234+
if duplicate_response:
235+
logger.warning(
236+
"RETRY PREVENTED: Policy already exists (likely from previous "
237+
"request that timed out but succeeded). Returning existing policy."
238+
)
239+
return duplicate_response
240+
204241
retry = retry.increment()
205242
await retry.asleep(response)
206243

@@ -213,9 +250,9 @@ async def _retry_operation_async(
213250
continue
214251

215252
if retry.is_exhausted() or not retry.is_retryable_status_code(
216-
response.status_code
253+
cast(httpx.Response, response).status_code
217254
):
218-
return response
255+
return cast(httpx.Response, response)
219256

220257
async def aclose(self) -> None:
221258
"""Close the underlying transport."""

pyatlan/errors.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1054,6 +1054,13 @@ class ErrorCode(Enum):
10541054
"your intention to fail after a maximum number of retries was reached.",
10551055
ApiError,
10561056
)
1057+
UNABLE_TO_SEARCH_EXISTING_POLICY = (
1058+
500,
1059+
"ATLAN-PYTHON-500-007",
1060+
"Unable to search for an existing policy '{0}' for persona '{1}': {2}",
1061+
"Check your backend connectivity and ensure the Atlan search service is accessible.",
1062+
ApiError,
1063+
)
10571064

10581065
def __init__(
10591066
self,

0 commit comments

Comments
 (0)