2929from enum import Enum
3030from typing import Any , Callable
3131import textwrap
32-
32+ import threading
3333import serial
3434
3535from .schedular import Schedular
@@ -135,6 +135,7 @@ class Sim767x:
135135 _message_send_id = - 1
136136 _last_command = "" # ??? is this being used anymore
137137 _messages_dict = {}
138+ _messages_dict_lock = threading .Lock ()
138139
139140 _username = ""
140141 _password = ""
@@ -168,7 +169,7 @@ def __init__(self, serial_port: str, network: str, apn: str, baud_rate: int):
168169 self .start_serial ()
169170 self ._serial_at .flush ()
170171 self ._sched = Schedular ("LTE Connection Schedular" )
171- self ._sched .add_timer (0.001 , 0.0 , self ._run_processing )
172+ self ._sched .add_timer (0.01 , 0.0 , self ._run_processing )
172173 self ._sched .add_timer (1.0 , 0.25 , self ._run_one_second_module_tasks )
173174 self ._sched .add_timer (1.0 , 0.5 , self ._run_one_second_mqtt_tasks )
174175 self ._sched .add_timer (1.0 , 0.75 , self ._run_one_second_gnss_tasks )
@@ -280,10 +281,11 @@ def subscribe(self, topic: str):
280281
281282 def publish_message (self , topic : str , message : str ):
282283 """Publis a message"""
283- if topic not in self ._messages_dict :
284- self ._messages_dict [topic ] = message
285- else :
286- self ._messages_dict [topic ] += message
284+ with self ._messages_dict_lock :
285+ if topic not in self ._messages_dict :
286+ self ._messages_dict [topic ] = message
287+ else :
288+ self ._messages_dict [topic ] += message
287289
288290 def _protected_at_cmd (self , cmd : str , on_ok_callback : Callable [[], None ], on_enter_callback : Callable [[], None ] | None , timeout_s : int = 10 ):
289291 if not self ._run_at_callbacks :
@@ -325,9 +327,11 @@ def _run_processing(self, cookie: Any):
325327 if not self ._mqtt_is_subscribing and not self ._mqtt_is_publishing and not self ._incoming_message : # Wait for any received message being downloaded.
326328 if self ._sub_topic :
327329 self ._mqtt_req_subscribe ()
328- elif self ._messages_dict :
329- d_topic = list (self ._messages_dict .keys ())[0 ]
330- self ._mqtt_request_publish (d_topic , self ._messages_dict [d_topic ])
330+ if self ._messages_dict :
331+ with self ._messages_dict_lock :
332+ d_topic = list (self ._messages_dict .keys ())[0 ]
333+ self ._mqtt_request_publish (d_topic , self ._messages_dict [d_topic ])
334+ del self ._messages_dict [d_topic ]
331335 return True
332336
333337 def _process_at_commands (self ):
@@ -482,8 +486,9 @@ def _process_at_commands(self):
482486 if len (result_arr ) >= 2 :
483487 error = int (result_arr [1 ])
484488 if error == 0 :
485- if self ._pub_topic in self ._messages_dict :
486- del self ._messages_dict [self ._pub_topic ]
489+ with self ._messages_dict_lock :
490+ if self ._pub_topic in self ._messages_dict :
491+ del self ._messages_dict [self ._pub_topic ]
487492 if self .mqtt_is_finished ():
488493 self ._pub_topic = ""
489494 self ._tx_message = ""
@@ -759,7 +764,6 @@ def _mqtt_request_publish(self, topic: str, message: str):
759764 else :
760765 self ._tx_message = message
761766
762- self ._tx_message = message
763767 if self ._tx_message :
764768 if self ._lte_state == LteState .MODULE_SHUTTING_DOWN :
765769 self ._shut_down_timer_s = 0 # Reset shutdown timer as there is a message to send
@@ -785,10 +789,9 @@ def _mqtt_publish(self):
785789 def mqtt_is_finished (self ):
786790 if len (self ._tx_lines ) > 0 :
787791 self .tx_message = self ._tx_lines .pop (0 )
788- self ._protected_at_cmd (f"CMQTTTOPIC=0,{ str (len (self ._pub_topic ))} " , lambda : self ._mqtt_request_payload (), lambda : self ._mqtt_enter_pub_topic () ) # clientIndex = 0
792+ self ._protected_at_cmd (f"CMQTTTOPIC=0,{ str (len (self ._pub_topic ))} " , self ._mqtt_request_payload , self ._mqtt_enter_pub_topic ) # clientIndex = 0
789793 return False
790- else :
791- return True
794+ return True
792795
793796# --------- GNSS ----------
794797 def gnss_start (self , interval : int , one_shot : bool ):
0 commit comments