Skip to content

Commit 3a9e656

Browse files
committed
[EXPERIMENTAL] Pull db__id stream
1 parent c82f42f commit 3a9e656

1 file changed

Lines changed: 23 additions & 4 deletions

File tree

src/plan/pull.rs

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! Pull expression plan, but without nesting.
22
3-
use timely::dataflow::operators::Concatenate;
3+
use timely::dataflow::operators::{Concat, Concatenate};
44
use timely::dataflow::scopes::child::Iterative;
55
use timely::dataflow::Scope;
66
use timely::order::Product;
@@ -193,7 +193,7 @@ impl<P: Implementable> Implementable for PullLevel<P> {
193193
// Cardinality single means we don't need
194194
// to distinguish child ids (there can
195195
// only be one).
196-
result.pop();
196+
result.pop().expect("malformed path");
197197

198198
result.push(attribute.clone());
199199
result.push(v.clone());
@@ -204,11 +204,30 @@ impl<P: Implementable> Implementable for PullLevel<P> {
204204
}
205205
});
206206

207-
let tuples = nested.concatenate(streams).as_collection();
207+
let tuples = if self.path_attributes.is_empty() || self.cardinality_many {
208+
nested.concatenate(streams)
209+
} else {
210+
let db_ids = {
211+
let path_attributes = self.path_attributes.clone();
212+
paths
213+
.map(move |path| {
214+
let mut result = interleave(&path, &path_attributes);
215+
let eid = result.pop().expect("malformed path");
216+
217+
result.push(Value::Aid("db__id".to_string()));
218+
result.push(eid);
219+
220+
result
221+
})
222+
.inner
223+
};
224+
225+
nested.concatenate(streams).concat(&db_ids)
226+
};
208227

209228
let relation = CollectionRelation {
210229
variables: vec![], // @TODO
211-
tuples,
230+
tuples: tuples.as_collection(),
212231
};
213232

214233
(Implemented::Collection(relation), shutdown_handle)

0 commit comments

Comments
 (0)