Skip to content

Commit b96e8f8

Browse files
GOV-667: Fix review comments
1 parent 6f0535f commit b96e8f8

1 file changed

Lines changed: 86 additions & 124 deletions

File tree

pyatlan/client/transport.py

Lines changed: 86 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,90 @@
1919

2020
logger = logging.getLogger(__name__)
2121

22+
def _find_existing_policy(
23+
client: "AtlanClient", policy_name: str, persona_guid: str
24+
) -> Optional[dict]:
25+
"""Search for an existing AuthPolicy by name and persona GUID."""
26+
try:
27+
from pyatlan.client.constants import INDEX_SEARCH
28+
from pyatlan.model.search import Bool, DSL, IndexSearchRequest, Term
29+
30+
query = Bool(
31+
filter=[
32+
Term(field="__typeName.keyword", value="AuthPolicy"),
33+
Term(field="name.keyword", value=policy_name),
34+
Term(field="__persona", value=persona_guid),
35+
]
36+
)
37+
search_request = IndexSearchRequest(
38+
dsl=DSL(query=query, size=1, from_=0),
39+
attributes=["name", "qualifiedName"],
40+
)
41+
raw_json = client._call_api(INDEX_SEARCH, request_obj=search_request)
42+
if raw_json and raw_json.get("entities"):
43+
return raw_json["entities"][0]
44+
return None
45+
except Exception as e:
46+
logger.debug(f"Error searching for existing policy: {e}")
47+
return None
48+
49+
50+
def _create_mock_response(
51+
existing_policy: dict, temp_guid: str = "-1"
52+
) -> httpx.Response:
53+
"""Build a mock bulk-entity response containing an already-created policy."""
54+
response_body = {
55+
"mutatedEntities": {"CREATE": [existing_policy]},
56+
"guidAssignments": {temp_guid: existing_policy.get("guid")},
57+
}
58+
return httpx.Response(
59+
status_code=200,
60+
json=response_body,
61+
request=httpx.Request("POST", "http://mock"),
62+
)
63+
64+
65+
def _check_for_duplicate_policy(
66+
client: "AtlanClient", request: httpx.Request
67+
) -> Optional[httpx.Response]:
68+
"""
69+
Check whether a bulk POST is creating an AuthPolicy that already exists.
70+
Only called during retry attempts, never on the first request.
71+
72+
Returns a mock response with the existing policy if a duplicate is found,
73+
or None to let the retry proceed normally.
74+
"""
75+
try:
76+
if request.method != "POST" or "/api/meta/entity/bulk" not in str(request.url):
77+
return None
78+
if not request.content:
79+
return None
80+
81+
body = json.loads(request.content.decode("utf-8"))
82+
for entity in body.get("entities", []):
83+
if entity.get("typeName") != "AuthPolicy":
84+
continue
85+
policy_name = entity.get("attributes", {}).get("name")
86+
access_control = entity.get("attributes", {}).get("accessControl")
87+
persona_guid = (
88+
access_control.get("guid")
89+
if isinstance(access_control, dict)
90+
else None
91+
)
92+
if not (policy_name and persona_guid):
93+
continue
94+
existing_policy = _find_existing_policy(client, policy_name, persona_guid)
95+
if existing_policy:
96+
logger.info(
97+
f"Found existing policy '{policy_name}' with guid "
98+
f"{existing_policy.get('guid')} during retry check"
99+
)
100+
return _create_mock_response(existing_policy, entity.get("guid", "-1"))
101+
return None
102+
except Exception as e:
103+
logger.debug(f"Duplicate policy check failed (will proceed with retry): {e}")
104+
return None
105+
22106

