Skip to content

Commit 3a86e34

Browse files
committed
Fixed tests after socket removal and ping node
1 parent 4140062 commit 3a86e34

8 files changed

Lines changed: 203 additions & 87 deletions

File tree

.github/workflows/tester.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ on: [push, pull_request]
44

55
jobs:
66
build:
7-
name: Performance Tests
7+
name: Integration Tests
88
runs-on: ubuntu-latest
99

1010
steps:

test/perf/affinity.py

Lines changed: 98 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,23 @@
11
import atexit
22
import os
33
import random
4+
import signal
45
import subprocess
56
import time
67
import uuid
78
from collections import defaultdict
9+
from test.perf import ping
810
from test.perf.utils import random_port
911
from test.perf.utils import read_and_parse_dot
10-
import signal
1112

1213
import attr
1314
import yaml
1415
from pyparsing import ParseException
1516
from wait_for import wait_for
1617

18+
STANDARD = 0
19+
PING = 1
20+
1721
procs = {}
1822

1923

@@ -29,33 +33,33 @@ def shut_all_procs():
2933
class Node:
3034
name = attr.ib()
3135
controller = attr.ib(default=False)
32-
listen_port = attr.ib(factory=random_port)
36+
listen = attr.ib(default=None)
3337
connections = attr.ib(factory=list)
3438
stats_enable = attr.ib(default=False)
3539
stats_port = attr.ib(default=None)
3640
profile = attr.ib(default=False)
37-
socket_path = attr.ib(default=None)
3841
data_path = attr.ib(default=None)
3942
topology = attr.ib(init=False, default=None)
4043
uuid = attr.ib(init=False, factory=uuid.uuid4)
4144

45+
node_type = STANDARD
46+
4247
def __attrs_post_init__(self):
43-
if not self.socket_path:
44-
self.socket_path = f"/tmp/receptor/{str(self.uuid)}/receptor.sock"
4548
if not self.data_path:
4649
self.data_path = f"/tmp/receptor/{str(self.uuid)}"
50+
if not self.listen:
51+
self.listen = f"receptor://0.0.0.0:{random_port()}"
4752

4853
@staticmethod
4954
def create_from_config(config):
5055
return Node(
5156
name=config["name"],
5257
controller=config.get("controller", False),
53-
listen_port=config.get("listen_port", None) or random_port(),
58+
listen=config.get("listen", f"receptor://0.0.0.0:{random_port()}"),
5459
connections=config.get("connections", []) or [],
5560
stats_enable=config.get("stats_enable", False),
5661
stats_port=config.get("stats_port", None) or random_port(),
5762
profile=config.get("profile", False),
58-
socket_path=config.get("socket_path", None),
5963
data_path=config.get("data_path", None),
6064
)
6165

@@ -67,17 +71,13 @@ def _construct_run_command(self):
6771

6872
if self.controller:
6973
st.extend(["--debug", "-d", self.data_path, "--node-id", self.name, "controller"])
70-
st.extend([f"--socket-path={self.socket_path}"])
71-
st.extend([f"--listen-port={self.listen_port}"])
74+
st.extend([f"--listen={self.listen}"])
7275
else:
7376
peer_string = " ".join(
74-
[
75-
f"--peer=localhost:{self.topology.nodes[pnode].listen_port}"
76-
for pnode in self.connections
77-
]
77+
[f"--peer={self.topology.nodes[pnode].listen}" for pnode in self.connections]
7878
)
7979
st.extend(["--debug", "-d", self.data_path, "--node-id", self.name, "node"])
80-
st.extend([f"--listen-port={self.listen_port}", peer_string])
80+
st.extend([f"--listen={self.listen}", peer_string])
8181

8282
if self.stats_enable:
8383
st.extend(["--stats-enable", f"--stats-port={self.stats_port}"])
@@ -91,7 +91,9 @@ def start(self):
9191
except FileNotFoundError:
9292
print(f"DIND'T FIND IT graph_{self.name}.dot")
9393
print(f"{time.time()} starting {self.name}({self.uuid})")
94-
op = subprocess.Popen(" ".join(self._construct_run_command()), shell=True, preexec_fn=os.setsid)
94+
op = subprocess.Popen(
95+
" ".join(self._construct_run_command()), shell=True, preexec_fn=os.setsid
96+
)
9597
procs[self.uuid] = op
9698

9799
def stop(self):
@@ -107,49 +109,94 @@ def get_debug_dot(self):
107109
try:
108110
with open(f"graph_{self.name}.dot") as f:
109111
dot_data = f.read()
110-
#print(f"FILE FOUND: graph_{self.name}.dot")
112+
print(f"FILE FOUND: graph_{self.name}.dot")
111113
return dot_data
112114
except FileNotFoundError:
113-
#print(f"FILE NOT FOUND: graph_{self.name}.dot")
115+
print(f"FILE NOT FOUND: graph_{self.name}.dot")
114116
return ""
115117

