Source code for haddock.modules.analysis.alascan.scan

"""alascan module."""
import os
from pathlib import Path
import shutil

import numpy as np
import pandas as pd
import io
from contextlib import redirect_stdout

from haddock import log
from haddock.libs.libalign import get_atoms, load_coords
from haddock.libs.libplots import make_alascan_plot
from haddock.modules.analysis.caprieval.capri import CAPRI
from haddock.clis import cli_score

ATOMS_TO_BE_MUTATED = ['C', 'N', 'CA', 'O', 'CB']

RES_CODES = dict([
    ("CYS", "C"),
    ("ASP", "D"),
    ("SER", "S"),
    ("GLN", "Q"),
    ("LYS", "K"),
    ("ILE", "I"),
    ("PRO", "P"),
    ("THR", "T"),
    ("PHE", "F"),
    ("ASN", "N"),
    ("GLY", "G"),
    ("HIS", "H"),
    ("LEU", "L"),
    ("ARG", "R"),
    ("TRP", "W"),
    ("ALA", "A"),
    ("VAL", "V"),
    ("GLU", "E"),
    ("TYR", "Y"),
    ("MET", "M"),
    ("ALY", "K"),
    ("ASH", "D"),
    ("CFE", "C"),
    ("CSP", "C"),
    ("CYC", "C"),
    ("CYF", "C"),
    ("CYM", "C"),
    ("DDZ", "A"),
    ("GLH", "E"),
    ("HLY", "P"),
    ("HY3", "P"),
    ("HYP", "P"),
    ("M3L", "K"),
    ("MLY", "K"),
    ("MLZ", "K"),
    ("MSE", "M"),
    ("NEP", "H"),
    ("PNS", "S"),
    ("PTR", "Y"),
    ("SEP", "S"),
    ("TOP", "T"),
    ("TYP", "Y"),
    ("TYS", "Y"),
    ])


