Skip to content

Commit 6a7d82b

Browse files
Merge pull request #90 from mahlaghakafili/expose-prometheous-logs
Expose prometheous logs
2 parents 858f76c + 0b51ea2 commit 6a7d82b

4 files changed

Lines changed: 4949 additions & 6 deletions

File tree

archipy/configs/config_template.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ class GrpcConfig(BaseModel):
193193
SERVE_PORT: int = Field(default=8100, description="Port to serve gRPC on")
194194
SERVE_HOST: str = Field(default="[::]", description="Host to serve gRPC on") # IPv6 equivalent of 0.0.0.0
195195
THREAD_WORKER_COUNT: int | None = Field(default=None, description="Number of worker threads")
196+
MAX_CONCURRENT_RPCS: int | None = Field(default=None, description="Maximum number of concurrent requests")
196197
THREAD_PER_CPU_CORE: int = Field(
197198
default=40,
198199
description="Threads per CPU core",

archipy/helpers/decorators/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
from .cache import ttl_cache_decorator
21
from .deprecation_exception import class_deprecation_error, method_deprecation_error
32
from .deprecation_warnings import class_deprecation_warning, method_deprecation_warning
43
from .retry import retry_decorator

archipy/helpers/utils/app_utils.py

Lines changed: 161 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,12 @@
22

33
import logging
44
from collections.abc import Awaitable, Callable
5+
from concurrent import futures
56
from contextlib import AbstractAsyncContextManager
67
from http import HTTPStatus
78
from typing import Any, cast
89

9-
from fastapi import FastAPI, Request, Response
10-
from fastapi.exceptions import RequestValidationError
11-
from fastapi.responses import JSONResponse
12-
from fastapi.routing import APIRoute
1310
from pydantic import ValidationError
14-
from starlette.middleware.cors import CORSMiddleware
1511

1612
from archipy.configs.base_config import BaseConfig
1713
from archipy.helpers.utils.base_utils import BaseUtils
@@ -22,6 +18,25 @@
2218
UnknownError,
2319
)
2420

21+
try:
22+
import grpc
23+
from grpc.experimental.aio import server # type: ignore[import-not-found]
24+
25+
GRPC_APP = True
26+
except ImportError:
27+
GRPC_APP = False
28+
29+
try:
30+
from fastapi import FastAPI, Request, Response
31+
from fastapi.exceptions import RequestValidationError
32+
from fastapi.responses import JSONResponse
33+
from fastapi.routing import APIRoute
34+
from starlette.middleware.cors import CORSMiddleware
35+
36+
FASTAPI_APP = True
37+
except ImportError:
38+
FASTAPI_APP = False
39+
2540

2641
class FastAPIExceptionHandler:
2742
"""Handles various types of errors and converts them to appropriate JSON responses."""
@@ -199,6 +214,94 @@ def setup_exception_handlers(app: FastAPI) -> None:
199214
app.add_exception_handler(Exception, generic_handler)
200215

201216

