Skip to content

Commit 92cf89d

Browse files
author
s.kazemi
committed
fix: kafka connection issues
1 parent 80ea2fa commit 92cf89d

2 files changed

Lines changed: 25 additions & 10 deletions

File tree

features/steps/kafka_adapter_steps.py

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,14 @@ def get_kafka_admin_adapter(context):
1111
"""Get or initialize the Kafka admin adapter."""
1212
scenario_context = get_current_scenario_context(context)
1313
if not hasattr(scenario_context, "admin_adapter") or scenario_context.admin_adapter is None:
14-
test_config = scenario_context.get("test_config")
15-
context.logger.info("Initializing Kafka admin adapter")
16-
scenario_context.admin_adapter = KafkaAdminAdapter(test_config.KAFKA)
14+
# Get the updated configuration from the running container
15+
test_containers = scenario_context.get("test_containers")
16+
kafka_container = test_containers.get_container("kafka")
17+
18+
# Use the configuration from the running container
19+
kafka_config = kafka_container.config
20+
21+
scenario_context.admin_adapter = KafkaAdminAdapter(kafka_config)
1722
return scenario_context.admin_adapter
1823

1924

@@ -24,9 +29,14 @@ def get_kafka_producer_adapter(context, topic_name):
2429
not hasattr(scenario_context, f"producer_{topic_name}")
2530
or getattr(scenario_context, f"producer_{topic_name}") is None
2631
):
27-
test_config = scenario_context.get("test_config")
28-
context.logger.info(f"Initializing Kafka producer for topic: {topic_name}")
29-
producer = KafkaProducerAdapter(topic_name, kafka_configs=test_config.KAFKA)
32+
# Get the updated configuration from the running container
33+
test_containers = scenario_context.get("test_containers")
34+
kafka_container = test_containers.get_container("kafka")
35+
36+
# Use the configuration from the running container
37+
kafka_config = kafka_container.config
38+
39+
producer = KafkaProducerAdapter(topic_name, kafka_configs=kafka_config)
3040
setattr(scenario_context, f"producer_{topic_name}", producer)
3141
return getattr(scenario_context, f"producer_{topic_name}")
3242

@@ -36,9 +46,14 @@ def get_kafka_consumer_adapter(context, topic_name, group_id):
3646
scenario_context = get_current_scenario_context(context)
3747
consumer_key = f"consumer_{topic_name}_{group_id}"
3848
if not hasattr(scenario_context, consumer_key) or getattr(scenario_context, consumer_key) is None:
39-
test_config = scenario_context.get("test_config")
40-
context.logger.info(f"Initializing Kafka consumer for topic: {topic_name}, group: {group_id}")
41-
consumer = KafkaConsumerAdapter(group_id=group_id, topic_list=[topic_name], kafka_configs=test_config.KAFKA)
49+
# Get the updated configuration from the running container
50+
test_containers = scenario_context.get("test_containers")
51+
kafka_container = test_containers.get_container("kafka")
52+
53+
# Use the configuration from the running container
54+
kafka_config = kafka_container.config
55+
56+
consumer = KafkaConsumerAdapter(group_id=group_id, topic_list=[topic_name], kafka_configs=kafka_config)
4257
setattr(scenario_context, consumer_key, consumer)
4358
return getattr(scenario_context, consumer_key)
4459

features/test_containers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,7 @@ def start(self) -> KafkaContainer:
356356
# Set container properties from running container
357357
self.host = self._container.get_container_host_ip()
358358
self.bootstrap_servers = self._container.get_bootstrap_server()
359-
self.config.BROKERS_LIST = self.bootstrap_servers
359+
self.config.BROKERS_LIST = [self.bootstrap_servers]
360360

361361
logger.info("Kafka container started on %s:%s", self.host, self.port)
362362
logger.info("Bootstrap servers: %s", self.bootstrap_servers)

0 commit comments

Comments
 (0)