Skip to content

Commit dd57d3f

Browse files
Merge pull request #92 from miladmahmoodi/features/kafka-producer-args-support
feat: add *args, **kwargs to support to Kafka producer
2 parents 2811e2d + 375f19f commit dd57d3f

4 files changed

Lines changed: 23 additions & 2 deletions

File tree

archipy/adapters/kafka/adapters.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -507,11 +507,12 @@ def _delivery_callback(error: KafkaError | None, message: Message) -> None:
507507
)
508508

509509
@override
510-
def produce(self, message: str | bytes) -> None:
510+
def produce(self, message: str | bytes, key: str | None = None) -> None:
511511
"""Produces a message to the configured topic.
512512
513513
Args:
514514
message (str | bytes): The message to produce.
515+
key (str | None, optional): The key for the message. Defaults to None.
515516
516517
Raises:
517518
NetworkError: If there is a network error producing the message.
@@ -524,6 +525,7 @@ def produce(self, message: str | bytes) -> None:
524525
topic=self._topic_name,
525526
value=processed_message,
526527
callback=self._delivery_callback,
528+
key=key,
527529
)
528530
except Exception as e:
529531
self._handle_producer_exception(e, "produce")

archipy/adapters/kafka/ports.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,11 +140,12 @@ class KafkaProducerPort:
140140
"""
141141

142142
@abstractmethod
143-
def produce(self, message: str | bytes) -> None:
143+
def produce(self, message: str | bytes, key: str | None = None) -> None:
144144
"""Produces a message to the configured topic.
145145
146146
Args:
147147
message (str | bytes): The message to produce.
148+
key (str | None, optional): The key for the message. Defaults to None.
148149
149150
Raises:
150151
NotImplementedError: If the method is not implemented by the concrete class.

features/kafka_adapters.feature

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,12 @@ Feature: Kafka Adapter Operations Testing
2424
When I validate the producer health
2525
Then the producer health check should pass
2626

27+
Scenario: Produce message with additional parameters
28+
Given a Kafka producer for topic "test-topic"
29+
And a Kafka consumer subscribed to topic "test-topic" with group "test-group"
30+
When I produce one message "Hello Kafka with key" with key "test-key" to topic "test-topic"
31+
Then the consumer should receive message "Hello Kafka with key" from topic "test-topic"
32+
2733
Scenario: Delete a topic
2834
Given a topic named "test-topic-deletable" exists
2935
When I delete the topic "test-topic-deletable"

features/steps/kafka_adapter_steps.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,18 @@ def step_produce_message(context, message, topic_name):
160160
raise e
161161

162162

163+
@when('I produce one message "{message}" with key "{key}" to topic "{topic_name}"')
164+
def step_produce_message_with_key(context, message, key, topic_name):
165+
adapter = get_kafka_producer_adapter(context, topic_name)
166+
try:
167+
adapter.produce(message, key=key)
168+
adapter.flush(timeout=1)
169+
context.logger.info(f"Produced message '{message}' to '{topic_name}' with key '{key}'")
170+
except Exception as e:
171+
context.logger.exception(f"Failed to produce message with key: {str(e)}")
172+
raise e
173+
174+
163175
@when("I validate the producer health")
164176
def step_validate_health(context):
165177
scenario_context = get_current_scenario_context(context)

0 commit comments

Comments
 (0)