Skip to content

add options for SDSC and EC2 patchers #937

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions ibllib/oneibl/data_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ def getData(self, one=None):
dfs = [file.filter(session_datasets)[1] for file in self.signature['input_files']]
return one._cache.datasets.iloc[0:0] if len(dfs) == 0 else pd.concat(dfs).drop_duplicates()

def getOutputFiles(self):
def getOutputFiles(self, session_path=None):
"""
Return a data frame of output datasets found on disk.

Expand All @@ -574,10 +574,11 @@ def getOutputFiles(self):
pandas.DataFrame
A dataset data frame of datasets on disk that were specified in signature['output_files'].
"""
assert self.session_path
session_path = self.session_path if session_path is None else session_path
assert session_path
# Next convert datasets to frame
# Create dataframe of all ALF datasets
df = _make_datasets_df(self.session_path, hash_files=False).set_index(['eid', 'id'])
df = _make_datasets_df(session_path, hash_files=False).set_index(['eid', 'id'])
# Filter outputs
if len(self.signature['output_files']) == 0:
return pd.DataFrame()
Expand Down Expand Up @@ -760,13 +761,13 @@ def __init__(self, session_path, signature, one=None):
"""
super().__init__(session_path, signature, one=one)

def setUp(self, **_):
def setUp(self, check_hash=True, **_):
"""
Function to download necessary data to run tasks using ONE
:return:
"""
df = super().getData()
self.one._check_filesystem(df, check_hash=False)
self.one._check_filesystem(df, check_hash=check_hash)

def uploadData(self, outputs, version, **kwargs):
"""
Expand Down Expand Up @@ -957,7 +958,7 @@ def __init__(self, session_path, signatures, one=None):
self.patch_path = os.getenv('SDSC_PATCH_PATH', SDSC_PATCH_PATH)
self.root_path = SDSC_ROOT_PATH

def setUp(self, task):
def setUp(self, task, **_):
"""Function to create symlinks to necessary data to run tasks."""
df = super().getData()

Expand All @@ -973,8 +974,9 @@ def setUp(self, task):
Path(self.root_path.joinpath(file_uuid)))
except FileExistsError:
pass

task.session_path = SDSC_TMP.joinpath(session_path)
assert self.getOutputFiles(session_path=task.session_path).shape[0] == 0, (
"On SDSC patcher, input files output files should be distinct from input files to avoid overwriting")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor typo here. Also could you please add a comment about copying input files that are also output file in the future? Otherwise if we encounter this assertion in the future we may not remember the implications of breaking it


def uploadData(self, outputs, version, **kwargs):
"""
Expand Down
6 changes: 3 additions & 3 deletions ibllib/oneibl/patcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,11 +661,11 @@ def check_datasets(self, file_list):

return exists

def patch_dataset(self, file_list, dry=False, ftp=False, force=False, **kwargs):
def patch_dataset(self, file_list, dry=False, ftp=False, clobber=False, **kwargs):

exists = self.check_datasets(file_list)
if len(exists) > 0 and not force:
_logger.error(f'Files: {", ".join([f.name for f in file_list])} already exist, to force set force=True')
if len(exists) > 0 and not clobber:
_logger.error(f'Files: {", ".join([f.name for f in file_list])} already exist, to overwrite set clobber=True')
return

response = super().patch_dataset(file_list, dry=dry, repository=self.s3_repo, ftp=False, **kwargs)
Expand Down
6 changes: 3 additions & 3 deletions ibllib/pipes/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ def get_signatures(self, **kwargs):
self.output_files = signature['output_files']

@abc.abstractmethod
def _run(self, overwrite=False):
def _run(self, overwrite=False, **kwargs):
"""Execute main task code.

This method contains a task's principal data processing code and should implemented
Expand Down Expand Up @@ -422,14 +422,14 @@ def setUp(self, **kwargs):
# Attempts to download missing data using globus
_logger.info('Not all input files found locally: attempting to re-download required files')
self.data_handler = self.get_data_handler(location='serverglobus')
self.data_handler.setUp(task=self)
self.data_handler.setUp(task=self, **kwargs)
# Double check we now have the required files to run the task
# TODO in future should raise error if even after downloading don't have the correct files
self.assert_expected_inputs(raise_error=False)
return True
else:
self.data_handler = self.get_data_handler()
self.data_handler.setUp(task=self)
self.data_handler.setUp(task=self, **kwargs)
self.get_signatures(**kwargs)
self.assert_expected_inputs(raise_error=False)
return True
Expand Down
23 changes: 23 additions & 0 deletions ibllib/tests/test_oneibl.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from ibllib.oneibl import patcher, registration, data_handlers as handlers
import ibllib.io.extractors.base
from ibllib.pipes.behavior_tasks import ChoiceWorldTrialsBpod
from ibllib.pipes.tasks import Task
from ibllib.tests import TEST_DB
from ibllib.io import session_params

Expand Down Expand Up @@ -704,6 +705,28 @@ def setUp(self):
self.root_path.mkdir(), self.patch_path.mkdir()
self.session_path = self.root_path.joinpath('KS005/2019-04-01/001')

def test_handler_overwrite(self):

class DummyTask(Task):
signature = {
'input_files': [('_iblrig_taskSettings.raw.json', 'raw_behavior_data', True)],
'output_files': [('_iblrig_taskSettings.raw.json', 'raw_behavior_data', True)]
}

def _run(self):
pass

with mock.patch('ibllib.oneibl.data_handlers.SDSC_ROOT_PATH', self.root_path), \
mock.patch('ibllib.oneibl.data_handlers.SDSC_PATCH_PATH', self.patch_path):
task = DummyTask(self.session_path, one=self.one, collection='raw_behavior_data', location='SDSC', on_error='raise')
# Add some files on disk to check they are symlinked by setUp method
for uid, rel_path in task.get_data_handler().getData().rel_path.items():
filepath = self.session_path.joinpath(rel_path)
filepath.parent.mkdir(exist_ok=True, parents=True)
filepath.with_stem(f'{filepath.stem}.{uid}').touch()
with self.assertRaises(AssertionError):
task.run()

def test_handler(self):
"""Test for SDSCDataHandler.setUp and cleanUp methods."""
# Create a task in order to check how the signature files are symlinked by handler
Expand Down
Loading