Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize some code #3

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 84 additions & 83 deletions ClassiCOL.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,33 +45,43 @@
warnings.filterwarnings("ignore", category=DeprecationWarning)


def collect_fasta_files(base_path, extensions):
"""Helper function to collect files with given extensions from a directory."""
return [
os.path.join(root, file)
for root, _, files in os.walk(base_path)
for file in files
if file.endswith(tuple(extensions))
]


def crap_f(path, add_fasta): # done
print("making database")
crap = {}
files_own = []
for fastafile in os.walk(path + "/BoneDB"):
for i in fastafile[-1]:
if i.endswith(".txt") or i.endswith(".fasta") or i.endswith(".fa"):
files_own.append(path + "/BoneDB/" + i)
if add_fasta != None:
for fastafile in os.walk(add_fasta):
for i in fastafile[-1]:
if i.endswith(".txt") or i.endswith(".fasta") or i.endswith(".fa"):
files_own.append(add_fasta + "/" + i)
done = ""
for i in files_own:
print(i.split("/")[-1])
for record in SeqIO.parse(i, "fasta"):
if str(record.description) in done:
done = set()
extensions = [".txt", ".fasta", ".fa"]

# collect files from BondDB and given directory
files_own = collect_fasta_files(os.path.join(path, "BoneDB"), extensions)
if add_fasta:
files_own.extend(collect_fasta_files(add_fasta, extensions))

for file_path in files_own:
print(os.path.basename(file_path))
for record in SeqIO.parse(file_path, "fasta"):
description = str(record.description)
if description in done:
continue
if "J" in record.seq or "O" in record.seq:
print("skip", record.description)
if any(invalid_char in record.seq for invalid_char in ["J", "O"]):
print("skip", description)
continue

add = record.seq
while add in crap:
add = Seq(str(add) + "&") # if seq the same as relative
done += str(record.description)
crap[add] = record.description
add = Seq(str(add) + "&")

done.add(description)
crap[add] = description
return crap


Expand Down Expand Up @@ -156,13 +166,9 @@ def do_unimod(path, ptms): # done
return unimod_db, unimod


def find_mass(
peptide, AA_codes
): # sums the masses of the amino acid sequence inputed #done
mass = 0
for AA in peptide:
mass += AA_codes[AA]
return mass
def find_mass(peptide, AA_codes):
"""sums the masses of the amino acid sequence inputed"""
return sum(AA_codes[AA] for AA in peptide)


def make_matrix(codes, uni): # done
Expand All @@ -185,35 +191,35 @@ def make_matrix(codes, uni): # done
return reduced_matrix


def find_header_row(file_path, search_term="pep_seq"):
"""Find the header row in a file containing a specific search term."""
with open(file_path, "r") as file:
for idx, line in enumerate(file):
if search_term in line:
return idx
raise ValueError(f"Header containing '{search_term}' not found in file.")


def load_files_mascot(path, name_file): # done
print("open file {}".format(name_file))
found = False
header_row = 0
while found == False:
try:
df = pd.read_csv(name_file, header=header_row)
if "pep_seq" in df.columns:
found = True
else:
header_row += 1
except:
header_row += 1
if header_row > 1000:
break
df = df.fillna("")
# Find the header row containing 'pep_seq' and then read the CSV from that row
header_row = find_header_row(name_file, search_term="pep_seq")
df = pd.read_csv(name_file, header=header_row, skip_blank_lines=False).fillna("")

# Store charges and relevant columns
charges = df["pep_exp_z"].values
df = df[
["prot_desc", "pep_seq", "pep_var_mod", "pep_var_mod_pos", "pep_scan_title"]
]
df["pep_scan_title"] = [
num.replace("~", '"') for num in df["pep_scan_title"].values
]
df_4_uni = df

df2 = df
# Vectorized replacement of '~' with '"'
df["pep_scan_title"] = df["pep_scan_title"].str.replace("~", '"')

# Process unimod and protein data
unimod_db, unimod = do_unimod(path, df["pep_var_mod"].values)
protein = {}
unimod_db, unimod = do_unimod(path, df_4_uni["pep_var_mod"].values)
ids = {}

for p, a, m in unimod_db[["PTM", "AA", "mass"]].values:
if a == "N-term":
a = "!"
Expand All @@ -222,20 +228,16 @@ def load_files_mascot(path, name_file): # done
add = "?"
while add + a in AA_codes.keys():
add += "?"
if a == "!" or a == "*":
AA_codes[add + a] = float(m)
else:
AA_codes[add + a] = AA_codes[a] + float(m)
AA_codes[add + a] = float(m) if a in ["!", "*"] else AA_codes[a] + float(m)
ids[add + a] = p
adj_pep = []
for i, peptide in enumerate(df2["pep_seq"].values):
adjusted_pept = ""
for AA in peptide:
adjusted_pept += AA + "|"
adj_pep.append(adjusted_pept[:-1])
df2["adj"] = np.array(adj_pep)
df2["charge"] = charges
return df, df2, unimod_db, protein, ids, adj_pep

