[mirror] fix(embedding): Self-blocking async code, other cleanups#5
[mirror] fix(embedding): Self-blocking async code, other cleanups#5yashwant86 wants to merge 6 commits intomm-base-10599from
Conversation
⚡ Risk Assessment —
|
| Files | Summary |
|---|---|
Embedding Batch Processing Async Safetybackend/onyx/natural_language_processing/search_nlp_models.py |
Fixed self-blocking async code in _batch_encode_texts by detecting whether caller is in an event loop. Routes to thread-local event loop for sync callers or separate thread with asyncio.run for async callers, preventing deadlocks. Refactored batch submission to use explicit lambda parameters instead of partial functions to ensure correct closure semantics. |
Test Coverage for Batch Encodingbackend/tests/unit/onyx/natural_language_processing/test_search_nlp_models.py |
Added comprehensive test suite for _batch_encode_texts covering multi-batch uniform/partial scenarios, single-batch sequential path, local model processing, error propagation, and sync/async caller contexts to validate deadlock fix and ordering correctness. |
Code Quality and Documentationbackend/onyx/document_index/opensearch/opensearch_document_index.py |
Fixed spelling error in comment: 'propogate' → 'propagate'. |
Sequence Diagram
sequenceDiagram
participant Caller
participant encode
participant process_batch
participant EventLoop as Event Loop Check
participant ThreadLocal as Thread-Local Loop
participant Executor as Thread Executor
Caller->>encode: encode(texts, batch_size)
encode->>encode: batch_list(texts)
alt Multi-threaded path
encode->>Executor: ThreadPoolExecutor(max_workers=num_threads)
loop For each batch
Executor->>process_batch: submit lambda
process_batch->>EventLoop: asyncio.get_running_loop()
alt Sync context (RuntimeError)
EventLoop-->>process_batch: RuntimeError
process_batch->>ThreadLocal: _get_or_create_event_loop()
ThreadLocal->>ThreadLocal: run_until_complete(_make_direct_api_call)
else Async context (running loop)
EventLoop-->>process_batch: running loop
process_batch->>Executor: ThreadPoolExecutor(max_workers=1)
Executor->>Executor: asyncio.run(_make_direct_api_call)
end
process_batch-->>Executor: (batch_idx, embeddings)
end
Executor->>encode: collect results
else Sequential path
loop For each batch
encode->>process_batch: call directly
process_batch->>EventLoop: asyncio.get_running_loop()
alt Sync context
process_batch->>ThreadLocal: run_until_complete
else Async context
process_batch->>Executor: asyncio.run in thread
end
end
end
encode->>encode: sort by batch_idx, extend embeddings
encode-->>Caller: embeddings
Dig Deeper With Commands
/review <file-path> <function-optional>/chat <file-path> "<question>"/roast <file-path>
Runs only when explicitly triggered.
Actionable Comments Posted: 0🧹 Nitpick comments (1)Comment says "greater than 1" but condition is `>= 1` - backend/onyx/natural_language_processing/search_nlp_models.py (925, 930)🧾 Coverage Summary✔️ Covered (3 files) |
Mirror of upstream onyx-dot-app#10599 for benchmark. Do not merge.
Summary by MergeMonkey