|
1 | 1 | # features/steps/kafka_steps.py |
| 2 | +import time |
2 | 3 | from behave import given, then, when |
3 | 4 | from confluent_kafka import TopicPartition |
4 | 5 | from features.test_helpers import get_current_scenario_context |
@@ -57,6 +58,21 @@ def get_kafka_consumer_adapter(context, topic_name, group_id): |
57 | 58 | setattr(scenario_context, consumer_key, consumer) |
58 | 59 | return getattr(scenario_context, consumer_key) |
59 | 60 |
|
| 61 | +def wait_for_topic_condition(adapter, condition_func, topic_name, max_retries=5, initial_delay=0.5): |
| 62 | + """Helper function to wait for a topic condition with retries.""" |
| 63 | + delay = initial_delay |
| 64 | + for attempt in range(max_retries): |
| 65 | + try: |
| 66 | + topics = adapter.list_topics(timeout=2).topics |
| 67 | + if condition_func(topic_name, topics): |
| 68 | + return True |
| 69 | + except Exception as e: |
| 70 | + if attempt == max_retries - 1: |
| 71 | + raise e |
| 72 | + if attempt < max_retries - 1: |
| 73 | + time.sleep(delay) |
| 74 | + delay *= 1.5 |
| 75 | + return False |
60 | 76 |
|
61 | 77 | # Given steps |
62 | 78 | @given("a configured Kafka admin adapter") |
@@ -174,37 +190,28 @@ def step_delete_topic(context, topic_name): |
174 | 190 | @then('the topic "{topic_name}" should exist') |
175 | 191 | def step_topic_should_exist(context, topic_name): |
176 | 192 | adapter = get_kafka_admin_adapter(context) |
177 | | - try: |
178 | | - topics = adapter.list_topics(timeout=1).topics |
179 | | - assert topic_name in topics, f"Topic '{topic_name}' does not exist" |
| 193 | + if wait_for_topic_condition(adapter, lambda name, topics: name in topics, topic_name): |
180 | 194 | context.logger.info(f"Verified topic '{topic_name}' exists") |
181 | | - except Exception as e: |
182 | | - context.logger.exception(f"Failed to verify topic existence: {str(e)}") |
183 | | - raise |
| 195 | + else: |
| 196 | + raise AssertionError(f"Topic '{topic_name}' does not exist after retries") |
184 | 197 |
|
185 | 198 |
|
186 | 199 | @then('the topic "{topic_name}" should not exist') |
187 | 200 | def step_topic_should_not_exist(context, topic_name): |
188 | 201 | adapter = get_kafka_admin_adapter(context) |
189 | | - try: |
190 | | - topics = adapter.list_topics(timeout=1).topics |
191 | | - assert topic_name not in topics, f"Topic '{topic_name}' still exists" |
| 202 | + if wait_for_topic_condition(adapter, lambda name, topics: name not in topics, topic_name): |
192 | 203 | context.logger.info(f"Verified topic '{topic_name}' does not exist") |
193 | | - except Exception as e: |
194 | | - context.logger.exception(f"Failed to verify topic non-existence: {str(e)}") |
195 | | - raise |
| 204 | + else: |
| 205 | + raise AssertionError(f"Topic '{topic_name}' still exists after retries") |
196 | 206 |
|
197 | 207 |
|
198 | 208 | @then('the topic list should include "{topic_name}"') |
199 | 209 | def step_topic_list_includes(context, topic_name): |
200 | 210 | adapter = get_kafka_admin_adapter(context) |
201 | | - try: |
202 | | - topics = adapter.list_topics(timeout=1).topics |
203 | | - assert topic_name in topics, f"Topic '{topic_name}' not in topic list" |
| 211 | + if wait_for_topic_condition(adapter, lambda name, topics: name in topics, topic_name): |
204 | 212 | context.logger.info(f"Verified '{topic_name}' in topic list") |
205 | | - except Exception as e: |
206 | | - context.logger.exception(f"Failed to verify topic list: {str(e)}") |
207 | | - raise |
| 213 | + else: |
| 214 | + raise AssertionError(f"Topic '{topic_name}' not in topic list after retries") |
208 | 215 |
|
209 | 216 |
|
210 | 217 | @then('the consumer should receive message "{expected_message}" from topic "{topic_name}"') |
|
0 commit comments