66import logging
77import threading
88from typing import Callable , Dict , Iterable , List , Optional , Union , TYPE_CHECKING
9+ import asyncio
910
1011if TYPE_CHECKING :
1112 from can import BusABC , Notifier
13+ from asyncio import AbstractEventLoop
1214
1315try :
1416 import can
3638class Network (MutableMapping ):
3739 """Representation of one CAN bus containing one or more nodes."""
3840
39- def __init__ (self , bus = None ):
41+ def __init__ (
42+ self ,
43+ bus : Optional [BusABC ] = None ,
44+ loop : Optional [AbstractEventLoop ] = None
45+ ):
4046 """
4147 :param can.BusABC bus:
4248 A python-can bus instance to re-use.
4349 """
4450 #: A python-can :class:`can.BusABC` instance which is set after
4551 #: :meth:`canopen.Network.connect` is called
4652 self .bus : Optional [BusABC ] = bus
53+ self .loop : Optional [asyncio .AbstractEventLoop ] = loop
4754 #: A :class:`~canopen.network.NodeScanner` for detecting nodes
4855 self .scanner = NodeScanner (self )
4956 #: List of :class:`can.Listener` objects.
@@ -52,15 +59,19 @@ def __init__(self, bus=None):
5259 self .notifier : Optional [Notifier ] = None
5360 self .nodes : Dict [int , Union [RemoteNode , LocalNode ]] = {}
5461 self .subscribers : Dict [int , List [Callback ]] = {}
55- self .send_lock = threading .Lock ()
62+ self .send_lock = threading .Lock () # FIXME Async
5663 self .sync = SyncProducer (self )
5764 self .time = TimeProducer (self )
5865 self .nmt = NmtMaster (0 )
5966 self .nmt .network = self
6067
6168 self .lss = LssMaster ()
6269 self .lss .network = self
63- self .subscribe (self .lss .LSS_RX_COBID , self .lss .on_message_received )
70+
71+ if self .loop :
72+ self .subscribe (self .lss .LSS_RX_COBID , self .lss .aon_message_received )
73+ else :
74+ self .subscribe (self .lss .LSS_RX_COBID , self .lss .on_message_received )
6475
6576 def subscribe (self , can_id : int , callback : Callback ) -> None :
6677 """Listen for messages with a specific CAN ID.
@@ -119,8 +130,9 @@ def connect(self, *args, **kwargs) -> "Network":
119130 kwargs_notifier = {}
120131 if "loop" in kwargs :
121132 kwargs_notifier ["loop" ] = kwargs ["loop" ]
133+ self .loop = kwargs ["loop" ]
122134 del kwargs ["loop" ]
123- self .bus = can .interface . Bus (* args , ** kwargs )
135+ self .bus = can .Bus (* args , ** kwargs )
124136 logger .info ("Connected to '%s'" , self .bus .channel_info )
125137 self .notifier = can .Notifier (self .bus , self .listeners , 1 , ** kwargs_notifier )
126138 return self
@@ -220,7 +232,9 @@ def send_message(self, can_id: int, data: bytes, remote: bool = False) -> None:
220232 arbitration_id = can_id ,
221233 data = data ,
222234 is_remote_frame = remote )
223- with self .send_lock :
235+ # NOTE: This lock is ok for async, because ther is only one thread
236+ # calling this function when using async, so it'll never lock.
237+ with self .send_lock : # FIXME: Blocking
224238 self .bus .send (msg )
225239 self .check ()
226240
@@ -256,10 +270,13 @@ def notify(self, can_id: int, data: bytearray, timestamp: float) -> None:
256270 :param timestamp:
257271 Timestamp of the message, preferably as a Unix timestamp
258272 """
259- if can_id in self .subscribers :
260- callbacks = self .subscribers [can_id ]
273+ # NOTE: Callback. Will be called from another thread
274+ callbacks = self .subscribers .get (can_id )
275+ if callbacks is not None :
261276 for callback in callbacks :
262- callback (can_id , data , timestamp )
277+ res = callback (can_id , data , timestamp )
278+ if res is not None and self .loop is not None and asyncio .iscoroutine (res ):
279+ self .loop .create_task (res )
263280 self .scanner .on_message_received (can_id )
264281
265282 def check (self ) -> None :
@@ -360,6 +377,7 @@ def __init__(self, network: Network):
360377 self .network = network
361378
362379 def on_message_received (self , msg ):
380+ # NOTE: Callback. Will be called from another thread
363381 if msg .is_error_frame or msg .is_remote_frame :
364382 return
365383
@@ -394,9 +412,11 @@ def __init__(self, network: Optional[Network] = None):
394412 self .nodes : List [int ] = []
395413
396414 def on_message_received (self , can_id : int ):
415+ # NOTE: Callback. Will be called from another thread
397416 service = can_id & 0x780
398417 node_id = can_id & 0x7F
399418 if node_id not in self .nodes and node_id != 0 and service in self .SERVICES :
419+ # NOTE: Assume this is thread-safe
400420 self .nodes .append (node_id )
401421
402422 def reset (self ):
0 commit comments