Skip to content

Commit 363df5e

Browse files
authored
Merge pull request #65 from psav/psav/remove_dot
Removed dot graphing
2 parents 66717f9 + ebc9fd9 commit 363df5e

2 files changed

Lines changed: 106 additions & 57 deletions

File tree

test/perf/affinity.py

Lines changed: 61 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
from prometheus_client.parser import text_string_to_metric_families
2+
from urllib.parse import urlparse
3+
import requests
14
import atexit
25
import os
36
import random
@@ -7,8 +10,9 @@
710
import uuid
811
from collections import defaultdict
912
from test.perf import ping
10-
from test.perf.utils import random_port
11-
from test.perf.utils import read_and_parse_dot
13+
from test.perf.utils import random_port, net_check
14+
from test.perf.utils import read_and_parse_metrics
15+
from test.perf.utils import Conn
1216

1317
import attr
1418
import yaml
@@ -70,32 +74,36 @@ def _construct_run_command(self):
7074
st = ["receptor"]
7175

7276
if self.controller:
73-
st.extend(["--debug", "-d", self.data_path, "--node-id", self.name, "controller"])
77+
st.extend(["-d", self.data_path, "--node-id", self.name, "controller"])
7478
st.extend([f"--listen={self.listen}"])
7579
else:
7680
peer_string = " ".join(
7781
[f"--peer={self.topology.nodes[pnode].listen}" for pnode in self.connections]
7882
)
79-
st.extend(["--debug", "-d", self.data_path, "--node-id", self.name, "node"])
83+
st.extend(["-d", self.data_path, "--node-id", self.name, "node"])
8084
st.extend([f"--listen={self.listen}", peer_string])
8185

8286
if self.stats_enable:
8387
st.extend(["--stats-enable", f"--stats-port={self.stats_port}"])
8488

8589
return st
8690

87-
def start(self):
88-
try:
89-
os.remove(f"graph_{self.name}.dot")
90-
os.sync()
91-
except FileNotFoundError:
92-
print(f"DIND'T FIND IT graph_{self.name}.dot")
91+
def start(self, wait_for_ports=True):
9392
print(f"{time.time()} starting {self.name}({self.uuid})")
9493
op = subprocess.Popen(
9594
" ".join(self._construct_run_command()), shell=True, preexec_fn=os.setsid
9695
)
9796
procs[self.uuid] = op
9897

98+
if wait_for_ports:
99+
self.wait_for_ports()
100+
101+
def wait_for_ports(self):
102+
print("waiting for nodes ports" + self.name)
103+
wait_for(net_check, func_args=[self.port, self.hostname, True], num_sec=10)
104+
if self.stats_enable:
105+
wait_for(net_check, func_args=[self.stats_port, self.hostname, True], num_sec=10)
106+
99107
def stop(self):
100108
print(f"{time.time()} killing {self.name}({self.uuid})")
101109
try:
@@ -105,22 +113,32 @@ def stop(self):
105113
procs[self.uuid].wait()
106114
print(f"Service was kill {procs[self.uuid].returncode}")
107115

108-
def get_debug_dot(self):
109-
try:
110-
with open(f"graph_{self.name}.dot") as f:
111-
dot_data = f.read()
112-
print(f"FILE FOUND: graph_{self.name}.dot")
113-
return dot_data
114-
except FileNotFoundError:
115-
print(f"FILE NOT FOUND: graph_{self.name}.dot")
116-
return ""
116+
@property
117+
def hostname(self):
118+
return urlparse(self.listen).hostname
119+
120+
@property
121+
def port(self):
122+
return urlparse(self.listen).port
123+
124+
def get_metrics(self):
125+
stats = requests.get(f"http://{self.hostname}:{self.stats_port}/metrics")
126+
metrics = {
127+
metric.name: metric
128+
for metric in text_string_to_metric_families(stats.text)
129+
}
130+
return metrics
131+
132+
def get_routes(self):
133+
routes = self.get_metrics()['routing_table_info'].samples[0].labels['edges']
134+
return read_and_parse_metrics(routes)
117135

118136
def validate_routes(self):
119137
print(f"****====TRYING COMPARE {self.name}")
120-
dot1 = self.get_debug_dot()
121-
dot2 = self.topology.generate_dot()
122-
if dot1 and dot2:
123-
return self.topology.compare_dot(dot1, dot2)
138+
node_routes = self.get_routes()
139+
control_routes = self.topology.generate_routes()
140+
if node_routes and control_routes:
141+
return self.topology.compare_routes(node_routes, control_routes)
124142
else:
125143
return False
126144

@@ -175,7 +193,7 @@ def ping(self, count, peer=None, node_ping_name="ping_node"):
175193
class PingNode(Node):
176194
node_type = PING
177195

178-
def start(self):
196+
def start(self, *args):
179197
return
180198

181199
def stop(self):
@@ -184,10 +202,7 @@ def stop(self):
184202
def validate_routes(self):
185203
return True
186204

