forked from qiita-spots/qiita
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathanalysis.py
1177 lines (1020 loc) · 44.3 KB
/
analysis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Objects for dealing with Qiita analyses
This module provides the implementation of the Analysis and Collection classes.
Classes
-------
- `Analysis` -- A Qiita Analysis class
- `Collection` -- A Qiita Collection class for grouping multiple analyses
"""
# -----------------------------------------------------------------------------
# Copyright (c) 2014--, The Qiita Development Team.
#
# Distributed under the terms of the BSD 3-clause License.
#
# The full license is in the file LICENSE, distributed with this software.
# -----------------------------------------------------------------------------
from itertools import product
from os.path import join, exists
from os import mkdir
from collections import defaultdict
from biom import load_table
from biom.util import biom_open
from biom.exception import DisjointIDError
from re import sub
import pandas as pd
from qiita_core.exceptions import IncompetentQiitaDeveloperError
from qiita_core.qiita_settings import qiita_config
import qiita_db as qdb
from json import loads, dump
class Analysis(qdb.base.QiitaObject):
"""
Analysis object to access to the Qiita Analysis information
Attributes
----------
owner
name
description
samples
data_types
artifacts
shared_with
jobs
pmid
Methods
-------
has_access
add_samples
remove_samples
share
unshare
build_files
summary_data
exists
create
delete
add_artifact
set_error
"""
_table = "analysis"
_portal_table = "analysis_portal"
_analysis_id_column = 'analysis_id'
@classmethod
def iter(cls):
"""Iter over the analyses"""
with qdb.sql_connection.TRN:
sql = """SELECT DISTINCT analysis_id
FROM qiita.analysis
JOIN qiita.analysis_portal USING (analysis_id)
JOIN qiita.portal_type USING (portal_type_id)
WHERE portal = %s
ORDER BY analysis_id"""
qdb.sql_connection.TRN.add(sql, [qiita_config.portal])
aids = qdb.sql_connection.TRN.execute_fetchflatten()
for aid in aids:
yield cls(aid)
@classmethod
def get_by_status(cls, status):
"""Returns all Analyses with given status
Parameters
----------
status : str
Status to search analyses for
Returns
-------
set of Analysis
All analyses in the database with the given status
"""
with qdb.sql_connection.TRN:
# Sandboxed analyses are the analyses that have not been started
# and hence they don't have an artifact yet
if status == 'sandbox':
sql = """SELECT DISTINCT analysis
FROM qiita.analysis
JOIN qiita.analysis_portal USING (analysis_id)
JOIN qiita.portal_type USING (portal_type_id)
WHERE portal = %s AND analysis_id NOT IN (
SELECT analysis_id
FROM qiita.analysis_artifact)"""
qdb.sql_connection.TRN.add(sql, [qiita_config.portal])
else:
sql = """SELECT DISTINCT analysis_id
FROM qiita.analysis_artifact
JOIN qiita.artifact USING (artifact_id)
JOIN qiita.visibility USING (visibility_id)
JOIN qiita.analysis_portal USING (analysis_id)
JOIN qiita.portal_type USING (portal_type_id)
WHERE visibility = %s AND portal = %s"""
qdb.sql_connection.TRN.add(sql, [status, qiita_config.portal])
return set(
cls(aid)
for aid in qdb.sql_connection.TRN.execute_fetchflatten())
@classmethod
def create(cls, owner, name, description, from_default=False,
merge_duplicated_sample_ids=False, categories=None):
"""Creates a new analysis on the database
Parameters
----------
owner : User object
The analysis' owner
name : str
Name of the analysis
description : str
Description of the analysis
from_default : bool, optional
If True, use the default analysis to populate selected samples.
Default False.
merge_duplicated_sample_ids : bool, optional
If the duplicated sample ids in the selected studies should be
merged or prepended with the artifact ids. False (default) prepends
the artifact id
categories : list of str, optional
If not None, use _only_ these categories for the metaanalysis
Returns
-------
qdb.analysis.Analysis
The newly created analysis
"""
with qdb.sql_connection.TRN:
portal_id = qdb.util.convert_to_id(
qiita_config.portal, 'portal_type', 'portal')
# Create the row in the analysis table
sql = """INSERT INTO qiita.{0}
(email, name, description)
VALUES (%s, %s, %s)
RETURNING analysis_id""".format(cls._table)
qdb.sql_connection.TRN.add(
sql, [owner.id, name, description])
a_id = qdb.sql_connection.TRN.execute_fetchlast()
if from_default:
# Move samples into that new analysis
dflt_id = owner.default_analysis.id
sql = """UPDATE qiita.analysis_sample
SET analysis_id = %s
WHERE analysis_id = %s"""
qdb.sql_connection.TRN.add(sql, [a_id, dflt_id])
# Add to both QIITA and given portal (if not QIITA)
sql = """INSERT INTO qiita.analysis_portal
(analysis_id, portal_type_id)
VALUES (%s, %s)"""
args = [[a_id, portal_id]]
if qiita_config.portal != 'QIITA':
qp_id = qdb.util.convert_to_id(
'QIITA', 'portal_type', 'portal')
args.append([a_id, qp_id])
qdb.sql_connection.TRN.add(sql, args, many=True)
instance = cls(a_id)
# Once the analysis is created, we can create the mapping file and
# the initial set of artifacts
plugin = qdb.software.Software.from_name_and_version(
'Qiita', 'alpha')
cmd = plugin.get_command('build_analysis_files')
params = qdb.software.Parameters.load(
cmd, values_dict={
'analysis': a_id,
'merge_dup_sample_ids': merge_duplicated_sample_ids,
'categories': categories})
job = qdb.processing_job.ProcessingJob.create(
owner, params, True)
sql = """INSERT INTO qiita.analysis_processing_job
(analysis_id, processing_job_id)
VALUES (%s, %s)"""
qdb.sql_connection.TRN.add(sql, [a_id, job.id])
qdb.sql_connection.TRN.execute()
# Doing the submission outside of the transaction
job.submit()
return instance
@classmethod
def delete(cls, _id):
"""Deletes an analysis
Parameters
----------
_id : int
The analysis id
Raises
------
QiitaDBUnknownIDError
If the analysis id doesn't exist
"""
with qdb.sql_connection.TRN:
# check if the analysis exist
if not cls.exists(_id):
raise qdb.exceptions.QiitaDBUnknownIDError(_id, "analysis")
# Check if the analysis has any artifact
sql = """SELECT EXISTS(SELECT *
FROM qiita.analysis_artifact
WHERE analysis_id = %s)"""
qdb.sql_connection.TRN.add(sql, [_id])
if qdb.sql_connection.TRN.execute_fetchlast():
raise qdb.exceptions.QiitaDBOperationNotPermittedError(
"Can't delete analysis %d, has artifacts attached"
% _id)
sql = "DELETE FROM qiita.analysis_filepath WHERE {0} = %s".format(
cls._analysis_id_column)
args = [_id]
qdb.sql_connection.TRN.add(sql, args)
sql = "DELETE FROM qiita.analysis_portal WHERE {0} = %s".format(
cls._analysis_id_column)
qdb.sql_connection.TRN.add(sql, args)
sql = "DELETE FROM qiita.analysis_sample WHERE {0} = %s".format(
cls._analysis_id_column)
qdb.sql_connection.TRN.add(sql, args)
sql = """DELETE FROM qiita.analysis_processing_job
WHERE {0} = %s""".format(cls._analysis_id_column)
qdb.sql_connection.TRN.add(sql, args)
# TODO: issue #1176
sql = """DELETE FROM qiita.{0} WHERE {1} = %s""".format(
cls._table, cls._analysis_id_column)
qdb.sql_connection.TRN.add(sql, args)
qdb.sql_connection.TRN.execute()
@classmethod
def exists(cls, analysis_id):
r"""Checks if the given analysis _id exists
Parameters
----------
analysis_id : int
The id of the analysis we are searching for
Returns
-------
bool
True if exists, false otherwise.
"""
with qdb.sql_connection.TRN:
sql = """SELECT EXISTS(
SELECT *
FROM qiita.{0}
JOIN qiita.analysis_portal USING (analysis_id)
JOIN qiita.portal_type USING (portal_type_id)
WHERE {1}=%s
AND portal=%s)""".format(cls._table,
cls._analysis_id_column)
qdb.sql_connection.TRN.add(sql, [analysis_id, qiita_config.portal])
return qdb.sql_connection.TRN.execute_fetchlast()
@property
def owner(self):
"""The owner of the analysis
Returns
-------
qiita_db.user.User
The owner of the Analysis
"""
with qdb.sql_connection.TRN:
sql = "SELECT email FROM qiita.{0} WHERE analysis_id = %s".format(
self._table)
qdb.sql_connection.TRN.add(sql, [self._id])
return qdb.user.User(qdb.sql_connection.TRN.execute_fetchlast())
@property
def name(self):
"""The name of the analysis
Returns
-------
str
Name of the Analysis
"""
with qdb.sql_connection.TRN:
sql = "SELECT name FROM qiita.{0} WHERE analysis_id = %s".format(
self._table)
qdb.sql_connection.TRN.add(sql, [self._id])
return qdb.sql_connection.TRN.execute_fetchlast()
@property
def _portals(self):
"""The portals used to create the analysis
Returns
-------
str
Name of the portal
"""
with qdb.sql_connection.TRN:
sql = """SELECT portal
FROM qiita.analysis_portal
JOIN qiita.portal_type USING (portal_type_id)
WHERE analysis_id = %s"""
qdb.sql_connection.TRN.add(sql, [self._id])
return qdb.sql_connection.TRN.execute_fetchflatten()
@property
def timestamp(self):
"""The timestamp of the analysis
Returns
-------
datetime
Timestamp of the Analysis
"""
with qdb.sql_connection.TRN:
sql = """SELECT timestamp FROM qiita.{0}
WHERE analysis_id = %s""".format(self._table)
qdb.sql_connection.TRN.add(sql, [self._id])
return qdb.sql_connection.TRN.execute_fetchlast()
@property
def description(self):
"""Returns the description of the analysis"""
with qdb.sql_connection.TRN:
sql = """SELECT description FROM qiita.{0}
WHERE analysis_id = %s""".format(self._table)
qdb.sql_connection.TRN.add(sql, [self._id])
return qdb.sql_connection.TRN.execute_fetchlast()
@description.setter
def description(self, description):
"""Changes the description of the analysis
Parameters
----------
description : str
New description for the analysis
Raises
------
QiitaDBStatusError
Analysis is public
"""
sql = """UPDATE qiita.{0} SET description = %s
WHERE analysis_id = %s""".format(self._table)
qdb.sql_connection.perform_as_transaction(sql, [description, self._id])
@property
def samples(self):
"""The artifact and samples attached to the analysis
Returns
-------
dict
Format is {artifact_id: [sample_id, sample_id, ...]}
"""
with qdb.sql_connection.TRN:
sql = """SELECT artifact_id, array_agg(
sample_id ORDER BY sample_id)
FROM qiita.analysis_sample
WHERE analysis_id = %s
GROUP BY artifact_id"""
qdb.sql_connection.TRN.add(sql, [self._id])
return dict(qdb.sql_connection.TRN.execute_fetchindex())
@property
def data_types(self):
"""Returns all data types used in the analysis
Returns
-------
list of str
Data types in the analysis
"""
with qdb.sql_connection.TRN:
sql = """SELECT DISTINCT data_type
FROM qiita.data_type
JOIN qiita.artifact USING (data_type_id)
JOIN qiita.analysis_sample USING (artifact_id)
WHERE analysis_id = %s
ORDER BY data_type"""
qdb.sql_connection.TRN.add(sql, [self._id])
return qdb.sql_connection.TRN.execute_fetchflatten()
@property
def shared_with(self):
"""The user the analysis is shared with
Returns
-------
list of int
User ids analysis is shared with
"""
with qdb.sql_connection.TRN:
sql = """SELECT email FROM qiita.analysis_users
WHERE analysis_id = %s"""
qdb.sql_connection.TRN.add(sql, [self._id])
return [qdb.user.User(uid)
for uid in qdb.sql_connection.TRN.execute_fetchflatten()]
@property
def artifacts(self):
with qdb.sql_connection.TRN:
sql = """SELECT artifact_id
FROM qiita.analysis_artifact
WHERE analysis_id = %s"""
qdb.sql_connection.TRN.add(sql, [self.id])
return [qdb.artifact.Artifact(aid)
for aid in qdb.sql_connection.TRN.execute_fetchflatten()]
@property
def mapping_file(self):
"""Returns the mapping file for the analysis
Returns
-------
int or None
The filepath id of the analysis mapping file or None
if not generated
"""
fp = [x['fp_id'] for x in qdb.util.retrieve_filepaths(
"analysis_filepath", "analysis_id", self._id)
if x['fp_type'] == 'plain_text']
if fp:
# returning the actual filepath id vs. an array
return fp[0]
else:
return None
@property
def metadata_categories(self):
"""Returns all metadata categories in the current analyses based
on the available studies
Returns
-------
dict of dict
a dict with study_id as the key & the values are another dict with
'sample' & 'prep' as keys and the metadata categories as values
"""
ST = qdb.metadata_template.sample_template.SampleTemplate
PT = qdb.metadata_template.prep_template.PrepTemplate
with qdb.sql_connection.TRN:
sql = """SELECT DISTINCT study_id, artifact_id
FROM qiita.analysis_sample
LEFT JOIN qiita.study_artifact USING (artifact_id)
WHERE analysis_id = %s"""
qdb.sql_connection.TRN.add(sql, [self._id])
metadata = defaultdict(dict)
for sid, aid in qdb.sql_connection.TRN.execute_fetchindex():
if sid not in metadata:
metadata[sid]['sample'] = set(ST(sid).categories)
metadata[sid]['prep'] = set()
for pt in qdb.artifact.Artifact(aid).prep_templates:
metadata[sid]['prep'] = metadata[sid]['prep'] | set(
PT(pt.id).categories)
return metadata
@property
def tgz(self):
"""Returns the tgz file of the analysis
Returns
-------
str or None
full filepath to the mapping file or None if not generated
"""
fp = [x['fp'] for x in qdb.util.retrieve_filepaths(
"analysis_filepath", "analysis_id", self._id)
if x['fp_type'] == 'tgz']
if fp:
# returning the actual path vs. an array
return fp[0]
else:
return None
@property
def jobs(self):
"""The jobs generating the initial artifacts for the analysis
Returns
-------
list of qiita_db.processing_job.Processing_job
Job ids for jobs in analysis. Empty list if no jobs attached.
"""
with qdb.sql_connection.TRN:
sql = """SELECT processing_job_id
FROM qiita.analysis_processing_job
WHERE analysis_id = %s"""
qdb.sql_connection.TRN.add(sql, [self._id])
return [qdb.processing_job.ProcessingJob(jid)
for jid in qdb.sql_connection.TRN.execute_fetchflatten()]
@property
def pmid(self):
"""Returns pmid attached to the analysis
Returns
-------
str or None
returns the PMID or None if none is attached
"""
with qdb.sql_connection.TRN:
sql = "SELECT pmid FROM qiita.{0} WHERE analysis_id = %s".format(
self._table)
qdb.sql_connection.TRN.add(sql, [self._id])
return qdb.sql_connection.TRN.execute_fetchlast()
@pmid.setter
def pmid(self, pmid):
"""adds pmid to the analysis
Parameters
----------
pmid: str
pmid to set for study
Raises
------
QiitaDBStatusError
Analysis is public
Notes
-----
An analysis should only ever have one PMID attached to it.
"""
sql = """UPDATE qiita.{0} SET pmid = %s
WHERE analysis_id = %s""".format(self._table)
qdb.sql_connection.perform_as_transaction(sql, [pmid, self._id])
@property
def can_be_publicized(self):
"""Returns whether the analysis can be made public
Returns
-------
bool
Whether the analysis can be publicized
list
A list of not public (private) artifacts
"""
# The analysis can be made public if all the artifacts used
# to get the samples from are public
with qdb.sql_connection.TRN:
non_public = []
sql = """SELECT DISTINCT artifact_id
FROM qiita.analysis_sample
WHERE analysis_id = %s
ORDER BY artifact_id"""
qdb.sql_connection.TRN.add(sql, [self.id])
for aid in qdb.sql_connection.TRN.execute_fetchflatten():
if qdb.artifact.Artifact(aid).visibility != 'public':
non_public.append(aid)
return (non_public == [], non_public)
@property
def is_public(self):
"""Returns if the analysis is public
Returns
-------
bool
If the analysis is public
"""
with qdb.sql_connection.TRN:
# getting all root artifacts / command_id IS NULL
sql = """SELECT DISTINCT visibility
FROM qiita.analysis_artifact
LEFT JOIN qiita.artifact USING (artifact_id)
LEFT JOIN qiita.visibility USING (visibility_id)
WHERE analysis_id = %s AND command_id IS NULL"""
qdb.sql_connection.TRN.add(sql, [self.id])
visibilities = set(qdb.sql_connection.TRN.execute_fetchflatten())
return visibilities == {'public'}
def make_public(self):
"""Makes an analysis public
Raises
------
ValueError
If can_be_publicized is not true
"""
with qdb.sql_connection.TRN:
can_be_publicized, non_public = self.can_be_publicized
if not can_be_publicized:
raise ValueError('Not all artifacts that generated this '
'analysis are public: %s' % ', '.join(
map(str, non_public)))
# getting all root artifacts / command_id IS NULL
sql = """SELECT artifact_id
FROM qiita.analysis_artifact
LEFT JOIN qiita.artifact USING (artifact_id)
WHERE analysis_id = %s AND command_id IS NULL"""
qdb.sql_connection.TRN.add(sql, [self.id])
aids = qdb.sql_connection.TRN.execute_fetchflatten()
for aid in aids:
qdb.artifact.Artifact(aid).visibility = 'public'
def add_artifact(self, artifact):
"""Adds an artifact to the analysis
Parameters
----------
artifact : qiita_db.artifact.Artifact
The artifact to be added
"""
with qdb.sql_connection.TRN:
sql = """INSERT INTO qiita.analysis_artifact
(analysis_id, artifact_id)
SELECT %s, %s
WHERE NOT EXISTS(SELECT *
FROM qiita.analysis_artifact
WHERE analysis_id = %s
AND artifact_id = %s)"""
qdb.sql_connection.TRN.add(sql, [self.id, artifact.id,
self.id, artifact.id])
def set_error(self, error_msg):
"""Sets the analysis error
Parameters
----------
error_msg : str
The error message
"""
le = qdb.logger.LogEntry.create('Runtime', error_msg)
sql = """UPDATE qiita.analysis
SET logging_id = %s
WHERE analysis_id = %s"""
qdb.sql_connection.perform_as_transaction(sql, [le.id, self.id])
def has_access(self, user):
"""Returns whether the given user has access to the analysis
Parameters
----------
user : User object
User we are checking access for
Returns
-------
bool
Whether user has access to analysis or not
"""
with qdb.sql_connection.TRN:
# if admin or superuser, just return true
if user.level in {'superuser', 'admin'}:
return True
return self in Analysis.get_by_status('public') | \
user.private_analyses | user.shared_analyses
def can_edit(self, user):
"""Returns whether the given user can edit the analysis
Parameters
----------
user : User object
User we are checking edit permissions for
Returns
-------
bool
Whether user can edit the study or not
"""
# The analysis is editable only if the user is the owner, is in the
# shared list or the user is an admin
return (user.level in {'superuser', 'admin'} or self.owner == user or
user in self.shared_with)
def summary_data(self):
"""Return number of studies, artifacts, and samples selected
Returns
-------
dict
counts keyed to their relevant type
"""
with qdb.sql_connection.TRN:
sql = """SELECT
COUNT(DISTINCT study_id) as studies,
COUNT(DISTINCT artifact_id) as artifacts,
COUNT(DISTINCT sample_id) as samples
FROM qiita.study_artifact
JOIN qiita.analysis_sample USING (artifact_id)
WHERE analysis_id = %s"""
qdb.sql_connection.TRN.add(sql, [self._id])
return dict(qdb.sql_connection.TRN.execute_fetchindex()[0])
def share(self, user):
"""Share the analysis with another user
Parameters
----------
user: User object
The user to share the analysis with
"""
# Make sure the analysis is not already shared with the given user
if user.id == self.owner or user.id in self.shared_with:
return
sql = """INSERT INTO qiita.analysis_users (analysis_id, email)
VALUES (%s, %s)"""
qdb.sql_connection.perform_as_transaction(sql, [self._id, user.id])
def unshare(self, user):
"""Unshare the analysis with another user
Parameters
----------
user: User object
The user to unshare the analysis with
"""
sql = """DELETE FROM qiita.analysis_users
WHERE analysis_id = %s AND email = %s"""
qdb.sql_connection.perform_as_transaction(sql, [self._id, user.id])
def _lock_samples(self):
"""Only dflt analyses can have samples added/removed
Raises
------
qiita_db.exceptions.QiitaDBOperationNotPermittedError
If the analysis is not a default analysis
"""
with qdb.sql_connection.TRN:
sql = "SELECT dflt FROM qiita.analysis WHERE analysis_id = %s"
qdb.sql_connection.TRN.add(sql, [self.id])
if not qdb.sql_connection.TRN.execute_fetchlast():
raise qdb.exceptions.QiitaDBOperationNotPermittedError(
"Can't add/remove samples from this analysis")
def add_samples(self, samples):
"""Adds samples to the analysis
Parameters
----------
samples : dictionary of lists
samples and the artifact id they come from in form
{artifact_id: [sample1, sample2, ...], ...}
"""
with qdb.sql_connection.TRN:
self._lock_samples()
for aid, samps in samples.items():
# get previously selected samples for aid and filter them out
sql = """SELECT sample_id
FROM qiita.analysis_sample
WHERE artifact_id = %s AND analysis_id = %s"""
qdb.sql_connection.TRN.add(sql, [aid, self._id])
prev_selected = qdb.sql_connection.TRN.execute_fetchflatten()
select = set(samps).difference(prev_selected)
sql = """INSERT INTO qiita.analysis_sample
(analysis_id, artifact_id, sample_id)
VALUES (%s, %s, %s)"""
args = [[self._id, aid, s] for s in select]
qdb.sql_connection.TRN.add(sql, args, many=True)
qdb.sql_connection.TRN.execute()
def remove_samples(self, artifacts=None, samples=None):
"""Removes samples from the analysis
Parameters
----------
artifacts : list, optional
Artifacts to remove, default None
samples : list, optional
sample ids to remove, default None
Notes
-----
- When only a list of samples given, the samples will be removed from
all artifacts it is associated with
- When only a list of artifacts is given, all samples associated with
that artifact are removed
- If both are passed, the given samples are removed from the given
artifacts
"""
with qdb.sql_connection.TRN:
self._lock_samples()
if artifacts and samples:
sql = """DELETE FROM qiita.analysis_sample
WHERE analysis_id = %s
AND artifact_id = %s
AND sample_id = %s"""
# Build the SQL arguments to remove the samples of the
# given artifacts.
args = [[self._id, a.id, s]
for a, s in product(artifacts, samples)]
elif artifacts:
sql = """DELETE FROM qiita.analysis_sample
WHERE analysis_id = %s AND artifact_id = %s"""
args = [[self._id, a.id] for a in artifacts]
elif samples:
sql = """DELETE FROM qiita.analysis_sample
WHERE analysis_id = %s AND sample_id = %s"""
args = [[self._id, s] for s in samples]
else:
raise IncompetentQiitaDeveloperError(
"Must provide list of samples and/or proc_data for "
"removal")
qdb.sql_connection.TRN.add(sql, args, many=True)
qdb.sql_connection.TRN.execute()
def build_files(self, merge_duplicated_sample_ids, categories=None):
"""Builds biom and mapping files needed for analysis
Parameters
----------
merge_duplicated_sample_ids : bool
If the duplicated sample ids in the selected studies should be
merged or prepended with the artifact ids. If false prepends
the artifact id
categories : set of str, optional
If not None, use _only_ these categories for the metaanalysis
Notes
-----
Creates biom tables for each requested data type
Creates mapping file for requested samples
"""
with qdb.sql_connection.TRN:
# in practice we could retrieve samples in each of the following
# calls but this will mean calling the DB multiple times and will
# make testing much harder as we will need to have analyses at
# different stages and possible errors.
samples = self.samples
# retrieving all info on artifacts to save SQL time
bioms_info = qdb.util.get_artifacts_information(samples.keys())
# figuring out if we are going to have duplicated samples, again
# doing it here cause it's computationally cheaper
# 1. merge samples per: data_type, reference used and
# the command id
# Note that grouped_samples is basically how many biom tables we
# are going to create
grouped_samples = {}
# post_processing_cmds is a list of dictionaries, each describing
# an operation to be performed on the final merged BIOM. The order
# of operations will be list-order. Thus, in the case that
# multiple post_processing_cmds are implemented, ensure proper
# order before passing off to _build_biom_tables().
post_processing_cmds = dict()
for aid, asamples in samples.items():
# find the artifact info, [0] there should be only one info
ainfo = [bi for bi in bioms_info
if bi['artifact_id'] == aid][0]
data_type = ainfo['data_type']
# ainfo['algorithm'] is the original merging scheme
label = "%s || %s" % (data_type, ainfo['algorithm'])
if label not in grouped_samples:
aparams = qdb.artifact.Artifact(aid).processing_parameters
if aparams is not None:
cmd = aparams.command.post_processing_cmd
if cmd is not None:
# preserve label, in case it's needed.
merging_scheme = sub(
', BIOM: [0-9a-zA-Z-.]+', '',
ainfo['algorithm'])
post_processing_cmds[ainfo['algorithm']] = (
merging_scheme, cmd)
grouped_samples[label] = []
grouped_samples[label].append((aid, asamples))
# We need to negate merge_duplicated_sample_ids because in
# _build_mapping_file is acually rename: merge yes == rename no
rename_dup_samples = not merge_duplicated_sample_ids
self._build_mapping_file(
samples, rename_dup_samples, categories=categories)
if post_processing_cmds:
biom_files = self._build_biom_tables(
grouped_samples,
rename_dup_samples,
post_processing_cmds=post_processing_cmds)
else:
# preserve the legacy path
biom_files = self._build_biom_tables(
grouped_samples,
rename_dup_samples)
# if post_processing_cmds exists, biom_files will be a triplet,
# instead of a pair; the final element in the tuple will be an
# file path to the new phylogenetic tree.
return biom_files
def _build_biom_tables(self,
grouped_samples,
rename_dup_samples=False,
post_processing_cmds=None):
"""Build tables and add them to the analysis"""
with qdb.sql_connection.TRN:
# creating per analysis output folder
_, base_fp = qdb.util.get_mountpoint(self._table)[0]
base_fp = join(base_fp, 'analysis_%d' % self.id)
if not exists(base_fp):
mkdir(base_fp)
biom_files = []
for label, tables in grouped_samples.items():
data_type, algorithm = [
line.strip() for line in label.split('||')]
new_table = None
artifact_ids = []
for aid, samples in tables:
artifact = qdb.artifact.Artifact(aid)
artifact_ids.append(str(aid))
# the next loop is assuming that an artifact can have only
# one biom, which is a safe assumption until we generate
# artifacts from multiple bioms and even then we might
# only have one biom
biom_table_fp = None
for x in artifact.filepaths:
if x['fp_type'] == 'biom':
biom_table_fp = x['fp']
break
if not biom_table_fp:
raise RuntimeError(
"Artifact %s does not have a biom table associated"
% aid)
# loading the found biom table
biom_table = load_table(biom_table_fp)
# filtering samples to keep those selected by the user
biom_table_samples = set(biom_table.ids())
selected_samples = biom_table_samples.intersection(samples)
biom_table.filter(selected_samples, axis='sample',
inplace=True)
if len(biom_table.ids()) == 0:
continue
if rename_dup_samples:
ids_map = {_id: "%d.%s" % (aid, _id)
for _id in biom_table.ids()}
biom_table.update_ids(ids_map, 'sample', True, True)
if new_table is None:
new_table = biom_table
else:
try:
new_table = new_table.concat([biom_table])
except DisjointIDError:
new_table = new_table.merge(biom_table)
if not new_table or len(new_table.ids()) == 0:
# if we get to this point the only reason for failure is
# rarefaction
raise RuntimeError("All samples filtered out from "
"analysis due to rarefaction level")
# write out the file