Skip to content

Commit 26a2661

Browse files
GOV-667: Resolve comments
1 parent badc0e6 commit 26a2661

3 files changed

Lines changed: 211 additions & 17 deletions

File tree

pyatlan/client/transport.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
import logging
99
from functools import partial
10-
from typing import TYPE_CHECKING, Any, Optional, Union
10+
from typing import TYPE_CHECKING, Any, Optional, Union, cast
1111

1212
import httpx
1313
from httpx_retries import Retry
@@ -130,11 +130,10 @@ def _retry_operation(
130130
response = e
131131
continue
132132

133-
assert isinstance(response, httpx.Response)
134133
if retry.is_exhausted() or not retry.is_retryable_status_code(
135-
response.status_code
134+
cast(httpx.Response, response).status_code
136135
):
137-
return response
136+
return cast(httpx.Response, response)
138137

139138
def close(self) -> None:
140139
"""Close the underlying transport."""
@@ -250,11 +249,10 @@ async def _retry_operation_async(
250249
response = e
251250
continue
252251

253-
assert isinstance(response, httpx.Response)
254252
if retry.is_exhausted() or not retry.is_retryable_status_code(
255-
response.status_code
253+
cast(httpx.Response, response).status_code
256254
):
257-
return response
255+
return cast(httpx.Response, response)
258256

259257
async def aclose(self) -> None:
260258
"""Close the underlying transport."""
Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
# SPDX-License-Identifier: Apache-2.0
2+
# Copyright 2025 Atlan Pte. Ltd.
3+
"""
4+
Async integration tests for transport-layer duplicate AuthPolicy prevention.
5+
6+
These tests connect to a live Atlan tenant to verify the retry +
7+
duplicate-prevention mechanism end-to-end using PyatlanAsyncTransport.
8+
"""
9+
10+
from typing import AsyncGenerator
11+
from unittest.mock import patch
12+
13+
import httpx
14+
import pytest
15+
import pytest_asyncio
16+
from httpx_retries import Retry
17+
18+
from pyatlan.client.aio import AsyncAtlanClient
19+
from pyatlan.client.transport import PyatlanAsyncTransport
20+
from pyatlan.model.assets import AuthPolicy, Persona
21+
from pyatlan.model.enums import AuthPolicyType, PersonaMetadataAction
22+
from tests.integration.client import TestId
23+
24+
PERSONA_NAME = "New"
25+
CONNECTION_QN = "default/redshift/1769838984"
26+
MODULE_NAME = TestId.make_unique("AioTransportRetry")
27+
28+
29+
@pytest_asyncio.fixture(scope="module")
30+
async def client() -> AsyncGenerator[AsyncAtlanClient, None]:
31+
"""Async Atlan client fixture."""
32+
yield AsyncAtlanClient()
33+
34+
35+
async def _find_persona(atlan_client: AsyncAtlanClient, name: str) -> Persona:
36+
results = await atlan_client.asset.find_personas_by_name(name)
37+
if not results:
38+
pytest.skip(f"Persona '{name}' not found on this tenant — skipping.")
39+
return results[0]
40+
41+
42+
def _build_fake_bulk_response(policy_name: str, persona_guid: str) -> httpx.Response:
43+
"""Return a fake 200 bulk POST response as if the policy was created."""
44+
fake_guid = f"fake-{policy_name}-guid"
45+
body = {
46+
"mutatedEntities": {
47+
"CREATE": [
48+
{
49+
"typeName": "AuthPolicy",
50+
"guid": fake_guid,
51+
"attributes": {
52+
"name": policy_name,
53+
"accessControl": {"guid": persona_guid},
54+
},
55+
}
56+
]
57+
},
58+
"guidAssignments": {"-1": fake_guid},
59+
}
60+
return httpx.Response(200, json=body)
61+
62+
63+
@pytest.mark.asyncio
64+
async def test_async_duplicate_prevention_on_timeout(client: AsyncAtlanClient):
65+
"""
66+
Simulate a ReadTimeout after a (mocked) async bulk POST succeeds.
67+
On retry, the transport runs a real IndexSearch against the tenant.
68+
Since the policy was never actually created, IndexSearch returns nothing
69+
and the retry proceeds — confirming the async duplicate-check path runs correctly.
70+
71+
This validates: async transport wiring, parse_auth_policy_entity, and the
72+
real IndexSearch call in find_existing_policy_async.
73+
"""
74+
persona = await _find_persona(client, PERSONA_NAME)
75+
policy_name = f"{MODULE_NAME}_DupCheck"
76+
77+
transport = PyatlanAsyncTransport(
78+
retry=Retry(total=3, backoff_factor=0, allowed_methods=["POST"]),
79+
client=client,
80+
trust_env=True,
81+
)
82+
assert client._async_session is not None
83+
original_transport = client._async_session._transport
84+
client._async_session._transport = transport
85+
86+
bulk_call_count = 0
87+
original_inner_handle = transport._transport.handle_async_request
88+
89+
async def intercepting_handle(request: httpx.Request) -> httpx.Response:
90+
nonlocal bulk_call_count
91+
if request.method == "POST" and "/api/meta/entity/bulk" in str(request.url):
92+
bulk_call_count += 1
93+
if bulk_call_count == 1:
94+
raise httpx.ReadTimeout(
95+
"Simulated timeout after successful creation",
96+
request=request,
97+
)
98+
# Second attempt — return a real fake success
99+
return _build_fake_bulk_response(policy_name, persona.guid)
100+
return await original_inner_handle(request)
101+
102+
transport._transport.handle_async_request = intercepting_handle # type: ignore[method-assign]
103+
104+
try:
105+
policy = Persona.create_metadata_policy(
106+
name=policy_name,
107+
persona_id=persona.guid,
108+
policy_type=AuthPolicyType.ALLOW,
109+
actions={PersonaMetadataAction.READ},
110+
connection_qualified_name=CONNECTION_QN,
111+
resources={f"entity:{CONNECTION_QN}/*"},
112+
)
113+
response = await client.asset.save(policy)
114+
115+
# Policy wasn't really created so IndexSearch returns nothing →
116+
# retry proceeds to attempt #2 → fake success returned
117+
assert response is not None
118+
assert bulk_call_count == 2, (
119+
f"Expected 2 bulk POSTs (no duplicate found, retry proceeded), got {bulk_call_count}"
120+
)
121+
finally:
122+
client._async_session._transport = original_transport
123+
transport._transport.handle_async_request = original_inner_handle # type: ignore[method-assign]
124+
125+
126+
@pytest.mark.asyncio
127+
async def test_async_duplicate_prevention_short_circuits_when_policy_exists(
128+
client: AsyncAtlanClient,
129+
):
130+
"""
131+
After a (mocked) async timeout, the IndexSearch duplicate-check is mocked to
132+
return an existing policy. The async transport should short-circuit and NOT
133+
send a second bulk POST.
134+
135+
This validates the full async duplicate-prevention flow without needing
136+
connection-admin rights on the tenant.
137+
"""
138+
persona = await _find_persona(client, PERSONA_NAME)
139+
policy_name = f"{MODULE_NAME}_ShortCircuit"
140+
fake_guid = f"existing-{policy_name}-guid"
141+
142+
existing_policy = {
143+
"typeName": "AuthPolicy",
144+
"guid": fake_guid,
145+
"attributes": {
146+
"name": policy_name,
147+
"accessControl": {"guid": persona.guid},
148+
},
149+
}
150+
151+
transport = PyatlanAsyncTransport(
152+
retry=Retry(total=3, backoff_factor=0, allowed_methods=["POST"]),
153+
client=client,
154+
trust_env=True,
155+
)
156+
assert client._async_session is not None
157+
original_transport = client._async_session._transport
158+
client._async_session._transport = transport
159+
160+
bulk_call_count = 0
161+
original_inner_handle = transport._transport.handle_async_request
162+
163+
async def intercepting_handle(request: httpx.Request) -> httpx.Response:
164+
nonlocal bulk_call_count
165+
if request.method == "POST" and "/api/meta/entity/bulk" in str(request.url):
166+
bulk_call_count += 1
167+
if bulk_call_count == 1:
168+
raise httpx.ReadTimeout(
169+
"Simulated timeout after successful creation",
170+
request=request,
171+
)
172+
return _build_fake_bulk_response(policy_name, persona.guid)
173+
return await original_inner_handle(request)
174+
175+
transport._transport.handle_async_request = intercepting_handle # type: ignore[method-assign]
176+
177+
try:
178+
# Patch find_existing_policy_async so the duplicate check returns our
179+
# fake existing policy without a real search
180+
with patch(
181+
"pyatlan.client.common.transport.find_existing_policy_async",
182+
return_value=existing_policy,
183+
):
184+
policy = Persona.create_metadata_policy(
185+
name=policy_name,
186+
persona_id=persona.guid,
187+
policy_type=AuthPolicyType.ALLOW,
188+
actions={PersonaMetadataAction.READ},
189+
connection_qualified_name=CONNECTION_QN,
190+
resources={f"entity:{CONNECTION_QN}/*"},
191+
)
192+
response = await client.asset.save(policy)
193+
194+
assert response is not None
195+
# Duplicate found → retry short-circuited → only 1 bulk POST
196+
assert bulk_call_count == 1, (
197+
f"Expected 1 bulk POST (duplicate prevented retry), got {bulk_call_count}"
198+
)
199+
# Response should contain the existing policy's GUID
200+
saved = response.assets_created(AuthPolicy)
201+
assert saved and saved[0].guid == fake_guid, (
202+
f"Expected existing policy guid {fake_guid}, got {saved}"
203+
)
204+
finally:
205+
client._async_session._transport = original_transport
206+
transport._transport.handle_async_request = original_inner_handle # type: ignore[method-assign]
Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,6 @@
2828
MODULE_NAME = TestId.make_unique("TransportRetry")
2929

3030

31-
# ---------------------------------------------------------------------------
32-
# Helpers
33-
# ---------------------------------------------------------------------------
34-
35-
3631
def _find_persona(atlan_client: AtlanClient, name: str) -> Persona:
3732
results = atlan_client.asset.find_personas_by_name(name)
3833
if not results:
@@ -61,11 +56,6 @@ def _build_fake_bulk_response(policy_name: str, persona_guid: str) -> httpx.Resp
6156
return httpx.Response(200, json=body)
6257

6358

64-
# ---------------------------------------------------------------------------
65-
# Tests
66-
# ---------------------------------------------------------------------------
67-
68-
6959
def test_duplicate_prevention_on_timeout(client: AtlanClient): # noqa: F811
7060
"""
7161
Simulate a ReadTimeout after a (mocked) bulk POST succeeds.

0 commit comments

Comments
 (0)