1+ import atexit
2+ import os
13import random
2- from collections import defaultdict
34import subprocess
4- import attr
5- import atexit
6- from utils import random_port
5+ import time
76import uuid
7+ from collections import defaultdict
8+ from test .perf .utils import random_port
9+ from test .perf .utils import read_and_parse_dot
10+ import signal
11+
12+ import attr
813import yaml
14+ from pyparsing import ParseException
15+ from wait_for import wait_for
916
1017procs = {}
1118
@@ -69,7 +76,7 @@ def _construct_run_command(self):
6976 for pnode in self .connections
7077 ]
7178 )
72- st .extend (["-d" , self .data_path , "--node-id" , self .name , "node" ])
79+ st .extend (["--debug" , "- d" , self .data_path , "--node-id" , self .name , "node" ])
7380 st .extend ([f"--listen-port={ self .listen_port } " , peer_string ])
7481
7582 if self .stats_enable :
@@ -78,12 +85,69 @@ def _construct_run_command(self):
7885 return st
7986
8087 def start (self ):
81- op = subprocess .Popen (" " .join (self ._construct_run_command ()), shell = True )
88+ try :
89+ os .remove (f"graph_{ self .name } .dot" )
90+ os .sync ()
91+ except FileNotFoundError :
92+ print (f"DIND'T FIND IT graph_{ self .name } .dot" )
93+ print (f"{ time .time ()} starting { self .name } ({ self .uuid } )" )
94+ op = subprocess .Popen (" " .join (self ._construct_run_command ()), shell = True , preexec_fn = os .setsid )
8295 procs [self .uuid ] = op
8396
8497 def stop (self ):
85- print (f"killing { self .name } ({ self .uuid } )" )
86- procs [self .uuid ].kill ()
98+ print (f"{ time .time ()} killing { self .name } ({ self .uuid } )" )
99+ try :
100+ os .killpg (os .getpgid (procs [self .uuid ].pid ), signal .SIGTERM )
101+ except ProcessLookupError :
102+ print ("Couldn't kill the process {procs[self.uuid].pid}" )
103+ procs [self .uuid ].wait ()
104+ print (f"Service was kill { procs [self .uuid ].returncode } " )
105+
106+ def get_debug_dot (self ):
107+ try :
108+ with open (f"graph_{ self .name } .dot" ) as f :
109+ dot_data = f .read ()
110+ #print(f"FILE FOUND: graph_{self.name}.dot")
111+ return dot_data
112+ except FileNotFoundError :
113+ #print(f"FILE NOT FOUND: graph_{self.name}.dot")
114+ return ""
115+
116+ def validate_routes (self ):
117+ dot1 = self .get_debug_dot ()
118+ dot2 = self .topology .generate_dot ()
119+ if dot1 and dot2 :
120+ return self .topology .compare_dot (dot1 , dot2 )
121+ else :
122+ return False
123+
124+ def ping (self , count ):
125+ socket_path = self .topology .find_controller ()[0 ].socket_path
126+
127+ if self .controller :
128+ # TODO Remove this once a controller is pingable
129+ return True
130+
131+ starter = [
132+ "time" ,
133+ "receptor" ,
134+ "ping" ,
135+ "--socket-path" ,
136+ socket_path ,
137+ self .name ,
138+ "--count" ,
139+ str (count ),
140+ ]
141+ start = time .time ()
142+ op = subprocess .Popen (" " .join (starter ), shell = True , stdout = subprocess .PIPE )
143+ op .wait ()
144+ duration = time .time () - start
145+ cmd_output = op .stdout .readlines ()
146+ print (cmd_output )
147+ if b"Failed" in cmd_output [0 ]:
148+ return "Failed"
149+ else :
150+ return duration / count
87151
88152
89153@attr .s
@@ -109,9 +173,7 @@ def remove_node(self, node_or_name):
109173 del self .nodes [node_name ]
110174
111175 @staticmethod
112- def generate_mesh (
113- controller_port , node_count , conn_method , profile = False , socket_path = None
114- ):
176+ def generate_mesh (controller_port , node_count , conn_method , profile = False , socket_path = None ):
115177 topology = Topology ()
116178 topology .add_node (
117179 Node (
@@ -170,7 +232,6 @@ def peer_function(*args):
170232 )
171233 return topology
172234
173-
174235 def dump_yaml (self , filename = ".last-topology.yaml" ):
175236 with open (filename , "w" ) as f :
176237 data = {"nodes" : {}}
@@ -192,27 +253,38 @@ def dump_yaml(self, filename=".last-topology.yaml"):
192253
193254 def dump_dot (self , filename = ".last-topology-graph.dot" ):
194255 with open (filename , "w" ) as f :
195- f .write ("graph {" )
196- for node , node_data in self .nodes .items ():
197- for conn in node_data .connections :
198- f .write (f"{ node } -- { conn } ; " )
199- f .write ("}" )
256+ f .write (self .generate_dot ())
200257
201- def start (self ):
258+ def generate_dot (self ):
259+ dot_data = "graph {"
260+ for node , node_data in self .nodes .items ():
261+ for conn in node_data .connections :
262+ dot_data += f"{ node } -- { conn } ; "
263+ dot_data += "}"
264+ return dot_data
265+
266+ def start (self , wait = True ):
202267 self .dump_yaml ()
203268 self .dump_dot ()
204269
205270 for k , node in self .nodes .items ():
206271 node .start ()
207272
273+ if wait :
274+ wait_for (self .validate_all_node_routes , delay = 6 , num_sec = 30 )
275+ #for name, node in self.nodes.items():
276+ # wait_for(lambda: node.validate_routes)
277+
208278 def stop (self ):
209279 for k , node in self .nodes .items ():
210280 node .stop ()
211281 print ("all killed" )
212282
213283 @staticmethod
214284 def load_topology_from_file (filename ):
215- data = yaml .safe_load (filename )
285+ with open (filename ) as f :
286+ data = yaml .safe_load (f )
287+
216288 topology = Topology ()
217289 for node_name , definition in data ["nodes" ].items ():
218290 node = Node .create_from_config (definition )
@@ -222,3 +294,36 @@ def load_topology_from_file(filename):
222294
223295 def find_controller (self ):
224296 return list (filter (lambda o : o .controller , self .nodes .values ()))
297+
298+ def ping (self , count = 10 , socket_path = None ):
299+ results = {}
300+ for _ , node in self .nodes .items ():
301+ results [node .name ] = node .ping (count )
302+ return results
303+
304+ @staticmethod
305+ def validate_ping_results (results , threshold = 0.1 ):
306+ valid = True
307+ for node in results :
308+ print (f"Asserting node { node } was under { threshold } threshold" )
309+ print (f" { results [node ]} " )
310+ if results [node ] == "Failed" or float (results [node ]) > float (threshold ):
311+ valid = False
312+ return valid
313+
314+ @staticmethod
315+ def compare_dot (dot1 , dot2 ):
316+ try :
317+ ds1 = read_and_parse_dot (dot1 )
318+ ds2 = read_and_parse_dot (dot2 )
319+ if ds1 != ds2 :
320+ print (f"MATCH FAIL" )
321+ print (ds1 )
322+ print (ds2 )
323+ return False
324+ return True
325+ except ParseException :
326+ return False
327+
328+ def validate_all_node_routes (self ):
329+ return all (node .validate_routes () for _ , node in self .nodes .items ())
0 commit comments