Skip to content

Commit 310c840

Browse files
committed
Working on new version adapted to Dataset-store
1 parent c41cc2d commit 310c840

4 files changed

Lines changed: 113 additions & 58 deletions

File tree

src/edgeml/Dataset.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,6 @@ def data(self):
4747
if labeling.name not in df.columns:
4848
df[labeling.name] = ""
4949
df.loc[(df['time'] >= label.start) & (df['time'] <= label.end), labeling.name] = label.name
50-
51-
52-
53-
5450
return df
5551

5652
def loadData(self):

src/edgeml/TimeSeries.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ def parse(self, data):
3232
self.start = data["start"]
3333
self.end = data["end"]
3434
self.unit = data["unit"]
35-
self.samplingRate = SamplingRate(data["samplingRate"]["mean"], data["samplingRate"]["var"])
35+
if data["samplingRate"] is not None:
36+
self.samplingRate = SamplingRate(data["samplingRate"]["mean"], data["samplingRate"]["var"])
3637
self.length = data["length"]
3738

3839
@property
@@ -49,7 +50,11 @@ def loadData(self) -> pd.DataFrame:
4950
res = req.get(self._backendURL + getProjectEndpoint + self._readKey + "/" + self._datasetId + "/" + self._id)
5051
with tempfile.NamedTemporaryFile(suffix=".h5", delete=False) as temp_file:
5152
temp_file.write(res.content)
52-
with h5py.File(temp_file.name, "r") as hf:
53-
time_array = np.array(hf["time"])
54-
data_array = np.array(hf["data"])
55-
self.data = pd.DataFrame({"time": time_array, self.name: data_array})
53+
temp_file.flush()
54+
if self.length == 0 or self.length == None:
55+
self.data = pd.DataFrame(columns=['time', self.name])
56+
else:
57+
with h5py.File(temp_file.name, "r") as hf:
58+
time_array = np.array(hf["time"])
59+
data_array = np.array(hf["data"])
60+
self.data = pd.DataFrame({"time": time_array, self.name: data_array})

src/edgeml/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from edgeml.edgeml import DatasetCollector, DatasetReceiver

src/edgeml/edgeml.py

Lines changed: 102 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
import requests as req
22
from edgeml.consts import getProjectEndpoint
33
from edgeml.Dataset import Dataset
4-
import timelib
4+
import time
55

6-
7-
class edgeml:
6+
class DatasetReceiver:
87

98
def __init__(self, backendURL, readKey=None, writeKey=None):
109
self.backendURL = backendURL
@@ -28,63 +27,117 @@ def loadData(self):
2827
for d in self.datasets:
2928
d.loadData()
3029

31-
#
32-
# @param {string} url - The url of the backend server
33-
# @param {string} key - The Device-Api-Key
34-
# @param {boolean} useDeviceTime - True if you want to use timestamps generated by the server
35-
# @returns Function to upload single datapoints to one dataset inside a specific project
36-
#
37-
class datasetCollector():
38-
def __init__(self, url: str, key: str, name: str, useDeviceTime: bool) -> None:
30+
URLS = {
31+
"uploadDataset": "/api/deviceapi/uploadDataset",
32+
"initDatasetIncrement": "/ds/api/dataset/init/",
33+
"addDatasetIncrement": "/ds/api/dataset/append/",
34+
"getDatasetsInProject": "/ds/api/datasets/"
35+
}
36+
37+
UPLOAD_INTERVAL = 5 * 1000
38+
39+
class DatasetCollector:
40+
def __init__(
41+
self, url, apiKey, name, useDeviceTime, timeSeries, metaData, datasetLabel=None
42+
):
3943
self.url = url
40-
self.key = key
44+
self.apiKey = apiKey
4145
self.name = name
4246
self.useDeviceTime = useDeviceTime
47+
self.timeSeries = timeSeries
48+
self.metaData = metaData
49+
self.datasetLabel = datasetLabel
50+
self.error = None
51+
self.uploadComplete = False
52+
self.labeling = None
53+
self.lastChecked = time.time() * 1000
54+
55+
if self.useDeviceTime:
56+
self.addDataPoint = self._addDataPoint_DeviceTime
57+
else:
58+
self.addDataPoint = self._addDataPoint_OwnTimeStamps
59+
60+
61+
if self.datasetLabel:
62+
labeling_name, label_name = self.datasetLabel.split("_")
63+
self.labeling = {"labelingName": labeling_name, "labelName": label_name}
4364

44-
res = req.post(url + initDatasetIncrement, json = {"deviceApiKey": key, "name": name})
45-
#TODO error handling
46-
self.datasetKey = res.json()['datasetKey']
47-
self.dataStore = {'datasetKey': self.datasetKey, 'data': []}
48-
self.counter = 0
4965
self.error = None
5066

