Skip to content

Commit 5307838

Browse files
[Cherry-Pick] [BugFix] fix speculative gauge metrics in multi api server (#7082) (#7100)
* [BugFix] fix speculative gauge metrics in multi api server * [fix] fix test_metrics
1 parent c55f2e0 commit 5307838

5 files changed

Lines changed: 146 additions & 47 deletions

File tree

fastdeploy/__init__.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,11 @@
1515
"""
1616

1717
import os
18-
import uuid
1918

2019
# suppress warning log from paddlepaddle
2120
os.environ["GLOG_minloglevel"] = "2"
2221
# suppress log from aistudio
2322
os.environ["AISTUDIO_LOG"] = "critical"
24-
# set prometheus dir
25-
if os.getenv("PROMETHEUS_MULTIPROC_DIR", "") == "":
26-
prom_dir = f"/tmp/fd_prom_{str(uuid.uuid4())}"
27-
os.environ["PROMETHEUS_MULTIPROC_DIR"] = prom_dir
28-
if os.path.exists(prom_dir):
29-
os.rmdir(prom_dir)
30-
os.mkdir(prom_dir)
3123

3224
import typing
3325

fastdeploy/entrypoints/openai/multi_api_server.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,15 @@ def start_servers(
107107
env = os.environ.copy()
108108
env["FD_ENABLE_MULTI_API_SERVER"] = "1"
109109
env["FD_LOG_DIR"] = env.get("FD_LOG_DIR", "log") + f"/log_{i}"
110+
if "PROMETHEUS_MULTIPROC_DIR" in env:
111+
prom_dir = env.get("PROMETHEUS_MULTIPROC_DIR")
112+
prom_dir_i = os.path.join(os.path.dirname(prom_dir), os.path.basename(prom_dir) + f"_dp{i}")
113+
# Create the directory if it doesn't exist
114+
if not os.path.exists(prom_dir_i):
115+
os.makedirs(prom_dir_i, exist_ok=True)
116+
env["PROMETHEUS_MULTIPROC_DIR"] = prom_dir_i
117+
logger.info(f"Set PROMETHEUS_MULTIPROC_DIR for DP {i}: {prom_dir_i}")
118+
110119
cmd = [
111120
sys.executable,
112121
"-m",

fastdeploy/metrics/metrics.py

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ def collect(self):
6565
Metric: Prometheus Metric objects that are not excluded.
6666
"""
6767
for metric in self.base_registry.collect():
68-
if not any(name.startswith(metric.name) for name in self.exclude_names):
68+
if not any(metric.name.startswith(name) for name in self.exclude_names):
6969
yield metric
7070

7171

@@ -83,11 +83,15 @@ def get_filtered_metrics() -> str:
8383
multiprocess.MultiProcessCollector(base_registry)
8484

8585
filtered_registry = CollectorRegistry()
86-
# 注册一个新的colletor,过滤gauge指标
87-
filtered_registry.register(SimpleCollector(base_registry, EXCLUDE_LABELS))
86+
# 动态获取需要排除的 gauge 指标列表
87+
exclude_labels = main_process_metrics.get_excluded_metrics()
88+
# 注册一个新的collector,过滤gauge指标
89+
filtered_registry.register(SimpleCollector(base_registry, exclude_labels))
8890

8991
# 将gauge指标重新注册到filtered_registry中,从内存中读取
9092
main_process_metrics.re_register_gauge(filtered_registry)
93+
# 将speculative中的gauge指标也重新注册
94+
main_process_metrics.re_register_speculative_gauge(filtered_registry)
9195

9296
return generate_latest(filtered_registry).decode("utf-8")
9397

@@ -195,7 +199,7 @@ class MetricsManager:
195199
"type": Gauge,
196200
"name": "fastdeploy:num_requests_running",
197201
"description": "Number of requests currently running",
198-
"kwargs": {"multiprocess_mode": "sum"},
202+
"kwargs": {},
199203
},
200204
"num_requests_waiting": {
201205
"type": Gauge,
@@ -625,19 +629,22 @@ def __init__(self):
625629
# 在模块加载,指标注册先设置Prometheus环境变量
626630
setup_multiprocess_prometheus()
627631

628-
# 动态创建所有指标
632+
# 动态创建所有非 gauge 型指标
629633
for metric_name, config in self.METRICS.items():
630634
setattr(
631635
self,
632636
metric_name,
633637
config["type"](config["name"], config["description"], **config["kwargs"]),
634638
)
635-
# 动态创建所有指标
639+
# 动态创建所有 gauge 型指标,统一配置 multiprocess_mode 为 livesum
636640
for metric_name, config in self.GAUGE_METRICS.items():
641+
kwargs = config["kwargs"].copy()
642+
if "multiprocess_mode" not in kwargs:
643+
kwargs["multiprocess_mode"] = "livesum"
637644
setattr(
638645
self,
639646
metric_name,
640-
config["type"](config["name"], config["description"], **config["kwargs"]),
647+
config["type"](config["name"], config["description"], **kwargs),
641648
)
642649
# 动态创建server metrics
643650
for metric_name, config in self.SERVER_METRICS.items():
@@ -695,17 +702,22 @@ def _init_speculative_metrics(self, speculative_method, num_speculative_tokens):
695702
Gauge(
696703
f"{config['name']}_{i}",
697704
f"{config['description']} (head {i})",
705+
multiprocess_mode="livesum",
698706
)
699707
)
700708
setattr(self, metric_name, gauges)
701709
else:
710+
# For Gauge metrics, automatically add multiprocess_mode="livesum"
711+
kwargs = config["kwargs"].copy()
712+
if config["type"] == Gauge and "multiprocess_mode" not in kwargs:
713+
kwargs["multiprocess_mode"] = "livesum"
702714
setattr(
703715
self,
704716
metric_name,
705717
config["type"](
706718
config["name"],
707719
config["description"],
708-
**config["kwargs"],
720+
**kwargs,
709721
),
710722
)
711723

