Skip to content

Commit 88897fb

Browse files
committed
feat: add *args, **kwargs to support to Kafka producer
1 parent 6a7d82b commit 88897fb

4 files changed

Lines changed: 26 additions & 2 deletions

File tree

archipy/adapters/kafka/adapters.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -507,11 +507,13 @@ def _delivery_callback(error: KafkaError | None, message: Message) -> None:
507507
)
508508

509509
@override
510-
def produce(self, message: str | bytes) -> None:
510+
def produce(self, message: str | bytes, *args, **kwargs) -> None:
511511
"""Produces a message to the configured topic.
512512
513513
Args:
514514
message (str | bytes): The message to produce.
515+
*args: Additional positional arguments passed to the underlying producer.
516+
**kwargs: Additional keyword arguments passed to the underlying producer.
515517
516518
Raises:
517519
NetworkError: If there is a network error producing the message.
@@ -521,9 +523,11 @@ def produce(self, message: str | bytes) -> None:
521523
try:
522524
processed_message = self._pre_process_message(message)
523525
self._adapter.produce(
526+
*args,
524527
topic=self._topic_name,
525528
value=processed_message,
526529
callback=self._delivery_callback,
530+
**kwargs,
527531
)
528532
except Exception as e:
529533
self._handle_producer_exception(e, "produce")

archipy/adapters/kafka/ports.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,11 +140,13 @@ class KafkaProducerPort:
140140
"""
141141

142142
@abstractmethod
143-
def produce(self, message: str | bytes) -> None:
143+
def produce(self, message: str | bytes, *args, **kwargs) -> None:
144144
"""Produces a message to the configured topic.
145145
146146
Args:
147147
message (str | bytes): The message to produce.
148+
*args: Additional positional arguments passed to the underlying producer.
149+
**kwargs: Additional keyword arguments passed to the underlying producer.
148150
149151
Raises:
150152
NotImplementedError: If the method is not implemented by the concrete class.

features/kafka_adapters.feature

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,12 @@ Feature: Kafka Adapter Operations Testing
2424
When I validate the producer health
2525
Then the producer health check should pass
2626

27+
Scenario: Produce message with additional parameters
28+
Given a Kafka producer for topic "test-topic"
29+
And a Kafka consumer subscribed to topic "test-topic" with group "test-group"
30+
When I produce one message "Hello Kafka with key" with key "test-key" to topic "test-topic"
31+
Then the consumer should receive message "Hello Kafka with key" from topic "test-topic"
32+
2733
Scenario: Delete a topic
2834
Given a topic named "test-topic-deletable" exists
2935
When I delete the topic "test-topic-deletable"

features/steps/kafka_adapter_steps.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,18 @@ def step_produce_message(context, message, topic_name):
160160
raise e
161161

162162

163+
@when('I produce one message "{message}" with key "{key}" to topic "{topic_name}"')
164+
def step_produce_message_with_key(context, message, key, topic_name):
165+
adapter = get_kafka_producer_adapter(context, topic_name)
166+
try:
167+
adapter.produce(message, key=key)
168+
adapter.flush(timeout=1)
169+
context.logger.info(f"Produced message '{message}' to '{topic_name}' with key '{key}'")
170+
except Exception as e:
171+
context.logger.exception(f"Failed to produce message with key: {str(e)}")
172+
raise e
173+
174+
163175
@when("I validate the producer health")
164176
def step_validate_health(context):
165177
scenario_context = get_current_scenario_context(context)

0 commit comments

Comments
 (0)