Skip to content

Commit 00be401

Browse files
committed
publish flow after running it
1 parent c46f5b7 commit 00be401

4 files changed

Lines changed: 65 additions & 19 deletions

File tree

openml/exceptions.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
class PyOpenMLError(Exception):
22
def __init__(self, message):
3+
self.message = message
34
super(PyOpenMLError, self).__init__(message)
45

6+
57
class OpenMLServerError(PyOpenMLError):
68
"""class for when something is really wrong on the server
79
(result did not parse to dict), contains unparsed error."""
810

911
def __init__(self, message):
10-
message = "OpenML Server error: " + message
1112
super(OpenMLServerError, self).__init__(message)
1213

1314
#
@@ -18,7 +19,6 @@ class OpenMLServerException(OpenMLServerError):
1819
def __init__(self, code, message, additional=None):
1920
self.code = code
2021
self.additional = additional
21-
message = "OpenML Server exception: " + message
2222
super(OpenMLServerException, self).__init__(message)
2323

2424

openml/flows/functions.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,8 @@ def _check_flow_for_server_id(flow):
145145
stack.append(component)
146146

147147

148-
def assert_flows_equal(flow1, flow2, ignore_parameters_on_older_children=None):
148+
def assert_flows_equal(flow1, flow2, ignore_parameters_on_older_children=None,
149+
ignore_parameters=False):
149150
"""Check equality of two flows.
150151
151152
Two flows are equal if their all keys which are not set by the server
@@ -160,6 +161,9 @@ def assert_flows_equal(flow1, flow2, ignore_parameters_on_older_children=None):
160161
ignore_parameters_on_older_children : str
161162
If set to ``OpenMLFlow.upload_date``, ignores parameters in a child
162163
flow if it's upload date predates the upload date of the parent flow.
164+
165+
ignore_parameters : bool
166+
Whether to ignore parameter values when comparing flows.
163167
"""
164168
if not isinstance(flow1, OpenMLFlow):
165169
raise TypeError('Argument 1 must be of type OpenMLFlow, but is %s' %
@@ -199,6 +203,8 @@ def assert_flows_equal(flow1, flow2, ignore_parameters_on_older_children=None):
199203
ignore_parameters_on_older_children)
200204
if upload_date_current_flow < upload_date_parent_flow:
201205
continue
206+
elif ignore_parameters:
207+
continue
202208

