Skip to content

Commit b251d83

Browse files
committed
src: Add trunk/access connection utilities between Switch and Devices
1 parent 579c457 commit b251d83

6 files changed

Lines changed: 405 additions & 0 deletions

File tree

src/__init__.py

Whitespace-only changes.

src/connections.py

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
import socket
2+
import asyncio
3+
import threading
4+
from scapy.all import Ether
5+
from collections import namedtuple
6+
from contextlib import suppress
7+
8+
from src.util import PortType, add_vlan_tag, fix_checksums
9+
10+
11+
DeviceEntry = namedtuple('DeviceEntry', [
12+
'dev',
13+
'sock',
14+
'vlan',
15+
# Trunk/Access
16+
'port_type',
17+
])
18+
19+
IP_MAX_SIZE = 65535
20+
21+
22+
class Connections(object):
23+
24+
def __init__(self):
25+
self._connections_thread = threading.Thread(target=self._start_event_loop)
26+
self._event_loop = None
27+
self._devices = list()
28+
self._message_queue = None
29+
30+
async def _read_message_queue(self):
31+
while True:
32+
packet, device_entry = await self._message_queue.get()
33+
await self._process_packet(packet, device_entry)
34+
35+
async def _process_packet(self, packet, src_device_entry):
36+
dst_mac = Ether(packet).dst
37+
38+
# Looking for the destination device
39+
for dst_device in self._devices:
40+
if dst_device.dev.get_mac == dst_mac and dst_device.vlan == src_device_entry.vlan:
41+
42+
if dst_device.port_type == PortType.TRUNK:
43+
packet = add_vlan_tag(packet, dst_device.vlan)
44+
45+
packet = fix_checksums(packet)
46+
await self._event_loop.sock_sendall(dst_device.sock, packet)
47+
48+
def _read_raw_packet(self, device_entry):
49+
try:
50+
packet = device_entry.sock.recv(IP_MAX_SIZE)
51+
self._message_queue.put_nowait((packet, device_entry))
52+
except OSError:
53+
# TODO: Currently the interface is closed successfully, but the cb
54+
# raises an exception.
55+
pass
56+
57+
def _start_event_loop(self):
58+
self._event_loop = asyncio.new_event_loop()
59+
asyncio.set_event_loop(self._event_loop)
60+
61+
self._message_queue = asyncio.Queue()
62+
asyncio.ensure_future(self._read_message_queue())
63+
64+
try:
65+
self._event_loop.run_forever()
66+
67+
for task in asyncio.Task.all_tasks():
68+
task.cancel()
69+
70+
with suppress(asyncio.CancelledError):
71+
# await until task is completed if it is currently running/pending.
72+
self._event_loop.run_until_complete(task)
73+
finally:
74+
self._event_loop.close()
75+
76+
def _update_arp_tables(self, new_dev):
77+
for curr_dev in self._devices:
78+
# If both devices in the same vlan, they should update each other with their addresses.
79+
if new_dev.vlan == curr_dev.vlan:
80+
new_dev.dev.run_from_namespace('arp -s {ip} {mac}'.format(
81+
ip=curr_dev.dev.get_ip, mac=curr_dev.dev.get_mac))
82+
83+
curr_dev.dev.run_from_namespace('arp -s {ip} {mac}'.format(
84+
ip=new_dev.dev.get_ip, mac=new_dev.dev.get_mac))
85+
86+
def append_device(self, dev, vlan, port_type):
87+
def _append_device_entry(self, new_dev_entry):
88+
self._devices.append(new_dev_entry)
89+
90+
sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(0x0003))
91+
sock.bind(('br-{}'.format(dev.get_name), 0))
92+
93+
new_dev_entry = DeviceEntry(dev, sock, vlan, port_type)
94+
95+
self._update_arp_tables(new_dev_entry)
96+
97+
self._event_loop.add_reader(sock, self._read_raw_packet, new_dev_entry)
98+
99+
# We're pushing the job into the event loop, so no lock is needed.
100+
# threadsafe since we're calling it from the main thread,
101+
# and not from the eventloop thread.
102+
self._event_loop.call_soon_threadsafe(_append_device_entry, self, new_dev_entry)
103+
104+
def remove_device(self, dev_to_remove):
105+
def _remove_device_entry(self, dev_to_remove):
106+
for dev_entry in self._devices:
107+
if dev_entry.dev == dev_to_remove:
108+
dev_entry.sock.close()
109+
self._event_loop.remove_reader(dev_entry.sock)
110+
self._devices.remove(dev_entry)
111+
break
112+
113+
self._event_loop.call_soon_threadsafe(_remove_device_entry, self, dev_to_remove)
114+
115+
def start_connections_thread(self):
116+
self._connections_thread.start()
117+
118+
def stop_connections_thread(self):
119+
self._event_loop.call_soon_threadsafe(self._event_loop.stop)