217+
class AsyncGrpcAPIUtils:
218+
"""async grpc api utilities."""
219+
220+
@staticmethod
221+
def setup_trace_interceptor(config: BaseConfig, interceptors: list) -> None:
222+
"""Configures trace interceptor for gRPC server if tracing is enabled.
223+
224+
Args:
225+
config (BaseConfig): The configuration object containing tracing settings.
226+
interceptors (List): List of gRPC interceptors to add the trace interceptor to.
227+
"""
228+
if not config.ELASTIC_APM.IS_ENABLED and not config.SENTRY.IS_ENABLED:
229+
return
230+
231+
try:
232+
from archipy.helpers.interceptors.grpc.trace.server_interceptor import AsyncGrpcServerTraceInterceptor
233+
234+
interceptors.append(AsyncGrpcServerTraceInterceptor())
235+
except Exception:
236+
logging.exception("Failed to initialize Trace Interceptor")
237+
238+
@staticmethod
239+
def setup_metric_interceptor(config: BaseConfig, interceptors: list) -> None:
240+
"""Configures metric interceptor for gRPC server if Prometheus is enabled.
241+
242+
Args:
243+
config (BaseConfig): The configuration object containing Prometheus settings.
244+
interceptors (List): List of gRPC interceptors to add the metric interceptor to.
245+
"""
246+
if not config.PROMETHEUS.IS_ENABLED:
247+
return
248+
249+
try:
250+
from prometheus_client import start_http_server
251+
252+
from archipy.helpers.interceptors.grpc.metric.server_interceptor import AsyncGrpcServerMetricInterceptor
253+
254+
start_http_server(config.PROMETHEUS.SERVER_PORT)
255+
interceptors.append(AsyncGrpcServerMetricInterceptor())
256+
257+
except Exception:
258+
logging.exception("Failed to initialize Metric Interceptor")
259+
260+
261+
class GrpcAPIUtils:
262+
"""grpc api utilities."""
263+
264+
@staticmethod
265+
def setup_trace_interceptor(config: BaseConfig, interceptors: list) -> None:
266+
"""Configures trace interceptor for gRPC server if tracing is enabled.
267+
268+
Args:
269+
config (BaseConfig): The configuration object containing tracing settings.
270+
interceptors (List): List of gRPC interceptors to add the trace interceptor to.
271+
"""
272+
if not config.ELASTIC_APM.IS_ENABLED and not config.SENTRY.IS_ENABLED:
273+
return
274+
275+
try:
276+
from archipy.helpers.interceptors.grpc.trace.server_interceptor import GrpcServerTraceInterceptor
277+
278+
interceptors.append(GrpcServerTraceInterceptor())
279+
except Exception:
280+
logging.exception("Failed to initialize Trace Interceptor")
281+
282+
@staticmethod
283+
def setup_metric_interceptor(config: BaseConfig, interceptors: list) -> None:
284+
"""Configures metric interceptor for gRPC server if Prometheus is enabled.
285+
286+
Args:
287+
config (BaseConfig): The configuration object containing Prometheus settings.
288+
interceptors (List): List of gRPC interceptors to add the metric interceptor to.
289+
"""
290+
if not config.PROMETHEUS.IS_ENABLED:
291+
return
292+
293+
try:
294+
from prometheus_client import start_http_server
295+
296+
from archipy.helpers.interceptors.grpc.metric.server_interceptor import GrpcServerMetricInterceptor
297+
298+
start_http_server(config.PROMETHEUS.SERVER_PORT)
299+
interceptors.append(GrpcServerMetricInterceptor())
300+
301+
except Exception:
302+
logging.exception("Failed to initialize Metric Interceptor")
303+
304+
202305
class AppUtils:
203306
"""Utility class for creating and configuring FastAPI applications."""
204307

@@ -249,3 +352,56 @@ def create_fastapi_app(
249352
FastAPIUtils.setup_exception_handlers(app)
250353

251354
return app
355+
356+
@classmethod
357+
def create_async_grpc_app(
358+
cls,
359+
config: BaseConfig,
360+
customized_interceptors: set[Any] | None = None,
361+
compression: grpc.Compression | None = None,
362+
) -> server:
363+
"""Create and configure an async gRPC application."""
364+
from archipy.helpers.interceptors.grpc.exception import AsyncGrpcServerExceptionInterceptor
365+
366+
async_interceptors = [AsyncGrpcServerExceptionInterceptor()]
367+
368+
AsyncGrpcAPIUtils.setup_trace_interceptor(config, async_interceptors)
369+
AsyncGrpcAPIUtils.setup_metric_interceptor(config, async_interceptors)
370+
371+
app = server(
372+
futures.ThreadPoolExecutor(max_workers=config.GRPC.THREAD_WORKER_COUNT),
373+
interceptors=async_interceptors,
374+
compression=compression,
375+
options=config.GRPC.SERVER_OPTIONS_CONFIG_LIST,
376+
maximum_concurrent_rpcs=config.GRPC.MAX_CONCURRENT_RPCS,
377+
)
378+
if customized_interceptors:
379+
async_interceptors.extend(customized_interceptors)
380+
return app
381+
382+
@classmethod
383+
def create_grpc_app(
384+
cls,
385+
config: BaseConfig,
386+
customized_interceptors: set[Any] | None = None,
387+
compression: grpc.Compression | None = None,
388+
) -> grpc.Server:
389+
"""Create and configure an async gRPC application."""
390+
from archipy.helpers.interceptors.grpc.exception import GrpcServerExceptionInterceptor
391+
392+
interceptors = [GrpcServerExceptionInterceptor()]
393+
394+
GrpcAPIUtils.setup_trace_interceptor(config, interceptors)
395+
GrpcAPIUtils.setup_metric_interceptor(config, interceptors)
396+
if customized_interceptors:
397+
interceptors.extend(customized_interceptors)
398+
399+
app = grpc.server(
400+
futures.ThreadPoolExecutor(max_workers=config.GRPC.THREAD_WORKER_COUNT),
401+
interceptors=interceptors, # type: ignore
402+
compression=compression,
403+
options=config.GRPC.SERVER_OPTIONS_CONFIG_LIST,
404+
maximum_concurrent_rpcs=config.GRPC.MAX_CONCURRENT_RPCS,
405+
)
406+
407+
return app

0 commit comments

Comments
 (0)