Skip to content

Commit 13ec11a

Browse files
feat: add ACTIVE_REQUESTS metrics and Prometheus server deduplication
- Add ACTIVE_REQUESTS gauge to gRPC sync/async interceptors - Create _is_prometheus_server_running() helper in app_utils - Prevent duplicate Prometheus server starts across FastAPI/gRPC - Unify metric interceptor tests with scenario outlines - Support FastAPI, gRPC, and AsyncgRPC in parameterized tests - Remove duplicate test scenarios
1 parent 05e7dc5 commit 13ec11a

5 files changed

Lines changed: 702 additions & 443 deletions

File tree

archipy/helpers/interceptors/grpc/metric/server_interceptor.py

Lines changed: 58 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@ class GrpcServerMetricInterceptor(BaseGrpcServerInterceptor):
1818
"""A gRPC server interceptor for collecting and reporting metrics using Prometheus.
1919
2020
This interceptor measures the response time of gRPC methods and records it in a Prometheus histogram.
21+
It also tracks the number of active requests using a Prometheus gauge.
2122
It also captures errors and logs them for monitoring purposes.
2223
"""
2324

24-
from prometheus_client import Histogram
25+
from prometheus_client import Gauge, Histogram
2526

2627
"Buckets for measuring response times between 0 and 1 second."
2728
ZERO_TO_ONE_SECONDS_BUCKETS: ClassVar[list[float]] = [i / 1000 for i in range(0, 1000, 5)]
@@ -39,20 +40,27 @@ class GrpcServerMetricInterceptor(BaseGrpcServerInterceptor):
3940

4041
"Prometheus histogram for tracking response times of gRPC methods."
4142
RESPONSE_TIME_SECONDS = Histogram(
42-
"response_time_seconds",
43-
"Time spent processing request",
43+
"grpc_response_time_seconds",
44+
"Time spent processing gRPC request",
4445
labelnames=("package", "service", "method", "status_code"),
4546
buckets=TOTAL_BUCKETS,
4647
)
4748

