Skip to content

Commit af13b5b

Browse files
read Epc in parallel to increase efficiency
+ adding ".dat" as possible file extension for hdf5 handler and set h5Handler as default
1 parent b66bc88 commit af13b5b

6 files changed

Lines changed: 455 additions & 119 deletions

File tree

energyml-utils/example/attic/compare_inmem_n_stream.py

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,31 @@ def reexport_in_memory(filepath: str, output_folder: Optional[str] = None):
3939
if output_folder:
4040
os.makedirs(output_folder, exist_ok=True)
4141
path_in_memory = f"{output_folder}/{path_in_memory.split('/')[-1]}"
42-
epc = Epc.read_file(epc_file_path=filepath, read_rels_from_files=False)
42+
epc = Epc.read_file(epc_file_path=filepath, read_rels_from_files=False, recompute_rels=False)
4343

4444
if os.path.exists(path_in_memory):
4545
os.remove(path_in_memory)
4646
epc.export_file(path_in_memory)
4747

4848

49-
def time_comparison(filepath: str, output_folder: Optional[str] = None, skip_sequential: bool = True):
49+
def reexport_in_memory_par_read(filepath: str, output_folder: Optional[str] = None):
50+
path_in_memory = filepath.replace(".epc", "_in_memory_par_read.epc")
51+
if output_folder:
52+
os.makedirs(output_folder, exist_ok=True)
53+
path_in_memory = f"{output_folder}/{path_in_memory.split('/')[-1]}"
54+
epc = Epc.read_file(epc_file_path=filepath, read_rels_from_files=False, read_parallel=True, recompute_rels=False)
55+
56+
if os.path.exists(path_in_memory):
57+
os.remove(path_in_memory)
58+
epc.export_file(path_in_memory, parallel=True)
59+
60+
61+
def time_comparison(
62+
filepath: str,
63+
output_folder: Optional[str] = None,
64+
skip_sequential_stream: bool = True,
65+
skip_parallel_stream: bool = True,
66+
):
5067
"""Compare performance of different EPC reexport methods."""
5168
print(f"\n{'=' * 70}")
5269
print(f"Performance Comparison: {filepath.split('/')[-1]}")
@@ -62,7 +79,15 @@ def time_comparison(filepath: str, output_folder: Optional[str] = None, skip_seq
6279
results.append(("In-Memory (Epc)", elapsed_inmem))
6380
print(f" ✓ Completed in {elapsed_inmem:.3f}s\n")
6481

65-
if not skip_sequential:
82+
# Test 1b: In-Memory with Parallel Read
83+
print("⏳ Testing In-Memory EPC processing with Parallel Read...")
84+
start = time.perf_counter()
85+
reexport_in_memory_par_read(filepath, output_folder)
86+
elapsed_inmem_par = time.perf_counter() - start
87+
results.append(("In-Memory (Epc) Parallel Read", elapsed_inmem_par))
88+
print(f" ✓ Completed in {elapsed_inmem_par:.3f}s\n")
89+
90+
if not skip_sequential_stream:
6691
# Test 2: Streaming Sequential
6792
print("⏳ Testing Streaming Sequential processing...")
6893
start = time.perf_counter()
@@ -72,12 +97,13 @@ def time_comparison(filepath: str, output_folder: Optional[str] = None, skip_seq
7297
print(f" ✓ Completed in {elapsed_seq:.3f}s\n")
7398

7499
# Test 3: Streaming Parallel
75-
print("⏳ Testing Streaming Parallel processing...")
76-
start = time.perf_counter()
77-
reexport_stream_parallel(filepath, output_folder)
78-
elapsed_parallel = time.perf_counter() - start
79-
results.append(("Stream Parallel", elapsed_parallel))
80-
print(f" ✓ Completed in {elapsed_parallel:.3f}s\n")
100+
if not skip_parallel_stream:
101+
print("⏳ Testing Streaming Parallel processing...")
102+
start = time.perf_counter()
103+
reexport_stream_parallel(filepath, output_folder)
104+
elapsed_parallel = time.perf_counter() - start
105+
results.append(("Stream Parallel", elapsed_parallel))
106+
print(f" ✓ Completed in {elapsed_parallel:.3f}s\n")
81107

82108
# Calculate speedups
83109
results_sorted = sorted(results, key=lambda x: x[1])
@@ -119,15 +145,15 @@ def time_comparison(filepath: str, output_folder: Optional[str] = None, skip_seq
119145

120146
update_prop_kind_dict_cache()
121147

122-
time_comparison(
123-
filepath=sys.argv[1] if len(sys.argv) > 1 else "rc/epc/testingPackageCpp22.epc",
124-
output_folder="rc/performance_results",
125-
)
126-
127148
# time_comparison(
128-
# filepath=sys.argv[1] if len(sys.argv) > 1 else "rc/epc/80wells_surf.epc", output_folder="rc/performance_results"
149+
# filepath=sys.argv[1] if len(sys.argv) > 1 else "rc/epc/testingPackageCpp22.epc",
150+
# output_folder="rc/performance_results",
129151
# )
130152

153+
time_comparison(
154+
filepath=sys.argv[1] if len(sys.argv) > 1 else "rc/epc/80wells_surf.epc", output_folder="rc/performance_results"
155+
)
156+
131157
# time_comparison(
132158
# filepath=sys.argv[1] if len(sys.argv) > 1 else "wip/failingData/fix/sample_mini_firp_201_norels_with_media.epc",
133159
# output_folder="rc/performance_results",

energyml-utils/src/energyml/utils/data/datasets_io.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -756,9 +756,11 @@ def _register_default_handlers(self, max_open_files: int) -> None:
756756
"""Register all available handlers based on installed dependencies."""
757757
# HDF5 Handler
758758
if __H5PY_MODULE_EXISTS__:
759-
self.register_handler([".h5", ".hdf5"], lambda: HDF5ArrayHandler())
759+
self.register_handler([".h5", ".hdf5", ".dat"], lambda: HDF5ArrayHandler()) # dat for Galaxy compatibility
760760
else:
761-
self.register_handler([".h5", ".hdf5"], lambda: MockHDF5ArrayHandler())
761+
self.register_handler(
762+
[".h5", ".hdf5", ".dat"], lambda: MockHDF5ArrayHandler()
763+
) # dat for Galaxy compatibility
762764

763765
# Parquet Handler
764766
if __PARQUET_MODULE_EXISTS__:
@@ -802,13 +804,18 @@ def get_handler_for_file(self, file_path: str) -> Optional[ExternalArrayHandler]
802804
file_path: Path to the file
803805
804806
Returns:
805-
Handler instance, or None if no handler registered for this extension
807+
Handler instance, or h5 handler if extension not found but h5 handler is available and not mock, else None
806808
"""
807809
ext = os.path.splitext(file_path)[1].lower()
808810

809811
if ext in self._handlers:
810812
return self._handlers[ext]()
811813

814+
# search for h5 handler if not mock and return it by default
815+
if ".h5" in self._handlers:
816+
h = self._handlers[".h5"]()
817+
if "mock" not in h.__class__.__name__.lower():
818+
return self._handlers[".h5"]()
812819
return None
813820

814821
def supports_extension(self, extension: str) -> bool:
@@ -973,7 +980,7 @@ def list_arrays(self, source: Union[BytesIO, str, Any]) -> List[str]:
973980
def can_handle_file(self, file_path: str) -> bool:
974981
"""Check if this handler can process the file."""
975982
ext = os.path.splitext(file_path)[1].lower()
976-
return ext in [".h5", ".hdf5"]
983+
return ext in [".h5", ".hdf5", ".dat"] # dat for Galaxy compatibility
977984

978985
else:
979986

@@ -1019,7 +1026,7 @@ def list_arrays(self, source: Union[BytesIO, str, Any]) -> List[str]:
10191026
raise MissingExtraInstallation(extra_name="hdf5")
10201027

10211028
def can_handle_file(self, file_path: str) -> bool:
1022-
return os.path.splitext(file_path)[1].lower() in [".h5", ".hdf5"]
1029+
return os.path.splitext(file_path)[1].lower() in [".h5", ".hdf5", ".dat"] # dat for Galaxy compatibility
10231030

10241031

10251032
# Parquet Handler

energyml-utils/src/energyml/utils/data/mesh.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ def read_mesh_object(
241241
): # WellboreFrameRep has allready the displacement applied
242242
# TODO: the displacement should be done in each reader function to manage specific cases
243243
for s in surfaces:
244-
print("CRS : ", s.crs_object.uuid if s.crs_object is not None else "None")
244+
logging.debug(f"CRS : {s.crs_object.uuid if s.crs_object is not None else 'None'}")
245245
crs_displacement(s.point_list, s.crs_object)
246246
return surfaces
247247
else:
@@ -833,7 +833,11 @@ def read_wellbore_trajectory_representation(
833833

834834
# Get CRS from trajectory geometry if available
835835
try:
836-
crs = workspace.get_object(get_obj_uri(get_object_attribute(energyml_object, "geometry.LocalCrs")))
836+
crs_attr = get_object_attribute(energyml_object, "geometry.LocalCrs")
837+
if crs_attr is not None:
838+
crs = workspace.get_object(get_obj_uri(crs_attr))
839+
else:
840+
raise ObjectNotFoundNotError("LocalCrs attribute not found in trajectory geometry")
837841
except Exception:
838842
logging.debug("Could not get CRS from trajectory geometry")
839843

0 commit comments

Comments
 (0)