|
17 | 17 | ## along with this program. If not, see <http://www.gnu.org/licenses/>. |
18 | 18 | ## |
19 | 19 |
|
| 20 | + |
| 21 | +""" Module that implements OpenSIPS Event behavior """ |
| 22 | + |
| 23 | +from threading import Thread, Event |
20 | 24 | from ..mi import OpenSIPSMI, OpenSIPSMIException |
21 | 25 | from .datagram import Datagram |
22 | 26 | from .stream import Stream |
23 | | -from threading import Thread, Event |
24 | 27 |
|
25 | 28 | class OpenSIPSEventException(Exception): |
26 | | - pass |
| 29 | + """ Exceptions generated by OpenSIPS Events """ |
27 | 30 |
|
28 | 31 | class OpenSIPSEvent(): |
29 | | - def __init__(self, mi: OpenSIPSMI, type: str, **kwargs): |
| 32 | + |
| 33 | + """ Implementation of the OpenSIPS Event """ |
| 34 | + |
| 35 | + def __init__(self, mi: OpenSIPSMI, _type: str, **kwargs): |
30 | 36 | self.mi = mi |
31 | 37 | self.kwargs = kwargs |
| 38 | + self.thread = None |
| 39 | + self.thread_stop = Event() |
| 40 | + self.thread_stop.clear() |
32 | 41 |
|
33 | | - if type == "datagram": |
| 42 | + if _type == "datagram": |
34 | 43 | self.socket = Datagram(**kwargs) |
35 | | - elif type == "stream": |
| 44 | + elif _type == "stream": |
36 | 45 | self.socket = Stream(**kwargs) |
37 | 46 | else: |
38 | 47 | raise ValueError("Invalid event type") |
39 | 48 |
|
| 49 | + def handle(self, callback): |
| 50 | + """ Handles the event callbacks """ |
| 51 | + while not self.thread_stop.is_set(): |
| 52 | + data = self.socket.read() |
| 53 | + if data: |
| 54 | + callback(data) |
| 55 | + |
40 | 56 | def subscribe(self, event: str, callback, expire=None): |
| 57 | + """ Subscribes for an event """ |
41 | 58 | try: |
| 59 | + sock_name = self.socket.create() |
42 | 60 | if expire is None: |
43 | | - ret_val = self.mi.execute("event_subscribe", [event, self.socket.sock_name]) |
| 61 | + ret_val = self.mi.execute("event_subscribe", [event, sock_name]) |
44 | 62 | else: |
45 | | - ret_val = self.mi.execute("event_subscribe", [event, self.socket.sock_name, expire]) |
| 63 | + ret_val = self.mi.execute("event_subscribe", [event, sock_name, expire]) |
46 | 64 |
|
47 | 65 | if ret_val != "OK": |
48 | 66 | raise OpenSIPSEventException("Failed to subscribe to event") |
49 | | - |
50 | | - self.socket.create() |
51 | | - self.thread_stop = Event() |
52 | | - self.thread_stop.clear() |
53 | | - self.thread = Thread(target=self.socket.handle, args=(callback, self.thread_stop)) |
| 67 | + |
| 68 | + self.thread = Thread(target=self.handle, args=(callback,)) |
54 | 69 | self.thread.start() |
55 | 70 |
|
56 | 71 | except OpenSIPSMIException as e: |
57 | 72 | raise e |
58 | 73 | except Exception as e: |
59 | | - raise OpenSIPSEventException("Failed to subscribe to event: {}".format(e)) |
60 | | - |
| 74 | + raise OpenSIPSEventException(f"Failed to subscribe to event: {e}") from e |
| 75 | + |
61 | 76 | def unsubscribe(self, event: str): |
| 77 | + """ Unsubscribes for an event """ |
62 | 78 | try: |
63 | 79 | ret_val = self.mi.execute("event_subscribe", [event, self.socket.sock_name, 0]) |
64 | 80 |
|
65 | 81 | if ret_val != "OK": |
66 | 82 | raise OpenSIPSEventException("Failed to unsubscribe from event") |
67 | | - |
| 83 | + |
68 | 84 | self.stop() |
69 | 85 |
|
70 | 86 | except OpenSIPSMIException as e: |
71 | 87 | raise e |
72 | 88 |
|
73 | 89 | def stop(self): |
| 90 | + """ Stops the current event processing """ |
74 | 91 | self.thread_stop.set() |
75 | 92 | self.thread.join() |
76 | 93 | self.socket.destroy() |
0 commit comments