Skip to content

Commit 375f19f

Browse files
committed
feat: add key parameter to Kafka producer
1 parent 88897fb commit 375f19f

2 files changed

Lines changed: 5 additions & 8 deletions

File tree

archipy/adapters/kafka/adapters.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -507,13 +507,12 @@ def _delivery_callback(error: KafkaError | None, message: Message) -> None:
507507
)
508508

509509
@override
510-
def produce(self, message: str | bytes, *args, **kwargs) -> None:
510+
def produce(self, message: str | bytes, key: str | None = None) -> None:
511511
"""Produces a message to the configured topic.
512512
513513
Args:
514514
message (str | bytes): The message to produce.
515-
*args: Additional positional arguments passed to the underlying producer.
516-
**kwargs: Additional keyword arguments passed to the underlying producer.
515+
key (str | None, optional): The key for the message. Defaults to None.
517516
518517
Raises:
519518
NetworkError: If there is a network error producing the message.
@@ -523,11 +522,10 @@ def produce(self, message: str | bytes, *args, **kwargs) -> None:
523522
try:
524523
processed_message = self._pre_process_message(message)
525524
self._adapter.produce(
526-
*args,
527525
topic=self._topic_name,
528526
value=processed_message,
529527
callback=self._delivery_callback,
530-
**kwargs,
528+
key=key,
531529
)
532530
except Exception as e:
533531
self._handle_producer_exception(e, "produce")

archipy/adapters/kafka/ports.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -140,13 +140,12 @@ class KafkaProducerPort:
140140
"""
141141

142142
@abstractmethod
143-
def produce(self, message: str | bytes, *args, **kwargs) -> None:
143+
def produce(self, message: str | bytes, key: str | None = None) -> None:
144144
"""Produces a message to the configured topic.
145145
146146
Args:
147147
message (str | bytes): The message to produce.
148-
*args: Additional positional arguments passed to the underlying producer.
149-
**kwargs: Additional keyword arguments passed to the underlying producer.
148+
key (str | None, optional): The key for the message. Defaults to None.
150149
151150
Raises:
152151
NotImplementedError: If the method is not implemented by the concrete class.

0 commit comments

Comments
 (0)