Skip to content

Commit 121b4c9

Browse files
committed
Add a function to get the list of flow graph nodes in strict left-to-right order
1 parent 699d578 commit 121b4c9

1 file changed

Lines changed: 78 additions & 24 deletions

File tree

dataikuapi/dss/flow.py

Lines changed: 78 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from .dataset import DSSDataset
33
from .recipe import DSSRecipe, DSSRecipeDefinitionAndPayload
44
from .future import DSSFuture
5-
import logging
5+
import logging, json
66

77
class DSSProjectFlow(object):
88
def __init__(self, client, project):
@@ -90,6 +90,7 @@ class DSSProjectFlowGraph(object):
9090
def __init__(self, flow, data):
9191
self.flow = flow
9292
self.data = data
93+
self.nodes = data["nodes"]
9394

9495
def get_source_computables(self, as_type="dict"):
9596
"""
@@ -102,17 +103,31 @@ def get_source_computables(self, as_type="dict"):
102103
:class:`dataikuapi.dss.savedmodel.DSSSavedModel`, or streaming endpoint
103104
"""
104105
ret = []
105-
for computable in self.data["computables"].values():
106-
if len(computable["predecessors"]) == 0:
107-
ret.append(computable)
108-
return self._convert_computables_list(ret, as_type)
106+
for node in self.nodes.values():
107+
if len(node["predecessors"]) == 0 and node["type"].startswith("COMPUTABLE"):
108+
ret.append(node)
109+
return self._convert_nodes_list(ret, as_type)
110+
111+
def get_source_recipes(self, as_type="dict"):
112+
"""
113+
Returns the list of source recipes.
114+
:param as_type: How to return the source recipes. Possible values are "dict" and "object"
115+
116+
:return: if as_type=dict, each recipes is returned as a dict containing at least "ref" and "type".
117+
if as_type=object, each computable is returned as a :class:`dataikuapi.dss.recipe.DSSRecipe`,
118+
"""
119+
ret = []
120+
for node in self.nodes.values():
121+
if len(node["predecessors"]) == 0 and node["type"] == "RUNNABLE_RECIPE":
122+
ret.append(node)
123+
return self._convert_nodes_list(ret, as_type)
109124

110125
def get_source_datasets(self):
111126
"""
112127
Returns the list of source datasets for this project.
113128
:rtype list of :class:`dataikuapi.dss.dataset.DSSDataset`
114129
"""
115-
return [self._get_object_from_computable(x) for x in self.get_source_computables() if x["type"] == "COMPUTABLE_DATASET"]
130+
return [self._get_object_from_graph_node(x) for x in self.get_source_computables() if x["type"] == "COMPUTABLE_DATASET"]
116131

117132
def get_successor_recipes(self, node, as_type="name"):
118133
"""
@@ -125,43 +140,82 @@ def get_successor_recipes(self, node, as_type="name"):
125140
if isinstance(node, DSSDataset):
126141
node = node.dataset_name
127142

128-
computable = self.data["computables"].get(node, None)
143+
computable = self.nodes.get(node, None)
129144
if computable is None:
130145
raise ValueError("Computable %s not found in Flow graph" % node)
131146

132-
names= computable["successors"]
133-
134-
if as_type == "object":
135-
return [DSSRecipe(self.flow.client, self.flow.project.project_key, x) for x in names]
136-
else:
137-
return names
138-
147+
runnables = [self.nodes[x] for x in computable["successors"]]
148+
return self._convert_nodes_list(runnables, as_type)
149+
139150
def get_successor_computables(self, node, as_type="dict"):
140151
"""
141152
Returns a list of computables that are a successor of a given graph node
142153
Each computable is returned as a dict containing at least "ref" and "type"
143154
"""
144155
if isinstance(node, DSSRecipe):
145156
node = node.recipe_name
146-
runnable = self.data["runnables"].get(node, None)
157+
runnable = self.nodes.get(node, None)
147158
if runnable is None:
148159
raise ValueError("Runnable %s not found in Flow graph" % node)
149160

150-
computables = [self.data["computables"][x] for x in runnable["successors"]]
151-
return self._convert_computables_list(computables, as_type)
161+
computables = [self.nodes[x] for x in runnable["successors"]]
162+
return self._convert_nodes_list(computables, as_type)
152163

153-
def _convert_computables_list(self, computables, as_type):
164+
def _convert_nodes_list(self, nodes, as_type):
154165
if as_type == "object":
155-
return [self._get_object_from_computable(computable) for computable in computables]
166+
return [self._get_object_from_graph_node(node) for node in nodes]
156167
else:
157-
return computables
168+
return nodes
158169

159-
def _get_object_from_computable(self, computable):
160-
if computable["type"] == "COMPUTABLE_DATASET":
161-
return DSSDataset(self.flow.client, self.flow.project.project_key, computable["ref"])
170+
def _get_object_from_graph_node(self, node):
171+
if node["type"] == "COMPUTABLE_DATASET":
172+
return DSSDataset(self.flow.client, self.flow.project.project_key, node["ref"])
173+
elif node["type"] == "RUNNABLE_RECIPE":
174+
return DSSRecipe(self.flow.client, self.flow.project.project_key, node["ref"])
162175
else:
163176
# TODO
164-
raise Exception("unsupported %s" % computable["type"])
177+
raise Exception("unsupported node type %s" % node["type"])
178+
179+
def get_items_in_traversal_order(self, as_type="dict"):
180+
ret = []
181+
def add_to_set(node):
182+
print("*** Adding: %s" % node["ref"])
183+
ret.append(node)
184+
def in_set(obj):
185+
for candidate in ret:
186+
if candidate["type"] == obj["type"] and candidate["ref"] == obj["ref"]:
187+
return True
188+
return False
189+
190+
def add_from(graph_node):
191+
#print("Add from %s" % graph_node["ref"])
192+
# To keep traversal order, we add predecessors before successors. And we recurse before adding
193+
# our own predecessors
194+
for predecessor_ref in graph_node["predecessors"]:
195+
#print(" Pred = %s " % predecessor_ref)
196+
predecessor_node = self.nodes[predecessor_ref]
197+
if not in_set(predecessor_node):
198+
add_from(predecessor_node)
199+
200+
# Then ourselves
201+
#print(" Itself %s" % graph_node["ref"])
202+
if not in_set(graph_node):
203+
add_to_set(graph_node)
204+
205+
# For successors, we first add ourselves before successors
206+
for successor_ref in graph_node["successors"]:
207+
#print(" Succ = %s " % successor_ref)
208+
successor_node = self.nodes[successor_ref]
209+
if not in_set(successor_node):
210+
add_from(successor_node)
211+
212+
for source_computable in self.get_source_computables():
213+
add_from(source_computable)
214+
215+
for source_recipe in self.get_source_recipes():
216+
add_from(source_recipe)
217+
218+
return self._convert_nodes_list(ret, as_type)
165219

166220
class DSSFlowTool(object):
167221
"""

0 commit comments

Comments
 (0)