Skip to content

[mirror] fix(embedding): Self-blocking async code, other cleanups#5

Open
yashwant86 wants to merge 6 commits intomm-base-10599from
mm-pr-10599
Open

[mirror] fix(embedding): Self-blocking async code, other cleanups#5
yashwant86 wants to merge 6 commits intomm-base-10599from
mm-pr-10599

Conversation

@yashwant86
Copy link
Copy Markdown

@yashwant86 yashwant86 commented Apr 26, 2026

Mirror of upstream onyx-dot-app#10599 for benchmark. Do not merge.


Summary by MergeMonkey

  • Bug Fixes:
    • Fixed self-blocking async code in embedding batch processing by detecting event loop context and routing to thread-local loop or separate thread appropriately.
    • Corrected spelling error 'propogate' to 'propagate' in OpenSearch document index.
  • Under the Hood:
    • Refactored batch encoding to use explicit parameter passing instead of partial functions, improving closure safety.
    • Standardized logging messages with consistent punctuation and formatting.
    • Updated type hints from List to list (PEP 585 style).
    • Simplified time calculation by removing intermediate variables.

@bot-mergemonkey
Copy link
Copy Markdown

bot-mergemonkey Bot commented Apr 26, 2026

Risk AssessmentCRITICAL · ~25 min review

Focus areas: Event loop detection and routing logic (lines 887-911) · Thread executor usage and asyncio.run safety (lines 905-911) · Batch result ordering and collection (lines 952-965) · Test coverage for sync/async caller contexts (test file lines 297-396)

Assessment: Fixes self-blocking async code in embedding pipeline that could cause deadlocks in async contexts.

Walkthrough

User calls encode() with texts and batch size. The method batches texts and detects whether it's in an async context via asyncio.get_running_loop(). For sync callers, it uses a thread-local event loop with run_until_complete. For async callers, it spawns a separate thread with asyncio.run to avoid blocking the event loop. Results are collected, sorted by batch index, and returned in order.

Changes

Files Summary
Embedding Batch Processing Async Safety
backend/onyx/natural_language_processing/search_nlp_models.py
Fixed self-blocking async code in _batch_encode_texts by detecting whether caller is in an event loop. Routes to thread-local event loop for sync callers or separate thread with asyncio.run for async callers, preventing deadlocks. Refactored batch submission to use explicit lambda parameters instead of partial functions to ensure correct closure semantics.
Test Coverage for Batch Encoding
backend/tests/unit/onyx/natural_language_processing/test_search_nlp_models.py
Added comprehensive test suite for _batch_encode_texts covering multi-batch uniform/partial scenarios, single-batch sequential path, local model processing, error propagation, and sync/async caller contexts to validate deadlock fix and ordering correctness.
Code Quality and Documentation
backend/onyx/document_index/opensearch/opensearch_document_index.py
Fixed spelling error in comment: 'propogate' → 'propagate'.

Sequence Diagram

sequenceDiagram
    participant Caller
    participant encode
    participant process_batch
    participant EventLoop as Event Loop Check
    participant ThreadLocal as Thread-Local Loop
    participant Executor as Thread Executor
    Caller->>encode: encode(texts, batch_size)
    encode->>encode: batch_list(texts)
    alt Multi-threaded path
        encode->>Executor: ThreadPoolExecutor(max_workers=num_threads)
        loop For each batch
            Executor->>process_batch: submit lambda
            process_batch->>EventLoop: asyncio.get_running_loop()
            alt Sync context (RuntimeError)
                EventLoop-->>process_batch: RuntimeError
                process_batch->>ThreadLocal: _get_or_create_event_loop()
                ThreadLocal->>ThreadLocal: run_until_complete(_make_direct_api_call)
            else Async context (running loop)
                EventLoop-->>process_batch: running loop
                process_batch->>Executor: ThreadPoolExecutor(max_workers=1)
                Executor->>Executor: asyncio.run(_make_direct_api_call)
            end
            process_batch-->>Executor: (batch_idx, embeddings)
        end
        Executor->>encode: collect results
    else Sequential path
        loop For each batch
            encode->>process_batch: call directly
            process_batch->>EventLoop: asyncio.get_running_loop()
            alt Sync context
                process_batch->>ThreadLocal: run_until_complete
            else Async context
                process_batch->>Executor: asyncio.run in thread
            end
        end
    end
    encode->>encode: sort by batch_idx, extend embeddings
    encode-->>Caller: embeddings
Loading

Dig Deeper With Commands

  • /review <file-path> <function-optional>
  • /chat <file-path> "<question>"
  • /roast <file-path>

Runs only when explicitly triggered.

@bot-mergemonkey
Copy link
Copy Markdown

Actionable Comments Posted: 0

🧹 Nitpick comments (1)
Comment says "greater than 1" but condition is `>= 1` - backend/onyx/natural_language_processing/search_nlp_models.py (925, 930)
The block comment above this branch lists "num_threads is greater than 1" as a precondition for taking the multi-threaded path, but the actual check is `num_threads >= 1`. With num_threads=1 you still enter the threaded path and submit all batches to a single-worker pool — functionally sequential but with extra thread overhead. Either tighten the check to `num_threads > 1` to match the comment, or update the comment to reflect that >=1 is intended.

Change to `num_threads > 1` (matches the comment and the intent of "only multi-thread when there's parallelism to gain"), or update the comment.
🧾 Coverage Summary
✔️ Covered (3 files)
- backend/onyx/document_index/opensearch/opensearch_document_index.py
- backend/onyx/natural_language_processing/search_nlp_models.py
- backend/tests/unit/onyx/natural_language_processing/test_search_nlp_models.py

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants