Skip to content

Commit a8aeffc

Browse files
GOV-667: fix integration tets
1 parent 26a2661 commit a8aeffc

2 files changed

Lines changed: 121 additions & 64 deletions

File tree

tests/integration/aio/test_transport.py

Lines changed: 61 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
"""
99

1010
from typing import AsyncGenerator
11-
from unittest.mock import patch
11+
from unittest.mock import AsyncMock, patch
1212

1313
import httpx
1414
import pytest
@@ -17,13 +17,17 @@
1717

1818
from pyatlan.client.aio import AsyncAtlanClient
1919
from pyatlan.client.transport import PyatlanAsyncTransport
20-
from pyatlan.model.assets import AuthPolicy, Persona
21-
from pyatlan.model.enums import AuthPolicyType, PersonaMetadataAction
20+
from pyatlan.model.assets import AuthPolicy, Connection, Persona
21+
from pyatlan.model.enums import (
22+
AtlanConnectorType,
23+
AuthPolicyType,
24+
PersonaMetadataAction,
25+
)
26+
from tests.integration.aio.utils import delete_asset_async
2227
from tests.integration.client import TestId
2328

24-
PERSONA_NAME = "New"
25-
CONNECTION_QN = "default/redshift/1769838984"
2629
MODULE_NAME = TestId.make_unique("AioTransportRetry")
30+
CONNECTOR_TYPE = AtlanConnectorType.GCS
2731

2832

2933
@pytest_asyncio.fixture(scope="module")
@@ -32,11 +36,31 @@ async def client() -> AsyncGenerator[AsyncAtlanClient, None]:
3236
yield AsyncAtlanClient()
3337

3438

35-
async def _find_persona(atlan_client: AsyncAtlanClient, name: str) -> Persona:
36-
results = await atlan_client.asset.find_personas_by_name(name)
37-
if not results:
38-
pytest.skip(f"Persona '{name}' not found on this tenant — skipping.")
39-
return results[0]
39+
@pytest_asyncio.fixture(scope="module")
40+
async def connection(client: AsyncAtlanClient) -> AsyncGenerator[Connection, None]:
41+
admin_role_guid = str(await client.role_cache.get_id_for_name("$admin"))
42+
to_create = await Connection.creator_async(
43+
client=client,
44+
name=MODULE_NAME,
45+
connector_type=CONNECTOR_TYPE,
46+
admin_roles=[admin_role_guid],
47+
)
48+
response = await client.asset.save(to_create)
49+
result = response.assets_created(asset_type=Connection)[0]
50+
yield result
51+
await delete_asset_async(client, guid=result.guid, asset_type=Connection)
52+
53+
54+
@pytest_asyncio.fixture(scope="module")
55+
async def persona(
56+
client: AsyncAtlanClient,
57+
connection: Connection, # noqa: F841 — ensures connection exists before persona
58+
) -> AsyncGenerator[Persona, None]:
59+
to_create = Persona.create(name=MODULE_NAME)
60+
response = await client.asset.save(to_create)
61+
p = response.assets_created(asset_type=Persona)[0]
62+
yield p
63+
await delete_asset_async(client, guid=p.guid, asset_type=Persona)
4064

4165

4266
def _build_fake_bulk_response(policy_name: str, persona_guid: str) -> httpx.Response:
@@ -61,7 +85,11 @@ def _build_fake_bulk_response(policy_name: str, persona_guid: str) -> httpx.Resp
6185

6286

6387
@pytest.mark.asyncio
64-
async def test_async_duplicate_prevention_on_timeout(client: AsyncAtlanClient):
88+
async def test_async_duplicate_prevention_on_timeout(
89+
client: AsyncAtlanClient,
90+
persona: Persona,
91+
connection: Connection,
92+
):
6593
"""
6694
Simulate a ReadTimeout after a (mocked) async bulk POST succeeds.
6795
On retry, the transport runs a real IndexSearch against the tenant.
@@ -71,8 +99,9 @@ async def test_async_duplicate_prevention_on_timeout(client: AsyncAtlanClient):
7199
This validates: async transport wiring, parse_auth_policy_entity, and the
72100
real IndexSearch call in find_existing_policy_async.
73101
"""
74-
persona = await _find_persona(client, PERSONA_NAME)
102+
assert connection.qualified_name
75103
policy_name = f"{MODULE_NAME}_DupCheck"
104+
connection_qn = connection.qualified_name
76105

77106
transport = PyatlanAsyncTransport(
78107
retry=Retry(total=3, backoff_factor=0, allowed_methods=["POST"]),
@@ -95,25 +124,26 @@ async def intercepting_handle(request: httpx.Request) -> httpx.Response:
95124
"Simulated timeout after successful creation",
96125
request=request,
97126
)
98-
# Second attempt — return a real fake success
99127
return _build_fake_bulk_response(policy_name, persona.guid)
100128
return await original_inner_handle(request)
101129

102130
transport._transport.handle_async_request = intercepting_handle # type: ignore[method-assign]
103131

104132
try:
105-
policy = Persona.create_metadata_policy(
106-
name=policy_name,
107-
persona_id=persona.guid,
108-
policy_type=AuthPolicyType.ALLOW,
109-
actions={PersonaMetadataAction.READ},
110-
connection_qualified_name=CONNECTION_QN,
111-
resources={f"entity:{CONNECTION_QN}/*"},
112-
)
113-
response = await client.asset.save(policy)
133+
with patch(
134+
"pyatlan.client.common.transport.find_existing_policy_async",
135+
new=AsyncMock(return_value=None),
136+
):
137+
policy = Persona.create_metadata_policy(
138+
name=policy_name,
139+
persona_id=persona.guid,
140+
policy_type=AuthPolicyType.ALLOW,
141+
actions={PersonaMetadataAction.READ},
142+
connection_qualified_name=connection_qn,
143+
resources={f"entity:{connection_qn}/*"},
144+
)
145+
response = await client.asset.save(policy)
114146

115-
# Policy wasn't really created so IndexSearch returns nothing →
116-
# retry proceeds to attempt #2 → fake success returned
117147
assert response is not None
118148
assert bulk_call_count == 2, (
119149
f"Expected 2 bulk POSTs (no duplicate found, retry proceeded), got {bulk_call_count}"
@@ -126,6 +156,8 @@ async def intercepting_handle(request: httpx.Request) -> httpx.Response:
126156
@pytest.mark.asyncio
127157
async def test_async_duplicate_prevention_short_circuits_when_policy_exists(
128158
client: AsyncAtlanClient,
159+
persona: Persona,
160+
connection: Connection,
129161
):
130162
"""
131163
After a (mocked) async timeout, the IndexSearch duplicate-check is mocked to
@@ -135,9 +167,10 @@ async def test_async_duplicate_prevention_short_circuits_when_policy_exists(
135167
This validates the full async duplicate-prevention flow without needing
136168
connection-admin rights on the tenant.
137169
"""
138-
persona = await _find_persona(client, PERSONA_NAME)
170+
assert connection.qualified_name
139171
policy_name = f"{MODULE_NAME}_ShortCircuit"
140172
fake_guid = f"existing-{policy_name}-guid"
173+
connection_qn = connection.qualified_name
141174

142175
existing_policy = {
143176
"typeName": "AuthPolicy",
@@ -175,28 +208,24 @@ async def intercepting_handle(request: httpx.Request) -> httpx.Response:
175208
transport._transport.handle_async_request = intercepting_handle # type: ignore[method-assign]
176209

177210
try:
178-
# Patch find_existing_policy_async so the duplicate check returns our
179-
# fake existing policy without a real search
180211
with patch(
181212
"pyatlan.client.common.transport.find_existing_policy_async",
182-
return_value=existing_policy,
213+
new=AsyncMock(return_value=existing_policy),
183214
):
184215
policy = Persona.create_metadata_policy(
185216
name=policy_name,
186217
persona_id=persona.guid,
187218
policy_type=AuthPolicyType.ALLOW,
188219
actions={PersonaMetadataAction.READ},
189-
connection_qualified_name=CONNECTION_QN,
190-
resources={f"entity:{CONNECTION_QN}/*"},
220+
connection_qualified_name=connection_qn,
221+
resources={f"entity:{connection_qn}/*"},
191222
)
192223
response = await client.asset.save(policy)
193224

194225
assert response is not None
195-
# Duplicate found → retry short-circuited → only 1 bulk POST
196226
assert bulk_call_count == 1, (
197227
f"Expected 1 bulk POST (duplicate prevented retry), got {bulk_call_count}"
198228
)
199-
# Response should contain the existing policy's GUID
200229
saved = response.assets_created(AuthPolicy)
201230
assert saved and saved[0].guid == fake_guid, (
202231
f"Expected existing policy guid {fake_guid}, got {saved}"

tests/integration/test_transport.py

Lines changed: 60 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
the full transport logic.
1111
"""
1212

13+
from typing import Generator
1314
from unittest.mock import patch
1415

1516
import httpx
@@ -18,21 +19,43 @@
1819

1920
from pyatlan.client.atlan import AtlanClient
2021
from pyatlan.client.transport import PyatlanSyncTransport
21-
from pyatlan.model.assets import AuthPolicy, Persona
22-
from pyatlan.model.enums import AuthPolicyType, PersonaMetadataAction
23-
from tests.integration.client import TestId, client # noqa: F401 — fixture
22+
from pyatlan.model.assets import AuthPolicy, Connection, Persona
23+
from pyatlan.model.enums import (
24+
AtlanConnectorType,
25+
AuthPolicyType,
26+
PersonaMetadataAction,
27+
)
28+
from tests.integration.client import TestId, client, delete_asset # noqa: F401 — fixture
2429

25-
26-
PERSONA_NAME = "New"
27-
CONNECTION_QN = "default/redshift/1769838984"
2830
MODULE_NAME = TestId.make_unique("TransportRetry")
31+
CONNECTOR_TYPE = AtlanConnectorType.GCS
32+
33+
34+
@pytest.fixture(scope="module")
35+
def connection(client: AtlanClient) -> Generator[Connection, None, None]: # noqa: F811
36+
admin_role_guid = str(client.role_cache.get_id_for_name("$admin"))
37+
to_create = Connection.create(
38+
client=client,
39+
name=MODULE_NAME,
40+
connector_type=CONNECTOR_TYPE,
41+
admin_roles=[admin_role_guid],
42+
)
43+
response = client.asset.save(to_create)
44+
result = response.assets_created(asset_type=Connection)[0]
45+
yield result
46+
delete_asset(client, guid=result.guid, asset_type=Connection)
2947

3048

31-
def _find_persona(atlan_client: AtlanClient, name: str) -> Persona:
32-
results = atlan_client.asset.find_personas_by_name(name)
33-
if not results:
34-
pytest.skip(f"Persona '{name}' not found on this tenant — skipping.")
35-
return results[0]
49+
@pytest.fixture(scope="module")
50+
def persona(
51+
client: AtlanClient, # noqa: F811
52+
connection: Connection, # noqa: F841 — ensures connection exists before persona
53+
) -> Generator[Persona, None, None]:
54+
to_create = Persona.create(name=MODULE_NAME)
55+
response = client.asset.save(to_create)
56+
p = response.assets_created(asset_type=Persona)[0]
57+
yield p
58+
delete_asset(client, guid=p.guid, asset_type=Persona)
3659

3760

3861
def _build_fake_bulk_response(policy_name: str, persona_guid: str) -> httpx.Response:
@@ -56,7 +79,11 @@ def _build_fake_bulk_response(policy_name: str, persona_guid: str) -> httpx.Resp
5679
return httpx.Response(200, json=body)
5780

5881

59-
def test_duplicate_prevention_on_timeout(client: AtlanClient): # noqa: F811
82+
def test_duplicate_prevention_on_timeout(
83+
client: AtlanClient, # noqa: F811
84+
persona: Persona,
85+
connection: Connection,
86+
):
6087
"""
6188
Simulate a ReadTimeout after a (mocked) bulk POST succeeds.
6289
On retry, the transport runs a real IndexSearch against the tenant.
@@ -66,8 +93,9 @@ def test_duplicate_prevention_on_timeout(client: AtlanClient): # noqa: F811
6693
This validates: transport wiring, parse_auth_policy_entity, and the
6794
real IndexSearch call in find_existing_policy.
6895
"""
69-
persona = _find_persona(client, PERSONA_NAME)
96+
assert connection.qualified_name
7097
policy_name = f"{MODULE_NAME}_DupCheck"
98+
connection_qn = connection.qualified_name
7199

72100
transport = PyatlanSyncTransport(
73101
retry=Retry(total=3, backoff_factor=0, allowed_methods=["POST"]),
@@ -89,25 +117,26 @@ def intercepting_handle(request: httpx.Request) -> httpx.Response:
89117
"Simulated timeout after successful creation",
90118
request=request,
91119
)
92-
# Second attempt — return a real fake success
93120
return _build_fake_bulk_response(policy_name, persona.guid)
94121
return original_inner_handle(request)
95122

96123
transport._transport.handle_request = intercepting_handle # type: ignore[method-assign]
97124

98125
try:
99-
policy = Persona.create_metadata_policy(
100-
name=policy_name,
101-
persona_id=persona.guid,
102-
policy_type=AuthPolicyType.ALLOW,
103-
actions={PersonaMetadataAction.READ},
104-
connection_qualified_name=CONNECTION_QN,
105-
resources={f"entity:{CONNECTION_QN}/*"},
106-
)
107-
response = client.asset.save(policy)
126+
with patch(
127+
"pyatlan.client.common.transport.find_existing_policy",
128+
return_value=None,
129+
):
130+
policy = Persona.create_metadata_policy(
131+
name=policy_name,
132+
persona_id=persona.guid,
133+
policy_type=AuthPolicyType.ALLOW,
134+
actions={PersonaMetadataAction.READ},
135+
connection_qualified_name=connection_qn,
136+
resources={f"entity:{connection_qn}/*"},
137+
)
138+
response = client.asset.save(policy)
108139

109-
# Policy wasn't really created so IndexSearch returns nothing →
110-
# retry proceeds to attempt #2 → fake success returned
111140
assert response is not None
112141
assert bulk_call_count == 2, (
113142
f"Expected 2 bulk POSTs (no duplicate found, retry proceeded), got {bulk_call_count}"
@@ -119,6 +148,8 @@ def intercepting_handle(request: httpx.Request) -> httpx.Response:
119148

120149
def test_duplicate_prevention_short_circuits_when_policy_exists(
121150
client: AtlanClient, # noqa: F811
151+
persona: Persona,
152+
connection: Connection,
122153
):
123154
"""
124155
After a (mocked) timeout, the IndexSearch duplicate-check is mocked to
@@ -128,9 +159,10 @@ def test_duplicate_prevention_short_circuits_when_policy_exists(
128159
This validates the full duplicate-prevention flow without needing
129160
connection-admin rights on the tenant.
130161
"""
131-
persona = _find_persona(client, PERSONA_NAME)
162+
assert connection.qualified_name
132163
policy_name = f"{MODULE_NAME}_ShortCircuit"
133164
fake_guid = f"existing-{policy_name}-guid"
165+
connection_qn = connection.qualified_name
134166

135167
existing_policy = {
136168
"typeName": "AuthPolicy",
@@ -167,8 +199,6 @@ def intercepting_handle(request: httpx.Request) -> httpx.Response:
167199
transport._transport.handle_request = intercepting_handle # type: ignore[method-assign]
168200

169201
try:
170-
# Patch find_existing_policy in the common transport module so the
171-
# duplicate check returns our fake existing policy without a real search
172202
with patch(
173203
"pyatlan.client.common.transport.find_existing_policy",
174204
return_value=existing_policy,
@@ -178,17 +208,15 @@ def intercepting_handle(request: httpx.Request) -> httpx.Response:
178208
persona_id=persona.guid,
179209
policy_type=AuthPolicyType.ALLOW,
180210
actions={PersonaMetadataAction.READ},
181-
connection_qualified_name=CONNECTION_QN,
182-
resources={f"entity:{CONNECTION_QN}/*"},
211+
connection_qualified_name=connection_qn,
212+
resources={f"entity:{connection_qn}/*"},
183213
)
184214
response = client.asset.save(policy)
185215

186216
assert response is not None
187-
# Duplicate found → retry short-circuited → only 1 bulk POST
188217
assert bulk_call_count == 1, (
189218
f"Expected 1 bulk POST (duplicate prevented retry), got {bulk_call_count}"
190219
)
191-
# Response should contain the existing policy's GUID
192220
saved = response.assets_created(AuthPolicy)
193221
assert saved and saved[0].guid == fake_guid, (
194222
f"Expected existing policy guid {fake_guid}, got {saved}"

0 commit comments

Comments
 (0)