44from scapy .all import Ether
55from collections import namedtuple
66from contextlib import suppress
7+ import concurrent .futures
78
89from src .util import PortType , add_vlan_tag , fix_checksums
10+ from src .manipulation import ManipulateArgs , default_manipulation_cb
911
1012
1113DeviceEntry = namedtuple ('DeviceEntry' , [
1820
1921IP_MAX_SIZE = 65535
2022
23+ MAX_WORKERS = 5
24+
2125
2226class Connections (object ):
2327
@@ -26,25 +30,34 @@ def __init__(self):
2630 self ._event_loop = None
2731 self ._devices = list ()
2832 self ._message_queue = None
33+ self ._executers = None
34+ self .manipulate_cb = default_manipulation_cb
2935
3036 async def _read_message_queue (self ):
3137 while True :
32- packet = await self ._message_queue .get ()
33- await self ._process_packet (packet )
38+ packet , dev_entry = await self ._message_queue .get ()
39+ await self ._process_packet (packet , dev_entry )
40+
41+ async def _process_packet (self , packet , src_device_entry ):
42+ # Should we deal with tagging. We left an option for the manipulator
43+ # to tag the packet.
44+ should_inject_raw = False
3445
35- async def _process_packet (self , packet ):
36- dst_mac = Ether (packet ).dst
37- src_mac = Ether (packet ).src
46+ if self .manipulate_cb is not None :
47+ packet , should_inject_raw = await self ._event_loop .run_in_executor (
48+ self ._executers ,
49+ self .manipulate_cb ,
50+ ManipulateArgs (packet , src_device_entry .port_type , src_device_entry .vlan ))
3851
39- src_vlan = self ._get_vlan_by_mac (src_mac )
52+ src_vlan = self ._get_vlan_by_mac (Ether ( packet ). src )
4053 if src_vlan is None :
4154 return None
4255
4356 # Looking for the destination device
4457 for dst_device in self ._devices :
45- if dst_device .dev .get_mac == dst_mac and dst_device .vlan == src_vlan :
58+ if dst_device .dev .get_mac == Ether ( packet ). dst and dst_device .vlan == src_vlan :
4659
47- if dst_device .port_type == PortType .TRUNK :
60+ if dst_device .port_type == PortType .TRUNK and not should_inject_raw :
4861 packet = add_vlan_tag (packet , dst_device .vlan )
4962
5063 packet = fix_checksums (packet )
@@ -53,7 +66,7 @@ async def _process_packet(self, packet):
5366 def _read_raw_packet (self , device_entry ):
5467 try :
5568 packet = device_entry .sock .recv (IP_MAX_SIZE )
56- self ._message_queue .put_nowait (packet )
69+ self ._message_queue .put_nowait (( packet , device_entry ) )
5770 except OSError :
5871 # TODO: Currently the interface is closed successfully, but the cb
5972 # raises an exception.
@@ -63,6 +76,8 @@ def _start_event_loop(self):
6376 self ._event_loop = asyncio .new_event_loop ()
6477 asyncio .set_event_loop (self ._event_loop )
6578
79+ self ._executers = concurrent .futures .ThreadPoolExecutor (max_workers = MAX_WORKERS )
80+
6681 self ._message_queue = asyncio .Queue ()
6782 asyncio .ensure_future (self ._read_message_queue ())
6883
0 commit comments