|
10 | 10 | ListQueueClusterBroker, |
11 | 11 | ListQueueSentinelBroker, |
12 | 12 | PubSubBroker, |
| 13 | + PubSubSentinelBroker, |
13 | 14 | ) |
14 | 15 |
|
15 | 16 |
|
@@ -176,6 +177,37 @@ async def test_list_queue_cluster_broker( |
176 | 177 | await broker.shutdown() |
177 | 178 |
|
178 | 179 |
|
| 180 | +@pytest.mark.anyio |
| 181 | +async def test_pub_sub_sentinel_broker( |
| 182 | + valid_broker_message: BrokerMessage, |
| 183 | + redis_sentinels: List[Tuple[str, int]], |
| 184 | + redis_sentinel_master_name: str, |
| 185 | +) -> None: |
| 186 | + """ |
| 187 | + Test that messages are published and read correctly by PubSubBroker. |
| 188 | +
|
| 189 | + We create two workers that listen and send a message to them. |
| 190 | + Expect both workers to receive the same message we sent. |
| 191 | + """ |
| 192 | + broker = PubSubSentinelBroker( |
| 193 | + sentinels=redis_sentinels, |
| 194 | + master_name=redis_sentinel_master_name, |
| 195 | + queue_name=uuid.uuid4().hex, |
| 196 | + ) |
| 197 | + worker1_task = asyncio.create_task(get_message(broker)) |
| 198 | + worker2_task = asyncio.create_task(get_message(broker)) |
| 199 | + await asyncio.sleep(0.3) |
| 200 | + |
| 201 | + await broker.kick(valid_broker_message) |
| 202 | + await asyncio.sleep(0.3) |
| 203 | + |
| 204 | + message1 = worker1_task.result() |
| 205 | + message2 = worker2_task.result() |
| 206 | + assert message1 == valid_broker_message.message |
| 207 | + assert message1 == message2 |
| 208 | + await broker.shutdown() |
| 209 | + |
| 210 | + |
179 | 211 | @pytest.mark.anyio |
180 | 212 | async def test_list_queue_sentinel_broker( |
181 | 213 | valid_broker_message: BrokerMessage, |
|
0 commit comments