Skip to content

Commit 968f7f2

Browse files
GOV-667: Revamp policy duplication check
1 parent 2b48e48 commit 968f7f2

6 files changed

Lines changed: 369 additions & 115 deletions

File tree

pyatlan/client/common/transport.py

Lines changed: 125 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,22 @@
1717

1818
from pyatlan.client.constants import BULK_UPDATE, INDEX_SEARCH
1919
from pyatlan.errors import ErrorCode
20-
from pyatlan.model.search import DSL, Bool, IndexSearchRequest, Term
20+
from pyatlan.model.search import Bool, DSL, IndexSearchRequest, Prefix, Term
2121

2222
logger = logging.getLogger(__name__)
2323

2424

2525
def build_policy_search_request(
26-
policy_name: str, persona_guid: str
26+
policy_name: str, persona_qualified_name: str
2727
) -> IndexSearchRequest:
2828
"""Build an IndexSearchRequest to find an existing AuthPolicy by name and persona."""
2929
query = Bool(
3030
filter=[
31+
Term(field="__state", value="ACTIVE"),
3132
Term(field="__typeName.keyword", value="AuthPolicy"),
33+
Term(field="policyCategory", value="persona"),
3234
Term(field="name.keyword", value=policy_name),
33-
Term(field="__persona", value=persona_guid),
35+
Prefix(field="qualifiedName", value=persona_qualified_name),
3436
]
3537
)
3638
return IndexSearchRequest(
@@ -85,17 +87,84 @@ def parse_auth_policy_entity(request: httpx.Request) -> Optional[tuple[str, str,
8587
return None
8688

8789

90+
def get_persona_qualified_name(client: Any, persona_guid: str) -> Optional[str]:
91+
"""
92+
Fetch the qualifiedName of a Persona by its GUID via IndexSearch (synchronous).
93+
"""
94+
try:
95+
query = Bool(
96+
filter=[
97+
Term(field="__typeName.keyword", value="Persona"),
98+
Term(field="__guid", value=persona_guid),
99+
]
100+
)
101+
search = IndexSearchRequest(
102+
dsl=DSL(query=query, size=1, from_=0),
103+
attributes=["qualifiedName"],
104+
)
105+
raw_json = client._call_api(INDEX_SEARCH, request_obj=search)
106+
if raw_json and raw_json.get("entities"):
107+
return raw_json["entities"][0].get("attributes", {}).get("qualifiedName")
108+
return None
109+
except Exception as e:
110+
logger.debug(
111+
"get_persona_qualified_name: could not fetch qualifiedName for persona %s: %s",
112+
persona_guid,
113+
e,
114+
)
115+
return None
116+
117+
118+
async def get_persona_qualified_name_async(
119+
client: Any, persona_guid: str
120+
) -> Optional[str]:
121+
"""
122+
Fetch the qualifiedName of a Persona by its GUID via IndexSearch (asynchronous).
123+
"""
124+
try:
125+
query = Bool(
126+
filter=[
127+
Term(field="__typeName.keyword", value="Persona"),
128+
Term(field="__guid", value=persona_guid),
129+
]
130+
)
131+
search = IndexSearchRequest(
132+
dsl=DSL(query=query, size=1, from_=0),
133+
attributes=["qualifiedName"],
134+
)
135+
raw_json = await client._call_api(INDEX_SEARCH, request_obj=search)
136+
if raw_json and raw_json.get("entities"):
137+
return raw_json["entities"][0].get("attributes", {}).get("qualifiedName")
138+
return None
139+
except Exception as e:
140+
logger.debug(
141+
"get_persona_qualified_name_async: could not fetch qualifiedName for persona %s: %s",
142+
persona_guid,
143+
e,
144+
)
145+
return None
146+
147+
88148
def find_existing_policy(
89149
client: Any, policy_name: str, persona_guid: str
90150
) -> Optional[dict]:
91151
"""
92152
Search for an existing AuthPolicy by name and persona GUID (synchronous).
93153
154+
First resolves the persona GUID to its qualifiedName, then uses a qualifiedName
155+
prefix query to scope the search to that persona.
156+
94157
Raises:
95-
ErrorCode.UNABLE_TO_SEARCH_EXISTING_POLICY: if the search call fails.
158+
ErrorCode.UNABLE_TO_SEARCH_EXISTING_POLICY: if the policy search call fails.
96159
"""
160+
persona_qualified_name = get_persona_qualified_name(client, persona_guid)
161+
if not persona_qualified_name:
162+
raise ErrorCode.UNABLE_TO_RESOLVE_PERSONA_QUALIFIED_NAME.exception_with_parameters(
163+
persona_guid
164+
)
165+
97166
try:
98-
search_request = build_policy_search_request(policy_name, persona_guid)
167+
search_request = build_policy_search_request(policy_name, persona_qualified_name)
99168
raw_json = client._call_api(INDEX_SEARCH, request_obj=search_request)
100169
if raw_json and raw_json.get("entities"):
101170
return raw_json["entities"][0]
@@ -112,11 +181,20 @@ async def find_existing_policy_async(
112181
"""
113182
Search for an existing AuthPolicy by name and persona GUID (asynchronous).
114183
184+
First resolves the persona GUID to its qualifiedName, then uses a qualifiedName
185+
prefix query to scope the search to that persona.
186+
115187
Raises:
116-
ErrorCode.UNABLE_TO_SEARCH_EXISTING_POLICY: if the search call fails.
188+
ErrorCode.UNABLE_TO_SEARCH_EXISTING_POLICY: if the policy search call fails.
117189
"""
190+
persona_qualified_name = await get_persona_qualified_name_async(client, persona_guid)
191+
if not persona_qualified_name:
192+
raise ErrorCode.UNABLE_TO_RESOLVE_PERSONA_QUALIFIED_NAME.exception_with_parameters(
193+
persona_guid
194+
)
195+
118196
try:
119-
search_request = build_policy_search_request(policy_name, persona_guid)
197+
search_request = build_policy_search_request(policy_name, persona_qualified_name)
120198
raw_json = await client._call_api(INDEX_SEARCH, request_obj=search_request)
121199
if raw_json and raw_json.get("entities"):
122200
return raw_json["entities"][0]
@@ -132,24 +210,36 @@ def check_for_duplicate_policy(
132210
) -> Optional[httpx.Response]:
133211
"""
134212
Check whether a bulk POST is creating an AuthPolicy that already exists (synchronous).
135-
Only called during retry attempts, never on the first request.
213+
Called before every attempt — including the first — so that repeated automation
214+
runs that don't pre-check for an existing policy are handled transparently.
136215
137216
Returns a mock response with the existing policy if a duplicate is found,
138-
or None to let the retry proceed normally.
217+
or None to let the request proceed normally.
139218
140-
Raises:
141-
ErrorCode.UNABLE_TO_SEARCH_EXISTING_POLICY: if the duplicate search fails.
219+
Never raises: search failures are logged and treated as "not found" so that
220+
a degraded index cannot block policy creation entirely.
142221
"""
143222
parsed = parse_auth_policy_entity(request)
144223
if not parsed:
145224
return None
146225

147226
policy_name, persona_guid, temp_guid = parsed
148-
existing_policy = find_existing_policy(client, policy_name, persona_guid)
227+
try:
228+
existing_policy = find_existing_policy(client, policy_name, persona_guid)
229+
except Exception as e:
230+
logger.warning(
231+
"Duplicate policy search failed for '%s' (persona %s): %s. "
232+
"Proceeding with request.",
233+
policy_name,
234+
persona_guid,
235+
str(e),
236+
)
237+
return None
149238
if existing_policy:
150239
logger.info(
151-
f"Found existing policy '{policy_name}' with guid "
152-
f"{existing_policy.get('guid')} during retry check"
240+
"Found existing policy '%s' with guid %s — returning it instead of creating a duplicate.",
241+
policy_name,
242+
existing_policy.get("guid"),
153243
)
154244
return create_mock_response(existing_policy, temp_guid)
155245
return None
@@ -160,26 +250,38 @@ async def check_for_duplicate_policy_async(
160250
) -> Optional[httpx.Response]:
161251
"""
162252
Check whether a bulk POST is creating an AuthPolicy that already exists (asynchronous).
163-
Only called during retry attempts, never on the first request.
253+
Called before every attempt — including the first — so that repeated automation
254+
runs that don't pre-check for an existing policy are handled transparently.
164255
165256
Returns a mock response with the existing policy if a duplicate is found,
166-
or None to let the retry proceed normally.
257+
or None to let the request proceed normally.
167258
168-
Raises:
169-
ErrorCode.UNABLE_TO_SEARCH_EXISTING_POLICY: if the duplicate search fails.
259+
Never raises: search failures are logged and treated as "not found" so that
260+
a degraded index cannot block policy creation entirely.
170261
"""
171262
parsed = parse_auth_policy_entity(request)
172263
if not parsed:
173264
return None
174265

175266
policy_name, persona_guid, temp_guid = parsed
176-
existing_policy = await find_existing_policy_async(
177-
client, policy_name, persona_guid
178-
)
267+
try:
268+
existing_policy = await find_existing_policy_async(
269+
client, policy_name, persona_guid
270+
)
271+
except Exception as e:
272+
logger.warning(
273+
"Duplicate policy search failed for '%s' (persona %s): %s. "
274+
"Proceeding with request.",
275+
policy_name,
276+
persona_guid,
277+
str(e),
278+
)
279+
return None
179280
if existing_policy:
180281
logger.info(
181-
f"Found existing policy '{policy_name}' with guid "
182-
f"{existing_policy.get('guid')} during retry check"
282+
"Found existing policy '%s' with guid %s — returning it instead of creating a duplicate.",
283+
policy_name,
284+
existing_policy.get("guid"),
183285
)
184286
return create_mock_response(existing_policy, temp_guid)
185287
return None

pyatlan/client/transport.py

Lines changed: 32 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -106,22 +106,25 @@ def _retry_operation(
106106
logger.debug(
107107
"_retry_operation retrying response=%s retry=%s", response, retry
108108
)
109-
110-
# ONLY during retry: check if this is a policy creation and if duplicate exists
111-
if self._client:
112-
duplicate_response = check_for_duplicate_policy(
113-
self._client, request
114-
)
115-
if duplicate_response:
116-
logger.warning(
117-
"RETRY PREVENTED: Policy already exists (likely from previous "
118-
"request that timed out but succeeded). Returning existing policy."
119-
)
120-
return duplicate_response
121-
122109
retry = retry.increment()
123110
retry.sleep(response)
124111

112+
# Check before EVERY attempt (first + retries).
113+
# On the first attempt this catches automation re-runs where the policy
114+
# was already created by a previous run.
115+
# On retries, the preceding sleep gives the index time to propagate an
116+
# entity that was committed server-side before a gateway timeout.
117+
if self._client:
118+
duplicate_response = check_for_duplicate_policy(
119+
self._client, request
120+
)
121+
if duplicate_response:
122+
logger.warning(
123+
"DUPLICATE PREVENTED: Policy already exists. "
124+
"Returning existing policy instead of creating a duplicate."
125+
)
126+
return duplicate_response
127+
125128
try:
126129
response = send_method(request)
127130
except httpx.HTTPError as e:
@@ -225,22 +228,25 @@ async def _retry_operation_async(
225228
response,
226229
retry,
227230
)
228-
229-
# ONLY during retry: check if this is a policy creation and if duplicate exists
230-
if self._client:
231-
duplicate_response = await check_for_duplicate_policy_async(
232-
self._client, request
233-
)
234-
if duplicate_response:
235-
logger.warning(
236-
"RETRY PREVENTED: Policy already exists (likely from previous "
237-
"request that timed out but succeeded). Returning existing policy."
238-
)
239-
return duplicate_response
240-
241231
retry = retry.increment()
242232
await retry.asleep(response)
243233

234+
# Check before EVERY attempt (first + retries).
235+
# On the first attempt this catches automation re-runs where the policy
236+
# was already created by a previous run.
237+
# On retries, the preceding sleep gives the index time to propagate an
238+
# entity that was committed server-side before a gateway timeout.
239+
if self._client:
240+
duplicate_response = await check_for_duplicate_policy_async(
241+
self._client, request
242+
)
243+
if duplicate_response:
244+
logger.warning(
245+
"DUPLICATE PREVENTED: Policy already exists. "
246+
"Returning existing policy instead of creating a duplicate."
247+
)
248+
return duplicate_response
249+
244250
try:
245251
response = await send_method(request)
246252
except httpx.HTTPError as e:

pyatlan/errors.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1061,6 +1061,13 @@ class ErrorCode(Enum):
10611061
"Check your backend connectivity and ensure the Atlan search service is accessible.",
10621062
ApiError,
10631063
)
1064+
UNABLE_TO_RESOLVE_PERSONA_QUALIFIED_NAME = (
1065+
500,
1066+
"ATLAN-PYTHON-500-008",
1067+
"Unable to resolve the qualifiedName for persona with GUID '{0}'.",
1068+
"Verify the persona exists and the Atlan search service is accessible.",
1069+
ApiError,
1070+
)
10641071

10651072
def __init__(
10661073
self,

tests/integration/aio/test_transport.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,10 @@ async def test_async_duplicate_prevention_on_timeout(
105105

106106
transport = PyatlanAsyncTransport(
107107
retry=Retry(total=3, backoff_factor=0, allowed_methods=["POST"]),
108-
client=client,
109108
trust_env=True,
110109
)
110+
# Set client reference after construction to avoid validation issues
111+
transport._client = client
111112
assert client._async_session is not None
112113
original_transport = client._async_session._transport
113114
client._async_session._transport = transport
@@ -183,9 +184,10 @@ async def test_async_duplicate_prevention_short_circuits_when_policy_exists(
183184

184185
transport = PyatlanAsyncTransport(
185186
retry=Retry(total=3, backoff_factor=0, allowed_methods=["POST"]),
186-
client=client,
187187
trust_env=True,
188188
)
189+
# Set client reference after construction to avoid validation issues
190+
transport._client = client
189191
assert client._async_session is not None
190192
original_transport = client._async_session._transport
191193
client._async_session._transport = transport
@@ -207,10 +209,23 @@ async def intercepting_handle(request: httpx.Request) -> httpx.Response:
207209

208210
transport._transport.handle_async_request = intercepting_handle # type: ignore[method-assign]
209211

212+
duplicate_check_count = 0
213+
214+
async def mock_find_existing_policy_async(*args, **kwargs):
215+
"""Return None on first check, existing_policy on retry check."""
216+
nonlocal duplicate_check_count
217+
duplicate_check_count += 1
218+
if duplicate_check_count == 1:
219+
# First check (before first attempt): no duplicate yet
220+
return None
221+
else:
222+
# Second check (before retry): duplicate exists now
223+
return existing_policy
224+
210225
try:
211226
with patch(
212227
"pyatlan.client.common.transport.find_existing_policy_async",
213-
new=AsyncMock(return_value=existing_policy),
228+
new=AsyncMock(side_effect=mock_find_existing_policy_async),
214229
):
215230
policy = Persona.create_metadata_policy(
216231
name=policy_name,

0 commit comments

Comments
 (0)