67+
res = req.post(
68+
url + URLS["initDatasetIncrement"] + apiKey,
69+
json={
70+
"name": self.name,
71+
"metaData": self.metaData,
72+
"timeSeries": self.timeSeries,
73+
"labeling": self.labeling,
74+
},
75+
)
5176

52-
def addDataPoint(self, sensorName: str, value: float, time: int = None):
53-
if (self.error):
54-
raise self.error
55-
if (type(value) is not float): #TODO cast int to float, it may cause problems, can value be ever int?
77+
if res.status_code != 200:
78+
raise RuntimeError(res.text.split(':')[1][1:-2])
79+
80+
res_data = res.json()
81+
if not res_data or not res_data["id"]:
82+
raise RuntimeError("Could not generate DatasetCollector")
83+
self.datasetKey = res_data["id"]
84+
self.dataStore = {x: [] for x in self.timeSeries}
85+
86+
async def _addDataPoint_DeviceTime(self, name, value):
87+
timestamp = int(time.time() * 1000)
88+
await self._addDataPoint_OwnTimeStamps(timestamp, name, value)
89+
90+
async def _addDataPoint_OwnTimeStamps(self, timestamp, name, value):
91+
if name not in self.timeSeries:
92+
raise ValueError("invalid time-series name")
93+
94+
if not isinstance(value, (int, float)):
5695
raise ValueError("Datapoint is not a number")
57-
if (not self.useDeviceTime and type(time) is not int and type(time) is not float):
96+
97+
if not isinstance(timestamp, (int)):
5898
raise ValueError("Provide a valid timestamp")
5999

60-
if (self.useDeviceTime):
61-
time = timelib.time()
100+
self.dataStore[name].append([timestamp, value])
62101

63-
if (all(dataPoint['sensorname'] != sensorName for dataPoint in self.dataStore['data'])):
64-
self.dataStore['data'].append({
65-
'sensorname': sensorName, #TODO sensorname is not in camelcase, maybe refactor later in db?
66-
'start': time,
67-
'end': time,
68-
'timeSeriesData': [{'timestamp': time, 'datapoint': value}]
69-
})
70-
else:
71-
for dataPoint in self.dataStore['data']:
72-
if (dataPoint['sensorname'] == sensorName):
73-
dataPoint['timeSeriesData'].append({'timestamp': time, 'datapoint': value})
74-
dataPoint['start'] = min(dataPoint['start'], time)
75-
dataPoint['end'] = max(dataPoint['end'], time)
76-
break
77-
78-
self.counter = self.counter + 1
79-
if self.counter > 1000:
80-
self.upload()
81-
82-
def __upload(self):
83-
res = req.post(self.url + addDatasetIncrementBatch, json = self.dataStore)
84-
self.counter = 0
85-
self.dataStore = {'datasetKey': self.datasetKey, 'data': []}
102+
if time.time() * 1000 - self.lastChecked > UPLOAD_INTERVAL:
103+
self.upload(self.labeling)
104+
self.lastChecked = time.time() * 1000
105+
self.dataStore = {"data": []}
106+
107+
108+
async def upload(self, uploadLabel):
109+
tmp_dataStore = self.dataStore.copy()
110+
tmp_dataStore = [{"name": k, "data": tmp_dataStore[k]} for k in tmp_dataStore.keys()]
111+
response = req.post(
112+
self.url
113+
+ URLS["addDatasetIncrement"]
114+
+ self.apiKey
115+
+ "/"
116+
+ self.datasetKey,
117+
json={"data": tmp_dataStore, "labeling": uploadLabel},
118+
)
119+
self.dataStore = {x: [] for x in self.timeSeries}
120+
if response.status_code != 200:
121+
raise RuntimeError("Upload failed")
86122

123+
# Synchronizes the server with the data when you have added all data
87124
def onComplete(self):
125+
if self.uploadComplete:
126+
raise RuntimeError("Dataset is already uploaded")
127+
tmp_dataStore = self.dataStore.copy()
128+
tmp_dataStore = [{"name": k, "data": tmp_dataStore[k]} for k in tmp_dataStore.keys()]
129+
response = req.post(
130+
self.url
131+
+ URLS["addDatasetIncrement"]
132+
+ self.apiKey
133+
+ "/"
134+
+ self.datasetKey,
135+
json={"data": tmp_dataStore, "labeling": self.labeling},
136+
)
137+
if response.status_code != 200:
138+
raise RuntimeError("Upload failed")
88139
if self.error:
89-
raise self.error
90-
self.__upload()
140+
raise RuntimeError(self.error)
141+
self.uploadComplete = True
142+
self.dataStore = {x: [] for x in self.timeSeries}
143+
return True

0 commit comments

Comments
 (0)