@@ -71,6 +71,29 @@ async def test_pub_sub_broker(
7171 await broker .shutdown ()
7272
7373
74+ @pytest .mark .anyio
75+ async def test_pub_sub_broker_max_connections (
76+ valid_broker_message : BrokerMessage ,
77+ redis_url : str ,
78+ ) -> None :
79+ """Test PubSubBroker with connection limit set."""
80+ broker = PubSubBroker (
81+ url = redis_url ,
82+ queue_name = uuid .uuid4 ().hex ,
83+ max_connection_pool_size = 4 ,
84+ timeout = 1 ,
85+ )
86+ worker_tasks = [asyncio .create_task (get_message (broker )) for _ in range (3 )]
87+ await asyncio .sleep (0.3 )
88+
89+ await asyncio .gather (* [broker .kick (valid_broker_message ) for _ in range (50 )])
90+ await asyncio .sleep (0.3 )
91+
92+ for worker in worker_tasks :
93+ worker .cancel ()
94+ await broker .shutdown ()
95+
96+
7497@pytest .mark .anyio
7598async def test_list_queue_broker (
7699 valid_broker_message : BrokerMessage ,
@@ -98,6 +121,29 @@ async def test_list_queue_broker(
98121 await broker .shutdown ()
99122
100123
124+ @pytest .mark .anyio
125+ async def test_list_queue_broker_max_connections (
126+ valid_broker_message : BrokerMessage ,
127+ redis_url : str ,
128+ ) -> None :
129+ """Test ListQueueBroker with connection limit set."""
130+ broker = ListQueueBroker (
131+ url = redis_url ,
132+ queue_name = uuid .uuid4 ().hex ,
133+ max_connection_pool_size = 4 ,
134+ timeout = 1 ,
135+ )
136+ worker_tasks = [asyncio .create_task (get_message (broker )) for _ in range (3 )]
137+ await asyncio .sleep (0.3 )
138+
139+ await asyncio .gather (* [broker .kick (valid_broker_message ) for _ in range (50 )])
140+ await asyncio .sleep (0.3 )
141+
142+ for worker in worker_tasks :
143+ worker .cancel ()
144+ await broker .shutdown ()
145+
146+
101147@pytest .mark .anyio
102148async def test_list_queue_cluster_broker (
103149 valid_broker_message : BrokerMessage ,
0 commit comments