Skip to content

Commit 1f566ab

Browse files
committed
Fix ticket-based job state transition for analysis jobs
1 parent 86da184 commit 1f566ab

File tree

2 files changed

+161
-62
lines changed

2 files changed

+161
-62
lines changed

api/placer.py

+39-34
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,8 @@ def finalize(self):
331331
job_ticket = JobTicket.get(self.context.get('job_ticket_id'))
332332
job = Job.get(job_ticket['job'])
333333
success = job_ticket['success']
334+
elif self.context.get('job_id'):
335+
job = Job.get(self.context.get('job_id'))
334336

335337
if self.metadata is not None:
336338
bid = bson.ObjectId(self.id_)
@@ -350,23 +352,11 @@ def finalize(self):
350352
hierarchy.update_container_hierarchy(self.metadata, bid, self.container_type)
351353

352354
if job_ticket is not None:
353-
if success:
354-
Queue.mutate(job, {
355-
'state': 'complete',
356-
'profile': {
357-
'elapsed': job_ticket['elapsed']
358-
}
359-
})
360-
else:
361-
Queue.mutate(job, {
362-
'state': 'failed',
363-
'profile': {
364-
'elapsed': job_ticket['elapsed']
365-
}
366-
})
355+
Queue.mutate(job, {'state': 'complete' if success else 'failed',
356+
'profile': {'elapsed': job_ticket['elapsed']}})
357+
job = Job.get(job.id_)
367358

368-
if self.context.get('job_id'):
369-
job = Job.get(self.context.get('job_id'))
359+
if job is not None:
370360
job.saved_files = [f['name'] for f in self.saved]
371361
job.produced_metadata = self.metadata
372362
job.save()
@@ -725,8 +715,7 @@ def finalize(self):
725715

726716
class AnalysisJobPlacer(Placer):
727717
def check(self):
728-
if self.id_ is None:
729-
raise Exception('Must specify a target analysis')
718+
self.requireTarget()
730719

731720
# Check that required state exists
732721
if self.context.get('job_id'):
@@ -743,27 +732,43 @@ def process_file_field(self, field, file_attrs):
743732
file_attrs.update(file_md)
744733
break
745734

735+
if self.context.get('job_ticket_id'):
736+
job_ticket = JobTicket.get(self.context.get('job_ticket_id'))
737+
738+
if not job_ticket['success']:
739+
file_attrs['from_failed_job'] = True
740+
746741
file_attrs['created'] = file_attrs['modified']
747742
self.save_file(field)
748743
self.saved.append(file_attrs)
749744

750745
def finalize(self):
751-
# Search the sessions table for analysis, replace file field
752-
if self.saved:
753-
q = {'_id': self.id_}
754-
u = {'$push': {'files': {'$each': self.saved}}}
755-
job_id = self.context.get('job_id')
756-
if job_id:
757-
# If the original job failed, update the analysis with the job that succeeded
758-
u['$set'] = {'job': job_id}
759-
760-
# Update the job with saved files list
761-
job = Job.get(job_id)
762-
job.saved_files = [f['name'] for f in self.saved]
763-
job.save()
764-
765-
config.db.analyses.update_one(q, u)
766-
return self.saved
746+
job = None
747+
job_ticket = None
748+
success = True
749+
750+
if self.context.get('job_ticket_id'):
751+
job_ticket = JobTicket.get(self.context.get('job_ticket_id'))
752+
job = Job.get(job_ticket['job'])
753+
success = job_ticket['success']
754+
elif self.context.get('job_id'):
755+
job = Job.get(self.context.get('job_id'))
756+
757+
# Replace analysis files (and job in case it's re-run)
758+
query = {'_id': self.id_}
759+
update = {'$set': {'files': self.saved, 'job': job.id_}}
760+
config.db.analyses.update_one(query, update)
761+
762+
if job_ticket is not None:
763+
Queue.mutate(job, {'state': 'complete' if success else 'failed',
764+
'profile': {'elapsed': job_ticket['elapsed']}})
765+
job = Job.get(job.id_)
766+
767+
if job is not None:
768+
job.saved_files = [f['name'] for f in self.saved]
769+
job.save()
770+
771+
return self.saved
767772

768773

769774
class GearPlacer(Placer):

tests/integration_tests/python/test_jobs.py

