Skip to content

Commit 8b3cfb3

Browse files
authored
Merge pull request #43 from matburt/metrics_service
Adding a basic metrics service and some metrics to track
2 parents 54bada3 + f0768e7 commit 8b3cfb3

10 files changed

Lines changed: 202 additions & 113 deletions

File tree

Pipfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ flake8 = "*"
1212

1313
[packages]
1414
python-dateutil = "*"
15+
prometheus-client = "*"
1516

1617
[requires]
1718
python_version = "3.6"

Pipfile.lock

Lines changed: 140 additions & 113 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

receptor/config.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,21 @@ def __init__(self, args=None):
137137
value_type='bool',
138138
hint='Disable the server function and only connect to configured peers',
139139
)
140+
self.add_config_option(
141+
section='node',
142+
key='stats_enable',
143+
default_value=None,
144+
set_value=True,
145+
value_type='bool',
146+
hint="Enable Prometheus style stats port",
147+
)
148+
self.add_config_option(
149+
section='node',
150+
key='stats_port',
151+
default_value=8889,
152+
value_type='int',
153+
hint='Port to listen for requests to show stats',
154+
)
140155
self.add_config_option(
141156
section='node',
142157
key='ping_interval',
@@ -192,6 +207,21 @@ def __init__(self, args=None):
192207
value_type='path',
193208
hint='Path to control socket for controller commands.',
194209
)
210+
self.add_config_option(
211+
section='controller',
212+
key='stats_enable',
213+
default_value=None,
214+
set_value=True,
215+
value_type='bool',
216+
hint="Enable Prometheus style stats port",
217+
)
218+
self.add_config_option(
219+
section='controller',
220+
key='stats_port',
221+
default_value=8889,
222+
value_type='int',
223+
hint='Port to listen for requests to show stats',
224+
)
195225
self.add_config_option(
196226
section='ping',
197227
key='count',

receptor/entrypoints.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
import sys
55
import time
66

7+
from prometheus_client import start_http_server
8+
79
from .receptor import Receptor
810
from . import controller
911
from . import exceptions
@@ -15,12 +17,18 @@
1517
def run_as_controller(config):
1618
receptor = Receptor(config)
1719
logger.info(f'Starting up as node ID {receptor.node_id}')
20+
if config.controller_stats_enable:
21+
logger.info(f'Starting stats on port {config.controller_stats_port}')
22+
start_http_server(config.controller_stats_port)
1823
controller.mainloop(receptor, config.controller_socket_path)
1924

2025

2126
def run_as_node(config):
2227
receptor = Receptor(config)
2328
logger.info(f'Running as Receptor node with ID: {receptor.node_id}')
29+
if config.node_stats_enable:
30+
logger.info(f'Starting stats on port {config.node_stats_port}')
31+
start_http_server(config.node_stats_port)
2432
node.mainloop(receptor, config.node_ping_interval)
2533

2634

receptor/protocol.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
from .messages import envelope
1212
from .exceptions import ReceptorBufferError
13+
from .stats import connected_peers_guage
1314

1415
logger = logging.getLogger(__name__)
1516

@@ -74,9 +75,11 @@ def connection_made(self, transport):
7475
self.transport = transport
7576
self.greeted = False
7677
self.incoming_buffer = DataBuffer()
78+
connected_peers_guage.inc()
7779
self.loop.create_task(self.wait_greeting())
7880

7981
def connection_lost(self, exc):
82+
connected_peers_guage.dec()
8083
self.receptor.remove_connection(self)
8184

8285
def data_received(self, data):
@@ -171,10 +174,12 @@ def __init__(self, receptor, loop):
171174

172175
def connection_made(self, transport):
173176
self.transport = transport
177+
connected_peers_guage.inc()
174178
if self not in self.receptor.controller_connections:
175179
self.receptor.controller_connections.append(self)
176180

177181
def connection_lost(self, exc):
182+
connected_peers_guage.dec()
178183
if self in self.receptor.controller_connections:
179184
self.receptor.controller_connections.remove(self)
180185

receptor/receptor.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from .router import MeshRouter
1010
from .work import WorkManager
1111
from .messages import envelope, directive
12+
from .stats import messages_received_counter
1213
from . import exceptions
1314

1415
RECEPTOR_DIRECTIVE_NAMESPACE = 'receptor'
@@ -203,6 +204,7 @@ async def handle_message(self, msg):
203204
directive=self.handle_directive,
204205
response=self.handle_response,
205206
)
207+
messages_received_counter.inc()
206208
outer_env = envelope.OuterEnvelope(**msg)
207209
next_hop = self.router.next_hop(outer_env.recipient)
208210
if next_hop:

receptor/router.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from dateutil import parser
1010
from .messages import envelope
1111
from .exceptions import UnrouteableError, ReceptorBufferError
12+
from .stats import route_counter
1213

1314
logger = logging.getLogger(__name__)
1415

@@ -145,6 +146,7 @@ async def forward(self, outer_envelope, next_hop):
145146
outer_envelope.route_list.append(self.node_id)
146147
logger.debug(f'Forwarding frame {outer_envelope.frame_id} to {next_hop}')
147148
try:
149+
route_counter.inc()
148150
buffer_obj.push(outer_envelope.serialize().encode("utf-8"))
149151
except ReceptorBufferError as e:
150152
logger.exception("Receptor Buffer Write Error forwarding message to {}: {}".format(next_hop, e))

receptor/stats.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
from prometheus_client import Counter, Gauge
2+
3+
messages_received_counter = Counter("incoming_messages", "Messages received from Receptor Peers")
4+
connected_peers_guage = Gauge("connected_peers", "Number of active peer connections")
5+
work_counter = Counter("work_events", "A count of the number of work events that have been received")
6+
active_work_gauge = Gauge("active_work", "Amount of work currently being performed")
7+
route_counter = Counter("route_events", "A count of the number of messages that have been routed elsewhere in the mesh")

receptor/work.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44

55
from . import exceptions
66
from .messages import envelope
7+
from .stats import work_counter, active_work_gauge
8+
9+
710
logger = logging.getLogger(__name__)
811

912

@@ -27,13 +30,16 @@ def get_work(self):
2730
return self.active_work
2831

2932
def add_work(self, env):
33+
work_counter.inc()
34+
active_work_gauge.inc()
3035
self.active_work.append(dict(id=env.message_id,
3136
directive=env.directive,
3237
sender=env.sender))
3338

3439
def remove_work(self, env):
3540
for work in self.active_work:
3641
if env.message_id == work["id"]:
42+
active_work_gauge.dec()
3743
self.active_work.remove(work)
3844

3945
async def handle(self, inner_env):

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
long_description_content_type='text/markdown',
2020
python_requires=">=3.6",
2121
install_requires=[
22+
"prometheus_client==0.7.1",
2223
],
2324
zip_safe=False,
2425
entry_points={

0 commit comments

Comments
 (0)