Skip to content

Commit 4e5f538

Browse files
refactor(pipeline_builder): split aggregation pipeline into planner/builder stages
- Extract query normalization, match planning, lookup planning, stage building, and tail stages - Introduce clear aggregation pipeline architecture aligned with MongoDB stages - Reduce PipelineBuilder to a small orchestration layer - Improve readability, isolation, and long-term maintainability
1 parent 16dd790 commit 4e5f538

12 files changed

Lines changed: 1766 additions & 1654 deletions

File tree

mongoengine/base/queryset/pipeline_builder.py

Lines changed: 0 additions & 1653 deletions
This file was deleted.
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# MongoEngine Aggregation Pipeline Architecture
2+
3+
## Architecture Overview
4+
5+
```
6+
pipeline/
7+
├── normalizer.py # Normalize user query (regex, $where, etc.)
8+
├── match_planner.py # Decide WHERE each match can safely run
9+
├── lookup_planner.py # Decide WHICH lookups are required
10+
├── stage_builder.py # Emit $lookup / $addFields / $match stages
11+
├── tail_builder.py # Emit terminal stages ($sort/$skip/$limit/$project)
12+
├── pipeline_builder.py # Orchestrator (very small)
13+
```
14+
15+
### Mental Model
16+
17+
1. **Normalizer**
18+
- Input: raw queryset query
19+
- Output: MongoDB-safe query
20+
21+
2. **MatchPlanner**
22+
- Buckets filters by dereference depth
23+
- Ensures missing references never match
24+
25+
3. **LookupPlanner**
26+
- Determines lookup tree from queries + select_related
27+
28+
4. **StageBuilder**
29+
- Emits actual MongoDB aggregation stages
30+
- Interleaves lookups with safe `$match`
31+
32+
5. **TailBuilder**
33+
- Applies final shaping stages
34+
- Always runs last
35+
36+
## Why This Matters
37+
38+
MongoDB aggregation pipelines are **order-sensitive**.
39+
This design makes ordering explicit and safe by construction.
40+
41+
If a file grows too large, it means responsibility is leaking.
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
from .pipeline_builder import *
2+
from .utils import *
3+
4+
__all__ = (
5+
list(pipeline_builder.__all__) +
6+
list(utils.__all__)
7+
)
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
from __future__ import annotations
2+
3+
from typing import Dict, Iterable
4+
5+
from .schema import Schema
6+
from .match_planner import MatchPlanner
7+
8+
9+
class LookupPlanner:
10+
"""
11+
Pure planning: produces a lookup tree (python field names).
12+
Does NOT emit Mongo stages.
13+
"""
14+
15+
def plan_from_select_related(self, select_related) -> dict:
16+
return self.build_related_tree(select_related)
17+
18+
def plan(self, doc_cls, select_related, bucket_prefixes: Iterable[str]) -> dict:
19+
tree = {}
20+
if select_related:
21+
tree = self.merge_trees(tree, self.build_related_tree(select_related))
22+
tree = self.merge_trees(tree, self.auto_tree_from_bucket_prefixes(doc_cls, bucket_prefixes))
23+
return tree
24+
25+
@staticmethod
26+
def build_related_tree(fields) -> dict:
27+
tree = {}
28+
for f in fields or []:
29+
parts = f.split("__")
30+
node = tree
31+
for p in parts:
32+
node = node.setdefault(p, {})
33+
node[""] = True
34+
return tree
35+
36+
@staticmethod
37+
def merge_trees(a: dict, b: dict) -> dict:
38+
if not a:
39+
return dict(b or {})
40+
if not b:
41+
return dict(a)
42+
out = dict(a)
43+
for k, v in b.items():
44+
if k not in out:
45+
out[k] = v
46+
else:
47+
if isinstance(out[k], dict) and isinstance(v, dict):
48+
out[k] = LookupPlanner.merge_trees(out[k], v)
49+
return out
50+
51+
def auto_tree_from_bucket_prefixes(self, root_doc_cls, bucket_prefixes: Iterable[str]) -> dict:
52+
"""
53+
Bucket prefixes are db_field dotted (e.g. "target.gp").
54+
We build a tree using python field names, with safe GenericRef traversal.
55+
"""
56+
tree: Dict[str, dict] = {}
57+
58+
for dotted_prefix in bucket_prefixes:
59+
if not dotted_prefix:
60+
continue
61+
62+
parts = dotted_prefix.split(".")
63+
node = tree
64+
cur = root_doc_cls
65+
66+
for idx, db_part in enumerate(parts):
67+
if cur is None:
68+
break
69+
70+
field_name, fld = Schema.resolve_field_name(cur, db_part)
71+
if not fld:
72+
break
73+
74+
node = node.setdefault(field_name, {})
75+
76+
from mongoengine.fields import ReferenceField, GenericReferenceField, EmbeddedDocumentField, \
77+
EmbeddedDocumentListField
78+
79+
leaf = Schema.unwrap_list_leaf(fld)
80+
81+
if isinstance(leaf, ReferenceField):
82+
cur = getattr(leaf, "document_type_obj", None) or getattr(leaf, "document_type", None)
83+
continue
84+
85+
if isinstance(leaf, GenericReferenceField):
86+
if idx < len(parts) - 1:
87+
next_part = parts[idx + 1]
88+
common_ref_field, _common_target = MatchPlanner.generic_common_ref(leaf, next_part)
89+
if common_ref_field is None:
90+
cur = None
91+
break
92+
93+
from mongoengine.document import _DocumentRegistry
94+
ch0 = (leaf.choices or ())[0]
95+
cur = _DocumentRegistry.get(ch0 if isinstance(ch0, str) else ch0.__name__)
96+
continue
97+
98+
cur = None
99+
break
100+
101+
if isinstance(fld, (EmbeddedDocumentField, EmbeddedDocumentListField)) or getattr(leaf, "document_type",
102+
None):
103+
cur = getattr(leaf, "document_type", None) or getattr(leaf, "document_type_obj", None)
104+
continue
105+
106+
cur = None
107+
108+
node[""] = True
109+
110+
return tree

0 commit comments

Comments
 (0)