Skip to content

Commit 889df11

Browse files
GOV-667: Resolved reviews and added tests
1 parent 49968b6 commit 889df11

5 files changed

Lines changed: 831 additions & 163 deletions

File tree

pyatlan/client/common/transport.py

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
# SPDX-License-Identifier: Apache-2.0
2+
# Copyright 2025 Atlan Pte. Ltd.
3+
"""
4+
Shared transport utilities for sync and async Atlan HTTP transports.
5+
6+
Provides duplicate AuthPolicy detection logic used by both
7+
PyatlanSyncTransport and PyatlanAsyncTransport.
8+
"""
9+
from __future__ import annotations
10+
11+
import json
12+
import logging
13+
from typing import Any, Optional
14+
15+
import httpx
16+
17+
from pyatlan.client.constants import BULK_UPDATE, INDEX_SEARCH
18+
from pyatlan.errors import ErrorCode
19+
from pyatlan.model.search import Bool, DSL, IndexSearchRequest, Term
20+
21+
logger = logging.getLogger(__name__)
22+
23+
24+
def build_policy_search_request(policy_name: str, persona_guid: str) -> IndexSearchRequest:
25+
"""Build an IndexSearchRequest to find an existing AuthPolicy by name and persona."""
26+
query = Bool(
27+
filter=[
28+
Term(field="__typeName.keyword", value="AuthPolicy"),
29+
Term(field="name.keyword", value=policy_name),
30+
Term(field="__persona", value=persona_guid),
31+
]
32+
)
33+
return IndexSearchRequest(
34+
dsl=DSL(query=query, size=1, from_=0),
35+
attributes=["name", "qualifiedName"],
36+
)
37+
38+
39+
def create_mock_response(existing_policy: dict, temp_guid: str = "-1") -> httpx.Response:
40+
"""Build a mock bulk-entity response containing an already-created policy."""
41+
response_body = {
42+
"mutatedEntities": {"CREATE": [existing_policy]},
43+
"guidAssignments": {temp_guid: existing_policy.get("guid")},
44+
}
45+
return httpx.Response(
46+
status_code=200,
47+
json=response_body,
48+
request=httpx.Request("POST", f"http://mock/{BULK_UPDATE.path}"),
49+
)
50+
51+
52+
def parse_auth_policy_entity(request: httpx.Request) -> Optional[tuple[str, str, str]]:
53+
"""
54+
Parse the request body and return (policy_name, persona_guid, temp_guid)
55+
if the request is a bulk POST containing an AuthPolicy, else None.
56+
"""
57+
if request.method != "POST" or BULK_UPDATE.path not in str(request.url):
58+
return None
59+
if not request.content:
60+
return None
61+
62+
body = json.loads(request.content.decode("utf-8"))
63+
for entity in body.get("entities", []):
64+
if entity.get("typeName") != "AuthPolicy":
65+
continue
66+
policy_name = entity.get("attributes", {}).get("name")
67+
access_control = entity.get("attributes", {}).get("accessControl")
68+
persona_guid = (
69+
access_control.get("guid") if isinstance(access_control, dict) else None
70+
)
71+
if policy_name and persona_guid:
72+
return policy_name, persona_guid, entity.get("guid", "-1")
73+
return None
74+
75+
76+
def find_existing_policy(client: Any, policy_name: str, persona_guid: str) -> Optional[dict]:
77+
"""
78+
Search for an existing AuthPolicy by name and persona GUID (synchronous).
79+
80+
Raises:
81+
ErrorCode.UNABLE_TO_SEARCH_EXISTING_POLICY: if the search call fails.
82+
"""
83+
try:
84+
search_request = build_policy_search_request(policy_name, persona_guid)
85+
raw_json = client._call_api(INDEX_SEARCH, request_obj=search_request)
86+
if raw_json and raw_json.get("entities"):
87+
return raw_json["entities"][0]
88+
return None
89+
except Exception as e:
90+
raise ErrorCode.UNABLE_TO_SEARCH_EXISTING_POLICY.exception_with_parameters(
91+
policy_name, persona_guid, str(e)
92+
)
93+
94+
95+
async def find_existing_policy_async(
96+
client: Any, policy_name: str, persona_guid: str
97+
) -> Optional[dict]:
98+
"""
99+
Search for an existing AuthPolicy by name and persona GUID (asynchronous).
100+
101+
Raises:
102+
ErrorCode.UNABLE_TO_SEARCH_EXISTING_POLICY: if the search call fails.
103+
"""
104+
try:
105+
search_request = build_policy_search_request(policy_name, persona_guid)
106+
raw_json = await client._call_api(INDEX_SEARCH, request_obj=search_request)
107+
if raw_json and raw_json.get("entities"):
108+
return raw_json["entities"][0]
109+
return None
110+
except Exception as e:
111+
raise ErrorCode.UNABLE_TO_SEARCH_EXISTING_POLICY.exception_with_parameters(
112+
policy_name, persona_guid, str(e)
113+
)
114+
115+
116+
def check_for_duplicate_policy(
117+
client: Any, request: httpx.Request
118+
) -> Optional[httpx.Response]:
119+
"""
120+
Check whether a bulk POST is creating an AuthPolicy that already exists (synchronous).
121+
Only called during retry attempts, never on the first request.
122+
123+
Returns a mock response with the existing policy if a duplicate is found,
124+
or None to let the retry proceed normally.
125+
126+
Raises:
127+
ErrorCode.UNABLE_TO_SEARCH_EXISTING_POLICY: if the duplicate search fails.
128+
"""
129+
parsed = parse_auth_policy_entity(request)
130+
if not parsed:
131+
return None
132+
133+
policy_name, persona_guid, temp_guid = parsed
134+
existing_policy = find_existing_policy(client, policy_name, persona_guid)
135+
if existing_policy:
136+
logger.info(
137+
f"Found existing policy '{policy_name}' with guid "
138+
f"{existing_policy.get('guid')} during retry check"
139+
)
140+
return create_mock_response(existing_policy, temp_guid)
141+
return None
142+
143+
144+
async def check_for_duplicate_policy_async(
145+
client: Any, request: httpx.Request
146+
) -> Optional[httpx.Response]:
147+
"""
148+
Check whether a bulk POST is creating an AuthPolicy that already exists (asynchronous).
149+
Only called during retry attempts, never on the first request.
150+
151+
Returns a mock response with the existing policy if a duplicate is found,
152+
or None to let the retry proceed normally.
153+
154+
Raises:
155+
ErrorCode.UNABLE_TO_SEARCH_EXISTING_POLICY: if the duplicate search fails.
156+
"""
157+
parsed = parse_auth_policy_entity(request)
158+
if not parsed:
159+
return None
160+
161+
policy_name, persona_guid, temp_guid = parsed
162+
existing_policy = await find_existing_policy_async(client, policy_name, persona_guid)
163+
if existing_policy:
164+
logger.info(
165+
f"Found existing policy '{policy_name}' with guid "
166+
f"{existing_policy.get('guid')} during retry check"
167+
)
168+
return create_mock_response(existing_policy, temp_guid)
169+
return None