23107
class PyatlanSyncTransport(httpx.BaseTransport):
24108
"""
@@ -106,7 +190,7 @@ def _retry_operation(
106190

107191
# ONLY during retry: check if this is a policy creation and if duplicate exists
108192
if self._client:
109-
duplicate_response = self._check_for_duplicate_policy(request)
193+
duplicate_response = _check_for_duplicate_policy(self._client, request)
110194
if duplicate_response:
111195
logger.warning(
112196
"RETRY PREVENTED: Policy already exists (likely from previous "
@@ -130,128 +214,6 @@ def _retry_operation(
130214
):
131215
return response
132216

133-
def _check_for_duplicate_policy(
134-
self, request: httpx.Request
135-
) -> Optional[httpx.Response]:
136-
"""
137-
Check if request is creating an AuthPolicy that already exists.
138-
This is ONLY called during retry attempts, not on first request.
139-
140-
Returns:
141-
A mock response with existing policy if duplicate found, None otherwise.
142-
"""
143-
try:
144-
# Only check POST requests to entity/bulk endpoint
145-
if request.method != "POST" or "/api/meta/entity/bulk" not in str(
146-
request.url
147-
):
148-
return None
149-
150-
if not request.content:
151-
return None
152-
153-
body = json.loads(request.content.decode("utf-8"))
154-
entities = body.get("entities", [])
155-
156-
# Check if any entity is an AuthPolicy
157-
for entity in entities:
158-
if entity.get("typeName") == "AuthPolicy":
159-
policy_name = entity.get("attributes", {}).get("name")
160-
access_control = entity.get("attributes", {}).get("accessControl")
161-
162-
# Extract persona GUID from accessControl relationship
163-
persona_guid = None
164-
if isinstance(access_control, dict):
165-
persona_guid = access_control.get("guid")
166-
167-
if policy_name and persona_guid:
168-
# Check if policy already exists
169-
existing_policy = self._find_existing_policy(
170-
policy_name, persona_guid
171-
)
172-
if existing_policy:
173-
logger.info(
174-
f"Found existing policy '{policy_name}' with guid "
175-
f"{existing_policy.get('guid')} during retry check"
176-
)
177-
# Create mock response with existing policy
178-
return self._create_mock_response(existing_policy)
179-
180-
return None
181-
182-
except Exception as e:
183-
# If duplicate check fails, log and continue with retry
184-
logger.debug(
185-
f"Duplicate policy check failed (will proceed with retry): {e}"
186-
)
187-
return None
188-
189-
def _find_existing_policy(
190-
self, policy_name: str, persona_guid: str
191-
) -> Optional[dict]:
192-
"""
193-
Search for existing policy by name and persona.
194-
195-
Returns:
196-
Policy data if found, None otherwise.
197-
"""
198-
try:
199-
# Import here to avoid circular dependency
200-
from pyatlan.client.constants import INDEX_SEARCH
201-
from pyatlan.model.search import Bool, DSL, IndexSearchRequest, Term
202-
203-
# Build search request to find policy by name and persona
204-
query = Bool(
205-
filter=[
206-
Term(field="__typeName.keyword", value="AuthPolicy"),
207-
Term(field="name.keyword", value=policy_name),
208-
Term(field="__persona", value=persona_guid),
209-
]
210-
)
211-
212-
search_request = IndexSearchRequest(
213-
dsl=DSL(query=query, size=1, from_=0),
214-
attributes=["name", "qualifiedName"],
215-
)
216-
217-
# Execute search directly using INDEX_SEARCH endpoint
218-
raw_json = self._client._call_api(
219-
INDEX_SEARCH,
220-
request_obj=search_request,
221-
)
222-
223-
# Check if policy found
224-
if (
225-
raw_json
226-
and raw_json.get("entities")
227-
and len(raw_json["entities"]) > 0
228-
):
229-
return raw_json["entities"][0]
230-
231-
return None
232-
233-
except Exception as e:
234-
logger.debug(f"Error searching for existing policy: {e}")
235-
return None
236-
237-
def _create_mock_response(self, existing_policy: dict) -> httpx.Response:
238-
"""
239-
Create a mock HTTP response containing the existing policy.
240-
241-
This response mimics what the bulk entity creation endpoint would return.
242-
"""
243-
# Create response body matching AssetMutationResponse format
244-
response_body = {
245-
"mutatedEntities": {"CREATE": [existing_policy]},
246-
"guidAssignments": {"-1": existing_policy.get("guid")},
247-
}
248-
249-
return httpx.Response(
250-
status_code=200,
251-
json=response_body,
252-
request=httpx.Request("POST", "http://mock"),
253-
)
254-
255217
def close(self) -> None:
256218
"""Close the underlying transport."""
257219
self._transport.close()
@@ -345,7 +307,7 @@ async def _retry_operation_async(
345307

346308
# ONLY during retry: check if this is a policy creation and if duplicate exists
347309
if self._client:
348-
duplicate_response = self._check_for_duplicate_policy(request)
310+
duplicate_response = _check_for_duplicate_policy(self._client, request)
349311
if duplicate_response:
350312
logger.warning(
351313
"RETRY PREVENTED: Policy already exists (likely from previous "

0 commit comments

Comments
 (0)