From 590963318edd99d4e75d349aac2bdce521cb1eb3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EB=B0=95=EC=98=81=EA=B7=9C?= Date: Tue, 19 May 2026 23:18:01 +0900 Subject: [PATCH 1/6] fix(resource_manager): reinitialize consumer threads after os.fork() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 박영규 --- langfuse/_client/resource_manager.py | 111 ++++++++++++++++++++++----- tests/unit/test_resource_manager.py | 60 +++++++++++++++ 2 files changed, 150 insertions(+), 21 deletions(-) diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index 2d42f6ce1..41b6b47ca 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -17,6 +17,7 @@ import atexit import os import threading +import weakref from queue import Full, Queue from typing import Any, Callable, Dict, List, Optional, cast @@ -170,6 +171,7 @@ def _initialize_instance( self.base_url = base_url self.mask = mask self.environment = environment + self._shutdown = False # Store additional client settings for get_client() to use self.timeout = timeout @@ -243,7 +245,9 @@ def _initialize_instance( x_langfuse_public_key=self.public_key, timeout=timeout, ) - score_ingestion_client = LangfuseClient( + + # Store as instance variable so _at_fork_reinit can reuse without recreation + self._score_ingestion_client = LangfuseClient( public_key=self.public_key, secret_key=secret_key, base_url=base_url, @@ -257,6 +261,52 @@ def _initialize_instance( LANGFUSE_MEDIA_UPLOAD_ENABLED, "True" ).lower() not in ("false", "0") + self._media_upload_thread_count = media_upload_thread_count or max( + int(os.getenv(LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT, 1)), 1 + ) + + self._init_consumer_threads() + + # Prompt cache + self.prompt_cache = PromptCache() + + # Register shutdown handler + atexit.register(self.shutdown) + + # Register fork handler to reinitialize consumer threads in child process. + # When using Gunicorn with --preload, os.fork() copies memory but not threads + # (POSIX.1: https://pubs.opengroup.org/onlinepubs/9699919799/functions/fork.html). + # Without this, media upload and score ingestion threads are lost after fork, + # causing silent data loss. + # + # Note: LangfuseSpanProcessor (BatchSpanProcessor) already handles fork-safety + # for span export via its own os.register_at_fork. This handler covers the + # remaining background threads managed by LangfuseResourceManager. + # + # weakref.WeakMethod prevents os.register_at_fork from holding a permanent strong + # reference to this instance, which would block garbage collection. + # See: https://github.com/open-telemetry/opentelemetry-python/blob/main/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py + if hasattr(os, "register_at_fork"): + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + os.register_at_fork( + # Walrus operator resolves the weak reference once and stores it in + # a temporary variable before calling it. This avoids a TOCTOU window + # where GC could collect the referent between checking for None and + # invoking the method. + after_in_child=lambda: (m := weak_reinit()) and m() + ) + + langfuse_logger.info( + f"Startup: Langfuse tracer successfully initialized | " + f"public_key={self.public_key} | " + f"base_url={base_url} | " + f"environment={environment or 'default'} | " + f"sample_rate={sample_rate if sample_rate is not None else 1.0} | " + f"media_threads={self._media_upload_thread_count}" + ) + + def _init_consumer_threads(self) -> None: + """Initialize media upload and score ingestion consumer threads.""" self._media_upload_queue: Queue[Any] = Queue(100_000) self._media_manager = MediaManager( api_client=self.api, @@ -266,12 +316,8 @@ def _initialize_instance( ) self._media_upload_consumers = [] - media_upload_thread_count = media_upload_thread_count or max( - int(os.getenv(LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT, 1)), 1 - ) - if self._media_upload_enabled: - for i in range(media_upload_thread_count): + for i in range(self._media_upload_thread_count): media_upload_consumer = MediaUploadConsumer( identifier=i, media_manager=self._media_manager, @@ -279,9 +325,6 @@ def _initialize_instance( media_upload_consumer.start() self._media_upload_consumers.append(media_upload_consumer) - # Prompt cache - self.prompt_cache = PromptCache() - # Score ingestion self._score_ingestion_queue: Queue[Any] = Queue(100_000) self._ingestion_consumers = [] @@ -289,25 +332,49 @@ def _initialize_instance( ingestion_consumer = ScoreIngestionConsumer( ingestion_queue=self._score_ingestion_queue, identifier=0, - client=score_ingestion_client, - flush_at=flush_at, - flush_interval=flush_interval, + client=self._score_ingestion_client, + flush_at=self.flush_at, + flush_interval=self.flush_interval, max_retries=3, public_key=self.public_key, ) ingestion_consumer.start() self._ingestion_consumers.append(ingestion_consumer) - # Register shutdown handler - atexit.register(self.shutdown) + def _at_fork_reinit(self) -> None: + """Reinitialize consumer threads after fork in child process. - langfuse_logger.info( - f"Startup: Langfuse tracer successfully initialized | " - f"public_key={self.public_key} | " - f"base_url={base_url} | " - f"environment={environment or 'default'} | " - f"sample_rate={sample_rate if sample_rate is not None else 1.0} | " - f"media_threads={media_upload_thread_count or 1}" + Called automatically via os.register_at_fork() after fork(). + Necessary for Gunicorn --preload deployments where os.fork() is used: + threads are not copied to child processes (POSIX standard), so without + reinitialization, the child process has no consumer threads and all + media upload and score ingestion events are silently lost. + + Note: LangfuseSpanProcessor (BatchSpanProcessor) handles span export + fork-safety separately via its own os.register_at_fork handler. + + Skipped if shutdown() was already called on this instance, to avoid + restarting threads on an intentionally torn-down manager. + """ + if self._shutdown: + return + + langfuse_logger.debug( + f"[PID {os.getpid()}] Fork detected: reinitializing Langfuse consumer threads." + ) + + # Queues are intentionally recreated after fork. Items enqueued before fork + # belong to the preloaded parent process and must not be processed by every + # worker — otherwise uploads/scores would be duplicated across workers. + # + # HTTP clients (self.httpx_client, self._score_ingestion_client) are not recreated + # here to keep this handler minimal; this mirrors the existing singleton client + # lifecycle. If preload-time network I/O is introduced in the future, clients + # may need fork-specific reinitialization as well. + self._init_consumer_threads() + + langfuse_logger.debug( + f"[PID {os.getpid()}] Langfuse consumer threads reinitialized after fork" ) @classmethod @@ -449,6 +516,8 @@ def flush(self) -> None: langfuse_logger.debug("Successfully flushed media upload queue") def shutdown(self) -> None: + self._shutdown = True + # Unregister the atexit handler first atexit.unregister(self.shutdown) diff --git a/tests/unit/test_resource_manager.py b/tests/unit/test_resource_manager.py index d0880dcd6..9bb74fe96 100644 --- a/tests/unit/test_resource_manager.py +++ b/tests/unit/test_resource_manager.py @@ -166,6 +166,66 @@ def test_media_upload_consumer_signal_shutdown_wakes_blocked_thread(): assert not consumer.is_alive() +def test_at_fork_reinit_creates_new_queues_and_consumers(monkeypatch): + """_at_fork_reinit() must replace queues and start fresh consumer threads.""" + monkeypatch.setenv("LANGFUSE_MEDIA_UPLOAD_ENABLED", "false") + + with LangfuseResourceManager._lock: + LangfuseResourceManager._instances.clear() + + client = Langfuse( + public_key="pk-fork-reinit", + secret_key="sk-fork-reinit", + span_exporter=NoOpSpanExporter(), + ) + rm = client._resources + assert rm is not None + + old_score_queue = rm._score_ingestion_queue + old_media_queue = rm._media_upload_queue + old_ingestion_consumers = list(rm._ingestion_consumers) + + rm._at_fork_reinit() + + assert rm._score_ingestion_queue is not old_score_queue + assert rm._media_upload_queue is not old_media_queue + assert len(rm._ingestion_consumers) == 1 + assert rm._ingestion_consumers[0].is_alive() + + # In a real fork, old threads don't exist in the child process. + # In this unit test they do — stop them explicitly to avoid leaking threads. + for consumer in old_ingestion_consumers: + consumer.pause() + consumer.join(timeout=1.0) + + client.shutdown() + + +def test_at_fork_reinit_skips_when_shutdown(monkeypatch): + """_at_fork_reinit() must not restart threads after intentional shutdown.""" + monkeypatch.setenv("LANGFUSE_MEDIA_UPLOAD_ENABLED", "false") + + with LangfuseResourceManager._lock: + LangfuseResourceManager._instances.clear() + + client = Langfuse( + public_key="pk-fork-shutdown", + secret_key="sk-fork-shutdown", + span_exporter=NoOpSpanExporter(), + ) + rm = client._resources + assert rm is not None + + old_score_queue = rm._score_ingestion_queue + + rm._shutdown = True + rm._at_fork_reinit() + + assert rm._score_ingestion_queue is old_score_queue # queue must not be replaced + + client.shutdown() + + def test_stop_and_join_consumer_threads_broadcasts_media_shutdown_after_pausing_all(): events = [] From f3b0b5304adea04051b5bf5eab45be9d8bed121e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EB=B0=95=EC=98=81=EA=B7=9C?= Date: Wed, 20 May 2026 00:08:05 +0900 Subject: [PATCH 2/6] fix(resource_manager): catch and log errors during fork child consumer reinitialization MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 박영규 --- langfuse/_client/resource_manager.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index 41b6b47ca..824ab8717 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -371,7 +371,13 @@ def _at_fork_reinit(self) -> None: # here to keep this handler minimal; this mirrors the existing singleton client # lifecycle. If preload-time network I/O is introduced in the future, clients # may need fork-specific reinitialization as well. - self._init_consumer_threads() + try: + self._init_consumer_threads() + except Exception as e: + langfuse_logger.error( + f"[PID {os.getpid()}] Failed to reinitialize consumer threads after fork: {e}. " + f"Media upload and score ingestion will be unavailable in this worker." + ) langfuse_logger.debug( f"[PID {os.getpid()}] Langfuse consumer threads reinitialized after fork" From fd16c0b919d4d40cf1025745da3cb2c5942f289a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EB=B0=95=EC=98=81=EA=B7=9C?= Date: Sat, 30 May 2026 06:41:05 +0900 Subject: [PATCH 3/6] fix(resource_manager): reinitialize _lock after fork to prevent deadlock in child process MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 박영규 --- langfuse/_client/resource_manager.py | 5 ++ tests/unit/test_resource_manager.py | 74 ++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+) diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index 824ab8717..244695365 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -359,6 +359,11 @@ def _at_fork_reinit(self) -> None: if self._shutdown: return + # The class-level lock may have been held by a thread in the parent at fork time. + # That thread does not exist in the child, so the lock can never be released and + # any attempt to acquire it would deadlock. Replace it with a fresh lock first. + LangfuseResourceManager._lock = threading.RLock() + langfuse_logger.debug( f"[PID {os.getpid()}] Fork detected: reinitializing Langfuse consumer threads." ) diff --git a/tests/unit/test_resource_manager.py b/tests/unit/test_resource_manager.py index 9bb74fe96..a55b4602f 100644 --- a/tests/unit/test_resource_manager.py +++ b/tests/unit/test_resource_manager.py @@ -226,6 +226,80 @@ def test_at_fork_reinit_skips_when_shutdown(monkeypatch): client.shutdown() +def test_at_fork_reinit_replaces_lock(monkeypatch): + """_at_fork_reinit() must replace the class-level lock with a fresh one. + + If a thread held _lock at fork time, the child has no such thread and the + lock can never be released, causing a deadlock. The reinit handler must + replace it before doing any other work so the child can always acquire it. + """ + monkeypatch.setenv("LANGFUSE_MEDIA_UPLOAD_ENABLED", "false") + + with LangfuseResourceManager._lock: + LangfuseResourceManager._instances.clear() + + client = Langfuse( + public_key="pk-fork-lock", + secret_key="sk-fork-lock", + span_exporter=NoOpSpanExporter(), + ) + rm = client._resources + assert rm is not None + + old_lock = LangfuseResourceManager._lock + + rm._at_fork_reinit() + + assert LangfuseResourceManager._lock is not old_lock + # New lock must be immediately acquirable (not held by any thread). + acquired = LangfuseResourceManager._lock.acquire(blocking=False) + assert acquired, "New lock must not be held after _at_fork_reinit()" + LangfuseResourceManager._lock.release() + + client.shutdown() + + +def test_at_fork_reinit_new_lock_acquirable_even_if_old_lock_was_held(monkeypatch): + """Simulate the fork-deadlock scenario: old lock held, new lock must still be acquirable. + + In a real fork, a thread holding _lock in the parent disappears in the child, + leaving the lock permanently acquired. Here we replicate that by acquiring the + old lock without releasing it, then calling _at_fork_reinit() and verifying that + the replacement lock is free. + """ + monkeypatch.setenv("LANGFUSE_MEDIA_UPLOAD_ENABLED", "false") + + with LangfuseResourceManager._lock: + LangfuseResourceManager._instances.clear() + + client = Langfuse( + public_key="pk-fork-lock-held", + secret_key="sk-fork-lock-held", + span_exporter=NoOpSpanExporter(), + ) + rm = client._resources + assert rm is not None + + # Simulate the lock being permanently held (as it would be in a forked child + # when the owning thread no longer exists). + stuck_lock = LangfuseResourceManager._lock + stuck_lock.acquire() # held, never released — simulates the fork scenario + + try: + rm._at_fork_reinit() + + # The new lock must be a different object and must be acquirable. + new_lock = LangfuseResourceManager._lock + assert new_lock is not stuck_lock + acquired = new_lock.acquire(blocking=False) + assert acquired, "Replacement lock must be acquirable after _at_fork_reinit()" + new_lock.release() + finally: + stuck_lock.release() # clean up so other tests are not affected + + client.shutdown() + + def test_stop_and_join_consumer_threads_broadcasts_media_shutdown_after_pausing_all(): events = [] From eba27cc57c8fbd746658262a9104e06be821537f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EB=B0=95=EC=98=81=EA=B7=9C?= Date: Sat, 30 May 2026 07:17:12 +0900 Subject: [PATCH 4/6] fix(resource_manager): recreate HTTP clients after fork to prevent process-unsafe socket sharing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 박영규 --- langfuse/_client/client.py | 7 +++ langfuse/_client/resource_manager.py | 39 ++++++++++-- tests/unit/test_resource_manager.py | 92 ++++++++++++++++++++++++++++ 3 files changed, 134 insertions(+), 4 deletions(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index 2f1c8d783..44cb62640 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -164,6 +164,13 @@ class Langfuse: host (Optional[str]): Deprecated. Use base_url instead. The Langfuse API host URL. Defaults to "https://cloud.langfuse.com". timeout (Optional[int]): Timeout in seconds for API requests. Defaults to 5 seconds. httpx_client (Optional[httpx.Client]): Custom httpx client for making non-tracing HTTP requests. If not provided, a default client will be created. + **Fork limitation**: ``httpx.Client`` is thread-safe but not process-safe. When using + ``fork()``-based servers (e.g. Gunicorn with ``--preload``), the SDK recreates the HTTP + client in child processes after fork to avoid sharing file descriptors (TCP sockets) across + processes. A custom ``httpx_client`` will therefore be replaced by a new default client in + child processes — any custom transport, SSL, or proxy settings will not carry over. + If you need those settings in forked workers, configure them via environment variables or + apply them in an ``after_in_child`` fork handler instead. debug (bool): Enable debug logging. Defaults to False. Can also be set via LANGFUSE_DEBUG environment variable. tracing_enabled (Optional[bool]): Enable or disable tracing. Defaults to True. Can also be set via LANGFUSE_TRACING_ENABLED environment variable. flush_at (Optional[int]): Number of spans to batch before sending to the API. Defaults to 512. Can also be set via LANGFUSE_FLUSH_AT environment variable. diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index 244695365..76124fbe7 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -372,10 +372,41 @@ def _at_fork_reinit(self) -> None: # belong to the preloaded parent process and must not be processed by every # worker — otherwise uploads/scores would be duplicated across workers. # - # HTTP clients (self.httpx_client, self._score_ingestion_client) are not recreated - # here to keep this handler minimal; this mirrors the existing singleton client - # lifecycle. If preload-time network I/O is introduced in the future, clients - # may need fork-specific reinitialization as well. + # HTTP clients must also be recreated. httpx.Client is thread-safe but not + # process-safe: fork() duplicates the parent's connection pool (TCP socket file + # descriptors) into the child. Both processes then share the same underlying + # sockets, which causes data corruption and SSL/TLS state mismatch under + # concurrent use. Fresh clients start with an empty pool owned solely by this + # child process. + try: + client_headers = self.additional_headers if self.additional_headers else {} + self.httpx_client = httpx.Client( + timeout=self.timeout, headers=client_headers + ) + self.api = LangfuseAPI( + base_url=self.base_url, + username=self.public_key, + password=self.secret_key, + x_langfuse_sdk_name="python", + x_langfuse_sdk_version=langfuse_version, + x_langfuse_public_key=self.public_key, + httpx_client=self.httpx_client, + timeout=self.timeout, + ) + self._score_ingestion_client = LangfuseClient( + public_key=self.public_key, + secret_key=self.secret_key, + base_url=self.base_url, + version=langfuse_version, + timeout=self.timeout or 20, + session=self.httpx_client, + ) + except Exception as e: + langfuse_logger.error( + f"[PID {os.getpid()}] Failed to recreate HTTP clients after fork: {e}. " + f"Network requests may fail in this worker." + ) + try: self._init_consumer_threads() except Exception as e: diff --git a/tests/unit/test_resource_manager.py b/tests/unit/test_resource_manager.py index a55b4602f..13c814a6d 100644 --- a/tests/unit/test_resource_manager.py +++ b/tests/unit/test_resource_manager.py @@ -300,6 +300,98 @@ def test_at_fork_reinit_new_lock_acquirable_even_if_old_lock_was_held(monkeypatc client.shutdown() +def test_at_fork_reinit_recreates_httpx_client_by_default(monkeypatch): + """_at_fork_reinit() must create a new httpx.Client to avoid sharing + connection-pool file descriptors (TCP sockets) across forked processes. + httpx.Client is thread-safe but not process-safe.""" + monkeypatch.setenv("LANGFUSE_MEDIA_UPLOAD_ENABLED", "false") + + with LangfuseResourceManager._lock: + LangfuseResourceManager._instances.clear() + + client = Langfuse( + public_key="pk-fork-httpx-default", + secret_key="sk-fork-httpx-default", + span_exporter=NoOpSpanExporter(), + ) + rm = client._resources + assert rm is not None + + old_httpx_client = rm.httpx_client + old_api = rm.api + old_score_ingestion_client = rm._score_ingestion_client + + rm._at_fork_reinit() + + assert rm.httpx_client is not old_httpx_client + assert rm.api is not old_api + assert rm._score_ingestion_client is not old_score_ingestion_client + + client.shutdown() + + +def test_at_fork_reinit_replaces_custom_httpx_client(monkeypatch): + """_at_fork_reinit() must replace a user-provided httpx.Client too. + Users cannot react to fork() inside the SDK singleton, so the library + must always produce a process-safe client — even at the cost of losing + custom settings. This is a documented limitation.""" + import httpx + + monkeypatch.setenv("LANGFUSE_MEDIA_UPLOAD_ENABLED", "false") + + with LangfuseResourceManager._lock: + LangfuseResourceManager._instances.clear() + + custom_client = httpx.Client(timeout=99) + client = Langfuse( + public_key="pk-fork-httpx-custom", + secret_key="sk-fork-httpx-custom", + httpx_client=custom_client, + span_exporter=NoOpSpanExporter(), + ) + rm = client._resources + assert rm is not None + assert rm.httpx_client is custom_client + + rm._at_fork_reinit() + + # Custom client must be replaced — sharing it cross-process is not safe. + assert rm.httpx_client is not custom_client + assert rm.api is not None + assert rm._score_ingestion_client is not None + + custom_client.close() + client.shutdown() + + +def test_at_fork_reinit_new_httpx_client_uses_configured_timeout_and_headers( + monkeypatch, +): + """After fork, the recreated httpx.Client must reflect the timeout and + additional_headers that were set on the resource manager.""" + monkeypatch.setenv("LANGFUSE_MEDIA_UPLOAD_ENABLED", "false") + + with LangfuseResourceManager._lock: + LangfuseResourceManager._instances.clear() + + client = Langfuse( + public_key="pk-fork-httpx-settings", + secret_key="sk-fork-httpx-settings", + timeout=42, + additional_headers={"X-Custom": "value"}, + span_exporter=NoOpSpanExporter(), + ) + rm = client._resources + assert rm is not None + + rm._at_fork_reinit() + + assert rm.httpx_client.timeout.connect == 42 + assert rm.httpx_client.headers.get("X-Custom") == "value" + + client.shutdown() + + def test_stop_and_join_consumer_threads_broadcasts_media_shutdown_after_pausing_all(): events = [] From 157122e0a346e24692f4aa071b45b16301fd3f94 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EB=B0=95=EC=98=81=EA=B7=9C?= Date: Mon, 1 Jun 2026 20:50:51 +0900 Subject: [PATCH 5/6] fix(resource_manager): preserve custom httpx_client after fork instead of replacing it MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 박영규 --- langfuse/_client/client.py | 14 +++++++------- langfuse/_client/resource_manager.py | 26 ++++++++++++++++---------- 2 files changed, 23 insertions(+), 17 deletions(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index 44cb62640..bbd25f333 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -164,13 +164,13 @@ class Langfuse: host (Optional[str]): Deprecated. Use base_url instead. The Langfuse API host URL. Defaults to "https://cloud.langfuse.com". timeout (Optional[int]): Timeout in seconds for API requests. Defaults to 5 seconds. httpx_client (Optional[httpx.Client]): Custom httpx client for making non-tracing HTTP requests. If not provided, a default client will be created. - **Fork limitation**: ``httpx.Client`` is thread-safe but not process-safe. When using - ``fork()``-based servers (e.g. Gunicorn with ``--preload``), the SDK recreates the HTTP - client in child processes after fork to avoid sharing file descriptors (TCP sockets) across - processes. A custom ``httpx_client`` will therefore be replaced by a new default client in - child processes — any custom transport, SSL, or proxy settings will not carry over. - If you need those settings in forked workers, configure them via environment variables or - apply them in an ``after_in_child`` fork handler instead. + **Fork safety**: ``httpx.Client`` is thread-safe but not process-safe. When using + ``fork()``-based servers (e.g. Gunicorn with ``--preload``), the SDK automatically + recreates its internally-managed HTTP client in child processes after fork. A custom + ``httpx_client`` is intentionally left as-is (the fork-inherited copy is reused), so + you retain the opportunity to handle process-safety yourself — for example by + registering your own ``os.register_at_fork(after_in_child=...)`` handler to close and + reopen connections on the custom client. debug (bool): Enable debug logging. Defaults to False. Can also be set via LANGFUSE_DEBUG environment variable. tracing_enabled (Optional[bool]): Enable or disable tracing. Defaults to True. Can also be set via LANGFUSE_TRACING_ENABLED environment variable. flush_at (Optional[int]): Number of spans to batch before sending to the API. Defaults to 512. Can also be set via LANGFUSE_FLUSH_AT environment variable. diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index 76124fbe7..525d79eae 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -219,6 +219,7 @@ def _initialize_instance( ## use connection pools with limited capacity. Creating multiple instances ## could exhaust the OS's maximum number of available TCP sockets (file descriptors), ## leading to connection errors. + self._custom_httpx_client = httpx_client if httpx_client is not None: self.httpx_client = httpx_client else: @@ -372,17 +373,22 @@ def _at_fork_reinit(self) -> None: # belong to the preloaded parent process and must not be processed by every # worker — otherwise uploads/scores would be duplicated across workers. # - # HTTP clients must also be recreated. httpx.Client is thread-safe but not - # process-safe: fork() duplicates the parent's connection pool (TCP socket file - # descriptors) into the child. Both processes then share the same underlying - # sockets, which causes data corruption and SSL/TLS state mismatch under - # concurrent use. Fresh clients start with an empty pool owned solely by this - # child process. + # Internally-managed httpx clients must also be recreated: fork() duplicates the + # parent's connection pool (TCP socket file descriptors) into the child. Both + # processes then share the same underlying sockets, causing data corruption and + # SSL/TLS state mismatch under concurrent use. Fresh clients start with an empty + # pool owned solely by this child process. + # + # Custom httpx clients provided by the caller are NOT recreated. The fork-inherited + # copy is reused as-is, giving the caller the opportunity to handle process-safety + # themselves (e.g. by registering their own os.register_at_fork handler). try: - client_headers = self.additional_headers if self.additional_headers else {} - self.httpx_client = httpx.Client( - timeout=self.timeout, headers=client_headers - ) + if self._custom_httpx_client is None: + client_headers = self.additional_headers if self.additional_headers else {} + self.httpx_client = httpx.Client( + timeout=self.timeout, headers=client_headers + ) + self.api = LangfuseAPI( base_url=self.base_url, username=self.public_key, From 68327b722c7cbc3e9ae01cc2c2170278501ca43a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EB=B0=95=EC=98=81=EA=B7=9C?= Date: Mon, 15 Jun 2026 22:57:26 +0900 Subject: [PATCH 6/6] fix(resource_manager): defer post-fork reinitialization to avoid segfault MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 박영규 --- langfuse/_client/resource_manager.py | 149 +++++++++++++++++---------- tests/unit/test_resource_manager.py | 39 +++++-- 2 files changed, 123 insertions(+), 65 deletions(-) diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index 525d79eae..d439395a5 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -343,7 +343,7 @@ def _init_consumer_threads(self) -> None: self._ingestion_consumers.append(ingestion_consumer) def _at_fork_reinit(self) -> None: - """Reinitialize consumer threads after fork in child process. + """Mark that post-fork reinitialization is needed; do no heavy work here. Called automatically via os.register_at_fork() after fork(). Necessary for Gunicorn --preload deployments where os.fork() is used: @@ -356,6 +356,12 @@ def _at_fork_reinit(self) -> None: Skipped if shutdown() was already called on this instance, to avoid restarting threads on an intentionally torn-down manager. + + Heavy work (httpx.Client creation, thread spawning) is intentionally + deferred to _ensure_post_fork_initialized(), called on first use. + Doing that work here — inside the after_in_child handler — triggers + SSL/TLS and Objective-C runtime calls that are unsafe in the narrow + post-fork window and cause segfaults on macOS with gunicorn --preload. """ if self._shutdown: return @@ -365,65 +371,96 @@ def _at_fork_reinit(self) -> None: # any attempt to acquire it would deadlock. Replace it with a fresh lock first. LangfuseResourceManager._lock = threading.RLock() - langfuse_logger.debug( - f"[PID {os.getpid()}] Fork detected: reinitializing Langfuse consumer threads." - ) + # Replace queues with fresh empty ones so flush() (e.g. via atexit) does not + # block waiting for pre-fork items that no consumer will ever drain. + # Queue() is pure Python — safe to call here. + self._media_upload_queue = Queue(100_000) + self._score_ingestion_queue = Queue(100_000) + self._media_upload_consumers = [] + self._ingestion_consumers = [] - # Queues are intentionally recreated after fork. Items enqueued before fork - # belong to the preloaded parent process and must not be processed by every - # worker — otherwise uploads/scores would be duplicated across workers. - # - # Internally-managed httpx clients must also be recreated: fork() duplicates the - # parent's connection pool (TCP socket file descriptors) into the child. Both - # processes then share the same underlying sockets, causing data corruption and - # SSL/TLS state mismatch under concurrent use. Fresh clients start with an empty - # pool owned solely by this child process. - # - # Custom httpx clients provided by the caller are NOT recreated. The fork-inherited - # copy is reused as-is, giving the caller the opportunity to handle process-safety - # themselves (e.g. by registering their own os.register_at_fork handler). - try: - if self._custom_httpx_client is None: - client_headers = self.additional_headers if self.additional_headers else {} - self.httpx_client = httpx.Client( - timeout=self.timeout, headers=client_headers - ) + # Signal that HTTP clients and consumer threads need to be recreated on first use. + self._needs_post_fork_reinit = True + # Fresh lock to guard the one-time lazy reinit below. + self._post_fork_reinit_lock = threading.Lock() - self.api = LangfuseAPI( - base_url=self.base_url, - username=self.public_key, - password=self.secret_key, - x_langfuse_sdk_name="python", - x_langfuse_sdk_version=langfuse_version, - x_langfuse_public_key=self.public_key, - httpx_client=self.httpx_client, - timeout=self.timeout, - ) - self._score_ingestion_client = LangfuseClient( - public_key=self.public_key, - secret_key=self.secret_key, - base_url=self.base_url, - version=langfuse_version, - timeout=self.timeout or 20, - session=self.httpx_client, - ) - except Exception as e: - langfuse_logger.error( - f"[PID {os.getpid()}] Failed to recreate HTTP clients after fork: {e}. " - f"Network requests may fail in this worker." - ) + def _ensure_post_fork_initialized(self) -> None: + """Lazily recreate HTTP clients and consumer threads after fork. - try: - self._init_consumer_threads() - except Exception as e: - langfuse_logger.error( - f"[PID {os.getpid()}] Failed to reinitialize consumer threads after fork: {e}. " - f"Media upload and score ingestion will be unavailable in this worker." + Called at the start of add_score_task() / add_trace_task() so that + the first actual work in the child process triggers full reinitialization. + The deferred approach avoids doing SSL/thread-creation work inside the + after_in_child handler where it causes segfaults on macOS. + """ + if not getattr(self, "_needs_post_fork_reinit", False): + return + + with self._post_fork_reinit_lock: + if not self._needs_post_fork_reinit: + return + + langfuse_logger.debug( + f"[PID {os.getpid()}] Fork detected: reinitializing Langfuse HTTP clients and consumer threads." ) - langfuse_logger.debug( - f"[PID {os.getpid()}] Langfuse consumer threads reinitialized after fork" - ) + # Queues are intentionally recreated here (not reused from _at_fork_reinit). + # Items enqueued before fork belong to the parent and must not be processed + # by every worker — that would duplicate uploads/scores across workers. + # + # Internally-managed httpx clients must also be recreated: fork() duplicates + # the parent's connection pool (TCP socket file descriptors) into the child. + # Both processes would then share the same underlying sockets, causing data + # corruption and SSL/TLS state mismatch under concurrent use. + # + # Custom httpx clients provided by the caller are NOT recreated. The + # fork-inherited copy is reused, giving the caller the opportunity to handle + # process-safety themselves (e.g. via their own os.register_at_fork handler). + try: + if self._custom_httpx_client is None: + client_headers = ( + self.additional_headers if self.additional_headers else {} + ) + self.httpx_client = httpx.Client( + timeout=self.timeout, headers=client_headers + ) + + self.api = LangfuseAPI( + base_url=self.base_url, + username=self.public_key, + password=self.secret_key, + x_langfuse_sdk_name="python", + x_langfuse_sdk_version=langfuse_version, + x_langfuse_public_key=self.public_key, + httpx_client=self.httpx_client, + timeout=self.timeout, + ) + self._score_ingestion_client = LangfuseClient( + public_key=self.public_key, + secret_key=self.secret_key, + base_url=self.base_url, + version=langfuse_version, + timeout=self.timeout or 20, + session=self.httpx_client, + ) + except Exception as e: + langfuse_logger.error( + f"[PID {os.getpid()}] Failed to recreate HTTP clients after fork: {e}. " + f"Network requests may fail in this worker." + ) + + try: + self._init_consumer_threads() + except Exception as e: + langfuse_logger.error( + f"[PID {os.getpid()}] Failed to reinitialize consumer threads after fork: {e}. " + f"Media upload and score ingestion will be unavailable in this worker." + ) + + self._needs_post_fork_reinit = False + + langfuse_logger.debug( + f"[PID {os.getpid()}] Langfuse consumer threads reinitialized after fork" + ) @classmethod def reset(cls) -> None: @@ -434,6 +471,7 @@ def reset(cls) -> None: cls._instances.clear() def add_score_task(self, event: dict, *, force_sample: bool = False) -> None: + self._ensure_post_fork_initialized() try: # Sample scores with the same sampler that is used for tracing tracer_provider = cast(TracerProvider, otel_trace_api.get_tracer_provider()) @@ -482,6 +520,7 @@ def add_trace_task( self, event: dict, ) -> None: + self._ensure_post_fork_initialized() try: langfuse_logger.debug( f"Trace: Enqueuing event type={event['type']} for trace_id={event['body'].id}" diff --git a/tests/unit/test_resource_manager.py b/tests/unit/test_resource_manager.py index 13c814a6d..89bed1bb7 100644 --- a/tests/unit/test_resource_manager.py +++ b/tests/unit/test_resource_manager.py @@ -167,7 +167,11 @@ def test_media_upload_consumer_signal_shutdown_wakes_blocked_thread(): def test_at_fork_reinit_creates_new_queues_and_consumers(monkeypatch): - """_at_fork_reinit() must replace queues and start fresh consumer threads.""" + """_at_fork_reinit() replaces queues immediately; consumers start on first use. + + Heavy work (httpx.Client, thread spawning) is deferred to avoid segfaults + caused by SSL/TLS initialization inside the after_in_child handler on macOS. + """ monkeypatch.setenv("LANGFUSE_MEDIA_UPLOAD_ENABLED", "false") with LangfuseResourceManager._lock: @@ -187,10 +191,19 @@ def test_at_fork_reinit_creates_new_queues_and_consumers(monkeypatch): rm._at_fork_reinit() + # Queues are replaced immediately (lightweight, safe in after_in_child). assert rm._score_ingestion_queue is not old_score_queue assert rm._media_upload_queue is not old_media_queue + # Consumer threads are NOT started yet — deferred to first use. + assert len(rm._ingestion_consumers) == 0 + assert rm._needs_post_fork_reinit is True + + # Trigger lazy initialization (as add_score_task / add_trace_task would). + rm._ensure_post_fork_initialized() + assert len(rm._ingestion_consumers) == 1 assert rm._ingestion_consumers[0].is_alive() + assert rm._needs_post_fork_reinit is False # In a real fork, old threads don't exist in the child process. # In this unit test they do — stop them explicitly to avoid leaking threads. @@ -301,8 +314,8 @@ def test_at_fork_reinit_new_lock_acquirable_even_if_old_lock_was_held(monkeypatc def test_at_fork_reinit_recreates_httpx_client_by_default(monkeypatch): - """_at_fork_reinit() must create a new httpx.Client to avoid sharing - connection-pool file descriptors (TCP sockets) across forked processes. + """After fork, an internally-managed httpx.Client is replaced on first use to + avoid sharing connection-pool file descriptors (TCP sockets) across processes. httpx.Client is thread-safe but not process-safe.""" monkeypatch.setenv("LANGFUSE_MEDIA_UPLOAD_ENABLED", "false") @@ -322,6 +335,11 @@ def test_at_fork_reinit_recreates_httpx_client_by_default(monkeypatch): old_score_ingestion_client = rm._score_ingestion_client rm._at_fork_reinit() + # Lazy: heavy init has not run yet. + assert rm.httpx_client is old_httpx_client + + # Trigger lazy initialization (as add_score_task / add_trace_task would). + rm._ensure_post_fork_initialized() assert rm.httpx_client is not old_httpx_client assert rm.api is not old_api @@ -330,11 +348,10 @@ def test_at_fork_reinit_recreates_httpx_client_by_default(monkeypatch): client.shutdown() -def test_at_fork_reinit_replaces_custom_httpx_client(monkeypatch): - """_at_fork_reinit() must replace a user-provided httpx.Client too. - Users cannot react to fork() inside the SDK singleton, so the library - must always produce a process-safe client — even at the cost of losing - custom settings. This is a documented limitation.""" +def test_at_fork_reinit_preserves_custom_httpx_client(monkeypatch): + """After fork, a caller-supplied httpx.Client is reused as-is. + The caller is responsible for their own fork-safety (e.g. via their own + os.register_at_fork handler). The SDK must not silently replace it.""" import httpx monkeypatch.setenv("LANGFUSE_MEDIA_UPLOAD_ENABLED", "false") @@ -354,9 +371,10 @@ def test_at_fork_reinit_replaces_custom_httpx_client(monkeypatch): assert rm.httpx_client is custom_client rm._at_fork_reinit() + rm._ensure_post_fork_initialized() - # Custom client must be replaced — sharing it cross-process is not safe. - assert rm.httpx_client is not custom_client + # Custom client must be preserved — caller owns process-safety for it. + assert rm.httpx_client is custom_client assert rm.api is not None assert rm._score_ingestion_client is not None @@ -385,6 +403,7 @@ def test_at_fork_reinit_new_httpx_client_uses_configured_timeout_and_headers( assert rm is not None rm._at_fork_reinit() + rm._ensure_post_fork_initialized() assert rm.httpx_client.timeout.connect == 42 assert rm.httpx_client.headers.get("X-Custom") == "value"