@@ -766,6 +778,19 @@ def register_speculative_metrics(self, registry: CollectorRegistry):
766778
else:
767779
registry.register(getattr(self, metric_name))
768780

781+
def re_register_speculative_gauge(self, registry: CollectorRegistry):
782+
"""Re-register gauge metrics from SPECULATIVE_METRICS to the specified registry"""
783+
# Check if SPECULATIVE_METRICS was initialized in this process
784+
# (it's an instance attribute set by _init_speculative_metrics, not the class-level empty dict)
785+
if not hasattr(self, "spec_decode_draft_acceptance_rate"):
786+
return
787+
for metric_name, config in self.SPECULATIVE_METRICS.items():
788+
if metric_name == "spec_decode_draft_single_head_acceptance_rate":
789+
for gauge in getattr(self, metric_name):
790+
registry.register(gauge)
791+
elif config["type"] == Gauge:
792+
registry.register(getattr(self, metric_name))
793+
769794
def re_register_gauge(self, registry: CollectorRegistry):
770795
"""Re-register gauge to the specified registry"""
771796
for metric_name in self.GAUGE_METRICS:
@@ -789,16 +814,19 @@ def register_all(self, registry: CollectorRegistry):
789814
if hasattr(main_process_metrics, "spec_decode_draft_acceptance_rate"):
790815
self.register_speculative_metrics(registry)
791816

792-
@classmethod
793-
def get_excluded_metrics(cls) -> Set[str]:
817+
def get_excluded_metrics(self) -> Set[str]:
794818
"""Get the set of indicator names that need to be excluded"""
795-
return {config["name"] for config in cls.GAUGE_METRICS.values()}
819+
excluded = {config["name"] for config in self.GAUGE_METRICS.values()}
820+
# Also add gauge metrics from SPECULATIVE_METRICS (if initialized)
821+
if hasattr(self, "SPECULATIVE_METRICS"):
822+
for config in self.SPECULATIVE_METRICS.values():
823+
if config["type"] == Gauge or config["type"] == list[Gauge]:
824+
excluded.add(config["name"])
825+
return excluded
796826

