-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathVcanDecoder.py
More file actions
81 lines (75 loc) · 2.83 KB
/
VcanDecoder.py
File metadata and controls
81 lines (75 loc) · 2.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import can
import cantools
class VcanDecoder:
messages = []
engineDataset = {}
engineInfoDecoded = {}
UseDBC = False
MyDBCFile= 'EngineSimCanDecoder/engine_ecu.dbc'
DBCFile = None
def __init__(self, channel='vcan0'):
self.canbus = can.interface.Bus(channel=channel, interface='socketcan')
self.use_dbc_file(self=self, dbc_file_name=self.MyDBCFile)
def grab_data(self):
while True:
try:
message = self.canbus.recv()
except Exception as e:
print(f"Error receiving CAN message: {e}")
continue
if message is not None:
self.messages.append(message)
if len(self.messages) >= 100:
break
self.shutdown()
@staticmethod
def print_messages(messages):
for msg in messages:
print(msg)
def getKeys(self):
return self.engineDataset.keys()
def get_engine_dataset(self):
for msg in self.messages:
self.engineDataset[msg.arbitration_id] = msg.data
def get_engine_byteinfo(self):
for key, value in self.engineDataset.items():
byte_strings = [str(byte) for byte in value]
self.engineInfoDecoded[key] = ",".join(byte_strings)
print(self.engineInfoDecoded)
@staticmethod
def use_dbc_file(self, dbc_file_name):
if not dbc_file_name.endswith('.dbc'):
print("Invalid DBC file format.")
else:
try:
self.DBCFile = cantools.database.load_file(filename=dbc_file_name)
if self.DBCFile is None:
print("DBC support is not available in the current CAN library.")
self.UseDBC = False
else:
print(f"DBC file '{dbc_file_name}' loaded successfully.")
self.UseDBC = True
except Exception as e:
print(f"Error loading DBC file: {e}")
self.UseDBC = False
except:
print("An unexpected error occurred while loading the DBC file.")
self.UseDBC = False
def decode_message(self, message):
if self.UseDBC:
try:
decoded = self.DBCFile.decode_message(message.arbitration_id, message.data)
return decoded
except Exception as e:
print(f"Error decoding message with DBC: {e}")
return None
else:
print("DBC file not loaded. Cannot decode message.")
return None
def shutdown(self):
self.canbus.shutdown()
VcanDecoder().grab_data()
VcanDecoder().get_engine_dataset()
VcanDecoder().get_engine_byteinfo()
VcanDecoder().print_messages(VcanDecoder().decode_message(msg) for msg in VcanDecoder().messages)
VcanDecoder().shutdown()