Skip to content

Commit 6f0535f

Browse files
GOV-667: Check if policy was created during retry
1 parent de441ba commit 6f0535f

2 files changed

Lines changed: 160 additions & 3 deletions

File tree

pyatlan/client/atlan.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -211,8 +211,11 @@ def __init__(self, **data):
211211
# Configure httpx client with custom transport that supports retry and proxy
212212
# Note: We pass proxy/SSL config to the transport, not the client,
213213
# so that retry logic properly respects these settings
214+
# Pass self reference to transport for duplicate checking during retries
214215
self._session = httpx.Client(
215-
transport=PyatlanSyncTransport(retry=self.retry, **transport_kwargs),
216+
transport=PyatlanSyncTransport(
217+
retry=self.retry, client=self, **transport_kwargs
218+
),
216219
headers={
217220
"x-atlan-agent": "sdk",
218221
"x-atlan-agent-id": "python",
@@ -2005,7 +2008,9 @@ def max_retries( # type: ignore[misc]
20052008
if self.verify is not None:
20062009
transport_kwargs["verify"] = self.verify
20072010

2008-
new_transport = PyatlanSyncTransport(retry=max_retries, **transport_kwargs)
2011+
new_transport = PyatlanSyncTransport(
2012+
retry=max_retries, client=self, **transport_kwargs
2013+
)
20092014
self._session._transport = new_transport
20102015

20112016
LOGGER.debug(

pyatlan/client/transport.py

Lines changed: 153 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,17 @@
66
with httpx's HTTPTransport while respecting proxy and SSL configurations.
77
"""
88

9+
import json
910
import logging
1011
from functools import partial
11-
from typing import Any, Optional, Union
12+
from typing import TYPE_CHECKING, Any, Optional, Union
1213

1314
import httpx
1415
from httpx_retries import Retry
1516

17+
if TYPE_CHECKING:
18+
from pyatlan.client.atlan import AtlanClient
19+
1620
logger = logging.getLogger(__name__)
1721

1822

@@ -43,9 +47,11 @@ class PyatlanSyncTransport(httpx.BaseTransport):
4347
def __init__(
4448
self,
4549
retry: Optional[Retry] = None,
50+
client: Optional["AtlanClient"] = None,
4651
**kwargs: Any,
4752
) -> None:
4853
self.retry = retry or Retry()
54+
self._client = client # Reference to AtlanClient for duplicate checking
4955
# Ensure trust_env is True by default to respect environment variables
5056
# unless explicitly overridden
5157
if "trust_env" not in kwargs:
@@ -97,6 +103,17 @@ def _retry_operation(
97103
logger.debug(
98104
"_retry_operation retrying response=%s retry=%s", response, retry
99105
)
106+
107+
# ONLY during retry: check if this is a policy creation and if duplicate exists
108+
if self._client:
109+
duplicate_response = self._check_for_duplicate_policy(request)
110+
if duplicate_response:
111+
logger.warning(
112+
"RETRY PREVENTED: Policy already exists (likely from previous "
113+
"request that timed out but succeeded). Returning existing policy."
114+
)
115+
return duplicate_response
116+
100117
retry = retry.increment()
101118
retry.sleep(response)
102119

@@ -113,6 +130,128 @@ def _retry_operation(
113130
):
114131
return response
115132

133+
def _check_for_duplicate_policy(
134+
self, request: httpx.Request
135+
) -> Optional[httpx.Response]:
136+
"""
137+
Check if request is creating an AuthPolicy that already exists.
138+
This is ONLY called during retry attempts, not on first request.
139+
140+
Returns:
141+
A mock response with existing policy if duplicate found, None otherwise.
142+
"""
143+
try:
144+
# Only check POST requests to entity/bulk endpoint
145+
if request.method != "POST" or "/api/meta/entity/bulk" not in str(
146+
request.url
147+
):
148+
return None
149+
150+
if not request.content:
151+
return None
152+
153+
body = json.loads(request.content.decode("utf-8"))
154+
entities = body.get("entities", [])
155+
156+
# Check if any entity is an AuthPolicy
157+
for entity in entities:
158+
if entity.get("typeName") == "AuthPolicy":
159+
policy_name = entity.get("attributes", {}).get("name")
160+
access_control = entity.get("attributes", {}).get("accessControl")
161+
162+
# Extract persona GUID from accessControl relationship
163+
persona_guid = None
164+
if isinstance(access_control, dict):
165+
persona_guid = access_control.get("guid")
166+
167+
if policy_name and persona_guid:
168+
# Check if policy already exists
169+
existing_policy = self._find_existing_policy(
170+
policy_name, persona_guid
171+
)
172+
if existing_policy:
173+
logger.info(
174+
f"Found existing policy '{policy_name}' with guid "
175+
f"{existing_policy.get('guid')} during retry check"
176+
)
177+
# Create mock response with existing policy
178+
return self._create_mock_response(existing_policy)
179+
180+
return None
181+
182+
except Exception as e:
183+
# If duplicate check fails, log and continue with retry
184+
logger.debug(
185+
f"Duplicate policy check failed (will proceed with retry): {e}"
186+
)
187+
return None
188+
189+
def _find_existing_policy(
190+
self, policy_name: str, persona_guid: str
191+
) -> Optional[dict]:
192+
"""
193+
Search for existing policy by name and persona.
194+
195+
Returns:
196+
Policy data if found, None otherwise.
197+
"""
198+
try:
199+
# Import here to avoid circular dependency
200+
from pyatlan.client.constants import INDEX_SEARCH
201+
from pyatlan.model.search import Bool, DSL, IndexSearchRequest, Term
202+
203+
# Build search request to find policy by name and persona
204+
query = Bool(
205+
filter=[
206+
Term(field="__typeName.keyword", value="AuthPolicy"),
207+
Term(field="name.keyword", value=policy_name),
208+
Term(field="__persona", value=persona_guid),
209+
]
210+
)
211+
212+
search_request = IndexSearchRequest(
213+
dsl=DSL(query=query, size=1, from_=0),
214+
attributes=["name", "qualifiedName"],
215+
)
216+
217+
# Execute search directly using INDEX_SEARCH endpoint
218+
raw_json = self._client._call_api(
219+
INDEX_SEARCH,
220+
request_obj=search_request,
221+
)
222+
223+
# Check if policy found
224+
if (
225+
raw_json
226+
and raw_json.get("entities")
227+
and len(raw_json["entities"]) > 0
228+
):
229+
return raw_json["entities"][0]
230+
231+
return None
232+
233+
except Exception as e:
234+
logger.debug(f"Error searching for existing policy: {e}")
235+
return None
236+
237+
def _create_mock_response(self, existing_policy: dict) -> httpx.Response:
238+
"""
239+
Create a mock HTTP response containing the existing policy.
240+
241+
This response mimics what the bulk entity creation endpoint would return.
242+
"""
243+
# Create response body matching AssetMutationResponse format
244+
response_body = {
245+
"mutatedEntities": {"CREATE": [existing_policy]},
246+
"guidAssignments": {"-1": existing_policy.get("guid")},
247+
}
248+
249+
return httpx.Response(
250+
status_code=200,
251+
json=response_body,
252+
request=httpx.Request("POST", "http://mock"),
253+
)
254+
116255
def close(self) -> None:
117256
"""Close the underlying transport."""
118257
self._transport.close()
@@ -145,9 +284,11 @@ class PyatlanAsyncTransport(httpx.AsyncBaseTransport):
145284
def __init__(
146285
self,
147286
retry: Optional[Retry] = None,
287+
client: Optional["AtlanClient"] = None,
148288
**kwargs: Any,
149289
) -> None:
150290
self.retry = retry or Retry()
291+
self._client = client # Reference to AtlanClient for duplicate checking
151292
# Ensure trust_env is True by default to respect environment variables
152293
# unless explicitly overridden
153294
if "trust_env" not in kwargs:
@@ -201,6 +342,17 @@ async def _retry_operation_async(
201342
response,
202343
retry,
203344
)
345+
346+
# ONLY during retry: check if this is a policy creation and if duplicate exists
347+
if self._client:
348+
duplicate_response = self._check_for_duplicate_policy(request)
349+
if duplicate_response:
350+
logger.warning(
351+
"RETRY PREVENTED: Policy already exists (likely from previous "
352+
"request that timed out but succeeded). Returning existing policy."
353+
)
354+
return duplicate_response
355+
204356
retry = retry.increment()
205357
await retry.asleep(response)
206358

0 commit comments

Comments
 (0)