116118
def validate_routes(self):
119+
print(f"****====TRYING COMPARE {self.name}")
117120
dot1 = self.get_debug_dot()
118121
dot2 = self.topology.generate_dot()
119122
if dot1 and dot2:
120123
return self.topology.compare_dot(dot1, dot2)
121124
else:
122125
return False
123126

124-
def ping(self, count):
125-
socket_path = self.topology.find_controller()[0].socket_path
127+
def ping(self, count, peer=None, node_ping_name="ping_node"):
128+
129+
if not peer:
130+
peer = self.topology.find_controller()[0]
131+
132+
if node_ping_name not in self.topology.nodes:
133+
self.topology.add_node(PingNode(name=node_ping_name))
134+
135+
if peer.name not in self.topology.nodes[node_ping_name].connections:
136+
self.topology.nodes[node_ping_name].connections.append(peer.name)
126137

127138
if self.controller:
128139
# TODO Remove this once a controller is pingable
129140
return True
130141

142+
peer_address = self.topology.nodes[peer.name].listen
143+
131144
starter = [
132145
"time",
133-
"receptor",
134-
"ping",
135-
"--socket-path",
136-
socket_path,
146+
"python",
147+
ping.__file__,
148+
"--data-path",
149+
self.data_path,
150+
"--node-id",
151+
node_ping_name,
152+
"--peer",
153+
peer_address,
154+
"--id",
137155
self.name,
138156
"--count",
139157
str(count),
140158
]
159+
print(starter)
141160
start = time.time()
142-
op = subprocess.Popen(" ".join(starter), shell=True, stdout=subprocess.PIPE)
161+
op = subprocess.Popen(
162+
" ".join(starter), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
163+
)
143164
op.wait()
144165
duration = time.time() - start
145166
cmd_output = op.stdout.readlines()
167+
print(op.stderr.read())
146168
print(cmd_output)
147169
if b"Failed" in cmd_output[0]:
148170
return "Failed"
149171
else:
150172
return duration / count
151173

152174

175+
class PingNode(Node):
176+
node_type = PING
177+
178+
def start(self):
179+
return
180+
181+
def stop(self):
182+
return
183+
184+
def validate_routes(self):
185+
return True
186+
187+
def get_debug_dot(self):
188+
raise NotImplementedError
189+
190+
def ping(self):
191+
raise NotImplementedError
192+
193+
def create_from_config(config):
194+
raise NotImplementedError
195+
196+
def _construct_run_command(self):
197+
raise NotImplementedError
198+
199+
153200
@attr.s
154201
class Topology:
155202
nodes = attr.ib(init=False, factory=dict)
@@ -173,21 +220,25 @@ def remove_node(self, node_or_name):
173220
del self.nodes[node_name]
174221

175222
@staticmethod
176-
def generate_mesh(controller_port, node_count, conn_method, profile=False, socket_path=None):
223+
def generate_mesh(controller_port, node_count, conn_method, profile=False):
177224
topology = Topology()
178225
topology.add_node(
179226
Node(
180227
name="controller",
181228
controller=True,
182-
listen_port=controller_port,
229+
listen=f"receptor://127.0.0.1:{controller_port}",
183230
profile=profile,
184-
socket_path=socket_path,
185231
)
186232
)
187233

188234
for i in range(node_count):
189235
topology.add_node(
190-
Node(name=f"node{i}", controller=False, listen_port=random_port(), profile=profile)
236+
Node(
237+
name=f"node{i}",
238+
controller=False,
239+
listen=f"receptor://127.0.0.1:{random_port()}",
240+
profile=profile,
241+
)
191242
)
192243