src/device.py

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
from src.util import shell_run_and_check, convert_subnetmask_to_cidr, run_shell_cmd
2+
from src.exception import LongInterfaceException, NamespaceCreationException
3+
4+
5+
class Device(object):
6+
7+
# IFNAMSIZ is 15 (excluding the null terminator).
8+
# We're appending at most 8 bytes to the given interface name, so 7
9+
# characters are left for the user.
10+
# Note that we prefer to limit the user's input, instead of generating
11+
# names by ourselves, as the user should know which resources this program
12+
# is creating (for example, the user might manually delete interfaces
13+
# by himself).
14+
INTERFACE_NAME_MAX_SIZE = 7
15+
16+
netns_commands = [
17+
# Create veth pair
18+
'ip link add veth-{name} type veth peer name br-veth-{name}',
19+
'ip link set br-veth-{name} up',
20+
# Set the new network namespace's veth interface
21+
'ip link set veth-{name} netns {name}',
22+
'ip netns exec {name} ip link set veth-{name} up',
23+
]
24+
25+
def __init__(self, name, ip, subnet_mask):
26+
if len(name) > self.INTERFACE_NAME_MAX_SIZE:
27+
raise LongInterfaceException(
28+
"Device name length must be at most {} characters({})".format(
29+
self.INTERFACE_NAME_MAX_SIZE, name))
30+
31+
# Create the network namespace for the device
32+
if not shell_run_and_check('ip netns add {}'.format(name)):
33+
raise NamespaceCreationException(
34+
"Can not create network namespace '{}' "
35+
"(Make sure to run the script with root priviliages)".format(name))
36+
37+
self._dev_name = name
38+
self._ip = ip
39+
self._cidr = convert_subnetmask_to_cidr(subnet_mask)
40+
self._mac = None
41+
42+
self._ns_is_set = False
43+
44+
def _get_ns_veth_mac(self):
45+
"""
46+
In case the veth interface of the new namespace is set (setup_namespace),
47+
The function returns the mac address of the veth interface (in utf-8).
48+
Otherwise, the function returns None.
49+
"""
50+
mac, err = run_shell_cmd(
51+
'ip netns exec {ns} cat /sys/class/net/veth-{ns}/address'.format(ns=self._dev_name))
52+
53+
return mac.decode('utf-8') if err == b'' else None
54+
55+
@property
56+
def get_name(self):
57+
return self._dev_name
58+
59+
@property
60+
def get_ip(self):
61+
return self._ip
62+
63+
@property
64+
def get_cidr(self):
65+
return self._cidr
66+
67+
@property
68+
def get_mac(self):
69+
if self._mac is None:
70+
self._mac = self._get_ns_veth_mac()
71+
72+
return self._mac
73+
74+
def get_switch_veth_name(self):
75+
# This class creates veth pair. One veth is set to the new network namespace
76+
# in `setup_namespace`. The second veth should be used by the default namespace,
77+
# later on (See switch.py). Therefore, the default namespace should be able
78+
# to get the second veth's name.
79+
return 'br-veth-{}'.format(self._dev_name)
80+
81+
def get_device_veth_name(self):
82+
# That is the veth interface in the "device" side (i.e - in the new network
83+
# namespace side).
84+
return 'veth-{}'.format(self._dev_name)
85+
86+
def run_from_namespace(self, cmd):
87+
out, err = run_shell_cmd('ip netns exec {ns} {cmd}'.format(ns=self._dev_name, cmd=cmd))
88+
return out.decode('utf-8') if err == b'' else err.decode('utf-8')
89+
90+
def setup_namespace(self):
91+
''' Safe to call more then once. '''
92+
if self._ns_is_set:
93+
return True
94+
95+
if not shell_run_and_check(' && '.join(self.netns_commands).format(name=self._dev_name)):
96+
return False
97+
98+
self._ns_is_set = True
99+
return True
100+
101+
def term(self):
102+
''' Safe to call more then once. '''
103+
if self._ns_is_set:
104+
shell_run_and_check('ip netns del {}'.format(self._dev_name))
105+
# We've created the switch's veth interface, so we are responsible for
106+
# deleting it.
107+
shell_run_and_check('ip link del {}'.format(self.get_switch_veth_name()))
108+
109+
self._ns_is_set = False

