diff --git a/ibllib/oneibl/data_handlers.py b/ibllib/oneibl/data_handlers.py index 310bfd96c..a73dfcc06 100644 --- a/ibllib/oneibl/data_handlers.py +++ b/ibllib/oneibl/data_handlers.py @@ -565,7 +565,7 @@ def getData(self, one=None): dfs = [file.filter(session_datasets)[1] for file in self.signature['input_files']] return one._cache.datasets.iloc[0:0] if len(dfs) == 0 else pd.concat(dfs).drop_duplicates() - def getOutputFiles(self): + def getOutputFiles(self, session_path=None): """ Return a data frame of output datasets found on disk. @@ -574,10 +574,11 @@ def getOutputFiles(self): pandas.DataFrame A dataset data frame of datasets on disk that were specified in signature['output_files']. """ - assert self.session_path + session_path = self.session_path if session_path is None else session_path + assert session_path # Next convert datasets to frame # Create dataframe of all ALF datasets - df = _make_datasets_df(self.session_path, hash_files=False).set_index(['eid', 'id']) + df = _make_datasets_df(session_path, hash_files=False).set_index(['eid', 'id']) # Filter outputs if len(self.signature['output_files']) == 0: return pd.DataFrame() @@ -760,13 +761,13 @@ def __init__(self, session_path, signature, one=None): """ super().__init__(session_path, signature, one=one) - def setUp(self, **_): + def setUp(self, check_hash=True, **_): """ Function to download necessary data to run tasks using ONE :return: """ df = super().getData() - self.one._check_filesystem(df, check_hash=False) + self.one._check_filesystem(df, check_hash=check_hash) def uploadData(self, outputs, version, **kwargs): """ @@ -957,7 +958,7 @@ def __init__(self, session_path, signatures, one=None): self.patch_path = os.getenv('SDSC_PATCH_PATH', SDSC_PATCH_PATH) self.root_path = SDSC_ROOT_PATH - def setUp(self, task): + def setUp(self, task, **_): """Function to create symlinks to necessary data to run tasks.""" df = super().getData() @@ -973,8 +974,9 @@ def setUp(self, task): Path(self.root_path.joinpath(file_uuid))) except FileExistsError: pass - task.session_path = SDSC_TMP.joinpath(session_path) + assert self.getOutputFiles(session_path=task.session_path).shape[0] == 0, ( + "On SDSC patcher, input files output files should be distinct from input files to avoid overwriting") def uploadData(self, outputs, version, **kwargs): """ diff --git a/ibllib/oneibl/patcher.py b/ibllib/oneibl/patcher.py index 10b70a729..43a38426b 100644 --- a/ibllib/oneibl/patcher.py +++ b/ibllib/oneibl/patcher.py @@ -661,11 +661,11 @@ def check_datasets(self, file_list): return exists - def patch_dataset(self, file_list, dry=False, ftp=False, force=False, **kwargs): + def patch_dataset(self, file_list, dry=False, ftp=False, clobber=False, **kwargs): exists = self.check_datasets(file_list) - if len(exists) > 0 and not force: - _logger.error(f'Files: {", ".join([f.name for f in file_list])} already exist, to force set force=True') + if len(exists) > 0 and not clobber: + _logger.error(f'Files: {", ".join([f.name for f in file_list])} already exist, to overwrite set clobber=True') return response = super().patch_dataset(file_list, dry=dry, repository=self.s3_repo, ftp=False, **kwargs) diff --git a/ibllib/pipes/tasks.py b/ibllib/pipes/tasks.py index f1755f4eb..e9d225f75 100644 --- a/ibllib/pipes/tasks.py +++ b/ibllib/pipes/tasks.py @@ -377,7 +377,7 @@ def get_signatures(self, **kwargs): self.output_files = signature['output_files'] @abc.abstractmethod - def _run(self, overwrite=False): + def _run(self, overwrite=False, **kwargs): """Execute main task code. This method contains a task's principal data processing code and should implemented @@ -422,14 +422,14 @@ def setUp(self, **kwargs): # Attempts to download missing data using globus _logger.info('Not all input files found locally: attempting to re-download required files') self.data_handler = self.get_data_handler(location='serverglobus') - self.data_handler.setUp(task=self) + self.data_handler.setUp(task=self, **kwargs) # Double check we now have the required files to run the task # TODO in future should raise error if even after downloading don't have the correct files self.assert_expected_inputs(raise_error=False) return True else: self.data_handler = self.get_data_handler() - self.data_handler.setUp(task=self) + self.data_handler.setUp(task=self, **kwargs) self.get_signatures(**kwargs) self.assert_expected_inputs(raise_error=False) return True diff --git a/ibllib/tests/test_oneibl.py b/ibllib/tests/test_oneibl.py index c4acc23b4..2c207b507 100644 --- a/ibllib/tests/test_oneibl.py +++ b/ibllib/tests/test_oneibl.py @@ -23,6 +23,7 @@ from ibllib.oneibl import patcher, registration, data_handlers as handlers import ibllib.io.extractors.base from ibllib.pipes.behavior_tasks import ChoiceWorldTrialsBpod +from ibllib.pipes.tasks import Task from ibllib.tests import TEST_DB from ibllib.io import session_params @@ -704,6 +705,28 @@ def setUp(self): self.root_path.mkdir(), self.patch_path.mkdir() self.session_path = self.root_path.joinpath('KS005/2019-04-01/001') + def test_handler_overwrite(self): + + class DummyTask(Task): + signature = { + 'input_files': [('_iblrig_taskSettings.raw.json', 'raw_behavior_data', True)], + 'output_files': [('_iblrig_taskSettings.raw.json', 'raw_behavior_data', True)] + } + + def _run(self): + pass + + with mock.patch('ibllib.oneibl.data_handlers.SDSC_ROOT_PATH', self.root_path), \ + mock.patch('ibllib.oneibl.data_handlers.SDSC_PATCH_PATH', self.patch_path): + task = DummyTask(self.session_path, one=self.one, collection='raw_behavior_data', location='SDSC', on_error='raise') + # Add some files on disk to check they are symlinked by setUp method + for uid, rel_path in task.get_data_handler().getData().rel_path.items(): + filepath = self.session_path.joinpath(rel_path) + filepath.parent.mkdir(exist_ok=True, parents=True) + filepath.with_stem(f'{filepath.stem}.{uid}').touch() + with self.assertRaises(AssertionError): + task.run() + def test_handler(self): """Test for SDSCDataHandler.setUp and cleanUp methods.""" # Create a task in order to check how the signature files are symlinked by handler