203209
if attr1 != attr2:
204210
raise ValueError("Flow %s: values for attribute '%s' differ: "

openml/runs/functions.py

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import six
1212
import xmltodict
1313

14+
import openml
1415
from ..exceptions import PyOpenMLError
1516
from .. import config
1617
from ..flows import sklearn_to_flow, get_flow, flow_exists, _check_n_jobs, \
@@ -30,20 +31,6 @@ def run_model_on_task(task, model, avoid_duplicate_runs=True, flow_tags=None,
3031
seed=None):
3132
flow = sklearn_to_flow(model)
3233

33-
# returns flow id if the flow exists on the server, False otherwise
34-
flow_id = flow_exists(flow.name, flow.external_version)
35-
36-
if flow_id == False:
37-
# TODO this is potential race condition! someone could upload the
38-
# same flow in the meantime!
39-
# means the flow did not exists. As we could run it, publish it now
40-
flow = flow.publish()
41-
else:
42-
# flow already existed, download it from server
43-
# TODO (neccessary? is this a post condition of this function)
44-
flow_from_server = get_flow(flow_id)
45-
_copy_server_fields(flow_from_server, flow)
46-
4734
return run_flow_on_task(task=task, flow=flow,
4835
avoid_duplicate_runs=avoid_duplicate_runs,
4936
flow_tags=flow_tags, seed=seed)
@@ -82,6 +69,9 @@ def run_flow_on_task(task, flow, avoid_duplicate_runs=True, flow_tags=None,
8269
# skips the run if it already exists and the user opts for this in the config file.
8370
# also, if the flow is not present on the server, the check is not needed.
8471
if avoid_duplicate_runs:
72+
if flow.flow_id is None:
73+
raise ValueError('Cannot check if a run exists if the '
74+
'corresponding flow has not been published yet!')
8575
flow_from_server = get_flow(flow.flow_id)
8676
setup_id = setup_exists(flow_from_server)
8777
ids = _run_exists(task.task_id, setup_id)
@@ -98,18 +88,43 @@ def run_flow_on_task(task, flow, avoid_duplicate_runs=True, flow_tags=None,
9888

9989
run_environment = _get_version_information()
10090
tags = ['openml-python', run_environment[1]]
91+
10192
# execute the run
93+
res = _run_task_get_arffcontent(flow.model, task, class_labels)
94+
95+
if flow.flow_id is None:
96+
_publish_flow_if_necessary(flow)
97+
10298
run = OpenMLRun(task_id=task.task_id, flow_id=flow.flow_id,
10399
dataset_id=dataset.dataset_id, model=flow.model, tags=tags)
104100
run.parameter_settings = OpenMLRun._parse_parameters(flow)
105-
res = _run_task_get_arffcontent(flow.model, task, class_labels)
101+
106102
run.data_content, run.trace_content, run.trace_attributes, run.detailed_evaluations = res
107103

108104
config.logger.info('Executed Task %d with Flow id: %d' % (task.task_id, run.flow_id))
109105

110106
return run
111107

112108

109+
def _publish_flow_if_necessary(flow):
110+
# try publishing the flow if one has to assume it doesn't exist yet. It
111+
# might fail because it already exists, then the flow is currently not
112+
# reused
113+
114+
try:
115+
flow.publish()
116+
except OpenMLServerException as e:
117+
if e.message == "flow already exists":
118+
flow_id = openml.flows.flow_exists(flow.name,
119+
flow.external_version)
120+
server_flow = get_flow(flow_id)
121+
openml.flows.flow._copy_server_fields(server_flow, flow)
122+
openml.flows.assert_flows_equal(flow, server_flow,
123+
ignore_parameters=True)
124+
else:
125+
raise e
126+
127+
113128
def get_run_trace(run_id):
114129
"""Get the optimization trace object for a given run id.
115130

tests/test_runs/test_run_functions.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,17 @@ def _check_detailed_evaluations(self, detailed_evaluations, num_repeats, num_fol
180180
self.assertGreater(evaluation, 0) # should take at least one millisecond (?)
181181
self.assertLess(evaluation, max_time_allowed)
182182

183+
def test_avoid_duplicate_runs_with_unpublished_flow(self):
184+
task_id = 115
185+
186+
clf = LogisticRegression()
187+
flow = sklearn_to_flow(clf)
188+
flow, _ = self._add_sentinel_to_flow_name(flow, None)
189+
task = openml.tasks.get_task(task_id)
190+
191+
self.assertRaisesRegexp(ValueError, 'Cannot check if a run exists if the corresponding flow has not been published yet!',
192+
openml.runs.run_flow_on_task, task=task,
193+
flow=flow, avoid_duplicate_runs=True)
183194

184195
def test_run_regression_on_classif_task(self):
185196
task_id = 115
@@ -202,6 +213,21 @@ def test_check_erronous_sklearn_flow_fails(self):
202213
openml.runs.run_model_on_task, task=task,
203214
model=clf)
204215

216+
def test__publish_flow_if_necessary(self):
217+
task_id = 115
218+
task = openml.tasks.get_task(task_id)
219+
220+
clf = LogisticRegression()
221+
flow = sklearn_to_flow(clf)
222+
flow, sentinel = self._add_sentinel_to_flow_name(flow, None)
223+
openml.runs.functions._publish_flow_if_necessary(flow)
224+
self.assertIsNotNone(flow.flow_id)
225+
226+
flow2 = sklearn_to_flow(clf)
227+
flow2, _ = self._add_sentinel_to_flow_name(flow2, sentinel)
228+
openml.runs.functions._publish_flow_if_necessary(flow2)
229+
self.assertEqual(flow2.flow_id, flow.flow_id)
230+
205231
def test_run_and_upload(self):
206232
# This unit test is ment to test the following functions, using a varity of flows:
207233
# - openml.runs.run_task()
@@ -277,7 +303,6 @@ def test_run_and_upload(self):
277303
self._check_detailed_evaluations(run.detailed_evaluations, 1, num_folds)
278304
pass
279305

280-
281306
def test_initialize_model_from_run(self):
282307
clf = sklearn.pipeline.Pipeline(steps=[('Imputer', Imputer(strategy='median')),
283308
('VarianceThreshold', VarianceThreshold(threshold=0.05)),

0 commit comments

Comments
 (0)