49+
"Prometheus gauge for tracking active gRPC requests."
50+
ACTIVE_REQUESTS = Gauge(
51+
"grpc_active_requests",
52+
"Number of active gRPC requests",
53+
labelnames=("package", "service", "method"),
54+
)
55+
4856
def intercept(
4957
self,
5058
method: Callable,
5159
request: object,
5260
context: grpc.ServicerContext,
5361
method_name_model: MethodName,
5462
) -> object:
55-
"""Intercepts a gRPC server call to measure response time and capture errors.
63+
"""Intercepts a gRPC server call to measure response time and track active requests.
5664
5765
Args:
5866
method (Callable): The gRPC method being intercepted.
@@ -66,47 +74,53 @@ def intercept(
6674
Raises:
6775
Exception: If an exception occurs during the method execution, it is captured and logged.
6876
"""
69-
try:
70-
# Skip metric collection if Prometheus is disabled
71-
if not BaseConfig.global_config().PROMETHEUS.IS_ENABLED:
72-
return method(request, context)
77+
if not BaseConfig.global_config().PROMETHEUS.IS_ENABLED:
78+
return method(request, context)
79+
80+
package = method_name_model.package
81+
service = method_name_model.service
82+
method_name = method_name_model.method
7383

74-
# Measure the start time
75-
start_time = time.time()
84+
self.ACTIVE_REQUESTS.labels(package=package, service=service, method=method_name).inc()
7685

77-
# Execute the gRPC method
86+
start_time = time.time()
87+
status_code = "OK"
88+
89+
try:
7890
result = method(request, context)
7991

80-
# Record the response time in the Prometheus histogram
81-
status_code = "OK"
8292
if hasattr(context, "code") and callable(context.code):
8393
code_method = cast("Callable[[], Any]", context.code)
8494
code_obj = code_method()
8595
if code_obj is not None:
8696
code_name = getattr(code_obj, "name", None)
8797
if code_name is not None:
8898
status_code = code_name
89-
self.RESPONSE_TIME_SECONDS.labels(
90-
package=method_name_model.package,
91-
service=method_name_model.service,
92-
method=method_name_model.method,
93-
status_code=status_code,
94-
).observe(time.time() - start_time)
9599
except Exception as exception:
96100
BaseUtils.capture_exception(exception)
97101
raise
98102
else:
99103
return result
104+
finally:
105+
duration = time.time() - start_time
106+
self.RESPONSE_TIME_SECONDS.labels(
107+
package=package,
108+
service=service,
109+
method=method_name,
110+
status_code=status_code,
111+
).observe(duration)
112+
self.ACTIVE_REQUESTS.labels(package=package, service=service, method=method_name).dec()
100113

101114

102115
class AsyncGrpcServerMetricInterceptor(BaseAsyncGrpcServerInterceptor):
103116
"""An async gRPC server interceptor for collecting and reporting metrics using Prometheus.
104117
105118
This interceptor measures the response time of async gRPC methods and records it in a Prometheus histogram.
119+
It also tracks the number of active requests using a Prometheus gauge.
106120
It also captures errors and logs them for monitoring purposes.
107121
"""
108122

109-
from prometheus_client import Histogram
123+
from prometheus_client import Gauge, Histogram
110124

111125
"Buckets for measuring response times between 0 and 1 second."
112126
ZERO_TO_ONE_SECONDS_BUCKETS: ClassVar[list[float]] = [i / 1000 for i in range(0, 1000, 5)]
@@ -124,20 +138,27 @@ class AsyncGrpcServerMetricInterceptor(BaseAsyncGrpcServerInterceptor):
124138

125139
"Prometheus histogram for tracking response times of async gRPC methods."
126140
RESPONSE_TIME_SECONDS = Histogram(
127-
"grpc_async_server_response_time_seconds",
141+
"grpc_async_response_time_seconds",
128142
"Time spent processing async gRPC request",
129143
labelnames=("package", "service", "method", "status_code"),
130144
buckets=TOTAL_BUCKETS,
131145
)
132146

147+
"Prometheus gauge for tracking active async gRPC requests."
148+
ACTIVE_REQUESTS = Gauge(
149+
"grpc_async_active_requests",
150+
"Number of active async gRPC requests",
151+
labelnames=("package", "service", "method"),
152+
)
153+
133154
async def intercept(
134155
self,
135156
method: Callable,
136157
request: object,
137158
context: grpc.aio.ServicerContext,
138159
method_name_model: MethodName,
139160
) -> object:
140-
"""Intercepts an async gRPC server call to measure response time and capture errors.
161+
"""Intercepts an async gRPC server call to measure response time and track active requests.
141162
142163
Args:
143164
method (Callable): The async gRPC method being intercepted.
@@ -151,24 +172,25 @@ async def intercept(
151172
Raises:
152173
Exception: If an exception occurs during the method execution, it is captured and logged.
153174
"""
154-
try:
155-
# Skip metric collection if Prometheus is disabled
156-
if not BaseConfig.global_config().PROMETHEUS.IS_ENABLED:
157-
return await method(request, context)
175+
if not BaseConfig.global_config().PROMETHEUS.IS_ENABLED:
176+
return await method(request, context)
177+
178+
package = method_name_model.package
179+
service = method_name_model.service
180+
method_name = method_name_model.method
158181

159-
# Measure the start time using asyncio event loop time for better precision
160-
start_time = asyncio.get_event_loop().time()
161-
status_code = "OK"
182+
self.ACTIVE_REQUESTS.labels(package=package, service=service, method=method_name).inc()
162183

184+
start_time = asyncio.get_event_loop().time()
185+
status_code = "OK"
186+
187+
try:
163188
try:
164-
# Execute the async gRPC method
165189
result = await method(request, context)
166190

167-
# Get the actual status code from context
168191
if hasattr(context, "code") and context.code():
169192
status_code = context.code().name
170193
except Exception as e:
171-
# Determine error status code
172194
if isinstance(e, grpc.aio.AioRpcError):
173195
code_obj = e.code()
174196
if code_obj is not None:
@@ -188,14 +210,14 @@ async def intercept(
188210
else:
189211
return result
190212
finally:
191-
# Record the response time in the Prometheus histogram
192213
duration = asyncio.get_event_loop().time() - start_time
193214
self.RESPONSE_TIME_SECONDS.labels(
194-
package=method_name_model.package,
195-
service=method_name_model.service,
196-
method=method_name_model.method,
215+
package=package,
216+
service=service,
217+
method=method_name,
197218
status_code=status_code,
198219
).observe(duration)
220+
self.ACTIVE_REQUESTS.labels(package=package, service=service, method=method_name).dec()
199221

200222
except Exception as exception:
201223
BaseUtils.capture_exception(exception)

archipy/helpers/utils/app_utils.py

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import logging
4+
import socket
45
from collections.abc import Callable
56
from concurrent import futures
67
from contextlib import AbstractAsyncContextManager
@@ -52,6 +53,21 @@
5253
FASTAPI_APP = False
5354

5455

56+
def _is_prometheus_server_running(port: int) -> bool:
57+
"""Check if Prometheus server is already running on the specified port.
58+
59+
Args:
60+
port (int): The port number to check.
61+
62+
Returns:
63+
bool: True if server is running, False otherwise.
64+
"""
65+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
66+
result = sock.connect_ex(("localhost", port))
67+
sock.close()
68+
return result == 0
69+
70+
5571
class FastAPIExceptionHandler:
5672
"""Handles various types of errors and converts them to appropriate JSON responses."""
5773

@@ -209,18 +225,11 @@ def setup_metric_interceptor(app: FastAPI, config: BaseConfig) -> None:
209225
return
210226

211227
try:
212-
import socket
213-
214228
from prometheus_client import start_http_server
215229

216230
from archipy.helpers.interceptors.fastapi.metric.interceptor import FastAPIMetricInterceptor
217231

218-
# Conditionally start Prometheus server (check if already running)
219-
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
220-
result = sock.connect_ex(("localhost", config.PROMETHEUS.SERVER_PORT))
221-
sock.close()
222-
223-
if result != 0:
232+
if not _is_prometheus_server_running(config.PROMETHEUS.SERVER_PORT):
224233
start_http_server(config.PROMETHEUS.SERVER_PORT)
225234

226235
app.add_middleware(FastAPIMetricInterceptor) # type: ignore[arg-type]
@@ -312,7 +321,9 @@ def setup_metric_interceptor(config: BaseConfig, interceptors: list) -> None:
312321

313322
from archipy.helpers.interceptors.grpc.metric.server_interceptor import AsyncGrpcServerMetricInterceptor
314323

315-
start_http_server(config.PROMETHEUS.SERVER_PORT)
324+
if not _is_prometheus_server_running(config.PROMETHEUS.SERVER_PORT):
325+
start_http_server(config.PROMETHEUS.SERVER_PORT)
326+
316327
interceptors.append(AsyncGrpcServerMetricInterceptor())
317328

318329
except Exception:
@@ -356,7 +367,9 @@ def setup_metric_interceptor(config: BaseConfig, interceptors: list) -> None:
356367

357368
from archipy.helpers.interceptors.grpc.metric.server_interceptor import GrpcServerMetricInterceptor
358369

359-
start_http_server(config.PROMETHEUS.SERVER_PORT)
370+
if not _is_prometheus_server_running(config.PROMETHEUS.SERVER_PORT):
371+
start_http_server(config.PROMETHEUS.SERVER_PORT)
372+
360373
interceptors.append(GrpcServerMetricInterceptor())
361374

362375
except Exception:

features/fastapi_metric_interceptor.feature renamed to features/metric_interceptor.feature

Lines changed: 55 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,67 @@
1-
Feature: FastAPI Metric Interceptor
1+
Feature: Metric Interceptor
22

3-
Scenario: Interceptor is added when Prometheus is enabled
4-
Given a FastAPI app with Prometheus enabled
5-
When the metric interceptor is setup
6-
Then the FastAPI app should have the metric interceptor
3+
Scenario Outline: Interceptor is added when Prometheus is enabled
4+
Given a <framework> app with Prometheus enabled
5+
When the <framework> metric interceptor is setup
6+
Then the <framework> app should have the metric interceptor
77

8-
Scenario: Interceptor is skipped when Prometheus is disabled
9-
Given a FastAPI app with Prometheus disabled
10-
When the metric interceptor is setup
11-
Then the FastAPI app should not have the metric interceptor
8+
Examples:
9+
| framework |
10+
| FastAPI |
11+
| gRPC |
12+
| AsyncgRPC |
1213

13-
Scenario: Response time is recorded for successful requests
14-
Given a FastAPI app with Prometheus enabled and metric interceptor
15-
When a GET request is made to "/test" endpoint
16-
Then the response time metric should be recorded
17-
And the metric should have method label "GET"
18-
And the metric should have status_code label "200"
19-
And the metric should have path_template label "/test"
14+
Scenario Outline: Interceptor is skipped when Prometheus is disabled
15+
Given a <framework> app with Prometheus disabled
16+
When the <framework> metric interceptor is setup
17+
Then the <framework> app should not have the metric interceptor
18+
19+
Examples:
20+
| framework |
21+
| FastAPI |
22+
| gRPC |
23+
| AsyncgRPC |
24+
25+
Scenario Outline: Response time is recorded for successful requests
26+
Given a <framework> app with Prometheus enabled and metric interceptor
27+
When a <framework> request is made
28+
Then the <framework> response time metric should be recorded
29+
And the <framework> metric should have correct labels
30+
31+
Examples:
32+
| framework |
33+
| FastAPI |
34+
| gRPC |
35+
| AsyncgRPC |
36+
37+
Scenario Outline: Active requests gauge increments and decrements correctly
38+
Given a <framework> app with Prometheus enabled and metric interceptor
39+
When a <framework> request is made
40+
Then the <framework> active requests gauge should increment before processing
41+
And the <framework> active requests gauge should decrement after processing
42+
43+
Examples:
44+
| framework |
45+
| FastAPI |
46+
| gRPC |
47+
| AsyncgRPC |
48+
49+
Scenario Outline: Prometheus server starts only once
50+
Given Prometheus is enabled for <framework>
51+
When multiple <framework> apps are created
52+
Then the Prometheus server should only start once
53+
54+
Examples:
55+
| framework |
56+
| FastAPI |
57+
| gRPC |
58+
| AsyncgRPC |
2059

2160
Scenario: Response time is recorded for failed requests
2261
Given a FastAPI app with Prometheus enabled and metric interceptor
2362
When a GET request is made to an endpoint that raises an error
2463
Then the response time metric should be recorded with status code 500
2564

26-
Scenario: Active requests gauge increments and decrements correctly
27-
Given a FastAPI app with Prometheus enabled and metric interceptor
28-
When a GET request is made to "/test" endpoint
29-
Then the active requests gauge should increment before processing
30-
And the active requests gauge should decrement after processing
31-
32-
Scenario: Prometheus server starts only once
33-
Given Prometheus is enabled
34-
When multiple FastAPI apps are created
35-
Then the Prometheus server should only start once
36-
3765
Scenario Outline: Metrics include correct labels for parameterized routes
3866
Given a FastAPI app with Prometheus enabled and metric interceptor with routes
3967
When a <method> request is made to "<actual_path>" with route pattern "<route_pattern>"

0 commit comments

Comments
 (0)