Skip to content

Commit 426268c

Browse files
authored
Multiple inputs add default workflow (#3371)
* add rna_copy_counts * RNA -> Calculate RNA * v != '*' * v != '*' : fix conditional * prep job only display if success * allowing for multiple inputs in workflow * fix error * just one element * rollback add_default_workflow * add full workflow case * do not add ancestors * single command
1 parent 57b84cf commit 426268c

File tree

2 files changed

+91
-15
lines changed

2 files changed

+91
-15
lines changed

qiita_db/metadata_template/prep_template.py

+51-15
Original file line numberDiff line numberDiff line change
@@ -794,16 +794,25 @@ def _get_predecessors(workflow, node):
794794
# recursive method to get predecessors of a given node
795795
pred = []
796796

797-
for pnode in workflow.graph.predecessors(node):
797+
parents = list(workflow.graph.predecessors(node))
798+
for pnode in parents:
798799
pred = _get_predecessors(workflow, pnode)
799800
cxns = {x[0]: x[2]
800801
for x in workflow.graph.get_edge_data(
801802
pnode, node)['connections'].connections}
802803
data = [pnode, node, cxns]
803804
if pred is None:
804-
pred = [data]
805-
else:
806-
pred.append(data)
805+
pred = []
806+
807+
# making sure that if the node has extra parents they are
808+
# generated first
809+
parents.remove(pnode)
810+
if parents:
811+
for pnode in parents:
812+
# [-1] just adding the parent and not its ancestors
813+
pred.extend([_get_predecessors(workflow, pnode)[-1]])
814+
815+
pred.append(data)
807816
return pred
808817

809818
# Note: we are going to use the final BIOMs to figure out which
@@ -894,8 +903,9 @@ def _get_predecessors(workflow, node):
894903
# let's just keep one, let's give it preference to the one with the
895904
# most total_conditions_satisfied
896905
_, wk = sorted(workflows, key=lambda x: x[0], reverse=True)[0]
906+
GH = wk.graph
897907
missing_artifacts = dict()
898-
for node, degree in wk.graph.out_degree():
908+
for node, degree in GH.out_degree():
899909
if degree != 0:
900910
continue
901911
mscheme = _get_node_info(wk, node)
@@ -920,7 +930,7 @@ def _get_predecessors(workflow, node):
920930
icxns = {y: x for x, y in cxns.items()}
921931
reqp = {x: icxns[y[1][0]]
922932
for x, y in cdp_cmd.required_parameters.items()}
923-
cmds_to_create.append([cdp_cmd, params, reqp])
933+
cmds_to_create.append([cdp, cdp_cmd, params, reqp])
924934

925935
info = _get_node_info(wk, pnode)
926936
if info in merging_schemes:
@@ -942,7 +952,7 @@ def _get_predecessors(workflow, node):
942952
'be applied')
943953
reqp[x] = wkartifact_type
944954

945-
cmds_to_create.append([pdp_cmd, params, reqp])
955+
cmds_to_create.append([pdp, pdp_cmd, params, reqp])
946956

