1- from prometheus_client .parser import text_string_to_metric_families
2- from urllib .parse import urlparse
3- import requests
41import atexit
2+ import json
53import os
64import random
75import signal
86import subprocess
97import time
108import uuid
119from collections import defaultdict
10+ from test .perf import debugger
1211from test .perf import ping
13- from test .perf .utils import random_port , net_check
14- from test .perf .utils import read_and_parse_metrics
1512from test .perf .utils import Conn
13+ from test .perf .utils import net_check
14+ from test .perf .utils import random_port
15+ from test .perf .utils import read_and_parse_metrics
16+ from urllib .parse import quote
17+ from urllib .parse import urlparse
1618
1719import attr
20+ import requests
1821import yaml
19- from pyparsing import ParseException
22+ from prometheus_client . parser import text_string_to_metric_families
2023from wait_for import wait_for
2124
2225STANDARD = 0
23- PING = 1
26+ DIAG = 1
2427
2528procs = {}
2629
@@ -45,6 +48,7 @@ class Node:
4548 data_path = attr .ib (default = None )
4649 topology = attr .ib (init = False , default = None )
4750 uuid = attr .ib (init = False , factory = uuid .uuid4 )
51+ active = attr .ib (init = False )
4852
4953 node_type = STANDARD
5054
@@ -97,21 +101,24 @@ def start(self, wait_for_ports=True):
97101
98102 if wait_for_ports :
99103 self .wait_for_ports ()
104+ self .active = True
100105
101106 def wait_for_ports (self ):
102- print ("waiting for nodes ports" + self .name )
107+ print ("waiting for nodes ports " + self .name )
108+ print (self .port , self .hostname )
103109 wait_for (net_check , func_args = [self .port , self .hostname , True ], num_sec = 10 )
104110 if self .stats_enable :
105111 wait_for (net_check , func_args = [self .stats_port , self .hostname , True ], num_sec = 10 )
106112
107113 def stop (self ):
108114 print (f"{ time .time ()} killing { self .name } ({ self .uuid } )" )
109115 try :
110- os .killpg (os .getpgid (procs [self .uuid ].pid ), signal .SIGTERM )
116+ os .killpg (os .getpgid (procs [self .uuid ].pid ), signal .SIGKILL ) # TODO NICE FOR DEBUGGER
111117 except ProcessLookupError :
112- print ("Couldn't kill the process {procs[self.uuid].pid}" )
118+ print (f "Couldn't kill the process { procs [self .uuid ].pid } " )
113119 procs [self .uuid ].wait ()
114120 print (f"Service was kill { procs [self .uuid ].returncode } " )
121+ self .active = False
115122
116123 @property
117124 def hostname (self ):
@@ -123,17 +130,19 @@ def port(self):
123130
124131 def get_metrics (self ):
125132 stats = requests .get (f"http://{ self .hostname } :{ self .stats_port } /metrics" )
126- metrics = {
127- metric .name : metric
128- for metric in text_string_to_metric_families (stats .text )
129- }
133+ metrics = {metric .name : metric for metric in text_string_to_metric_families (stats .text )}
130134 return metrics
131135
132136 def get_routes (self ):
133- routes = self .get_metrics ()['routing_table_info' ].samples [0 ].labels ['edges' ]
134- return read_and_parse_metrics (routes )
137+ routes = self .get_metrics ()["routing_table_info" ].samples [0 ].labels ["edges" ]
138+ if routes == "()" :
139+ return set ()
140+ else :
141+ return read_and_parse_metrics (routes )
135142
136143 def validate_routes (self ):
144+ if not self .active :
145+ raise Exception ("Can't get routes from a stopped node" )
137146 print (f"****====TRYING COMPARE { self .name } " )
138147 node_routes = self .get_routes ()
139148 control_routes = self .topology .generate_routes ()
@@ -144,11 +153,14 @@ def validate_routes(self):
144153
145154 def ping (self , count , peer = None , node_ping_name = "ping_node" ):
146155
156+ if self .topology .diag_node :
157+ return self .topology .diag_node .ping (count = count , recipient = self .name )
158+
147159 if not peer :
148160 peer = self .topology .find_controller ()[0 ]
149161
150162 if node_ping_name not in self .topology .nodes :
151- self .topology .add_node (PingNode (name = node_ping_name ))
163+ self .topology .add_node (DiagNode (name = node_ping_name ))
152164
153165 if peer .name not in self .topology .nodes [node_ping_name ].connections :
154166 self .topology .nodes [node_ping_name ].connections .append (peer .name )
@@ -190,31 +202,83 @@ def ping(self, count, peer=None, node_ping_name="ping_node"):
190202 return duration / count
191203
192204
193- class PingNode (Node ):
194- node_type = PING
205+ @attr .s
206+ class DiagNode (Node ):
207+ api_address = attr .ib (default = "0.0.0.0" )
208+ api_port = attr .ib (default = 8080 )
209+ node_type = DIAG
210+
211+ def _construct_run_command (self ):
212+ starter = [
213+ "python" ,
214+ debugger .__file__ ,
215+ "--listen" ,
216+ self .listen ,
217+ "--data-path" ,
218+ self .data_path ,
219+ "--api-address" ,
220+ self .api_address ,
221+ "--api-port" ,
222+ str (self .api_port ),
223+ ]
224+ print (starter )
225+ return starter
195226
196- def start (self , * args ):
197- return
198227
199- def stop (self ):
200- return
228+ def wait_for_ports (self ):
229+ print (f"waiting for { self .api_port } , { self .api_address } " )
230+ wait_for (net_check , func_args = [self .api_port , self .api_address , True ])
231+ super ().wait_for_ports ()
201232
202233 def validate_routes (self ):
203- return True
234+ print (f"****====TRYING COMPARE { self .name } " )
235+ node_routes = self .get_routes ()
236+ control_routes = self .topology .generate_routes ()
237+ if node_routes and control_routes :
238+ return self .topology .compare_routes (node_routes , control_routes )
239+ else :
240+ return False
204241
205- def ping (self , * args ):
206- raise NotImplementedError
242+ def get_routes (self ):
243+ route_data = json .loads (
244+ requests .get (f"http://{ self .api_address } :{ self .api_port } /connections" ).text
245+ )
246+ routes = set ()
247+ for route in route_data :
248+ routes .add (Conn (route [0 ], route [1 ], route [2 ]))
249+ return routes
207250
208- def create_from_config (config ):
209- raise NotImplementedError
251+ def ping (self , count = 5 , recipient = "controller" ):
252+ output = requests .get (
253+ f"http://{ self .api_address } :{ self .api_port } /ping?count={ count } &recipient={ recipient } "
254+ ).text
210255
211- def _construct_run_command (self ):
256+ if "Failed" in output :
257+ return "Failed"
258+ else :
259+ times = json .loads (output )
260+ return sum (times ) / len (times )
261+
262+ def add_peer (self , peer ):
263+ coded_peer = quote (peer .listen )
264+ requests .get (f"http://{ self .api_address } :{ self .api_port } /add_peer?peer={ coded_peer } " )
265+
266+ def create_from_config (config ):
212267 raise NotImplementedError
213268
214269
215270@attr .s
216271class Topology :
272+ use_diag_node = attr .ib (default = False )
217273 nodes = attr .ib (init = False , factory = dict )
274+ diag_node = attr .ib (init = False , default = None )
275+
276+ def __attrs_post_init__ (self ):
277+ if self .use_diag_node :
278+ self .diag_node = DiagNode (
279+ name = "diag_node" , listen = f"receptor://127.0.0.1:{ random_port ()} "
280+ )
281+ self .add_node (self .diag_node )
218282
219283 def add_node (self , node ):
220284 if node .name not in self .nodes :
@@ -319,9 +383,7 @@ def generate_routes(self):
319383 routes = set ()
320384 for node , node_data in self .nodes .items ():
321385 for conn in node_data .connections :
322- routes .add (
323- Conn (node_data .name , conn , 1 )
324- )
386+ routes .add (Conn (node_data .name , conn , 1 ))
325387 return routes
326388
327389 def start (self , wait = True ):
@@ -334,6 +396,9 @@ def start(self, wait=True):
334396 print ("Waiting for nodes" )
335397 for _ , node in self .nodes .items ():
336398 node .wait_for_ports ()
399+ if self .use_diag_node :
400+ self .diag_node .add_peer (self .find_controller ()[0 ])
401+ self .nodes [self .diag_node .name ].connections .append (self .find_controller ()[0 ].name )
337402 wait_for (self .validate_all_node_routes , delay = 6 , num_sec = 30 )
338403 # for name, node in self.nodes.items():
339404 # wait_for(lambda: node.validate_routes)
@@ -344,11 +409,11 @@ def stop(self):
344409 print ("all killed" )
345410
346411 @staticmethod
347- def load_topology_from_file (filename ):
412+ def load_topology_from_file (filename , use_diag_node = False ):
348413 with open (filename ) as f :
349414 data = yaml .safe_load (f )
350415
351- topology = Topology ()
416+ topology = Topology (use_diag_node = use_diag_node )
352417 for node_name , definition in data ["nodes" ].items ():
353418 node = Node .create_from_config (definition )
354419 topology .add_node (node )
@@ -390,6 +455,4 @@ def compare_routes(route1, route2):
390455 return True
391456
392457 def validate_all_node_routes (self ):
393- return all (
394- node .validate_routes () for _ , node in self .nodes .items () if node .node_type == STANDARD
395- )
458+ return all (node .validate_routes () for node in self .nodes .values () if node .active )
0 commit comments