[docs]def mutate(pdb_f, target_chain, target_resnum, mut_resname): """ Mutate a residue in a PDB file into a different residue. Parameters ---------- pdb_f : str Path to the pdb file. target_chain : str Chain of the residue to be mutated. target_resnum : int Residue number of the residue to be mutated. mut_resname : str Residue name of the residue to be mutated. Returns ------- mut_pdb_fname : str Path to the mutated pdb file. """ mut_pdb_l = [] resname = '' with open(pdb_f, 'r') as fh: for line in fh.readlines(): if line.startswith('ATOM'): chain = line[21] resnum = int(line[22:26]) atom_name = line[12:16].strip() if target_chain == chain and target_resnum == resnum: if not resname: resname = line[17:20].strip() if atom_name in ATOMS_TO_BE_MUTATED: # mutate line = line[:17] + mut_resname + line[20:] mut_pdb_l.append(line) else: mut_pdb_l.append(line) try: mut_id = f'{RES_CODES[resname]}{target_resnum}{RES_CODES[mut_resname]}' except KeyError: raise KeyError(f"Could not mutate {resname} into {mut_resname}.") mut_pdb_fname = Path( pdb_f.name.replace('.pdb', f'-{target_chain}_{mut_id}.pdb')) with open(mut_pdb_fname, 'w') as fh: fh.write(''.join(mut_pdb_l)) return mut_pdb_fname
[docs]def add_delta_to_bfactor(pdb_f, df_scan): """Add delta scores as b-factors. Parameters ---------- pdb_f : str Path to the pdb file. df_scan : pandas.DataFrame Dataframe with the scan results for the model Returns ------- pdb_f : str Path to the pdb file with the b-factors added. """ tmp_pdb_f = pdb_f.replace('.pdb', '_bfactor.pdb') max_b, min_b = df_scan["delta_score"].max(), df_scan["delta_score"].min() out_pdb_l = [] with open(pdb_f, 'r') as fh: for line in fh.readlines(): if line.startswith('ATOM'): chain = line[21] resnum = int(line[22:26]) norm_delta = 0.0 # extracting all the elements of df_scan such that # chain = chain and res = resnum df_scan_subset = df_scan.loc[ (df_scan["chain"] == chain) & (df_scan["res"] == resnum) ] if df_scan_subset.shape[0] > 0: delta = df_scan_subset["delta_score"].values[0] norm_delta = 100 * (delta - min_b) / (max_b - min_b) delta_str = f"{norm_delta:.2f}".rjust(6, " ") line = line[:60] + delta_str + line[66:] out_pdb_l.append(line) with open(tmp_pdb_f, 'w') as out_fh: out_fh.write(''.join(out_pdb_l)) # move tmp_pdb_f to pdb_f os.rename(tmp_pdb_f, pdb_f) return pdb_f
[docs]def get_score_string(pdb_f, run_dir): """Get score output from cli_score.main. Parameters ---------- pdb_f : str Path to the pdb file. run_dir : str Path to the run directory. Returns ------- out : list List of strings with the score output. """ f = io.StringIO() with redirect_stdout(f): cli_score.main(pdb_f, run_dir, full=True) out = f.getvalue().split(os.linesep) return out
[docs]def calc_score(pdb_f, run_dir): """Calculate the score of a model. Parameters ---------- pdb_f : str Path to the pdb file. run_dir : str Path to the run directory. Returns ------- score : float Haddock score. vdw : float Van der Waals energy. elec : float Electrostatic energy. desolv : float Desolvation energy. bsa : float Buried surface area. """ out_string = get_score_string(pdb_f, run_dir) for ln in out_string: if ln.startswith("> HADDOCK-score (emscoring)"): score = float(ln.split()[-1]) if ln.startswith("> vdw"): vdw = float(ln.split("vdw=")[1].split(",")[0]) elec = float(ln.split("elec=")[1].split(",")[0]) desolv = float(ln.split("desolv=")[1].split(",")[0]) bsa = float(ln.split("bsa=")[1].split(",")[0]) return score, vdw, elec, desolv, bsa
[docs]def add_zscores(df_scan_clt, column='delta_score'): """Add z-scores to the dataframe. Parameters ---------- df_scan : pandas.DataFrame Dataframe with the scan results for the model. colunm : str Column to calculate the z-score. Returns ------- df_scan : pandas.DataFrame Dataframe with the z-scores added. """ mean_delta = np.mean(df_scan_clt[column]) std_delta = np.std(df_scan_clt[column]) if std_delta > 0.0: df_scan_clt['z_score'] = (df_scan_clt[column] - mean_delta) / std_delta else: df_scan_clt['z_score'] = 0.0 return df_scan_clt
[docs]def alascan_cluster_analysis(models): """Perform cluster analysis on the alascan data. Parameters ---------- models : list List of models. path : str Path to the run directory. """ clt_scan = {} cl_pops = {} for native in models: cl_id = native.clt_id # unclustered models have cl_id = None if cl_id is None: cl_id = "-" if cl_id not in clt_scan: clt_scan[cl_id] = {} cl_pops[cl_id] = 1 else: cl_pops[cl_id] += 1 # read the scan file alascan_fname = f"scan_{native.file_name.rstrip('.pdb')}.csv" #alascan_fname = Path(path, alascan_fname) df_scan = pd.read_csv(alascan_fname, sep="\t", comment="#") # loop over the scan file for row_idx in range(df_scan.shape[0]): row = df_scan.iloc[row_idx] chain = row['chain'] res = row['res'] ori_resname = row['ori_resname'] delta_score = row['delta_score'] delta_vdw = row['delta_vdw'] delta_elec = row['delta_elec'] delta_desolv = row['delta_desolv'] delta_bsa = row['delta_bsa'] # add to the cluster data with the ident logic ident = f"{chain}-{res}-{ori_resname}" if ident not in clt_scan[cl_id]: clt_scan[cl_id][ident] = { 'delta_score': delta_score, 'delta_vdw': delta_vdw, 'delta_elec': delta_elec, 'delta_desolv': delta_desolv, 'delta_bsa': delta_bsa, 'frac_pr': 1 } else: clt_scan[cl_id][ident]['delta_score'] += delta_score clt_scan[cl_id][ident]['delta_vdw'] += delta_vdw clt_scan[cl_id][ident]['delta_elec'] += delta_elec clt_scan[cl_id][ident]['delta_desolv'] += delta_desolv clt_scan[cl_id][ident]['delta_bsa'] += delta_bsa clt_scan[cl_id][ident]['frac_pr'] += 1 # now average the data for cl_id in clt_scan: scan_clt_filename = f"scan_clt_{cl_id}.csv" log.info(f"Writing {scan_clt_filename}") clt_data = [] for ident in clt_scan[cl_id]: chain = ident.split("-")[0] resnum = int(ident.split("-")[1]) resname = ident.split("-")[2] frac_pr = clt_scan[cl_id][ident]["frac_pr"] clt_data.append([chain, resnum, resname, ident, clt_scan[cl_id][ident]['delta_score'] / frac_pr, clt_scan[cl_id][ident]['delta_vdw'] / frac_pr, clt_scan[cl_id][ident]['delta_elec'] / frac_pr, clt_scan[cl_id][ident]['delta_desolv'] / frac_pr, clt_scan[cl_id][ident]['delta_bsa'] / frac_pr, clt_scan[cl_id][ident]['frac_pr'] / cl_pops[cl_id] ] ) df_cols = ['chain', 'resnum', 'resname', 'full_resname', 'delta_score', 'delta_vdw', 'delta_elec', 'delta_desolv', 'delta_bsa', 'frac_pres'] df_scan_clt = pd.DataFrame(clt_data, columns=df_cols) # adding clt-based Z score df_scan_clt = add_zscores(df_scan_clt, 'delta_score') df_scan_clt.sort_values(by=['chain', 'resnum'], inplace=True) df_scan_clt.to_csv( scan_clt_filename, index=False, float_format='%.2f', sep="\t") # add comment fl_content = open(scan_clt_filename, 'r').read() with open(scan_clt_filename, 'w') as f: f.write(f"#######################################################################{os.linesep}") # noqa E501 f.write(f"# `alascan` cluster results for cluster {cl_id}{os.linesep}") # noqa E501 f.write(f"#{os.linesep}") f.write(f"# z_score is calculated with respect to the mean values of all residues{os.linesep}") # noqa E501 f.write(f"#######################################################################{os.linesep}") # noqa E501 f.write(fl_content) return clt_scan
[docs]def generate_alascan_output(models, path): """Generate the alascan output files. Parameters ---------- models : list List of models. path : str Path to the run directory. """ models_to_export = [] for model in models: name = f"{model.file_name.rstrip('.pdb')}_alascan.pdb" # changing attributes name_path = Path(name) shutil.copy(Path(model.path, model.file_name), name_path) alascan_fname = f"scan_{model.file_name.rstrip('.pdb')}.csv" # add delta_score as a bfactor to the model df_scan = pd.read_csv(alascan_fname, sep="\t", comment="#") add_delta_to_bfactor(name, df_scan) model.ori_name = model.file_name model.file_name = name model.full_name = name model.rel_path = Path('..', Path(path).name, name) model.path = str(Path(".").resolve()) models_to_export.append(model) return models_to_export
[docs]def create_alascan_plots(clt_alascan, scan_residue, offline = False): """Create the alascan plots.""" for clt_id in clt_alascan: scan_clt_filename = f"scan_clt_{clt_id}.csv" if not os.path.exists(scan_clt_filename): log.warning(f"Could not find {scan_clt_filename}") continue df_scan_clt = pd.read_csv( scan_clt_filename, sep="\t", comment="#" ) # plot the data try: make_alascan_plot( df_scan_clt, clt_id, scan_residue, offline=offline, ) except Exception as e: log.warning( "Could not create interactive plot. The following error" f" occurred {e}" ) return
[docs]class ScanJob: """A Job dedicated to the parallel alanine scanning of models.""" def __init__( self, output, params, scan_obj): log.info(f"core {scan_obj.core}, initialising Scan...") self.output = output self.params = params self.scan_obj = scan_obj
[docs] def run(self): """Run this ScanJob.""" log.info(f"core {self.scan_obj.core}, running Scan...") self.scan_obj.run() self.scan_obj.output() return
[docs]class Scan: """Scan class.""" def __init__( self, model_list, output_name, core, path, **params, ): """Initialise Scan class.""" self.model_list = model_list self.output_name = output_name self.core = core self.path = path self.scan_res = params['params']['scan_residue'] self.int_cutoff = params["params"]["int_cutoff"] # initialising resdic if "params" in params.keys(): self.filter_resdic = { key[-1]: value for key, value in params["params"].items() if key.startswith("resdic") }
[docs] def run(self): """Run alascan calculations.""" for native in self.model_list: # here we rescore the native model for consistency, as the score # attribute could come from any module in principle sc_dir = f"haddock3-score-{self.core}" n_score, n_vdw, n_elec, n_des, n_bsa = calc_score(native.rel_path, run_dir=sc_dir) scan_data = [] # check if the user wants to mutate only some residues if self.filter_resdic != {'_': []}: interface = self.filter_resdic else: interface = CAPRI.identify_interface( native.rel_path, cutoff=self.int_cutoff ) atoms = get_atoms(native.rel_path) coords, chain_ranges = load_coords(native.rel_path, atoms, add_resname=True ) resname_dict = {} for chain, resid, _atom, resname in coords.keys(): key = f"{chain}-{resid}" if key not in resname_dict: resname_dict[key] = resname # self.log(f'Mutating interface of {native.file_name}...') # self.log(f"Interface: {interface}") for chain in interface: for res in interface[chain]: ori_resname = resname_dict[f"{chain}-{res}"] end_resname = self.scan_res if ori_resname == self.scan_res: # we do not re-score equal residues (e.g. ALA = ALA) c_score = n_score c_vdw = n_vdw c_elec = n_elec c_des = n_des c_bsa = n_bsa else: try: mut_pdb_name = mutate(native.rel_path, chain, res, end_resname) except KeyError: continue # now we score the mutated model c_score, c_vdw, c_elec, c_des, c_bsa = calc_score( mut_pdb_name, run_dir=sc_dir) # now the deltas (wildtype - mutant) delta_score = n_score - c_score delta_vdw = n_vdw - c_vdw delta_elec = n_elec - c_elec delta_desolv = n_des - c_des delta_bsa = n_bsa - c_bsa scan_data.append([chain, res, ori_resname, end_resname, c_score, c_vdw, c_elec, c_des, c_bsa, delta_score, delta_vdw, delta_elec, delta_desolv, delta_bsa]) os.remove(mut_pdb_name) # write output df_columns = ['chain', 'res', 'ori_resname', 'end_resname', 'score', 'vdw', 'elec', 'desolv', 'bsa', 'delta_score', 'delta_vdw', 'delta_elec', 'delta_desolv', 'delta_bsa'] self.df_scan = pd.DataFrame(scan_data, columns=df_columns) alascan_fname = Path(self.path, f"scan_{native.file_name.rstrip('.pdb')}.csv") # add zscore self.df_scan = add_zscores(self.df_scan, 'delta_score') self.df_scan.to_csv( alascan_fname, index=False, float_format='%.2f', sep="\t" ) fl_content = open(alascan_fname, 'r').read() with open(alascan_fname, 'w') as f: f.write(f"##########################################################{os.linesep}") # noqa E501 f.write(f"# `alascan` results for {native.file_name}{os.linesep}") # noqa E501 f.write(f"#{os.linesep}") f.write(f"# native score = {n_score}{os.linesep}") f.write(f"#{os.linesep}") f.write(f"# z_score is calculated with respect to the other residues") # noqa E501 f.write(f"{os.linesep}") f.write(f"##########################################################{os.linesep}") # noqa E501 f.write(fl_content)
[docs] def output(self): """Write down unique contacts to file.""" output_fname = Path(self.path, self.output_name) with open(output_fname, "w") as out_fh: out_fh.write( f"core {self.core} wrote alascan data " f"for {len(self.model_list)} models{os.linesep}" )