# Efficient peptide adjustment using join() for string concatenation
df["adj"] = df["pep_seq"].apply(lambda x: "|".join(x))

# Add charge column
df["charge"] = charges

return df, df, unimod_db, protein, ids, df["adj"].values


def load_files_maxquant(path, name_file): # done
Expand Down Expand Up @@ -1011,18 +1013,20 @@ def massblast(
AA_codes,
uncertain,
): # done
done = []
db_str = str(db)
t0 = time.time()
done = set()
final_result = []
done_title = []
found_peptide = []
done_title = set()
found_peptide = set()
for ilocs, p in enumerate(peptides):
if title[ilocs] in done_title:
continue
if p + PTMs[ilocs] in remember_peptides.keys():
if p + PTMs[ilocs] in remember_peptides:
find = p + PTMs[ilocs]
if animal not in remember_peptides[find]:
continue # peptide already tested and not found in test animal
done_title.append(title[ilocs])
done_title.add(title[ilocs])
if len(final_result) > 0 and cap == True:
start_temp = [
num for num in final_result if p in num and PTMs[ilocs] in num
Expand Down Expand Up @@ -1056,7 +1060,7 @@ def massblast(
cap == False and p in found_peptide
): # If you are only interested in the peptide, than you need only not all possibles
continue
done.append((p, PTMs[ilocs]))
done.add((p, PTMs[ilocs]))
ptm_masses, PTMs_insearch = ptm_mass(PTMs[ilocs], unimod_db)
p_mass = find_mass(p, AA_codes) + ptm_masses
p_adj = p_adjs[ilocs]
Expand All @@ -1077,19 +1081,16 @@ def massblast(
exact_match_with_uncertainty += u
# Add all the exact matches, als oif X,B or Z in sequence. Only 1 uncertain allowed in output
added_seq = False
search_result = re.search(exact_match_with_uncertainty, db_str)
if (
str(re.search(exact_match_with_uncertainty, str(db))) != "None"
search_result
and len(
[
num
for num in re.search(exact_match_with_uncertainty, str(db)).group()
if num not in AA_codes.keys()
]
[num for num in search_result.group() if num not in AA_codes.keys()]
)
<= 1
): # match all exact matches en remind the location
match = re.search(exact_match_with_uncertainty, str(db)).group()
location = re.finditer(exact_match_with_uncertainty, str(db))
match = search_result.group()
location = re.finditer(exact_match_with_uncertainty, db_str)
locs = []
for i in location:
locs.append(i.span())
Expand All @@ -1109,9 +1110,9 @@ def massblast(
charge[ilocs],
)
)
found_peptide.append(p)
found_peptide.add(p)
else:
found_peptide.append(p)
found_peptide.add(p)
final_result.append(
(
p,
Expand All @@ -1128,9 +1129,9 @@ def massblast(
# if isobaric switch was found, no need to check this again for another animal
if p in remember_good:
for testing in remember_good[p]:
if testing[1] in str(db) and testing[-1] == PTMs[ilocs]:
match = re.search(testing[1], str(db)).group()
location = re.finditer(testing[1], str(db))
if testing[1] in db_str and testing[-1] == PTMs[ilocs]:
match = re.search(testing[1], db_str).group()
location = re.finditer(testing[1], db_str)
locs = []
for i in location:
locs.append(i.span())
Expand All @@ -1150,7 +1151,7 @@ def massblast(
)
)
added_seq = True
found_peptide.append(p)
found_peptide.add(p)
# start isoblast
if added_seq == False: # if no exact match, than start isobaric switches
final_addition = False
Expand Down Expand Up @@ -1225,7 +1226,7 @@ def massblast(
)
)
final_addition = True # this could give a problem
found_peptide.append(p)
found_peptide.add(p)
if p in remember_good:
remember_good[p] = remember_good[p] + [
(
Expand Down Expand Up @@ -1254,7 +1255,8 @@ def massblast(
PTMs[ilocs],
)
]

t1 = time.time()
print("*****isoblast time: ", t1 - t0)
return final_result, remember_good, remember_bad


Expand Down Expand Up @@ -2727,7 +2729,6 @@ def rescore(path, file_loc):
if fixed_mod != None:
for AA, f in fixed_mod:
AA_codes[AA] = AA_codes[AA] + f

if Search_engine == "mascot":
df, df2, unimod_db, protein, ids, adj_pep = load_files_mascot(
path, test_file
Expand Down