187-
def get_debug_dot(self):
188-
raise NotImplementedError
189-
190-
def ping(self):
205+
def ping(self, *args):
191206
raise NotImplementedError
192207

193208
def create_from_config(config):
@@ -300,26 +315,25 @@ def dump_yaml(self, filename=".last-topology.yaml"):
300315

301316
yaml.dump(data, f)
302317

303-
def dump_dot(self, filename=".last-topology-graph.dot"):
304-
with open(filename, "w") as f:
305-
f.write(self.generate_dot())
306-
307-
def generate_dot(self):
308-
dot_data = "graph {"
318+
def generate_routes(self):
319+
routes = set()
309320
for node, node_data in self.nodes.items():
310321
for conn in node_data.connections:
311-
dot_data += f"{node} -- {conn}; "
312-
dot_data += "}"
313-
return dot_data
322+
routes.add(
323+
Conn(node_data.name, conn, 1)
324+
)
325+
return routes
314326

315327
def start(self, wait=True):
316328
self.dump_yaml()
317-
self.dump_dot()
318329

319330
for k, node in self.nodes.items():
320-
node.start()
331+
node.start(wait_for_ports=not wait)
321332

322333
if wait:
334+
print("Waiting for nodes")
335+
for _, node in self.nodes.items():
336+
node.wait_for_ports()
323337
wait_for(self.validate_all_node_routes, delay=6, num_sec=30)
324338
# for name, node in self.nodes.items():
325339
# wait_for(lambda: node.validate_routes)
@@ -365,20 +379,15 @@ def validate_ping_results(results, threshold=0.1):
365379
return valid
366380

367381
@staticmethod
368-
def compare_dot(dot1, dot2):
369-
try:
370-
ds1 = read_and_parse_dot(dot1)
371-
ds2 = read_and_parse_dot(dot2)
372-
if ds1 != ds2:
373-
print(f"****====MATCH FAIL")
374-
print(ds1)
375-
print(ds2)
376-
return False
377-
else:
378-
print("****====MATCH")
379-
return True
380-
except ParseException:
382+
def compare_routes(route1, route2):
383+
if route1 != route2:
384+
print(f"****====MATCH FAIL")
385+
print(route1)
386+
print(route2)
381387
return False
388+
else:
389+
print("****====MATCH")
390+
return True
382391

383392
def validate_all_node_routes(self):
384393
return all(

test/perf/utils.py

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,26 @@
55
from pyparsing import OneOrMore
66
from pyparsing import Suppress
77
from pyparsing import Word
8+
from pyparsing import nums
9+
from pyparsing import Optional
10+
from _collections import defaultdict
11+
import logging
12+
import re
813

914

15+
logger = logging.getLogger(__name__)
16+
17+
_ports = defaultdict(dict)
18+
_dns_cache = {}
19+
ip_address = re.compile(
20+
r"^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}" r"(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$"
21+
)
22+
1023
class Conn:
11-
def __init__(self, a, b):
24+
def __init__(self, a, b, cost):
1225
self.a = a
1326
self.b = b
27+
self.cost = cost
1428

1529
def __eq__(self, other):
1630
if self.a == other.a and self.b == other.b or self.a == other.b and self.b == other.a:
@@ -26,12 +40,16 @@ def __repr__(self):
2640
return f"{self.a} -- {self.b}"
2741

2842

29-
def read_and_parse_dot(raw_data):
30-
group = Group(Word(alphanums + "_") + Suppress("--") + Word(alphanums + "_")) + Suppress(";")
31-
dot = Suppress("graph {") + OneOrMore(group) + Suppress("}")
43+
def read_and_parse_metrics(raw_data):
44+
token = Suppress("'") + Word(alphanums + "_") + Suppress("'")
45+
group = Group(
46+
Suppress("(") + token + Suppress(",") + token + Suppress(",") + Word(nums) + Suppress(
47+
")") + Suppress(Optional(',')))
48+
dot = Suppress("{") + OneOrMore(group) + Suppress("}")
3249

3350
data = dot.parseString(raw_data).asList()
34-
return {Conn(c[0], c[1]) for c in data}
51+
52+
return {Conn(c[0], c[1], c[2]) for c in data}
3553

3654

3755
def random_port(tcp=True):
@@ -55,3 +73,25 @@ def random_port(tcp=True):
5573
addr, port = s.getsockname()
5674
s.close()
5775
return port
76+
77+
78+
def net_check(port, addr="localhost", force=False):
79+
"""Checks the availablility of a port"""
80+
port = int(port)
81+
if port not in _ports[addr] or force:
82+
# First try DNS resolution
83+
try:
84+
85+
addr = socket.gethostbyname(addr)
86+
87+
# Then try to connect to the port
88+
try:
89+
socket.create_connection((addr, port), timeout=10)
90+
_ports[addr][port] = True
91+
except socket.error:
92+
logger.exception("failed connection")
93+
_ports[addr][port] = False
94+
except Exception as e:
95+
logger.info(e)
96+
_ports[addr][port] = False
97+
return _ports[addr][port]

0 commit comments

Comments
 (0)