797827

798828
main_process_metrics = MetricsManager()
799829

800830
# 由于zmq指标记录比较耗时,默认不开启,通过DEBUG参数开启
801831
if envs.FD_DEBUG:
802832
main_process_metrics.init_zmq_metrics()
803-
804-
EXCLUDE_LABELS = MetricsManager.get_excluded_metrics()

tests/entrypoints/openai/test_multi_api_server.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,58 @@ def test_main_function(self, mock_check_param, mock_sleep, mock_start_servers, m
180180
mock_proc1.wait.assert_called_once()
181181
mock_proc2.wait.assert_called_once()
182182

183+
@patch("fastdeploy.entrypoints.openai.multi_api_server.subprocess.Popen")
184+
@patch("fastdeploy.entrypoints.openai.multi_api_server.is_port_available")
185+
def test_prometheus_multiprocess_dir_per_dp(self, mock_is_port_available, mock_popen):
186+
"""Test that each DP server gets a unique PROMETHEUS_MULTIPROC_DIR"""
187+
# Mock port availability check
188+
mock_is_port_available.return_value = True
189+
190+
# Mock subprocess.Popen to capture env passed to each server
191+
envs_captured = []
192+
193+
def capture_popen(*args, **kwargs):
194+
envs_captured.append(kwargs.get("env", {}).copy())
195+
mock_proc = MagicMock()
196+
mock_proc.pid = 1000 + len(envs_captured)
197+
return mock_proc
198+
199+
mock_popen.side_effect = capture_popen
200+
201+
# Call start_servers with 2 servers
202+
processes = start_servers(
203+
server_count=2,
204+
device_count=2,
205+
server_args=self.test_server_args,
206+
ports="8000,8001",
207+
metrics_ports="8800,8801",
208+
controller_ports="-1",
209+
)
210+
211+
# Verify subprocess.Popen was called twice
212+
self.assertEqual(mock_popen.call_count, 2)
213+
self.assertEqual(len(envs_captured), 2)
214+
self.assertEqual(len(processes), 2)
215+
216+
# Verify each server has a unique PROMETHEUS_MULTIPROC_DIR
217+
prom_dirs = []
218+
for i, env in enumerate(envs_captured):
219+
prom_dir = env.get("PROMETHEUS_MULTIPROC_DIR")
220+
print(f"Server {i} PROMETHEUS_MULTIPROC_DIR: {prom_dir}")
221+
self.assertIsNotNone(prom_dir, f"Server {i} should have PROMETHEUS_MULTIPROC_DIR set")
222+
prom_dirs.append(prom_dir)
223+
224+
# Verify all PROMETHEUS_MULTIPROC_DIR values are unique
225+
self.assertEqual(
226+
len(prom_dirs), len(set(prom_dirs)), "Each DP server should have a unique PROMETHEUS_MULTIPROC_DIR"
227+
)
228+
229+
# Verify each directory contains the server index
230+
for i, prom_dir in enumerate(prom_dirs):
231+
# The directory should contain the server index (0 or 1)
232+
# to uniquely identify each server's metrics directory
233+
self.assertIn(f"_dp{i}", prom_dir, f"PROMETHEUS_MULTIPROC_DIR for server {i} should contain _dp{i}")
234+
183235

184236
if __name__ == "__main__":
185237
unittest.main()

tests/metrics/test_metrics.py

Lines changed: 44 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -14,46 +14,64 @@
1414
# limitations under the License.
1515
"""
1616

17+
import os
1718
import unittest
1819
from unittest.mock import patch
1920

2021
from prometheus_client import Gauge
2122

22-
from fastdeploy.metrics.metrics import get_filtered_metrics
23+
from fastdeploy.metrics.metrics import get_filtered_metrics, main_process_metrics
2324

2425

2526
class TestGetFilteredMetrics(unittest.TestCase):
26-
def test_filtered_and_custom_metrics(self):
27-
"""
28-
Test get_filtered_metrics function:
29-
1. Exclude specific metrics from base_registry
30-
2. Keep other metrics in base_registry
31-
3. Ensure metrics registered by extra_register_func are effective
32-
"""
33-
34-
# Simulated metrics in base_registry (Gauge instances)
35-
g_keep = Gauge("metric_to_keep", "Kept metric")
36-
g_keep.set(1.23)
37-
38-
g_exclude = Gauge("metric_to_exclude", "Excluded metric")
39-
g_exclude.set(99)
40-
41-
# Fake MultiProcessCollector: register our simulated metrics
27+
def _collect_metrics_with_mocked_multiprocess(self, metric_name, multiprocess_value):
4228
def fake_multiprocess_collector(registry):
43-
registry.register(g_keep)
44-
registry.register(g_exclude)
29+
gauge = Gauge(metric_name, f"fake metric for {metric_name}", ["pid"], registry=registry)
30+
gauge.labels(pid="10001").set(multiprocess_value)
4531

46-
with patch(
47-
"fastdeploy.metrics.metrics.multiprocess.MultiProcessCollector", side_effect=fake_multiprocess_collector
32+
with (
33+
patch.dict(os.environ, {"PROMETHEUS_MULTIPROC_DIR": "/tmp/fake-prometheus-multiproc-dir"}, clear=False),
34+
patch(
35+
"fastdeploy.metrics.metrics.multiprocess.MultiProcessCollector",
36+
side_effect=fake_multiprocess_collector,
37+
),
4838
):
49-
result = get_filtered_metrics()
39+
return get_filtered_metrics()
5040

51-
print("==== result ====\n", result)
41+
def _assert_unique_metric_value(self, metrics_text, metric_name, expected_value):
42+
metric_lines = [line for line in metrics_text.splitlines() if line.startswith(f"{metric_name} ")]
43+
self.assertEqual(metric_lines, [f"{metric_name} {expected_value}"])
44+
self.assertNotIn("pid=", metrics_text)
5245

53-
# 2. Kept metric should appear
54-
self.assertIn("metric_to_keep", result)
46+
def test_regular_gauge_returns_single_value_without_pid(self):
47+
metric = main_process_metrics.batch_size
48+
metric.set(8.0)
5549

56-
self.assertIn("metric_to_exclude", result)
50+
result = self._collect_metrics_with_mocked_multiprocess(metric._name, multiprocess_value=1008.0)
51+
52+
self._assert_unique_metric_value(result, metric._name, 8.0)
53+
54+
def test_speculative_gauge_returns_single_value_without_pid(self):
55+
if not hasattr(main_process_metrics, "spec_decode_draft_acceptance_rate"):
56+
main_process_metrics._init_speculative_metrics("mtp", 2)
57+
58+
metric = main_process_metrics.spec_decode_draft_acceptance_rate
59+
metric.set(0.75)
60+
61+
result = self._collect_metrics_with_mocked_multiprocess(metric._name, multiprocess_value=1000.75)
62+
63+
self._assert_unique_metric_value(result, metric._name, 0.75)
64+
65+
def test_speculative_single_head_gauge_returns_single_value_without_pid(self):
66+
if not hasattr(main_process_metrics, "spec_decode_draft_acceptance_rate"):
67+
main_process_metrics._init_speculative_metrics("mtp", 2)
68+
69+
metric = main_process_metrics.spec_decode_draft_single_head_acceptance_rate[0]
70+
metric.set(0.6)
71+
72+
result = self._collect_metrics_with_mocked_multiprocess(metric._name, multiprocess_value=1000.6)
73+
74+
self._assert_unique_metric_value(result, metric._name, 0.6)
5775

5876

5977
if __name__ == "__main__":

0 commit comments

Comments
 (0)