@@ -106,18 +106,16 @@ def __init__(self, kafka_configs: KafkaConfig | None = None) -> None:
106106 configs : KafkaConfig = kafka_configs or BaseConfig .global_config ().KAFKA
107107 try :
108108 broker_list_csv = "," .join (configs .BROKERS_LIST )
109- config = {"bootstrap.servers" : broker_list_csv }
109+ config : dict [ str , str | int | float ] = {"bootstrap.servers" : broker_list_csv }
110110 if configs .USERNAME and configs .PASSWORD and configs .SSL_CA_FILE :
111- config |= {
112- "sasl.username" : configs .USERNAME ,
113- "sasl.password" : configs .PASSWORD .get_secret_value (),
114- "security.protocol" : configs .SECURITY_PROTOCOL ,
115- "sasl.mechanism" : configs .SASL_MECHANISM ,
116- "ssl.ca.location" : configs .SSL_CA_FILE ,
117- "ssl.certificate.location" : configs .SSL_CERT_FILE ,
118- "ssl.key.location" : configs .SSL_KEY_FILE ,
119- "ssl.endpoint.identification.algorithm" : "none" ,
120- }
111+ config ["sasl.username" ] = configs .USERNAME
112+ config ["sasl.password" ] = configs .PASSWORD .get_secret_value ()
113+ config ["security.protocol" ] = configs .SECURITY_PROTOCOL
114+ config ["sasl.mechanism" ] = configs .SASL_MECHANISM or ""
115+ config ["ssl.ca.location" ] = configs .SSL_CA_FILE
116+ config ["ssl.certificate.location" ] = configs .SSL_CERT_FILE or ""
117+ config ["ssl.key.location" ] = configs .SSL_KEY_FILE or ""
118+ config ["ssl.endpoint.identification.algorithm" ] = "none"
121119 self .adapter : AdminClient = AdminClient (config )
122120 except Exception as e :
123121 self ._handle_kafka_exception (e , "KafkaAdmin_init" )
@@ -249,7 +247,7 @@ def _get_adapter(cls, group_id: str, configs: KafkaConfig) -> Consumer:
249247 """
250248 try :
251249 broker_list_csv = "," .join (configs .BROKERS_LIST )
252- config = {
250+ config : dict [ str , str | int | float ] = {
253251 "bootstrap.servers" : broker_list_csv ,
254252 "group.id" : group_id ,
255253 "session.timeout.ms" : configs .SESSION_TIMEOUT_MS ,
@@ -264,16 +262,14 @@ def _get_adapter(cls, group_id: str, configs: KafkaConfig) -> Consumer:
264262 "max.partition.fetch.bytes" : configs .MAX_PARTITION_FETCH_BYTES ,
265263 }
266264 if configs .USERNAME and configs .PASSWORD and configs .SSL_CA_FILE :
267- config |= {
268- "sasl.username" : configs .USERNAME ,
269- "sasl.password" : configs .PASSWORD .get_secret_value (),
270- "security.protocol" : configs .SECURITY_PROTOCOL ,
271- "sasl.mechanism" : configs .SASL_MECHANISM ,
272- "ssl.ca.location" : configs .SSL_CA_FILE ,
273- "ssl.certificate.location" : configs .SSL_CERT_FILE ,
274- "ssl.key.location" : configs .SSL_KEY_FILE ,
275- "ssl.endpoint.identification.algorithm" : "none" ,
276- }
265+ config ["sasl.username" ] = configs .USERNAME
266+ config ["sasl.password" ] = configs .PASSWORD .get_secret_value ()
267+ config ["security.protocol" ] = configs .SECURITY_PROTOCOL
268+ config ["sasl.mechanism" ] = configs .SASL_MECHANISM or ""
269+ config ["ssl.ca.location" ] = configs .SSL_CA_FILE
270+ config ["ssl.certificate.location" ] = configs .SSL_CERT_FILE or ""
271+ config ["ssl.key.location" ] = configs .SSL_KEY_FILE or ""
272+ config ["ssl.endpoint.identification.algorithm" ] = "none"
277273 consumer = Consumer (config )
278274 except Exception as e :
279275 cls ._handle_kafka_exception (e , "KafkaConsumer_init" )
@@ -445,7 +441,7 @@ def _get_adapter(cls, configs: KafkaConfig) -> Producer:
445441 """
446442 try :
447443 broker_list_csv = "," .join (configs .BROKERS_LIST )
448- config = {
444+ config : dict [ str , str | int | float ] = {
449445 "bootstrap.servers" : broker_list_csv ,
450446 "linger.ms" : configs .LINGER_MS ,
451447 "batch.size" : configs .BATCH_SIZE ,
@@ -462,16 +458,14 @@ def _get_adapter(cls, configs: KafkaConfig) -> Producer:
462458 if configs .TRANSACTIONAL_ID :
463459 config ["transactional.id" ] = configs .TRANSACTIONAL_ID
464460 if configs .USERNAME and configs .PASSWORD and configs .SSL_CA_FILE :
465- config |= {
466- "sasl.username" : configs .USERNAME ,
467- "sasl.password" : configs .PASSWORD .get_secret_value (),
468- "security.protocol" : configs .SECURITY_PROTOCOL ,
469- "sasl.mechanism" : configs .SASL_MECHANISM ,
470- "ssl.ca.location" : configs .SSL_CA_FILE ,
471- "ssl.certificate.location" : configs .SSL_CERT_FILE ,
472- "ssl.key.location" : configs .SSL_KEY_FILE ,
473- "ssl.endpoint.identification.algorithm" : "none" ,
474- }
461+ config ["sasl.username" ] = configs .USERNAME
462+ config ["sasl.password" ] = configs .PASSWORD .get_secret_value ()
463+ config ["security.protocol" ] = configs .SECURITY_PROTOCOL
464+ config ["sasl.mechanism" ] = configs .SASL_MECHANISM or ""
465+ config ["ssl.ca.location" ] = configs .SSL_CA_FILE
466+ config ["ssl.certificate.location" ] = configs .SSL_CERT_FILE or ""
467+ config ["ssl.key.location" ] = configs .SSL_KEY_FILE or ""
468+ config ["ssl.endpoint.identification.algorithm" ] = "none"
475469 producer = Producer (config )
476470 except Exception as e :
477471 cls ._handle_kafka_exception (e , "KafkaProducer_init" )
0 commit comments