Source code for haddock.clis.cli_traceback

"""
Traces back PDB files from a HADDOCK run directory.

Given an input run directory, haddock3-traceback traces back each model to the
initial input molecules used, providing the rank of each intermediate model.

USAGE::

    haddock3-traceback -r <run_dir>

"""
import argparse
import sys
from pathlib import Path

import numpy as np
import pandas as pd

from haddock import log
from haddock.core.defaults import TRACEBACK_FOLDER
from haddock.core.typing import Any, FilePath
from haddock.libs import libcli
from haddock.libs.libontology import ModuleIO, PDBFile
from haddock.libs.libplots import make_traceback_plot
from haddock.modules import get_module_steps_folders


[docs] def get_steps_without_pdbs(run_dir, all_steps): """ Get the modules that do not produce PDB files. Parameters ---------- run_dir : str or pathlib.Path Path to the run directory. all_steps : list List of all the steps in the run directory. Returns ------- steps_without_pdbs : list List of steps that did not produce PDB files. """ steps_without_pdbs = [] for step in all_steps: if step.endswith("topoaa"): steps_without_pdbs.append(step) else: step_dir = Path(run_dir, step) if step_dir.is_dir(): pdbs = list(step_dir.glob("*.pdb*")) if len(pdbs) == 0: steps_without_pdbs.append(step) return steps_without_pdbs
[docs] def get_ori_names(n: int, pdbfile: PDBFile, max_topo_len: int) -> tuple[list, int]: """ Get the original name(s) of the PDB file. Parameters ---------- n : int Step number. pdbfile : PDBFile PDBFile object. max_topo_len : int Maximum length of the topologies found so far. Returns ------- ori_names : list List of original names. max_topo_len : int Maximum length of the topologies found so far. """ if n != 0: # not the first step, ori_name should be defined ori_names = [pdbfile.ori_name] else: # first step, we get topology files instead of ori_name # topology can either be a list of topologies or a single # topology if isinstance(pdbfile.topology, list): ori_names = [el.file_name for el in pdbfile.topology] if len(pdbfile.topology) > max_topo_len: max_topo_len = len(pdbfile.topology) else: ori_names = [pdbfile.topology.file_name] # type: ignore max_topo_len = 1 return ori_names, max_topo_len
[docs] def traceback_dataframe( data_dict: dict, rank_dict: dict, sel_step: list, max_topo_len: int ) -> pd.DataFrame: """ Create traceback dataframe by combining together ranks and data. Parameters ---------- data_dict : dict Dictionary containing the data to be traced back. rank_dict : dict Dictionary containing the ranks of the data to be traced back. sel_step : list List of selected steps. max_topo_len : int Maximum length of the topologies. Returns ------- df_ord : pandas.DataFrame Dataframe containing the traceback data. """ # get last step of the workflow last_step = sel_step[-1] # data dict to dataframe df_data = pd.DataFrame.from_dict(data_dict, orient="index") df_data.reset_index(inplace=True) # assign columns data_cols = [el for el in reversed(sel_step)] data_cols.extend([f"00_topo{i+1}" for i in range(max_topo_len)]) df_data.columns = data_cols # same for the rank_dict df_ranks = pd.DataFrame.from_dict(rank_dict, orient="index") df_ranks.reset_index(inplace=True) ranks_col = [last_step] # the key to merge the dataframes ranks_col.extend([f"{el}_rank" for el in reversed(sel_step)]) df_ranks.columns = ranks_col # merging the data and ranks dataframes df_merged = pd.merge(df_data, df_ranks, on=last_step) ordered_cols = sorted(df_merged.columns) df_ord = df_merged[ordered_cols] # last thing: substituting unk records with - in the last step unk_records = df_ord[f"{last_step}"].str.startswith("unk") df_ord.loc[unk_records, last_step] = "-" return df_ord
[docs] def order_traceback_df(df_output, sel_step): """ Order the traceback dataframe. Each step is ordered by rank. Parameters ---------- df_output : pandas.DataFrame Dataframe containing the traceback data. sel_step : list List of selected steps. Returns ------- df_output : pandas.DataFrame Dataframe containing the ordered traceback data. """ # loop over sel_step in reverse order sorted_list = [] indexes = [] for n in range(len(sel_step) - 1, -1, -1): rank_col = sel_step[n] + "_rank" # take only models with a rank df_last = df_output[df_output[rank_col] != "-"] # remove from df_last the indexes that are already in the dataframe df_last = df_last[~df_last.index.isin(indexes)] # sorting the dataframe by rank sorted_df_last = df_last.sort_values(by=rank_col) sorted_list.append(sorted_df_last) # concat the current indexes with the previous ones indexes = sorted_df_last.index.tolist() + indexes df_output = pd.concat(sorted_list) return df_output
[docs] def subset_traceback(traceback_df: pd.DataFrame, cons_filename: Path) -> pd.DataFrame: """ Generate a subset the traceback dataframe with the top 40 models. Parameters ---------- traceback_df : pandas.DataFrame Dataframe containing the traceback data. cons_filename : pathlib.Path name of the consensus file. Returns ------- rank_data_subset : pandas.DataFrame Dataframe containing the subset of the traceback data. """ red_traceback_df = traceback_df.head(40) rank_data = red_traceback_df.filter(regex='rank$') # get the last column: this will define the name of the models last_column = red_traceback_df.columns[-2] last_column_data = red_traceback_df[last_column] # concat the ranks with the model name rank_data = pd.concat([last_column_data, rank_data], axis=1) # the rank of the last column must be defined last_column_rank = last_column + "_rank" # copy avoids SettingWithCopyWarning rank_data_subset = rank_data[rank_data[last_column_rank] != '-'].copy() rank_columns = rank_data_subset.columns[1:].tolist() rank_data_subset[rank_columns] = rank_data_subset[rank_columns].apply(pd.to_numeric, errors='coerce') rank_data_subset = rank_data_subset.sort_values(by=last_column_rank, ascending=True) # rename the columns rank_data_subset.columns = ["Model"] + rank_columns # sum the ranks rank_data_subset["Sum-of-Ranks"] = rank_data_subset[rank_columns].sum(axis=1) rank_data_subset.to_csv(cons_filename, index=False, sep="\t") return rank_data_subset
# Command line interface parser ap = argparse.ArgumentParser( prog="haddock3-traceback", description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, ) libcli.add_rundir_arg(ap)
[docs] def load_args(ap): """Load argument parser args.""" return ap.parse_args()
[docs] def cli(ap, main): """Command-line interface entry point.""" cmd = vars(load_args(ap)) main(**cmd)
[docs] def maincli(): """Execute main client.""" cli(ap, main)
[docs] def main(run_dir: FilePath, offline: bool = False) -> None: """ Traceback CLI. Parameters ---------- run_dir : str or Path Path to the original run directory. """ log.level = 20 log.info(f"Running haddock3-traceback on {run_dir}") # Reading steps log.info("Reading input run directory") # get the module folders from the run_dir input all_steps = get_module_steps_folders(Path(run_dir)) log.info(f"All_steps: {', '.join(all_steps)}") ana_modules = get_steps_without_pdbs(run_dir, all_steps) log.info(f"Modules not to be analysed: {', '.join(ana_modules)}") sel_step = [st for st in all_steps if st not in ana_modules] # check if there are steps to traceback if len(sel_step) == 0: log.info("No steps to trace back. Exiting.") return else: log.info(f"Steps to trace back: {', '.join(sel_step)}") # creating traceback folder outdir = Path(run_dir, TRACEBACK_FOLDER) try: outdir.mkdir(exist_ok=False) log.info(f"Created directory: {str(outdir.resolve())}") except FileExistsError: log.warning(f"Directory {str(outdir.resolve())} already exists.") data_dict: dict[Any, Any] = {} rank_dict: dict[Any, Any] = {} unk_idx, max_topo_len = 0, 0 # this cycle goes through the steps in reverse order for n in range(len(sel_step) - 1, -1, -1): log.info(f"Tracing back step {sel_step[n]}") # correcting names in the dictionary. The ori_name must be complemented # with the step folder name for key in data_dict.keys(): if data_dict[key][-1] != "-": data_dict[key][-1] = f"../{sel_step[n]}/{data_dict[key][-1]}" delta = len(sel_step) - n - 1 # how many steps have we gone back? # loading the .json file json_path = Path(run_dir, sel_step[n], "io.json") io = ModuleIO() io.load(json_path) # list all the values in the data_dict ls_values = [x for val in data_dict.values() for x in val] # getting and sorting the ranks for the current step folder ranks = [pdbfile.score for pdbfile in io.output] ranks_argsort = np.argsort(ranks) # iterating through the pdbfiles to fill data_dict and rank_dict for i, pdbfile in enumerate(io.output): rank = np.where(ranks_argsort == i)[0][0] + 1 # getting the original names ori_names, max_topo_len = get_ori_names(n, pdbfile, max_topo_len) if n != len(sel_step) - 1: if str(pdbfile.rel_path) not in ls_values: # this is the first step in which the pdbfile appears. # This means that it was discarded for the subsequent steps # We need to add the pdbfile to the data_dict keys = [f"unk{unk_idx}"] data_dict[keys[0]] = ["-" for el in range(delta - 1)] data_dict[keys[0]].append(str(pdbfile.rel_path)) rank_dict[keys[0]] = ["-" for el in range(delta)] unk_idx += 1 else: # we've already seen this pdb before. idxs = [i for i, el in enumerate(ls_values) if el==str(pdbfile.rel_path)] keys = [list(data_dict.keys())[idx // delta] for idx in idxs] # assignment for el in ori_names: for key in keys: data_dict[key].append(el) for key in keys: rank_dict[key].append(rank) else: # last step of the workflow data_dict[str(pdbfile.rel_path)] = [on for on in ori_names] rank_dict[str(pdbfile.rel_path)] = [rank] # print(f"rank_dict {rank_dict}") # print(f"data_dict {data_dict}, maxtopo {max_topo_len}") # stripping away relative paths final_data_dict = {} for key in data_dict.keys(): new_key = key.split("/")[-1] final_data_dict[new_key] = [el.split("/")[-1] for el in data_dict[key]] final_rank_dict = {} for key in rank_dict.keys(): new_key = key.split("/")[-1] final_rank_dict[new_key] = rank_dict[key] # dumping the data into a dataframe df_output = traceback_dataframe( final_data_dict, final_rank_dict, sel_step, max_topo_len ) # ordering the dataframe df_output = order_traceback_df(df_output, sel_step) # dumping the dataframe track_filename = Path(run_dir, TRACEBACK_FOLDER, "traceback.tsv") log.info( f"Output dataframe {track_filename} " f"created with shape {df_output.shape}" ) df_output.to_csv(track_filename, sep="\t", index=False) # taking (and writing) a subset of the dataframe consensus_filename = Path(run_dir, TRACEBACK_FOLDER, "consensus.tsv") rank_data_subset = subset_traceback(df_output, consensus_filename) # plotting the traceback dataframe plot_filename = Path(run_dir, TRACEBACK_FOLDER, "traceback.html") make_traceback_plot(rank_data_subset, plot_filename, offline=offline) return
if __name__ == "__main__": sys.exit(maincli())