Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ share/python-wheels/
.installed.cfg
*.egg
MANIFEST
*.raw
*.log*

# PyInstaller
# Usually these files are written by a python script from a template
Expand Down
4 changes: 2 additions & 2 deletions echoflow/aspects/echoflow_aspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@ def wrapper(*args, **kwargs):
)
else:
print(log)
except Exception as e:
except Exception as ex:
pass
raise e
raise e

return wrapper

Expand Down
51 changes: 27 additions & 24 deletions echoflow/echoflow_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ def echoflow_{stage_name}(
futures.append(future)

ed_list = [f.result() for f in futures]
outputs = process_output_transects(name=stage.name, config=config, ed_list=ed_list)
outputs = process_output_transects(name=stage.name, config=config, stage=stage, ed_list=ed_list)
return outputs


Expand Down Expand Up @@ -402,33 +402,36 @@ def process_{stage_name}(
file_name = str(out_data.data.get("file_name"))
transect = str(out_data.data.get("transect"))
""" + """
log_util.log(msg={'msg':f' ---- Entering ----', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)
try:
log_util.log(msg={'msg':f' ---- Entering ----', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)

out_zarr = get_out_zarr(group = stage.options.get('group', True), working_dir=working_dir, transect=transect, file_name=file_name, storage_options=config.output.storage_options_dict)

out_zarr = get_out_zarr(group = stage.options.get('group', True), working_dir=working_dir, transect=transect, file_name=file_name, storage_options=config.output.storage_options_dict)


log_util.log(msg={'msg':f'Processing file, output will be at {out_zarr}', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)

if stage.options.get("use_offline") == False or isFile(out_zarr, config.output.storage_options_dict) == False:
log_util.log(msg={'msg':f'File not found in the destination folder / use_offline flag is False', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)

ed_list = get_zarr_list.fn(transect_data=out_data, storage_options=config.output.storage_options_dict)

log_util.log(msg={'msg':f'Computing """ + f"""{stage_name}""" + """', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)
log_util.log(msg={'msg':f'Processing file, output will be at {out_zarr}', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)

# xr_d = ..call your function here...
if stage.options.get("use_offline") == False or isFile(out_zarr, config.output.storage_options_dict) == False:
log_util.log(msg={'msg':f'File not found in the destination folder / use_offline flag is False', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)

log_util.log(msg={'msg':f'Converting to Zarr', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)

xr_d.to_zarr(store=out_zarr, mode="w", consolidated=True,
storage_options=config.output.storage_options_dict)

else:
log_util.log(msg={'msg':f'Skipped processing {file_name}. File found in the destination folder. To replace or reprocess set `use_offline` flag to False', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)

log_util.log(msg={'msg':f' ---- Exiting ----', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)

return {'out_path': out_zarr, 'transect': transect, 'file_name': file_name, 'error': False}
ed_list = get_zarr_list.fn(transect_data=out_data, storage_options=config.output.storage_options_dict)

log_util.log(msg={'msg':f'Computing """ + f"""{stage_name}""" + """', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)

# xr_d = ..call your function here...

log_util.log(msg={'msg':f'Converting to Zarr', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)

xr_d.to_zarr(store=out_zarr, mode="w", consolidated=True,
storage_options=config.output.storage_options_dict)

else:
log_util.log(msg={'msg':f'Skipped processing {file_name}. File found in the destination folder. To replace or reprocess set `use_offline` flag to False', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)

log_util.log(msg={'msg':f' ---- Exiting ----', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)

return {'out_path': out_zarr, 'transect': transect, 'file_name': file_name, 'error': False}
except Exception as e:
return {'transect': transect, 'file_name': file_name, 'error': True, 'error_desc': e}
"""

with open(f'./{stage_name}.py', 'w') as file:
Expand Down
9 changes: 5 additions & 4 deletions echoflow/rule_engine/dependency_engine.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from collections import defaultdict


class DependencyEngine:
"""
Dependency Engine for Managing Function Dependencies
Expand Down Expand Up @@ -25,7 +28,7 @@ class DependencyEngine:
next_functions = engine.get_possible_next_functions(current_function="download_data")
"""
def __init__(self):
self.dependencies = {}
self.dependencies = defaultdict(list)

def add_dependency(self, target_function, dependent_function):
"""
Expand All @@ -42,9 +45,7 @@ def add_dependency(self, target_function, dependent_function):
Example:
# Add a dependency relationship
engine.add_dependency(target_function="data_download", dependent_function="data_preprocess")
"""
if target_function not in self.dependencies:
self.dependencies[target_function] = []
"""
self.dependencies[target_function].append(dependent_function)

def get_possible_next_functions(self, current_function):
Expand Down
50 changes: 26 additions & 24 deletions echoflow/stages/subflows/add_depth.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def echoflow_add_depth(
futures.append(future)

ed_list = [f.result() for f in futures]
outputs = process_output_transects(name=stage.name, config=config, ed_list=ed_list)
outputs = process_output_transects(name=stage.name, config=config, stage=stage, ed_list=ed_list)
return outputs


Expand Down Expand Up @@ -132,29 +132,31 @@ def process_add_depth(
else:
file_name = str(out_data.data.get("file_name"))
transect = str(out_data.data.get("transect"))

log_util.log(msg={'msg':f' ---- Entering ----', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)

out_zarr = get_out_zarr(group = stage.options.get('group', True), working_dir=working_dir, transect=transect, file_name=file_name, storage_options=config.output.storage_options_dict)

log_util.log(msg={'msg':f'Processing file, output will be at {out_zarr}', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)

if stage.options.get("use_offline") == False or isFile(out_zarr, config.output.storage_options_dict) == False:
log_util.log(msg={'msg':f'File not found in the destination folder / use_offline flag is False', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)

ed_list = get_zarr_list.fn(transect_data=out_data, storage_options=config.output.storage_options_dict)
try:
log_util.log(msg={'msg':f' ---- Entering ----', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)

log_util.log(msg={'msg':f'Computing TS', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)
out_zarr = get_out_zarr(group = stage.options.get('group', True), working_dir=working_dir, transect=transect, file_name=file_name, storage_options=config.output.storage_options_dict)

xr_d_loc = ep.consolidate.add_depth(ds=ed_list[0], depth_offset=stage.external_params.get('depth_offset'),
tilt=stage.external_params.get('tilt'),
downward=stage.external_params.get('downward'))
log_util.log(msg={'msg':f'Converting to Zarr', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)
log_util.log(msg={'msg':f'Processing file, output will be at {out_zarr}', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)

xr_d_loc.to_zarr(store=out_zarr, mode="w", consolidated=True,
storage_options=config.output.storage_options_dict)
else:
log_util.log(msg={'msg':f'Skipped processing {file_name}. File found in the destination folder. To replace or reprocess set `use_offline` flag to False', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)

log_util.log(msg={'msg':f' ---- Exiting ----', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)
return {'out_path': out_zarr, 'transect': transect, 'file_name': file_name, 'error': False}
if stage.options.get("use_offline") == False or isFile(out_zarr, config.output.storage_options_dict) == False:
log_util.log(msg={'msg':f'File not found in the destination folder / use_offline flag is False', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)

ed_list = get_zarr_list.fn(transect_data=out_data, storage_options=config.output.storage_options_dict)

log_util.log(msg={'msg':f'Computing TS', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)

xr_d_loc = ep.consolidate.add_depth(ds=ed_list[0], depth_offset=stage.external_params.get('depth_offset'),
tilt=stage.external_params.get('tilt'),
downward=stage.external_params.get('downward'))
log_util.log(msg={'msg':f'Converting to Zarr', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)

xr_d_loc.to_zarr(store=out_zarr, mode="w", consolidated=True,
storage_options=config.output.storage_options_dict)
else:
log_util.log(msg={'msg':f'Skipped processing {file_name}. File found in the destination folder. To replace or reprocess set `use_offline` flag to False', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)

log_util.log(msg={'msg':f' ---- Exiting ----', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)
return {'out_path': out_zarr, 'transect': transect, 'file_name': file_name, 'error': False}
except Exception as e:
return {'transect': transect, 'file_name': file_name, 'error': True, 'error_desc': e}
50 changes: 26 additions & 24 deletions echoflow/stages/subflows/add_location.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def echoflow_add_location(
futures.append(future)

ed_list = [f.result() for f in futures]
outputs = process_output_transects(name=stage.name, config=config, ed_list=ed_list)
outputs = process_output_transects(name=stage.name, config=config, stage=stage, ed_list=ed_list)
return outputs


Expand Down Expand Up @@ -131,30 +131,32 @@ def process_add_location(
else:
file_name = str(out_data.data.get("file_name"))
transect = str(out_data.data.get("transect"))
try:
log_util.log(msg={'msg':f' ---- Entering ----', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)

log_util.log(msg={'msg':f' ---- Entering ----', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)

out_zarr = get_out_zarr(group = stage.options.get('group', True), working_dir=working_dir, transect=transect, file_name=file_name, storage_options=config.output.storage_options_dict)

log_util.log(msg={'msg':f'Processing file, output will be at {out_zarr}', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)

if stage.options.get("use_offline") == False or isFile(out_zarr, config.output.storage_options_dict) == False:
log_util.log(msg={'msg':f'File not found in the destination folder / use_offline flag is False', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)
out_zarr = get_out_zarr(group = stage.options.get('group', True), working_dir=working_dir, transect=transect, file_name=file_name, storage_options=config.output.storage_options_dict)

ed_list = get_zarr_list.fn(transect_data=out_data, storage_options=config.output.storage_options_dict)

log_util.log(msg={'msg':f'Computing Add Location', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)

xr_d_loc = ep.consolidate.add_location(ds=ed_list[0], echodata=stage.external_params.get('echodata'), nmea_sentence=stage.external_params.get('nmea_sentence'))

log_util.log(msg={'msg':f'Converting to Zarr', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)
log_util.log(msg={'msg':f'Processing file, output will be at {out_zarr}', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)

xr_d_loc.to_zarr(store=out_zarr, mode="w", consolidated=True,
storage_options=config.output.storage_options_dict)

else:
log_util.log(msg={'msg':f'Skipped processing {file_name}. File found in the destination folder. To replace or reprocess set `use_offline` flag to False', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)
if stage.options.get("use_offline") == False or isFile(out_zarr, config.output.storage_options_dict) == False:
log_util.log(msg={'msg':f'File not found in the destination folder / use_offline flag is False', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)

ed_list = get_zarr_list.fn(transect_data=out_data, storage_options=config.output.storage_options_dict)

log_util.log(msg={'msg':f'Computing Add Location', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)

xr_d_loc = ep.consolidate.add_location(ds=ed_list[0], echodata=stage.external_params.get('echodata'), nmea_sentence=stage.external_params.get('nmea_sentence'))

log_util.log(msg={'msg':f'Converting to Zarr', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)

xr_d_loc.to_zarr(store=out_zarr, mode="w", consolidated=True,
storage_options=config.output.storage_options_dict)

else:
log_util.log(msg={'msg':f'Skipped processing {file_name}. File found in the destination folder. To replace or reprocess set `use_offline` flag to False', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)

log_util.log(msg={'msg':f' ---- Exiting ----', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)

log_util.log(msg={'msg':f' ---- Exiting ----', 'mod_name':__file__, 'func_name':file_name}, use_dask=stage.options['use_dask'], eflogging=config.logging)

return {'out_path': out_zarr, 'transect': transect, 'file_name': file_name, 'error': False}
return {'out_path': out_zarr, 'transect': transect, 'file_name': file_name, 'error': False}
except Exception as e:
return {'transect': transect, 'file_name': file_name, 'error': True, 'error_desc': e}
Loading