Skip to content

Commit cca466b

Browse files
GOVFOUN-408: Fix connection pool issue
1 parent 12e8227 commit cca466b

3 files changed

Lines changed: 372 additions & 3 deletions

File tree

pyatlan/client/atlan.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,14 @@ def __init__(self, **data):
214214
# Pass self reference to transport for duplicate checking during retries
215215
self._session = httpx.Client(
216216
transport=PyatlanSyncTransport(
217-
retry=self.retry, client=self, **transport_kwargs
217+
retry=self.retry,
218+
client=self,
219+
limits=httpx.Limits(
220+
max_connections=50,
221+
max_keepalive_connections=10,
222+
keepalive_expiry=30.0,
223+
),
224+
**transport_kwargs,
218225
),
219226
headers={
220227
"x-atlan-agent": "sdk",
@@ -554,7 +561,10 @@ def _call_api_internal(
554561
try:
555562
params["headers"]["X-Atlan-Request-Id"] = request_id_var.get()
556563
timeout = httpx.Timeout(
557-
None, connect=self.connect_timeout, read=self.read_timeout
564+
None,
565+
connect=self.connect_timeout,
566+
read=self.read_timeout,
567+
pool=30.0,
558568
)
559569
if binary_data:
560570
response = self._session.request(
@@ -2014,7 +2024,14 @@ def max_retries( # type: ignore[misc]
20142024
transport_kwargs["verify"] = self.verify
20152025

20162026
new_transport = PyatlanSyncTransport(
2017-
retry=max_retries, client=self, **transport_kwargs
2027+
retry=max_retries,
2028+
client=self,
2029+
limits=httpx.Limits(
2030+
max_connections=50,
2031+
max_keepalive_connections=10,
2032+
keepalive_expiry=30.0,
2033+
),
2034+
**transport_kwargs,
20182035
)
20192036
self._session._transport = new_transport
20202037

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
# SPDX-License-Identifier: Apache-2.0
2+
# Copyright 2026 Atlan Pte. Ltd.
3+
"""
4+
Integration tests for GOVFOUN-408: httpcore connection pool deadlock fix.
5+
6+
These tests run against a live Atlan tenant and verify:
7+
1. Pool limits and timeout config survive the full client init and real HTTP
8+
connections — not just object construction.
9+
2. Concurrent requests complete without hanging under the configured pool size.
10+
3. httpx.PoolTimeout propagates through the full SDK stack instead of being
11+
swallowed or causing an indefinite hang.
12+
4. max_retries context manager correctly installs and restores limits on a
13+
client that is actively connected.
14+
"""
15+
16+
import time
17+
from concurrent.futures import ThreadPoolExecutor, wait
18+
from unittest.mock import patch
19+
20+
import httpx
21+
import pytest
22+
from httpx_retries import Retry
23+
24+
from pyatlan.client.atlan import AtlanClient
25+
from pyatlan.client.transport import PyatlanSyncTransport
26+
27+
28+
def _get_httpcore_pool(client: AtlanClient):
29+
return client._session._transport._transport._pool
30+
31+
32+
# ---------------------------------------------------------------------------
33+
# Pool limits — config survives real HTTP connections
34+
# ---------------------------------------------------------------------------
35+
36+
37+
def test_pool_limits_on_live_client(client: AtlanClient):
38+
"""
39+
Pool limits are configured correctly on an AtlanClient built from the
40+
integration environment (real base_url, real API key, real retry config).
41+
The values are set at construction time and must survive unchanged.
42+
"""
43+
pool = _get_httpcore_pool(client)
44+
assert pool._max_connections == 50
45+
assert pool._keepalive_expiry == 30.0
46+
assert pool._max_keepalive_connections == 10
47+
48+
49+
# ---------------------------------------------------------------------------
50+
# Concurrent requests — happy path
51+
# ---------------------------------------------------------------------------
52+
53+
54+
def test_concurrent_requests_complete_without_deadlock(client: AtlanClient):
55+
"""
56+
N concurrent requests (N << max_connections=50) must all complete.
57+
58+
Under the old config (pool=None, max_connections=100), if connections
59+
accumulated CLOSE_WAIT sockets and filled all slots, threads waiting for
60+
a connection would block on threading.Event.wait(timeout=None) forever.
61+
62+
With pool=30.0 and max_connections=50, threads either succeed or raise
63+
PoolTimeout within 30s — they never hang indefinitely.
64+
"""
65+
n_threads = 5
66+
timeout_seconds = 60
67+
68+
def make_request():
69+
return client.user.get_current()
70+
71+
with ThreadPoolExecutor(max_workers=n_threads) as executor:
72+
futures = [executor.submit(make_request) for _ in range(n_threads)]
73+
done, not_done = wait(futures, timeout=timeout_seconds)
74+
75+
# Only check for deadlock — individual futures may raise (auth errors, etc.)
76+
assert len(not_done) == 0, (
77+
f"{len(not_done)} of {n_threads} requests still pending after "
78+
f"{timeout_seconds}s — possible connection pool deadlock"
79+
)
80+
81+
82+
# ---------------------------------------------------------------------------
83+
# PoolTimeout propagation
84+
# ---------------------------------------------------------------------------
85+
86+
87+
def test_pool_timeout_propagates_through_sdk_stack(client: AtlanClient):
88+
"""
89+
httpx.PoolTimeout injected at the transport layer must propagate up
90+
through the SDK without being swallowed or stalling.
91+
92+
Before the fix, pool=None meant threads blocked indefinitely on
93+
threading.Event.wait(timeout=None). With pool=30.0 the SDK raises an
94+
exception quickly instead.
95+
"""
96+
original_transport = client._session._transport
97+
98+
test_transport = PyatlanSyncTransport(
99+
retry=Retry(total=0),
100+
client=client,
101+
limits=httpx.Limits(
102+
max_connections=50,
103+
max_keepalive_connections=10,
104+
keepalive_expiry=30.0,
105+
),
106+
)
107+
108+
def always_pool_timeout(request: httpx.Request) -> httpx.Response:
109+
raise httpx.PoolTimeout("simulated pool exhaustion", request=request)
110+
111+
test_transport._transport.handle_request = always_pool_timeout # type: ignore[method-assign]
112+
client._session._transport = test_transport
113+
114+
try:
115+
start = time.monotonic()
116+
with pytest.raises(Exception):
117+
client.user.get_current()
118+
elapsed = time.monotonic() - start
119+
120+
assert elapsed < 5.0, (
121+
f"PoolTimeout took {elapsed:.1f}s to propagate — "
122+
"it may be getting swallowed or retried unexpectedly"
123+
)
124+
finally:
125+
client._session._transport = original_transport
126+
127+
128+
# ---------------------------------------------------------------------------
129+
# max_retries context manager
130+
# ---------------------------------------------------------------------------
131+
132+
133+
def test_max_retries_transport_limits_on_live_client(client: AtlanClient):
134+
"""
135+
max_retries context manager must install a transport with the same pool
136+
limits on the real integration client (env-configured base_url, retry policy).
137+
"""
138+
captured: dict = {}
139+
original_init = PyatlanSyncTransport.__init__
140+
141+
def capturing_init(self_t, retry=None, client=None, **kwargs): # noqa: ARG001
142+
if "limits" in kwargs:
143+
captured["limits"] = kwargs["limits"]
144+
original_init(self_t, retry=retry, client=client, **kwargs)
145+
146+
with patch.object(PyatlanSyncTransport, "__init__", capturing_init):
147+
with client.max_retries():
148+
pass
149+
150+
limits = captured.get("limits")
151+
assert limits is not None, "max_retries did not pass limits to PyatlanSyncTransport"
152+
assert limits.max_connections == 50
153+
assert limits.keepalive_expiry == 30.0
154+
assert limits.max_keepalive_connections == 10
155+
156+
157+
def test_max_retries_restores_original_transport(client: AtlanClient):
158+
"""
159+
max_retries must restore the original transport after the context exits,
160+
even on the real integration client (env-configured base_url, retry policy).
161+
"""
162+
original_transport = client._session._transport
163+
164+
with client.max_retries():
165+
pass
166+
167+
assert client._session._transport is original_transport
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
# SPDX-License-Identifier: Apache-2.0
2+
# Copyright 2026 Atlan Pte. Ltd.
3+
"""
4+
Regression tests for GOVFOUN-408: httpcore connection pool deadlock fix.
5+
6+
Root cause: csa-metadata-completeness hung indefinitely on masterc-prd because
7+
two missing configs caused all 100 httpcore connection slots to fill with
8+
CLOSE_WAIT zombies, then 109 worker threads blocked forever in
9+
wait_for_connection(timeout=None).
10+
11+
These tests verify the two SDK-side fixes:
12+
1. httpx.Timeout has pool=30.0 — threads raise PoolTimeout after 30s instead
13+
of waiting forever.
14+
2. httpx.Limits has keepalive_expiry=30.0 — client retires idle connections
15+
before nginx's keepalive_timeout=75s FIN, preventing CLOSE_WAIT accumulation.
16+
"""
17+
18+
from unittest.mock import Mock, patch
19+
20+
import httpx
21+
import pytest
22+
23+
from pyatlan.client.atlan import AtlanClient
24+
from pyatlan.client.transport import PyatlanSyncTransport
25+
from pyatlan.model.assets import AtlasGlossary
26+
27+
28+
@pytest.fixture(autouse=True)
29+
def set_env(monkeypatch):
30+
monkeypatch.setenv("ATLAN_BASE_URL", "https://test.atlan.com")
31+
monkeypatch.setenv("ATLAN_API_KEY", "test-api-key")
32+
33+
34+
@pytest.fixture()
35+
def client():
36+
return AtlanClient()
37+
38+
39+
def _error_response():
40+
r = Mock()
41+
r.status_code = 500
42+
r.text = "internal server error"
43+
return r
44+
45+
46+
def _trigger_api_call(client: AtlanClient) -> None:
47+
"""Drive any outbound request through _call_api_internal."""
48+
try:
49+
client.asset.save(AtlasGlossary.creator(name="t"))
50+
except Exception:
51+
pass
52+
53+
54+
def _get_httpcore_pool(client: AtlanClient):
55+
transport = client._session._transport
56+
assert isinstance(transport, PyatlanSyncTransport)
57+
return transport._transport._pool
58+
59+
60+
# ---------------------------------------------------------------------------
61+
# Pool timeout
62+
# ---------------------------------------------------------------------------
63+
64+
65+
@patch.object(AtlanClient, "_session")
66+
def test_pool_timeout_is_30_seconds(mock_session, client):
67+
"""pool=30.0 must be set — the live deadlock showed pool=None caused infinite blocking."""
68+
mock_session.request.return_value = _error_response()
69+
_trigger_api_call(client)
70+
assert mock_session.request.called, "no HTTP request was made"
71+
timeout = mock_session.request.call_args.kwargs["timeout"]
72+
assert timeout.pool == 30.0
73+
74+
75+
@patch.object(AtlanClient, "_session")
76+
def test_pool_timeout_is_not_none(mock_session, client):
77+
"""pool timeout must never be None — threading.Event.wait(timeout=None) blocks forever."""
78+
mock_session.request.return_value = _error_response()
79+
_trigger_api_call(client)
80+
assert mock_session.request.called
81+
timeout = mock_session.request.call_args.kwargs["timeout"]
82+
assert timeout.pool is not None
83+
84+
85+
@patch.object(AtlanClient, "_session")
86+
def test_connect_timeout_unchanged(mock_session, client):
87+
"""connect timeout must still equal client.connect_timeout (default 30s)."""
88+
mock_session.request.return_value = _error_response()
89+
_trigger_api_call(client)
90+
assert mock_session.request.called
91+
timeout = mock_session.request.call_args.kwargs["timeout"]
92+
assert timeout.connect == client.connect_timeout
93+
94+
95+
@patch.object(AtlanClient, "_session")
96+
def test_read_timeout_unchanged(mock_session, client):
97+
"""read timeout must still equal client.read_timeout (default 900s)."""
98+
mock_session.request.return_value = _error_response()
99+
_trigger_api_call(client)
100+
assert mock_session.request.called
101+
timeout = mock_session.request.call_args.kwargs["timeout"]
102+
assert timeout.read == client.read_timeout
103+
104+
105+
@patch.object(AtlanClient, "_session")
106+
def test_pool_timeout_propagates_not_hangs(mock_session, client):
107+
"""httpx.PoolTimeout must propagate — if it were swallowed the workflow would still hang."""
108+
mock_session.request.side_effect = httpx.PoolTimeout(
109+
"connection pool exhausted", request=None
110+
)
111+
with pytest.raises(Exception):
112+
client.asset.save(AtlasGlossary.creator(name="t"))
113+
114+
115+
# ---------------------------------------------------------------------------
116+
# Transport limits
117+
# ---------------------------------------------------------------------------
118+
119+
120+
def test_transport_max_connections_is_50(client):
121+
"""max_connections=50 reduces blast radius when CLOSE_WAIT sockets accumulate."""
122+
assert _get_httpcore_pool(client)._max_connections == 50
123+
124+
125+
def test_transport_keepalive_expiry_is_30_seconds(client):
126+
"""keepalive_expiry=30.0 — client closes idle connections before nginx's 75s FIN."""
127+
assert _get_httpcore_pool(client)._keepalive_expiry == 30.0
128+
129+
130+
def test_transport_max_keepalive_connections_is_10(client):
131+
"""max_keepalive_connections=10 bounds idle connections held in the pool."""
132+
assert _get_httpcore_pool(client)._max_keepalive_connections == 10
133+
134+
135+
# ---------------------------------------------------------------------------
136+
# max_retries context manager
137+
# ---------------------------------------------------------------------------
138+
139+
140+
def _capture_transport_limits(client: AtlanClient) -> httpx.Limits:
141+
"""Enter max_retries, capture the limits used to construct the new transport."""
142+
captured: dict = {}
143+
original_init = PyatlanSyncTransport.__init__
144+
145+
def capturing_init(self_t, retry=None, client=None, **kwargs): # noqa: ARG001
146+
if "limits" in kwargs:
147+
captured["limits"] = kwargs["limits"]
148+
original_init(self_t, retry=retry, client=client, **kwargs)
149+
150+
with patch.object(PyatlanSyncTransport, "__init__", capturing_init):
151+
with client.max_retries():
152+
pass
153+
154+
assert "limits" in captured, "max_retries did not pass limits to PyatlanSyncTransport"
155+
return captured["limits"]
156+
157+
158+
def test_max_retries_transport_max_connections_is_50(client):
159+
"""max_retries context manager must use the same max_connections=50."""
160+
assert _capture_transport_limits(client).max_connections == 50
161+
162+
163+
def test_max_retries_transport_keepalive_expiry_is_30_seconds(client):
164+
"""max_retries context manager must use keepalive_expiry=30.0."""
165+
assert _capture_transport_limits(client).keepalive_expiry == 30.0
166+
167+
168+
def test_max_retries_transport_max_keepalive_connections_is_10(client):
169+
"""max_retries context manager must use max_keepalive_connections=10."""
170+
assert _capture_transport_limits(client).max_keepalive_connections == 10
171+
172+
173+
def test_max_retries_replaces_transport_inside_context(client):
174+
"""max_retries installs a fresh transport for the duration of the context."""
175+
original = client._session._transport
176+
with client.max_retries():
177+
assert client._session._transport is not original
178+
179+
180+
def test_max_retries_restores_original_transport_on_exit(client):
181+
"""max_retries restores the original transport when the context exits."""
182+
original = client._session._transport
183+
with client.max_retries():
184+
pass
185+
assert client._session._transport is original

0 commit comments

Comments
 (0)