+122-28
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,8 @@ def test_failed_job_output(data_builder, default_payload, as_user, as_admin, as_
362362
project = data_builder.create_project()
363363
session = data_builder.create_session()
364364
acquisition = data_builder.create_acquisition()
365-
assert as_admin.post('/acquisitions/' + acquisition + '/files', files=file_form('test.zip')).ok
365+
r = as_admin.post('/acquisitions/' + acquisition + '/files', files=file_form('test.zip'))
366+
assert r.ok
366367

367368
# create rule for text files
368369
r = as_admin.post('/projects/' + project + '/rules', json={
@@ -391,8 +392,7 @@ def test_failed_job_output(data_builder, default_payload, as_user, as_admin, as_
391392
})
392393
assert r.ok
393394
job = r.json()['_id']
394-
395-
api_db.jobs.update_one({'_id': bson.ObjectId(job)}, {'$set':{'state': 'running'}})
395+
api_db.jobs.update_one({'_id': bson.ObjectId(job)}, {'$set': {'state': 'running'}})
396396

397397
# prepare completion (send success status before engine upload)
398398
r = as_drone.post('/jobs/' + job + '/prepare-complete', json={'success': False, 'elapsed': -1})
@@ -403,36 +403,36 @@ def test_failed_job_output(data_builder, default_payload, as_user, as_admin, as_
403403
assert job_ticket['success'] == False
404404

405405
# engine upload
406-
metadata = {
407-
'project':{
408-
'label': 'engine project',
409-
'info': {'test': 'p'}
410-
},
411-
'session':{
412-
'label': 'engine session',
413-
'subject': {'code': 'engine subject'},
414-
'info': {'test': 's'}
415-
},
416-
'acquisition':{
417-
'label': 'engine acquisition',
418-
'timestamp': '2016-06-20T21:57:36+00:00',
419-
'info': {'test': 'a'},
420-
'files': [{
421-
'name': 'result.txt',
422-
'type': 'text',
423-
'info': {'test': 'f0'}
424-
}]
425-
}
426-
}
427-
428-
api_db.jobs.update_one({'_id': bson.ObjectId(job)}, {'$set':{'state': 'running'}})
429-
430406
r = as_drone.post('/engine',
431407
params={'level': 'acquisition', 'id': acquisition, 'job': job, 'job_ticket': job_ticket['_id']},
432-
files=file_form('result.txt', meta=metadata)
408+
files=file_form('result.txt', meta={
409+
'project': {
410+
'label': 'engine project',
411+
'info': {'test': 'p'}
412+
},
413+
'session': {
414+
'label': 'engine session',
415+
'subject': {'code': 'engine subject'},
416+
'info': {'test': 's'}
417+
},
418+
'acquisition': {
419+
'label': 'engine acquisition',
420+
'timestamp': '2016-06-20T21:57:36+00:00',
421+
'info': {'test': 'a'},
422+
'files': [{
423+
'name': 'result.txt',
424+
'type': 'text',
425+
'info': {'test': 'f0'}
426+
}]
427+
}
428+
})
433429
)
434430
assert r.ok
435431

432+
# verify job was transitioned to failed state
433+
job_doc = as_admin.get('/jobs/' + job).json()
434+
assert job_doc['state'] == 'failed'
435+
436436
# verify metadata wasn't applied
437437
acq = as_admin.get('/acquisitions/' + acquisition).json()
438438
assert 'test' not in acq.get('info', {})
@@ -471,6 +471,100 @@ def test_failed_job_output(data_builder, default_payload, as_user, as_admin, as_
471471
jobs = [j for j in api_db.jobs.find({'gear_id': gear2})]
472472
assert len(jobs) == 1
473473

474+
475+
def test_job_state_transition_from_ticket(data_builder, default_payload, as_admin, as_drone, api_db, file_form):
476+
# create gear
477+
gear_doc = default_payload['gear']['gear']
478+
gear_doc['inputs'] = {'dicom': {'base': 'file'}}
479+
gear = data_builder.create_gear(gear=gear_doc)
480+
481+
# create acq with file (for input)
482+
acquisition = data_builder.create_acquisition()
483+
r = as_admin.post('/acquisitions/' + acquisition + '/files', files=file_form('test.zip'))
484+
assert r.ok
485+
486+
# create job
487+
r = as_admin.post('/jobs/add', json={
488+
'gear_id': gear,
489+
'config': {},
490+
'inputs': {'dicom': {'type': 'acquisition', 'id': acquisition, 'name': 'test.zip'}},
491+
'destination': {'type': 'acquisition', 'id': acquisition}
492+
})
493+
assert r.ok
494+
job = r.json()['_id']
495+
api_db.jobs.update_one({'_id': bson.ObjectId(job)}, {'$set': {'state': 'running'}})
496+
497+
# prepare completion (send success status before engine upload)
498+
r = as_drone.post('/jobs/' + job + '/prepare-complete', json={'success': True, 'elapsed': 3})
499+
assert r.ok
500+
job_ticket = r.json()['ticket']
501+
502+
# engine upload (should trigger state transition based on ticket)
503+
r = as_drone.post('/engine',
504+
params={'level': 'acquisition', 'id': acquisition, 'job': job, 'job_ticket': job_ticket},
505+
files=file_form('result.txt', meta={
506+
'acquisition': {'files': [{'name': 'result.txt', 'type': 'text'}]}
507+
})
508+
)
509+
assert r.ok
510+
511+
# verify job was transitioned to complete state
512+
job_doc = as_admin.get('/jobs/' + job).json()
513+
assert job_doc['state'] == 'complete'
514+
515+
# test with success: False
516+
api_db.jobs.update_one({'_id': bson.ObjectId(job)}, {'$set': {'state': 'running'}})
517+
api_db.job_tickets.update_one({'_id': bson.ObjectId(job_ticket)}, {'$set': {'success': False}})
518+
r = as_drone.post('/engine',
519+
params={'level': 'acquisition', 'id': acquisition, 'job': job, 'job_ticket': job_ticket},
520+
files=file_form('result.txt', meta={
521+
'acquisition': {'files': [{'name': 'result.txt', 'type': 'text'}]}
522+
})
523+
)
524+
assert r.ok
525+
job_doc = as_admin.get('/jobs/' + job).json()
526+
assert job_doc['state'] == 'failed'
527+
528+
# create session, analysis and job
529+
session = data_builder.create_session()
530+
r = as_admin.post('/sessions/' + session + '/analyses', json={
531+
'label': 'online',
532+
'job': {'gear_id': gear,
533+
'inputs': {'dicom': {'type': 'acquisition', 'id': acquisition, 'name': 'test.zip'}}}
534+
})
535+
assert r.ok
536+
analysis = r.json()['_id']
537+
538+
r = as_admin.get('/analyses/' + analysis)
539+
assert r.ok
540+
job = r.json().get('job')
541+
api_db.jobs.update_one({'_id': bson.ObjectId(job)}, {'$set': {'state': 'running'}})
542+
543+
# prepare completion (send success status before engine upload)
544+
r = as_drone.post('/jobs/' + job + '/prepare-complete', json={'success': True, 'elapsed': 3})
545+
assert r.ok
546+
job_ticket = r.json()['ticket']
547+
548+
r = as_drone.post('/engine',
549+
params={'level': 'analysis', 'id': analysis, 'job': job, 'job_ticket': job_ticket},
550+
files=file_form('result.txt', meta={'type': 'text'}))
551+
assert r.ok
552+
553+
# verify job was transitioned to complete state
554+
job_doc = as_admin.get('/jobs/' + job).json()
555+
assert job_doc['state'] == 'complete'
556+
557+
# test with success: False
558+
api_db.jobs.update_one({'_id': bson.ObjectId(job)}, {'$set': {'state': 'running'}})
559+
api_db.job_tickets.update_one({'_id': bson.ObjectId(job_ticket)}, {'$set': {'success': False}})
560+
r = as_drone.post('/engine',
561+
params={'level': 'analysis', 'id': analysis, 'job': job, 'job_ticket': job_ticket},
562+
files=file_form('result.txt', meta={'type': 'text'}))
563+
assert r.ok
564+
job_doc = as_admin.get('/jobs/' + job).json()
565+
assert job_doc['state'] == 'failed'
566+
567+
474568
def test_analysis_job_creation_errors(data_builder, default_payload, as_admin, file_form):
475569
project = data_builder.create_project()
476570
session = data_builder.create_session()

0 commit comments

Comments
 (0)