-
Notifications
You must be signed in to change notification settings - Fork 21
/
pubGroundMutations
executable file
·181 lines (156 loc) · 6.29 KB
/
pubGroundMutations
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
#!/usr/bin/env python
# load default python packages
from __future__ import print_function
import logging, optparse, sys, glob, gzip
from os.path import join, basename, isfile, dirname, abspath
from collections import defaultdict, Counter
# add <scriptDir>/lib/ to package search path
sys.path.insert(0, join(dirname(abspath(__file__)), "lib"))
import pubGeneric, maxCommon, pubConf, maxbio
# ==== FUNCTIONS =====
def iterCommonFiles(inDirs, mask="*.tab.gz"):
"""
yield lists of filenames that have the same name in all indirs
Doesn't really check if all dirs are identical.
"""
fmask = join(inDirs[0], mask)
for fname in glob.glob(fmask):
base = basename(fname)
flist = []
for i in inDirs:
dirFname = join(i, base)
assert(isfile(dirFname))
flist.append(dirFname)
yield flist
def parseUniProt(inDir):
" parse uniprot fa files and return as acc -> list of (isoformId, sequence) "
faName = join(inDir, "uniprot.9606.var.fa.gz")
seqs = maxbio.parseFastaAsDict(gzip.open(faName))
result = defaultdict(list)
for seqId, seq in seqs.iteritems():
acc = seqId.split("-")[0]
result[acc].append( (seqId, seq) )
return result
def indexMuts(mutAnnots):
mutCounts = Counter()
mutToAnnots = defaultdict(list)
for mut in mutAnnots:
mutName = (int(mut.pos), mut.wtRes)
#mutName = "".join((mut.wtRes, mut.pos, mut.mutRes))
mutCounts.update([mutName])
mutToAnnots[mutName].append(mut)
extId = mut.externalId
pmid = mut.pmid
return extId, pmid, mutCounts, mutToAnnots
def indexGenes(upAnnots):
""" given rows from annotation file, return extId, counts of uniprot accession and
dict with uniprot accession to list of annotations """
upCounts = Counter()
accToAnnots = defaultdict(list)
#print geneAnnots
assert(len(upAnnots)>0)
extId = None
for up in upAnnots:
#print "UP", up
upCounts.update([up.uniProtAcc])
accToAnnots[up.uniProtAcc].append(up)
extId = up.externalId
return extId, upCounts, accToAnnots
def findSeqs(seqs, accs, pos, res):
" return all uniprot accessions from accs with at least one variant with residue at pos "
matchSeqs = []
for acc in accs:
isoSeqs = seqs[acc]
for seqId, seq in isoSeqs:
if pos < len(seq) and seq[pos]==res:
matchSeqs.append(seqId)
break
return matchSeqs
def someSnippets(idCounts, idToAnnots, isGene=False):
""" create a list of descriptions for all ids in idCounts
also return a dict with id -> name
"""
descs = []
names = {}
for id, count in idCounts.iteritems():
annots = idToAnnots[id]
annot = annots[0]
if isGene:
name = annot.word
else:
name = annot.wtRes+annot.pos+annot.mutRes
names[id] = name
descs.append("%s (%s, %d times): %s" % (id, name, count, annot.snippet))
return descs, names
def main(args, options):
if options.test:
import doctest
doctest.testmod()
sys.exit(0)
pubGeneric.setupLogging("", options)
annotDir, outFname = args
seqs = parseUniProt(pubConf.dbRefDir)
mutDir = join(annotDir, "mutations")
geneDir = join(annotDir, "genes")
refDir = pubConf.dbRefDir
ofh = open(outFname, "w")
resolvCount = 0
mutCount = 0
for fNames in iterCommonFiles([geneDir, mutDir]):
logging.debug( fNames )
print(fNames)
for fileId, rows in maxCommon.iterTsvJoin(fNames, groupFieldNumber=0, useChars=13):
articleId = str(fileId)[:10]
geneAnnots = rows[0]
mutAnnots = rows[1]
extId1, pmid, mutCounts, mutToAnnots = indexMuts(mutAnnots)
extId2, upCounts, upToAnnots = indexGenes(geneAnnots)
assert(extId1==extId2)
extId = extId1
print()
print("Article: %s PMID %s article %s file %s" % (extId, pmid, articleId, fileId))
#print extId, mutCounts.most_common()
print("- genes:")
geneDescs, geneNames = someSnippets(upCounts, upToAnnots, isGene=True)
print("\n".join(geneDescs))
print("- mutations:")
mutDescs, mutNames = someSnippets(mutCounts, mutToAnnots)
print("\n".join(mutDescs))
mutCount += len(set(mutNames))
#print extId, upCounts.most_common()
#print mutAnnots
rows = []
outLines = []
for posInfo, count in mutCounts.iteritems():
pos, origAa = posInfo
zeroPos = pos-1 # python is 0-based
mutName = str(pos)+origAa
matchSeqs = findSeqs(seqs, upCounts, zeroPos, origAa)
if len(matchSeqs)==0:
#rows.append(["NOMATCH", str(articleId), extId, mutName])
pass
else:
geneDesc = ",".join(["%s/%s" % (s, geneNames[s.split("-")[0]]) for s in matchSeqs])
rows.append([str(articleId), extId, mutName, geneDesc])
for uniprotId in matchSeqs:
outLines.append("\t".join([pmid, uniprotId+"/"+str(pos)]))
print("Resolved mutations:")
print("\n".join(["\t".join(row) for row in rows]))
resolvCount += len(rows)
if len(outLines)!=0:
ofh.write("\n".join(outLines)+"\n")
#for row in r1:
#print row
#print rows
print("Total mutation descriptions found: %d" % mutCount)
print("Total resolved mutations found: %d" % resolvCount)
# === COMMAND LINE INTERFACE, OPTIONS AND HELP ===
parser = optparse.OptionParser("""usage: %prog [options] annotDir - read geneSearcher and mutation_finderoutput, guess the gene for each mutation, and place onto uniprot sequences""")
parser.add_option("-d", "--debug", dest="debug", action="store_true", help="show debug messages")
parser.add_option("-v", "--verbose", dest="verbose", action="store_true", help="show more debug messages")
parser.add_option("-t", "--test", dest="test", action="store_true", help="run tests")
(options, args) = parser.parse_args()
if args==[] and not options.test:
parser.print_help()
exit(1)
main(args, options)