@@ -35,6 +35,8 @@ pub struct GraphQl {
3535 pub query : String ,
3636 /// Cached paths
3737 paths : Vec < Pull > ,
38+ /// Required attributes to filter entities by
39+ required_aids : Vec < Aid > ,
3840}
3941
4042impl GraphQl {
@@ -50,6 +52,7 @@ impl GraphQl {
5052 GraphQl {
5153 query,
5254 paths : ast. into_paths ( empty_plan) ,
55+ required_aids : vec ! [ ] ,
5356 }
5457 }
5558
@@ -61,7 +64,19 @@ impl GraphQl {
6164 bindings : root_plan. into_bindings ( ) ,
6265 } ) ;
6366
64- GraphQl { query, paths }
67+ GraphQl {
68+ query,
69+ paths,
70+ required_aids : vec ! [ ] ,
71+ }
72+ }
73+
74+ /// Creates a new GraphQl that filters top-level entities down to
75+ /// only those with all of the required Aids present.
76+ pub fn with_required_aids ( query : String , required_aids : Vec < Aid > ) -> Self {
77+ let mut query = GraphQl :: new ( query) ;
78+ query. required_aids = required_aids;
79+ query
6580 }
6681}
6782
@@ -320,8 +335,11 @@ impl GraphQl {
320335 } ) ;
321336
322337 let mut change_keys = HashMap :: new ( ) ;
338+ let mut excise_keys = Vec :: new ( ) ;
323339 let mut vector = Vec :: new ( ) ;
324340
341+ let required_aids = self . required_aids . clone ( ) ;
342+
325343 let snapshots = nested. parent . concatenate ( streams) . unary_notify (
326344 Pipeline ,
327345 "GraphQl" ,
@@ -337,20 +355,47 @@ impl GraphQl {
337355 notificator. notify_at ( cap. retain ( ) ) ;
338356 } ) ;
339357
340- let states = states. borrow ( ) ;
358+ {
359+ let mut states = states. borrow_mut ( ) ;
341360
342- notificator. for_each ( |cap, _, _| {
343- if let Some ( mut keys) = change_keys. remove ( cap. time ( ) ) {
344- let t = cap. time ( ) . clone ( ) ;
345-
346- let snapshots = keys. drain ( ) . map ( |key| {
347- let snapshot = & states[ key] ;
348- Output :: Json ( "test" . to_string ( ) , snapshot. clone ( ) , t. clone ( ) . into ( ) , 1 )
349- } ) ;
361+ for ( key, snapshot) in states. as_object ( ) . unwrap ( ) . iter ( ) {
362+ for required_aid in required_aids. iter ( ) {
363+ if !snapshot. as_object ( ) . unwrap ( ) . contains_key ( required_aid) {
364+ excise_keys. push ( key. clone ( ) ) ;
365+ }
366+ }
367+ }
350368
351- output. session ( & cap) . give_iterator ( snapshots) ;
369+ let states_map = states. as_object_mut ( ) . unwrap ( ) ;
370+ for key in excise_keys. drain ( ..) {
371+ states_map. remove ( & key) ;
352372 }
353- } ) ;
373+ }
374+
375+ {
376+ let states = states. borrow ( ) ;
377+
378+ notificator. for_each ( |cap, _, _| {
379+ if let Some ( mut keys) = change_keys. remove ( cap. time ( ) ) {
380+ let t = cap. time ( ) . clone ( ) ;
381+
382+ let snapshots = keys. drain ( ) . flat_map ( |key| {
383+ if let Some ( snapshot) = states. get ( key) {
384+ Some ( Output :: Json (
385+ "test" . to_string ( ) ,
386+ snapshot. clone ( ) ,
387+ t. clone ( ) . into ( ) ,
388+ 1 ,
389+ ) )
390+ } else {
391+ None
392+ }
393+ } ) ;
394+
395+ output. session ( & cap) . give_iterator ( snapshots) ;
396+ }
397+ } ) ;
398+ }
354399 } ,
355400 ) ;
356401
0 commit comments