Skip to content

Commit 49968b6

Browse files
GOV-667: fix async transport path
1 parent b96e8f8 commit 49968b6

2 files changed

Lines changed: 71 additions & 3 deletions

File tree

pyatlan/client/aio/client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ def __init__(self, **kwargs):
168168

169169
# Create async session with custom transport that supports retry and proxy
170170
self._async_session = httpx.AsyncClient(
171-
transport=PyatlanAsyncTransport(retry=self.retry, **transport_kwargs),
171+
transport=PyatlanAsyncTransport(retry=self.retry, client=self, **transport_kwargs),
172172
headers={
173173
"x-atlan-agent": "sdk",
174174
"x-atlan-agent-id": "python",
@@ -980,7 +980,7 @@ async def max_retries( # type: ignore[override,misc]
980980
if self.verify is not None:
981981
transport_kwargs["verify"] = self.verify
982982

983-
new_transport = PyatlanAsyncTransport(retry=max_retries, **transport_kwargs)
983+
new_transport = PyatlanAsyncTransport(retry=max_retries, client=self, **transport_kwargs)
984984
session._transport = new_transport
985985

986986
LOGGER.debug(

pyatlan/client/transport.py

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,72 @@ def _create_mock_response(
6262
)
6363

6464

65+
async def _find_existing_policy_async(
66+
client: Any, policy_name: str, persona_guid: str
67+
) -> Optional[dict]:
68+
"""Async version of _find_existing_policy for use with AsyncAtlanClient."""
69+
try:
70+
from pyatlan.client.constants import INDEX_SEARCH
71+
from pyatlan.model.search import Bool, DSL, IndexSearchRequest, Term
72+
73+
query = Bool(
74+
filter=[
75+
Term(field="__typeName.keyword", value="AuthPolicy"),
76+
Term(field="name.keyword", value=policy_name),
77+
Term(field="__persona", value=persona_guid),
78+
]
79+
)
80+
search_request = IndexSearchRequest(
81+
dsl=DSL(query=query, size=1, from_=0),
82+
attributes=["name", "qualifiedName"],
83+
)
84+
raw_json = await client._call_api(INDEX_SEARCH, request_obj=search_request)
85+
if raw_json and raw_json.get("entities"):
86+
return raw_json["entities"][0]
87+
return None
88+
except Exception as e:
89+
logger.debug(f"Error searching for existing policy (async): {e}")
90+
return None
91+
92+
93+
async def _check_for_duplicate_policy_async(
94+
client: Any, request: httpx.Request
95+
) -> Optional[httpx.Response]:
96+
"""Async version of _check_for_duplicate_policy for use with AsyncAtlanClient."""
97+
try:
98+
if request.method != "POST" or "/api/meta/entity/bulk" not in str(request.url):
99+
return None
100+
if not request.content:
101+
return None
102+
103+
body = json.loads(request.content.decode("utf-8"))
104+
for entity in body.get("entities", []):
105+
if entity.get("typeName") != "AuthPolicy":
106+
continue
107+
policy_name = entity.get("attributes", {}).get("name")
108+
access_control = entity.get("attributes", {}).get("accessControl")
109+
persona_guid = (
110+
access_control.get("guid")
111+
if isinstance(access_control, dict)
112+
else None
113+
)
114+
if not (policy_name and persona_guid):
115+
continue
116+
existing_policy = await _find_existing_policy_async(
117+
client, policy_name, persona_guid
118+
)
119+
if existing_policy:
120+
logger.info(
121+
f"Found existing policy '{policy_name}' with guid "
122+
f"{existing_policy.get('guid')} during retry check"
123+
)
124+
return _create_mock_response(existing_policy, entity.get("guid", "-1"))
125+
return None
126+
except Exception as e:
127+
logger.debug(f"Duplicate policy check failed (will proceed with retry): {e}")
128+
return None
129+
130+
65131
def _check_for_duplicate_policy(
66132
client: "AtlanClient", request: httpx.Request
67133
) -> Optional[httpx.Response]:
@@ -307,7 +373,9 @@ async def _retry_operation_async(
307373

308374
# ONLY during retry: check if this is a policy creation and if duplicate exists
309375
if self._client:
310-
duplicate_response = _check_for_duplicate_policy(self._client, request)
376+
duplicate_response = await _check_for_duplicate_policy_async(
377+
self._client, request
378+
)
311379
if duplicate_response:
312380
logger.warning(
313381
"RETRY PREVENTED: Policy already exists (likely from previous "

0 commit comments

Comments
 (0)