pyatlan/client/transport.py

Lines changed: 13 additions & 163 deletions
Original file line numberDiff line numberDiff line change
@@ -6,169 +6,23 @@
66
with httpx's HTTPTransport while respecting proxy and SSL configurations.
77
"""
88

9-
import json
109
import logging
1110
from functools import partial
1211
from typing import TYPE_CHECKING, Any, Optional, Union
1312

1413
import httpx
1514
from httpx_retries import Retry
1615

16+
from pyatlan.client.common.transport import (
17+
check_for_duplicate_policy,
18+
check_for_duplicate_policy_async,
19+
)
20+
1721
if TYPE_CHECKING:
1822
from pyatlan.client.atlan import AtlanClient
1923

2024
logger = logging.getLogger(__name__)
2125

22-
def _find_existing_policy(
23-
client: "AtlanClient", policy_name: str, persona_guid: str
24-
) -> Optional[dict]:
25-
"""Search for an existing AuthPolicy by name and persona GUID."""
26-
try:
27-
from pyatlan.client.constants import INDEX_SEARCH
28-
from pyatlan.model.search import Bool, DSL, IndexSearchRequest, Term
29-
30-
query = Bool(
31-
filter=[
32-
Term(field="__typeName.keyword", value="AuthPolicy"),
33-
Term(field="name.keyword", value=policy_name),
34-
Term(field="__persona", value=persona_guid),
35-
]
36-
)
37-
search_request = IndexSearchRequest(
38-
dsl=DSL(query=query, size=1, from_=0),
39-
attributes=["name", "qualifiedName"],
40-
)
41-
raw_json = client._call_api(INDEX_SEARCH, request_obj=search_request)
42-
if raw_json and raw_json.get("entities"):
43-
return raw_json["entities"][0]
44-
return None
45-
except Exception as e:
46-
logger.debug(f"Error searching for existing policy: {e}")
47-
return None
48-
49-
50-
def _create_mock_response(
51-
existing_policy: dict, temp_guid: str = "-1"
52-
) -> httpx.Response:
53-
"""Build a mock bulk-entity response containing an already-created policy."""
54-
response_body = {
55-
"mutatedEntities": {"CREATE": [existing_policy]},
56-
"guidAssignments": {temp_guid: existing_policy.get("guid")},
57-
}
58-
return httpx.Response(
59-
status_code=200,
60-
json=response_body,
61-
request=httpx.Request("POST", "http://mock"),
62-
)
63-
64-
65-
async def _find_existing_policy_async(
66-
client: Any, policy_name: str, persona_guid: str
67-
) -> Optional[dict]:
68-
"""Async version of _find_existing_policy for use with AsyncAtlanClient."""
69-
try:
70-
from pyatlan.client.constants import INDEX_SEARCH
71-
from pyatlan.model.search import Bool, DSL, IndexSearchRequest, Term
72-
73-
query = Bool(
74-
filter=[
75-
Term(field="__typeName.keyword", value="AuthPolicy"),
76-
Term(field="name.keyword", value=policy_name),
77-
Term(field="__persona", value=persona_guid),
78-
]
79-
)
80-
search_request = IndexSearchRequest(
81-
dsl=DSL(query=query, size=1, from_=0),
82-
attributes=["name", "qualifiedName"],
83-
)
84-
raw_json = await client._call_api(INDEX_SEARCH, request_obj=search_request)
85-
if raw_json and raw_json.get("entities"):
86-
return raw_json["entities"][0]
87-
return None
88-
except Exception as e:
89-
logger.debug(f"Error searching for existing policy (async): {e}")
90-
return None
91-
92-
93-
async def _check_for_duplicate_policy_async(
94-
client: Any, request: httpx.Request
95-
) -> Optional[httpx.Response]:
96-
"""Async version of _check_for_duplicate_policy for use with AsyncAtlanClient."""
97-
try:
98-
if request.method != "POST" or "/api/meta/entity/bulk" not in str(request.url):
99-
return None
100-
if not request.content:
101-
return None
102-
103-
body = json.loads(request.content.decode("utf-8"))
104-
for entity in body.get("entities", []):
105-
if entity.get("typeName") != "AuthPolicy":
106-
continue
107-
policy_name = entity.get("attributes", {}).get("name")
108-
access_control = entity.get("attributes", {}).get("accessControl")
109-
persona_guid = (
110-
access_control.get("guid")
111-
if isinstance(access_control, dict)
112-
else None
113-
)
114-
if not (policy_name and persona_guid):
115-
continue
116-
existing_policy = await _find_existing_policy_async(
117-
client, policy_name, persona_guid
118-
)
119-
if existing_policy:
120-
logger.info(
121-
f"Found existing policy '{policy_name}' with guid "
122-
f"{existing_policy.get('guid')} during retry check"
123-
)
124-
return _create_mock_response(existing_policy, entity.get("guid", "-1"))
125-
return None
126-
except Exception as e:
127-
logger.debug(f"Duplicate policy check failed (will proceed with retry): {e}")
128-
return None
129-
130-
131-
def _check_for_duplicate_policy(
132-
client: "AtlanClient", request: httpx.Request
133-
) -> Optional[httpx.Response]:
134-
"""
135-
Check whether a bulk POST is creating an AuthPolicy that already exists.
136-
Only called during retry attempts, never on the first request.
137-
138-
Returns a mock response with the existing policy if a duplicate is found,
139-
or None to let the retry proceed normally.
140-
"""
141-
try:
142-
if request.method != "POST" or "/api/meta/entity/bulk" not in str(request.url):
143-
return None
144-
if not request.content:
145-
return None
146-
147-
body = json.loads(request.content.decode("utf-8"))
148-
for entity in body.get("entities", []):
149-
if entity.get("typeName") != "AuthPolicy":
150-
continue
151-
policy_name = entity.get("attributes", {}).get("name")
152-
access_control = entity.get("attributes", {}).get("accessControl")
153-
persona_guid = (
154-
access_control.get("guid")
155-
if isinstance(access_control, dict)
156-
else None
157-
)
158-
if not (policy_name and persona_guid):
159-
continue
160-
existing_policy = _find_existing_policy(client, policy_name, persona_guid)
161-
if existing_policy:
162-
logger.info(
163-
f"Found existing policy '{policy_name}' with guid "
164-
f"{existing_policy.get('guid')} during retry check"
165-
)
166-
return _create_mock_response(existing_policy, entity.get("guid", "-1"))
167-
return None
168-
except Exception as e:
169-
logger.debug(f"Duplicate policy check failed (will proceed with retry): {e}")
170-
return None
171-
17226

17327
class PyatlanSyncTransport(httpx.BaseTransport):
17428
"""
@@ -228,11 +82,8 @@ def handle_request(self, request: httpx.Request) -> httpx.Response:
22882
"""
22983
logger.debug("handle_request started request=%s", request)
23084

231-
if self.retry.is_retryable_method(request.method):
232-
send_method = partial(self._transport.handle_request)
233-
response = self._retry_operation(request, send_method)
234-
else:
235-
response = self._transport.handle_request(request)
85+
send_method = partial(self._transport.handle_request)
86+
response = self._retry_operation(request, send_method)
23687

23788
logger.debug(
23889
"handle_request finished request=%s response=%s", request, response
@@ -256,7 +107,9 @@ def _retry_operation(
256107

257108
# ONLY during retry: check if this is a policy creation and if duplicate exists
258109
if self._client:
259-
duplicate_response = _check_for_duplicate_policy(self._client, request)
110+
duplicate_response = check_for_duplicate_policy(
111+
self._client, request
112+
)
260113
if duplicate_response:
261114
logger.warning(
262115
"RETRY PREVENTED: Policy already exists (likely from previous "
@@ -343,11 +196,8 @@ async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
343196
"""
344197
logger.debug("handle_async_request started request=%s", request)
345198

346-
if self.retry.is_retryable_method(request.method):
347-
send_method = partial(self._transport.handle_async_request)
348-
response = await self._retry_operation_async(request, send_method)
349-
else:
350-
response = await self._transport.handle_async_request(request)
199+
send_method = partial(self._transport.handle_async_request)
200+
response = await self._retry_operation_async(request, send_method)
351201

352202
logger.debug(
353203
"handle_async_request finished request=%s response=%s", request, response
@@ -373,7 +223,7 @@ async def _retry_operation_async(
373223

374224
# ONLY during retry: check if this is a policy creation and if duplicate exists
375225
if self._client:
376-
duplicate_response = await _check_for_duplicate_policy_async(
226+
duplicate_response = await check_for_duplicate_policy_async(
377227
self._client, request
378228
)
379229
if duplicate_response:

0 commit comments

Comments
 (0)