22from dataclasses import dataclass
33from enum import Enum
44import glob
5+ import logging
56from typing import Union
67from defs .common import strtoint
78import itertools
@@ -233,7 +234,18 @@ class protocol_settings:
233234 settings : dict [str , str ]
234235 ''' default settings provided by protocol json '''
235236
237+ byteorder : str = "big"
238+
239+ _log : logging .Logger = None
240+
241+
236242 def __init__ (self , protocol : str , settings_dir : str = 'protocols' ):
243+
244+ #apply log level to logger
245+ self ._log_level = getattr (logging , logging .getLevelName (logging .getLogger ().getEffectiveLevel ()), logging .INFO )
246+ self ._log : logging .Logger = logging .getLogger (__name__ )
247+ self ._log .setLevel (self ._log_level )
248+
237249 self .protocol = protocol
238250 self .settings_dir = settings_dir
239251
@@ -266,6 +278,8 @@ def __init__(self, protocol : str, settings_dir : str = 'protocols'):
266278 else :
267279 self .transport = "modbus_rtu"
268280
281+ if "byteorder" in self .settings : #handle byte order for ints n stuff
282+ self .byteorder = self .settings ["byteorder" ]
269283
270284 for registry_type in Registry_Type :
271285 self .load_registry_map (registry_type )
@@ -305,7 +319,7 @@ def load__json(self, file : str = '', settings_dir : str = ''):
305319
306320 #if path does not exist; nothing to load. skip.
307321 if not path :
308- print ("ERROR: '" + file + "' not found" )
322+ self . _log . error ("ERROR: '" + file + "' not found" )
309323 return
310324
311325 with open (path ) as f :
@@ -321,13 +335,13 @@ def load__json(self, file : str = '', settings_dir : str = ''):
321335
322336 def load__registry (self , path , registry_type : Registry_Type = Registry_Type .INPUT ) -> list [registry_map_entry ]:
323337 registry_map : list [registry_map_entry ] = []
324- register_regex = re .compile (r'(?P<register>(?:0?x[\dA-Z ]+|[\d]+))\.(b(?P<bit>x?\d{1,2})|(?P<byte>x?\d{1,2}))' )
338+ register_regex = re .compile (r'(?P<register>(?:0?x[\da-z ]+|[\d]+))\.(b(?P<bit>x?\d{1,2})|(?P<byte>x?\d{1,2}))' )
325339
326340 data_type_regex = re .compile (r'(?P<datatype>\w+)\.(?P<length>\d+)' )
327341
328- range_regex = re .compile (r'(?P<reverse>r|)(?P<start>(?:0?x[\dA-Z ]+|[\d]+))[\-~](?P<end>(?:0?x[\dA-Z ]+|[\d]+))' )
342+ range_regex = re .compile (r'(?P<reverse>r|)(?P<start>(?:0?x[\da-z ]+|[\d]+))[\-~](?P<end>(?:0?x[\da-z ]+|[\d]+))' )
329343 ascii_value_regex = re .compile (r'(?P<regex>^\[.+\]$)' )
330- list_regex = re .compile (r'\s*(?:(?P<range_start>(?:0?x[\dA-Z ]+|[\d]+))-(?P<range_end>(?:0?x[\dA-Z ]+|[\d]+))|(?P<element>[^,\s][^,]*?))\s*(?:,|$)' )
344+ list_regex = re .compile (r'\s*(?:(?P<range_start>(?:0?x[\da-z ]+|[\d]+))-(?P<range_end>(?:0?x[\da-z ]+|[\d]+))|(?P<element>[^,\s][^,]*?))\s*(?:,|$)' )
331345
332346
333347 if not os .path .exists (path ): #return empty is file doesnt exist.
@@ -386,7 +400,7 @@ def determine_delimiter(first_row) -> str:
386400 variable_name = variable_name = variable_name .strip ().lower ().replace (' ' , '_' ).replace ('__' , '_' ) #clean name
387401
388402 if re .search (r"[^a-zA-Z0-9\_]" , variable_name ) :
389- print ( "WARNING Invalid Name : " + str (variable_name ) + " reg: " + str (row ['register' ]) + " doc name: " + str (row ['documented name' ]) + " path: " + str (path ))
403+ self . _log . warning ( " Invalid Name : " + str (variable_name ) + " reg: " + str (row ['register' ]) + " doc name: " + str (row ['documented name' ]) + " path: " + str (path ))
390404
391405 #convert to float
392406 try :
@@ -402,7 +416,7 @@ def determine_delimiter(first_row) -> str:
402416
403417 if 'values' not in row :
404418 row ['values' ] = ""
405- print ( "WARNING No Value Column : path: " + str (path ))
419+ self . _log . warning ( " No Value Column : path: " + str (path ))
406420
407421 data_type_len : int = - 1
408422 #optional row, only needed for non-default data types
@@ -471,6 +485,7 @@ def determine_delimiter(first_row) -> str:
471485 register : int = - 1
472486 register_bit : int = 0
473487 register_byte : int = - 1
488+ row ['register' ] = row ['register' ].lower () #ensure is all lower case
474489 match = register_regex .search (row ['register' ])
475490 if match :
476491 register = strtoint (match .group ('register' ))
@@ -673,21 +688,25 @@ def load_registry_map(self, registry_type : Registry_Type, file : str = '', sett
673688 def process_register_bytes (self , registry : dict [int ,bytes ], entry : registry_map_entry ):
674689 ''' process bytes into data'''
675690
676- register = registry [entry .register ]
691+ if isinstance (registry [entry .register ], tuple ):
692+ register = registry [entry .register ][0 ] #can bus uses tuple for timestamp
693+ else :
694+ register = registry [entry .register ]
695+
677696 if entry .register_byte > 0 :
678697 register = register [entry .register_byte :]
679698
680699 if entry .data_type_size > 0 :
681700 register = register [:entry .data_type_size ]
682701
683702 if entry .data_type == Data_Type .UINT :
684- value = int .from_bytes (register [:4 ], byteorder = 'big' , signed = False )
703+ value = int .from_bytes (register [:4 ], byteorder = self . byteorder , signed = False )
685704 elif entry .data_type == Data_Type .INT :
686- value = int .from_bytes (register [:4 ], byteorder = 'big' , signed = True )
705+ value = int .from_bytes (register [:4 ], byteorder = self . byteorder , signed = True )
687706 elif entry .data_type == Data_Type .USHORT :
688- value = int .from_bytes (register [:2 ], byteorder = 'big' , signed = False )
707+ value = int .from_bytes (register [:2 ], byteorder = self . byteorder , signed = False )
689708 elif entry .data_type == Data_Type .SHORT :
690- value = int .from_bytes (register [:2 ], byteorder = 'big' , signed = True )
709+ value = int .from_bytes (register [:2 ], byteorder = self . byteorder , signed = True )
691710 elif entry .data_type == Data_Type ._16BIT_FLAGS or entry .data_type == Data_Type ._8BIT_FLAGS or entry .data_type == Data_Type ._32BIT_FLAGS :
692711 #16 bit flags
693712 start_bit : int = 0
@@ -700,18 +719,31 @@ def process_register_bytes(self, registry : dict[int,bytes], entry : registry_ma
700719 #handle custom sizes, less than 1 register
701720 end_bit = flag_size + start_bit
702721
703- if entry .documented_name + '_codes' in self .protocolSettings .codes :
722+ if entry .documented_name + '_codes' in self .codes :
723+ code_key : str = entry .documented_name + '_codes'
704724 flags : list [str ] = []
725+ flag_indexes : list [str ] = []
705726 for i in range (start_bit , end_bit ): # Iterate over each bit position (0 to 15)
706727 byte = i // 8
707728 bit = i % 8
708729 val = register [byte ]
709730 # Check if the i-th bit is set
710731 if (val >> bit ) & 1 :
711732 flag_index = "b" + str (i )
712- if flag_index in self .protocolSettings .codes [entry .documented_name + '_codes' ]:
713- flags .append (self .protocolSettings .codes [entry .documented_name + '_codes' ][flag_index ])
714-
733+ flag_indexes .append (flag_index )
734+ if flag_index in self .codes [code_key ]:
735+ flags .append (self .codes [code_key ][flag_index ])
736+
737+ #check multibit flags
738+ multibit_flags = [key for key in self .codes if '&' in key ]
739+
740+ if multibit_flags : #if multibit flags are found
741+ flag_indexes_set : set [str ] = set (flag_indexes )
742+ for multibit_flag in multibit_flags :
743+ bits = multibit_flag .split ('&' ) # Split key into 'bits'
744+ if all (bit in flag_indexes_set for bit in bits ): # Check if all bits are present in the flag_indexes_set
745+ flags .append (self .codes [code_key ][multibit_flag ])
746+
715747 value = "," .join (flags )
716748 else :
717749 flags : list [str ] = []
@@ -760,7 +792,23 @@ def process_register_bytes(self, registry : dict[int,bytes], entry : registry_ma
760792 try :
761793 value = register .decode ("utf-8" ) #convert bytes to ascii
762794 except UnicodeDecodeError as e :
763- print ("UnicodeDecodeError:" , e )
795+ self ._log .error ("UnicodeDecodeError:" , e )
796+
797+ #apply unit mod
798+ if entry .unit_mod != float (1 ):
799+ value = value * entry .unit_mod
800+
801+ #apply codes
802+ if (entry .data_type != Data_Type ._16BIT_FLAGS and
803+ entry .documented_name + '_codes' in self .codes ):
804+ try :
805+ cleanval = str (int (value ))
806+
807+ if cleanval in self .codes [entry .documented_name + '_codes' ]:
808+ value = self .codes [entry .documented_name + '_codes' ][cleanval ]
809+ except :
810+ #do nothing; try is for intval
811+ value = value
764812
765813 return value
766814
@@ -855,11 +903,11 @@ def process_register_ushort(self, registry : dict[int, int], entry : registry_ma
855903 bit_index = entry .register_bit
856904 value = (registry [entry .register ] >> bit_index ) & bit_mask
857905 elif entry .data_type == Data_Type .ASCII :
858- value = registry [entry .register ].to_bytes ((16 + 7 ) // 8 , byteorder = 'big' ) #convert to ushort to bytes
906+ value = registry [entry .register ].to_bytes ((16 + 7 ) // 8 , byteorder = self . byteorder ) #convert to ushort to bytes
859907 try :
860908 value = value .decode ("utf-8" ) #convert bytes to ascii
861909 except UnicodeDecodeError as e :
862- print ("UnicodeDecodeError:" , e )
910+ self . _log . error ("UnicodeDecodeError:" , e )
863911
864912 else : #default, Data_Type.USHORT
865913 value = float (registry [entry .register ])
0 commit comments