947957
if starting_job is not None:
948958
init_artifacts = {
@@ -953,14 +963,16 @@ def _get_predecessors(workflow, node):
953963
cmds_to_create.reverse()
954964
current_job = None
955965
loop_starting_job = starting_job
956-
for i, (cmd, params, rp) in enumerate(cmds_to_create):
966+
previous_dps = dict()
967+
for i, (dp, cmd, params, rp) in enumerate(cmds_to_create):
957968
if loop_starting_job is not None:
958969
previous_job = loop_starting_job
959970
loop_starting_job = None
960971
else:
961972
previous_job = current_job
973+
974+
req_params = dict()
962975
if previous_job is None:
963-
req_params = dict()
964976
for iname, dname in rp.items():
965977
if dname not in init_artifacts:
966978
msg = (f'Missing Artifact type: "{dname}" in '
@@ -970,12 +982,35 @@ def _get_predecessors(workflow, node):
970982
# raises option c.
971983
raise ValueError(msg)
972984
req_params[iname] = init_artifacts[dname]
985+
if len(dp.command.required_parameters) > 1:
986+
for pn in GH.predecessors(node):
987+
info = _get_node_info(wk, pn)
988+
n, cnx, _ = GH.get_edge_data(
989+
pn, node)['connections'].connections[0]
990+
if info not in merging_schemes or \
991+
n not in merging_schemes[info]:
992+
msg = ('This workflow contains a step with '
993+
'multiple inputs so it cannot be '
994+
'completed automatically, please add '
995+
'the commands by hand.')
996+
raise ValueError(msg)
997+
req_params[cnx] = merging_schemes[info][n]
973998
else:
974-
req_params = dict()
975-
connections = dict()
976-
for iname, dname in rp.items():
977-
req_params[iname] = f'{previous_job.id}{dname}'
978-
connections[dname] = iname
999+
if len(dp.command.required_parameters) == 1:
1000+
cxns = dict()
1001+
for iname, dname in rp.items():
1002+
req_params[iname] = f'{previous_job.id}{dname}'
1003+
cxns[dname] = iname
1004+
connections = {previous_job: cxns}
1005+
else:
1006+
connections = dict()
1007+
for pn in GH.predecessors(node):
1008+
pndp = pn.default_parameter
1009+
n, cnx, _ = GH.get_edge_data(
1010+
pn, node)['connections'].connections[0]
1011+
_job = previous_dps[pndp.id]
1012+
req_params[cnx] = f'{_job.id}{n}'
1013+
connections[_job] = {n: cnx}
9791014
params.update(req_params)
9801015
job_params = qdb.software.Parameters.load(
9811016
cmd, values_dict=params)
@@ -997,8 +1032,9 @@ def _get_predecessors(workflow, node):
9971032
else:
9981033
current_job = workflow.add(
9991034
job_params, req_params=req_params,
1000-
connections={previous_job: connections})
1035+
connections=connections)
10011036
previous_jobs[current_job] = params
1037+
previous_dps[dp.id] = current_job
10021038

10031039
return workflow
10041040

qiita_db/metadata_template/test/test_prep_template.py

+40
Original file line numberDiff line numberDiff line change
@@ -1477,6 +1477,46 @@ def test_artifact_setter(self):
14771477
"the parameters are the same as jobs"):
14781478
pt.add_default_workflow(qdb.user.User('[email protected]'))
14791479

1480+
# Then, let's clean up again and add a new command/step with 2
1481+
# BIOM input artifacts
1482+
for pj in wk.graph.nodes:
1483+
pj._set_error('Killed')
1484+
cmd = qdb.software.Command.create(
1485+
qdb.software.Software(1), "Multiple BIOM as inputs", "", {
1486+
'req_artifact_1': ['artifact:["BIOM"]', None],
1487+
'req_artifact_2': ['artifact:["BIOM"]', None],
1488+
}, outputs={'MB-output': 'BIOM'})
1489+
cmd_dp = qdb.software.DefaultParameters.create("", cmd)
1490+
# creating the new node for the cmd and linking it's two inputs with
1491+
# two inputs
1492+
sql = f"""
1493+
INSERT INTO qiita.default_workflow_node (
1494+
default_workflow_id, default_parameter_set_id)
1495+
VALUES (1, {cmd_dp.id});
1496+
INSERT INTO qiita.default_workflow_edge (
1497+
parent_id, child_id)
1498+
VALUES (8, 10);
1499+
INSERT INTO qiita.default_workflow_edge (
1500+
parent_id, child_id)
1501+
VALUES (9, 10);
1502+
INSERT INTO qiita.default_workflow_edge_connections (
1503+
default_workflow_edge_id, parent_output_id, child_input_id)
1504+
VALUES (6, 3, 99);
1505+
INSERT INTO qiita.default_workflow_edge_connections (
1506+
default_workflow_edge_id, parent_output_id, child_input_id)
1507+
VALUES (7, 3, 100)
1508+
"""
1509+
qdb.sql_connection.perform_as_transaction(sql)
1510+
wk = pt.add_default_workflow(qdb.user.User('[email protected]'))
1511+
self.assertEqual(len(wk.graph.nodes), 6)
1512+
self.assertEqual(len(wk.graph.edges), 5)
1513+
self.assertCountEqual(
1514+
[x.command.name for x in wk.graph.nodes],
1515+
# we should have 2 split libraries and 3 close reference
1516+
['Split libraries FASTQ', 'Split libraries FASTQ',
1517+
'Pick closed-reference OTUs', 'Pick closed-reference OTUs',
1518+
'Pick closed-reference OTUs', 'Multiple BIOM as inputs'])
1519+
14801520
# now let's test that an error is raised when there is no valid initial
14811521
# input data; this moves the data type from FASTQ to taxa_summary for
14821522
# the default_workflow_id = 1

0 commit comments

Comments
 (0)