|
| 1 | +import threading |
| 2 | +from scapy.all import Ether, IP, Raw |
| 3 | + |
| 4 | +from src.device import Device |
| 5 | +from src.manipulation import ManipulateArgs, ManipulateRet |
| 6 | + |
| 7 | + |
| 8 | +D1_ADDR = '192.168.250.1' |
| 9 | +D2_ADDR = '192.168.250.2' |
| 10 | +D3_ADDR = '192.168.1.1' |
| 11 | +D4_ADDR = '192.168.1.2' |
| 12 | + |
| 13 | +GLOBAL_D3_MAC = None |
| 14 | +GLOBAL_D4_MAC = None |
| 15 | + |
| 16 | + |
| 17 | +def forward_d1_to_d3(arg: ManipulateArgs) -> ManipulateRet: |
| 18 | + ''' |
| 19 | + Replace addresses of d2 with d4, so the packet will be hopped |
| 20 | + to the second vlan |
| 21 | + ''' |
| 22 | + global GLOBAL_D3_MAC |
| 23 | + global GLOBAL_D4_MAC |
| 24 | + packet = Ether(arg.packet) |
| 25 | + |
| 26 | + if not (IP in packet and packet[IP].dst == D2_ADDR and packet[IP].src == D1_ADDR): |
| 27 | + return ManipulateRet(arg.packet, False) |
| 28 | + |
| 29 | + packet[IP].src = D4_ADDR |
| 30 | + packet[IP].dst = D3_ADDR |
| 31 | + |
| 32 | + packet[Ether].src = GLOBAL_D4_MAC |
| 33 | + packet[Ether].dst = GLOBAL_D3_MAC |
| 34 | + |
| 35 | + return ManipulateRet(Raw(packet).load, False) |
| 36 | + |
| 37 | + |
| 38 | +def test_vlan_hopping(switch): |
| 39 | + ''' |
| 40 | + In this test, d1 is in vlan 20, and it'll try to connect |
| 41 | + to d3, which is part of vlan 10. The manipulation callback works follow: |
| 42 | + For each packet that its source is d1 and destined to d2, |
| 43 | + the callback will change the packet so it'll look like the source is d4, |
| 44 | + and it's destined to d3. The switch will think that the packet came from |
| 45 | + d4 who's part of vlan 10, so it'll forward the packet to d3 (who's also in vlan 10). |
| 46 | + That is an example of a VLAN hopping. |
| 47 | + ''' |
| 48 | + global GLOBAL_D3_MAC |
| 49 | + global GLOBAL_D4_MAC |
| 50 | + |
| 51 | + d1 = Device('a', D1_ADDR, '255.255.255.0') |
| 52 | + d2 = Device('b', D2_ADDR, '255.255.255.0') |
| 53 | + |
| 54 | + d3 = Device('c', D3_ADDR, '255.255.255.0') |
| 55 | + d4 = Device('d', D4_ADDR, '255.255.255.0') |
| 56 | + |
| 57 | + assert switch.connect_device_trunk(d1, 20) |
| 58 | + assert switch.connect_device_access(d2, 20) |
| 59 | + |
| 60 | + assert switch.connect_device_trunk(d3, 10) |
| 61 | + assert switch.connect_device_access(d4, 10) |
| 62 | + |
| 63 | + GLOBAL_D3_MAC = d3.get_mac |
| 64 | + GLOBAL_D4_MAC = d4.get_mac |
| 65 | + |
| 66 | + switch.set_manipulation(forward_d1_to_d3) |
| 67 | + |
| 68 | + def _run_listening_nc(dev): |
| 69 | + out = dev.run_from_namespace('timeout 5s nc -lu 0.0.0.0 4444') |
| 70 | + assert 'hello' == out |
| 71 | + |
| 72 | + t = threading.Thread(target=_run_listening_nc, args=(d3, )) |
| 73 | + t.start() |
| 74 | + |
| 75 | + out = d1.run_from_namespace('bash -c "echo hello | nc -u 192.168.250.2 4444 -w 3"') |
| 76 | + assert '' == out |
| 77 | + |
| 78 | + switch.disconnect_device(d1) |
| 79 | + switch.disconnect_device(d2) |
| 80 | + switch.disconnect_device(d3) |
| 81 | + switch.disconnect_device(d4) |
0 commit comments