193244
for k, node in topology.nodes.items():
@@ -198,7 +249,7 @@ def generate_mesh(controller_port, node_count, conn_method, profile=False, socke
198249
return topology
199250

200251
@staticmethod
201-
def generate_random_mesh(controller_port, node_count, max_conn_count, profile, socket_path):
252+
def generate_random_mesh(controller_port, node_count, max_conn_count, profile):
202253
def peer_function(topology, cur_node):
203254
nconns = defaultdict(int)
204255
print(topology)
@@ -218,17 +269,17 @@ def peer_function(topology, cur_node):
218269
return random.choices(available_nodes, k=int(random.random() * max_conn_count))
219270

220271
topology = Topology.generate_random_mesh(
221-
controller_port, node_count, peer_function, profile, socket_path
272+
controller_port, node_count, peer_function, profile
222273
)
223274
return topology
224275

225276
@staticmethod
226-
def generate_flat_mesh(controller_port, node_count, profile, socket_path):
277+
def generate_flat_mesh(controller_port, node_count, profile):
227278
def peer_function(*args):
228279
return ["controller"]
229280

230281
topology = Topology.generate_random_mesh(
231-
controller_port, node_count, peer_function, profile, socket_path
282+
controller_port, node_count, peer_function, profile
232283
)
233284
return topology
234285

@@ -238,14 +289,12 @@ def dump_yaml(self, filename=".last-topology.yaml"):
238289
for node, node_data in self.nodes.items():
239290
data["nodes"][node] = {
240291
"name": node_data.name,
241-
"listen_port": node_data.listen_port if node_data.controller else None,
292+
"listen": node_data.listen if node_data.controller else None,
242293
"controller": node_data.controller,
243294
"connections": node_data.connections,
244295
"stats_enable": node_data.stats_enable,
245296
"stats_port": node_data.stats_port,
246297
}
247-
if node_data.socket_path:
248-
data["nodes"][node]["socket_path"] = node_data.socket_path
249298
if node_data.data_path:
250299
data["nodes"][node]["data_path"] = node_data.data_path
251300

@@ -272,7 +321,7 @@ def start(self, wait=True):
272321

273322
if wait:
274323
wait_for(self.validate_all_node_routes, delay=6, num_sec=30)
275-
#for name, node in self.nodes.items():
324+
# for name, node in self.nodes.items():
276325
# wait_for(lambda: node.validate_routes)
277326

278327
def stop(self):
@@ -295,9 +344,13 @@ def load_topology_from_file(filename):
295344
def find_controller(self):
296345
return list(filter(lambda o: o.controller, self.nodes.values()))
297346

298-
def ping(self, count=10, socket_path=None):
347+
def ping(self, count=10):
299348
results = {}
300-
for _, node in self.nodes.items():
349+
350+
# Need to grab the list of nodes prior to running as pinging adds a node
351+
nodes = list(self.nodes.keys())
352+
for node_name in nodes:
353+
node = self.nodes[node_name]
301354
results[node.name] = node.ping(count)
302355
return results
303356

@@ -317,13 +370,17 @@ def compare_dot(dot1, dot2):
317370
ds1 = read_and_parse_dot(dot1)
318371
ds2 = read_and_parse_dot(dot2)
319372
if ds1 != ds2:
320-
print(f"MATCH FAIL")
373+
print(f"****====MATCH FAIL")
321374
print(ds1)
322375
print(ds2)
323376
return False
324-
return True
377+
else:
378+
print("****====MATCH")
379+
return True
325380
except ParseException:
326381
return False
327382

328383
def validate_all_node_routes(self):
329-
return all(node.validate_routes() for _, node in self.nodes.items())
384+
return all(
385+
node.validate_routes() for _, node in self.nodes.items() if node.node_type == STANDARD
386+
)

test/perf/ping.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import asyncio
2+
import time
3+
4+
import click
5+
6+
from receptor import Controller
7+
from receptor import ReceptorConfig
8+
9+
10+
def run_as_ping(config):
11+
def ping_iter():
12+
if config.ping_count:
13+
for x in range(config.ping_count):
14+
yield x
15+
else:
16+
while True:
17+
yield 0
18+
19+
async def ping_entrypoint():
20+
read_task = controller.loop.create_task(read_responses())
21+
await controller.add_peer(config.ping_peer)
22+
start_wait = time.time()
23+
while not controller.receptor.router.node_is_known(config.ping_recipient) and (
24+
time.time() - start_wait < 5
25+
):
26+
await asyncio.sleep(0.1)
27+
await send_pings()
28+
await read_task
29+
30+
async def read_responses():
31+
for _ in ping_iter():
32+
payload = await controller.recv()
33+
print("{}".format(payload))
34+
35+
async def send_pings():
36+
for _ in ping_iter():
37+
await controller.ping(config.ping_recipient)
38+
await asyncio.sleep(config.ping_delay)
39+
40+
print(f"Sending ping to {config.ping_recipient} via {config.ping_peer}.")
41+
controller = Controller(config)
42+
# controller.loop = asyncio.get_event_loop()
43+
return controller.run(ping_entrypoint)
44+
45+
46+
@click.command("ping")
47+
@click.option("--data-path", default="/tmp/receptor")
48+
@click.option("--peer", default="receptor://127.0.0.1:8889")
49+
@click.option("--id", default="node1")
50+
@click.option("--node-id", default="ping-node")
51+
@click.option("--count", default=10)
52+
def ping(data_path, peer, id, node_id, count):
53+
config = ReceptorConfig(
54+
["-d", data_path, "--node-id", node_id, "ping", "--peer", peer, id, "--count", str(count)]
55+
)
56+
run_as_ping(config)
57+
58+
59+
if __name__ == "__main__":
60+
ping()

0 commit comments

Comments
 (0)