Skip to content

Commit 4d026b0

Browse files
authored
Merge pull request #878 from atlanhq/GOV-667
GOV-667 | feat: Prevent duplicate AuthPolicy creation on timeout retries
2 parents bf946b2 + 04a951a commit 4d026b0

6 files changed

Lines changed: 437 additions & 118 deletions

File tree

pyatlan/client/common/transport.py

Lines changed: 152 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,26 @@
1717

1818
from pyatlan.client.constants import BULK_UPDATE, INDEX_SEARCH
1919
from pyatlan.errors import ErrorCode
20-
from pyatlan.model.search import DSL, Bool, IndexSearchRequest, Term
20+
from pyatlan.model.search import Bool, DSL, IndexSearchRequest, Prefix, Term
2121

2222
logger = logging.getLogger(__name__)
2323

2424

2525
def build_policy_search_request(
26-
policy_name: str, persona_guid: str
26+
policy_name: str, persona_qualified_name: str
2727
) -> IndexSearchRequest:
28-
"""Build an IndexSearchRequest to find an existing AuthPolicy by name and persona."""
28+
"""
29+
Build an IndexSearchRequest to find an existing AuthPolicy by name and persona.
30+
Using persona GUID directly returns associated assets, not policies.
31+
Policies require a hierarchical (prefix) match to be correctly retrieved.
32+
"""
2933
query = Bool(
3034
filter=[
35+
Term(field="__state", value="ACTIVE"),
3136
Term(field="__typeName.keyword", value="AuthPolicy"),
37+
Term(field="policyCategory", value="persona"),
3238
Term(field="name.keyword", value=policy_name),
33-
Term(field="__persona", value=persona_guid),
39+
Prefix(field="qualifiedName", value=persona_qualified_name),
3440
]
3541
)
3642
return IndexSearchRequest(
@@ -57,7 +63,10 @@ def create_mock_response(
5763
def parse_auth_policy_entity(request: httpx.Request) -> Optional[tuple[str, str, str]]:
5864
"""
5965
Parse the request body and return (policy_name, persona_guid, temp_guid)
60-
if the request is a bulk POST containing an AuthPolicy, else None.
66+
if the request is a bulk POST containing a NEW AuthPolicy creation, else None.
67+
68+
Only matches policy CREATES (temp GUIDs starting with "-"), not UPDATES
69+
(real GUIDs), to prevent suppressing legitimate policy modifications.
6170
"""
6271
if request.method != "POST" or BULK_UPDATE.path not in str(request.url):
6372
return None
@@ -75,27 +84,107 @@ def parse_auth_policy_entity(request: httpx.Request) -> Optional[tuple[str, str,
7584
for entity in body.get("entities", []):
7685
if entity.get("typeName") != "AuthPolicy":
7786
continue
87+
88+
entity_guid = entity.get("guid", "-1")
89+
# Only match policy CREATES (temp GUIDs like "-1", "-2", etc.)
90+
# Skip policy UPDATES (real GUIDs) to avoid suppressing modifications
91+
if not isinstance(entity_guid, str) or not entity_guid.startswith("-"):
92+
logger.debug(
93+
"parse_auth_policy_entity: skipping duplicate check for policy with GUID %s (likely an update or invalid type)",
94+
entity_guid,
95+
)
96+
continue
97+
7898
policy_name = entity.get("attributes", {}).get("name")
7999
access_control = entity.get("attributes", {}).get("accessControl")
80100
persona_guid = (
81101
access_control.get("guid") if isinstance(access_control, dict) else None
82102
)
83103
if policy_name and persona_guid:
84-
return policy_name, persona_guid, entity.get("guid", "-1")
104+
return policy_name, persona_guid, entity_guid
85105
return None
86106

87107

108+
def get_persona_qualified_name(client: Any, persona_guid: str) -> Optional[str]:
109+
"""
110+
Fetch the qualifiedName of a Persona by its GUID via IndexSearch (synchronous).
111+
"""
112+
try:
113+
query = Bool(
114+
filter=[
115+
Term(field="__typeName.keyword", value="Persona"),
116+
Term(field="__guid", value=persona_guid),
117+
]
118+
)
119+
search = IndexSearchRequest(
120+
dsl=DSL(query=query, size=1, from_=0),
121+
attributes=["qualifiedName"],
122+
)
123+
raw_json = client._call_api(INDEX_SEARCH, request_obj=search)
124+
if raw_json and raw_json.get("entities"):
125+
return raw_json["entities"][0].get("attributes", {}).get("qualifiedName")
126+
return None
127+
except Exception as e:
128+
logger.debug(
129+
"get_persona_qualified_name: could not fetch qualifiedName for persona %s: %s",
130+
persona_guid,
131+
e,
132+
)
133+
return None
134+
135+
136+
async def get_persona_qualified_name_async(
137+
client: Any, persona_guid: str
138+
) -> Optional[str]:
139+
"""
140+
Fetch the qualifiedName of a Persona by its GUID via IndexSearch (asynchronous).
141+
"""
142+
try:
143+
query = Bool(
144+
filter=[
145+
Term(field="__typeName.keyword", value="Persona"),
146+
Term(field="__guid", value=persona_guid),
147+
]
148+
)
149+
search = IndexSearchRequest(
150+
dsl=DSL(query=query, size=1, from_=0),
151+
attributes=["qualifiedName"],
152+
)
153+
raw_json = await client._call_api(INDEX_SEARCH, request_obj=search)
154+
if raw_json and raw_json.get("entities"):
155+
return raw_json["entities"][0].get("attributes", {}).get("qualifiedName")
156+
return None
157+
except Exception as e:
158+
logger.debug(
159+
"get_persona_qualified_name_async: could not fetch qualifiedName for persona %s: %s",
160+
persona_guid,
161+
e,
162+
)
163+
return None
164+
165+
88166
def find_existing_policy(
89167
client: Any, policy_name: str, persona_guid: str
90168
) -> Optional[dict]:
91169
"""
92170
Search for an existing AuthPolicy by name and persona GUID (synchronous).
93171
172+
First resolves the persona GUID to its qualifiedName, then uses a qualifiedName
173+
prefix query to scope the search to that persona.
174+
94175
Raises:
95-
ErrorCode.UNABLE_TO_SEARCH_EXISTING_POLICY: if the search call fails.
176+
ErrorCode.UNABLE_TO_SEARCH_EXISTING_POLICY: if the policy search call fails.
96177
"""
178+
persona_qualified_name = get_persona_qualified_name(client, persona_guid)
179+
if not persona_qualified_name:
180+
raise ErrorCode.UNABLE_TO_RESOLVE_PERSONA_QUALIFIED_NAME.exception_with_parameters(
181+
persona_guid
182+
)
183+
97184
try:
98-
search_request = build_policy_search_request(policy_name, persona_guid)
185+
search_request = build_policy_search_request(
186+
policy_name, persona_qualified_name
187+
)
99188
raw_json = client._call_api(INDEX_SEARCH, request_obj=search_request)
100189
if raw_json and raw_json.get("entities"):
101190
return raw_json["entities"][0]
@@ -112,11 +201,24 @@ async def find_existing_policy_async(
112201
"""
113202
Search for an existing AuthPolicy by name and persona GUID (asynchronous).
114203
204+
First resolves the persona GUID to its qualifiedName, then uses a qualifiedName
205+
prefix query to scope the search to that persona.
206+
115207
Raises:
116-
ErrorCode.UNABLE_TO_SEARCH_EXISTING_POLICY: if the search call fails.
208+
ErrorCode.UNABLE_TO_SEARCH_EXISTING_POLICY: if the policy search call fails.
117209
"""
210+
persona_qualified_name = await get_persona_qualified_name_async(
211+
client, persona_guid
212+
)
213+
if not persona_qualified_name:
214+
raise ErrorCode.UNABLE_TO_RESOLVE_PERSONA_QUALIFIED_NAME.exception_with_parameters(
215+
persona_guid
216+
)
217+
118218
try:
119-
search_request = build_policy_search_request(policy_name, persona_guid)
219+
search_request = build_policy_search_request(
220+
policy_name, persona_qualified_name
221+
)
120222
raw_json = await client._call_api(INDEX_SEARCH, request_obj=search_request)
121223
if raw_json and raw_json.get("entities"):
122224
return raw_json["entities"][0]
@@ -132,24 +234,36 @@ def check_for_duplicate_policy(
132234
) -> Optional[httpx.Response]:
133235
"""
134236
Check whether a bulk POST is creating an AuthPolicy that already exists (synchronous).
135-
Only called during retry attempts, never on the first request.
237+
Called before every attempt — including the first — so that repeated automation
238+
runs that don't pre-check for an existing policy are handled transparently.
136239
137240
Returns a mock response with the existing policy if a duplicate is found,
138-
or None to let the retry proceed normally.
241+
or None to let the request proceed normally.
139242
140-
Raises:
141-
ErrorCode.UNABLE_TO_SEARCH_EXISTING_POLICY: if the duplicate search fails.
243+
Never raises: search failures are logged and treated as "not found" so that
244+
a degraded index cannot block policy creation entirely.
142245
"""
143246
parsed = parse_auth_policy_entity(request)
144247
if not parsed:
145248
return None
146249

147250
policy_name, persona_guid, temp_guid = parsed
148-
existing_policy = find_existing_policy(client, policy_name, persona_guid)
251+
try:
252+
existing_policy = find_existing_policy(client, policy_name, persona_guid)
253+
except Exception as e:
254+
logger.warning(
255+
"Duplicate policy search failed for '%s' (persona %s): %s. "
256+
"Proceeding with request.",
257+
policy_name,
258+
persona_guid,
259+
str(e),
260+
)
261+
return None
149262
if existing_policy:
150263
logger.info(
151-
f"Found existing policy '{policy_name}' with guid "
152-
f"{existing_policy.get('guid')} during retry check"
264+
"Found existing policy '%s' with guid %s — returning it instead of creating a duplicate.",
265+
policy_name,
266+
existing_policy.get("guid"),
153267
)
154268
return create_mock_response(existing_policy, temp_guid)
155269
return None
@@ -160,26 +274,38 @@ async def check_for_duplicate_policy_async(
160274
) -> Optional[httpx.Response]:
161275
"""
162276
Check whether a bulk POST is creating an AuthPolicy that already exists (asynchronous).
163-
Only called during retry attempts, never on the first request.
277+
Called before every attempt — including the first — so that repeated automation
278+
runs that don't pre-check for an existing policy are handled transparently.
164279
165280
Returns a mock response with the existing policy if a duplicate is found,
166-
or None to let the retry proceed normally.
281+
or None to let the request proceed normally.
167282
168-
Raises:
169-
ErrorCode.UNABLE_TO_SEARCH_EXISTING_POLICY: if the duplicate search fails.
283+
Never raises: search failures are logged and treated as "not found" so that
284+
a degraded index cannot block policy creation entirely.
170285
"""
171286
parsed = parse_auth_policy_entity(request)
172287
if not parsed:
173288
return None
174289

175290
policy_name, persona_guid, temp_guid = parsed
176-
existing_policy = await find_existing_policy_async(
177-
client, policy_name, persona_guid
178-
)
291+
try:
292+
existing_policy = await find_existing_policy_async(
293+
client, policy_name, persona_guid
294+
)
295+
except Exception as e:
296+
logger.warning(
297+
"Duplicate policy search failed for '%s' (persona %s): %s. "
298+
"Proceeding with request.",
299+
policy_name,
300+
persona_guid,
301+
str(e),
302+
)
303+
return None
179304
if existing_policy:
180305
logger.info(
181-
f"Found existing policy '{policy_name}' with guid "
182-
f"{existing_policy.get('guid')} during retry check"
306+
"Found existing policy '%s' with guid %s — returning it instead of creating a duplicate.",
307+
policy_name,
308+
existing_policy.get("guid"),
183309
)
184310
return create_mock_response(existing_policy, temp_guid)
185311
return None

pyatlan/client/transport.py

Lines changed: 30 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -106,22 +106,23 @@ def _retry_operation(
106106
logger.debug(
107107
"_retry_operation retrying response=%s retry=%s", response, retry
108108
)
109-
110-
# ONLY during retry: check if this is a policy creation and if duplicate exists
111-
if self._client:
112-
duplicate_response = check_for_duplicate_policy(
113-
self._client, request
114-
)
115-
if duplicate_response:
116-
logger.warning(
117-
"RETRY PREVENTED: Policy already exists (likely from previous "
118-
"request that timed out but succeeded). Returning existing policy."
119-
)
120-
return duplicate_response
121-
122109
retry = retry.increment()
123110
retry.sleep(response)
124111

112+
# Check before EVERY attempt (first + retries).
113+
# On the first attempt this catches automation re-runs where the policy
114+
# was already created by a previous run.
115+
# On retries, the preceding sleep gives the index time to propagate an
116+
# entity that was committed server-side before a gateway timeout.
117+
if self._client:
118+
duplicate_response = check_for_duplicate_policy(self._client, request)
119+
if duplicate_response:
120+
logger.warning(
121+
"DUPLICATE PREVENTED: Policy already exists. "
122+
"Returning existing policy instead of creating a duplicate."
123+
)
124+
return duplicate_response
125+
125126
try:
126127
response = send_method(request)
127128
except httpx.HTTPError as e:
@@ -225,22 +226,25 @@ async def _retry_operation_async(
225226
response,
226227
retry,
227228
)
228-
229-
# ONLY during retry: check if this is a policy creation and if duplicate exists
230-
if self._client:
231-
duplicate_response = await check_for_duplicate_policy_async(
232-
self._client, request
233-
)
234-
if duplicate_response:
235-
logger.warning(
236-
"RETRY PREVENTED: Policy already exists (likely from previous "
237-
"request that timed out but succeeded). Returning existing policy."
238-
)
239-
return duplicate_response
240-
241229
retry = retry.increment()
242230
await retry.asleep(response)
243231

232+
# Check before EVERY attempt (first + retries).
233+
# On the first attempt this catches automation re-runs where the policy
234+
# was already created by a previous run.
235+
# On retries, the preceding sleep gives the index time to propagate an
236+
# entity that was committed server-side before a gateway timeout.
237+
if self._client:
238+
duplicate_response = await check_for_duplicate_policy_async(
239+
self._client, request
240+
)
241+
if duplicate_response:
242+
logger.warning(
243+
"DUPLICATE PREVENTED: Policy already exists. "
244+
"Returning existing policy instead of creating a duplicate."
245+
)
246+
return duplicate_response
247+
244248
try:
245249
response = await send_method(request)
246250
except httpx.HTTPError as e:

pyatlan/errors.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1061,6 +1061,13 @@ class ErrorCode(Enum):
10611061
"Check your backend connectivity and ensure the Atlan search service is accessible.",
10621062
ApiError,
10631063
)
1064+
UNABLE_TO_RESOLVE_PERSONA_QUALIFIED_NAME = (
1065+
500,
1066+
"ATLAN-PYTHON-500-008",
1067+
"Unable to resolve the qualifiedName for persona with GUID '{0}'.",
1068+
"Verify the persona exists and the Atlan search service is accessible.",
1069+
ApiError,
1070+
)
10641071

10651072
def __init__(
10661073
self,

0 commit comments

Comments
 (0)