src/exception.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
class LongInterfaceException(Exception):
2+
pass
3+
4+
5+
class NamespaceCreationException(Exception):
6+
pass
7+
8+
9+
class BridgeInterfaceCreationException(Exception):
10+
pass
11+
12+
13+
class NamespaceConnectionException(Exception):
14+
pass

src/switch.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
from src.connections import Connections
2+
from src.util import shell_run_and_check, PortType
3+
from src.exception import (NamespaceCreationException,
4+
BridgeInterfaceCreationException, NamespaceConnectionException)
5+
6+
7+
class Switch(object):
8+
9+
def __init__(self):
10+
self._connections = Connections()
11+
self._connections.start_connections_thread()
12+
13+
def _set_bridge_interface(self, dev):
14+
set_bridge_cmds = [
15+
'ip link add name {br} type bridge',
16+
'ip link set {br} up',
17+
'ip link set {br} promisc on',
18+
'ip link set {veth} master {br}',
19+
]
20+
21+
return shell_run_and_check(' && '.join(set_bridge_cmds).format(
22+
br='br-{}'.format(dev.get_name), veth=dev.get_switch_veth_name()))
23+
24+
def _set_access_connection(self, dev):
25+
# Set IP to veth interface (within the device's network namespace)
26+
if not shell_run_and_check(
27+
'ip netns exec {name} ip addr add {ip}/{cidr} dev veth-{name}'.format(
28+
name=dev.get_name, ip=dev.get_ip, cidr=dev.get_cidr)):
29+
return False
30+
31+
return True
32+
33+
def _set_trunk_connection(self, dev, vlan):
34+
vlan_interface_cmds = [
35+
'ip link add link {iface} name {iface}.v type vlan id {vlan}',
36+
'ip addr add {ip}/{cidr} brd + dev {iface}.v',
37+
'ip link set {iface}.v up',
38+
]
39+
40+
# Configure vlan interface inside the network namespace
41+
concat_cmds = ' && '.join(
42+
['ip netns exec {} '.format(dev.get_name) + cmd for cmd in vlan_interface_cmds]).format(
43+
iface=dev.get_device_veth_name(),
44+
ip=dev.get_ip,
45+
cidr=dev.get_cidr,
46+
vlan=vlan)
47+
48+
if not shell_run_and_check(concat_cmds):
49+
return False
50+
51+
return True
52+
53+
def connect_device_access(self, dev, vlan):
54+
if not dev.setup_namespace():
55+
raise NamespaceCreationException("failed to create network namespace for {}".format(
56+
dev.get_name))
57+
58+
if not self._set_bridge_interface(dev):
59+
raise BridgeInterfaceCreationException(
60+
"failed to create bridge interface for {}".format(dev.get_name))
61+
62+
if not self._set_access_connection(dev):
63+
raise NamespaceConnectionException("failed to create connection to namespace {}".format(
64+
dev.get_name))
65+
66+
self._connections.append_device(dev, vlan, PortType.ACCESS)
67+
68+
def connect_device_trunk(self, dev, vlan):
69+
if not dev.setup_namespace():
70+
raise NamespaceCreationException("failed to create network namespace for {}".format(
71+
dev.get_name))
72+
73+
if not self._set_bridge_interface(dev):
74+
raise BridgeInterfaceCreationException(
75+
"failed to create bridge interface for {}".format(dev.get_name))
76+
77+
if not self._set_trunk_connection(dev, vlan):
78+
raise NamespaceConnectionException("failed to create connection to namespace {}".format(
79+
dev.get_name))
80+
81+
self._connections.append_device(dev, vlan, PortType.TRUNK)
82+
83+
def disconnect_device(self, dev):
84+
# Delete switch's bridge interface
85+
shell_run_and_check('ip link del br-{}'.format(dev.get_name))
86+
87+
self._connections.remove_device(dev)
88+
89+
dev.term()
90+
91+
def term(self):
92+
self._connections.stop_connections_thread()

0 commit comments

Comments
 (0)