Source code for haddock.clis.cli_traceback

"""
Traces back PDB files from a HADDOCK run directory.

Given an input run directory, haddock3-traceback traces back each model to the
initial input molecules used, providing the rank of each intermediate model.

USAGE::

    haddock3-traceback -r <run_dir>

"""
import argparse
import sys
from pathlib import Path

import numpy as np
import pandas as pd
from typing import Any

from haddock import log
from haddock.libs import libcli
from haddock.libs.libontology import ModuleIO, PDBFile
from haddock.libs.libplots import make_traceback_plot
from haddock.modules import get_module_steps_folders


TRACK_FOLDER = "traceback"  # name of the traceback folder


[docs]def get_steps_without_pdbs(run_dir, all_steps): """ Get the modules that do not produce PDB files. Parameters ---------- run_dir : str or pathlib.Path Path to the run directory. all_steps : list List of all the steps in the run directory. Returns ------- steps_without_pdbs : list List of steps that did not produce PDB files. """ steps_without_pdbs = [] for step in all_steps: if step.endswith("topoaa"): steps_without_pdbs.append(step) else: step_dir = Path(run_dir, step) if step_dir.is_dir(): pdbs = list(step_dir.glob("*.pdb*")) if len(pdbs) == 0: steps_without_pdbs.append(step) return steps_without_pdbs
[docs]def get_ori_names(n: int, pdbfile: PDBFile, max_topo_len: int) -> tuple[list, int]: """ Get the original name(s) of the PDB file. Parameters ---------- n : int Step number. pdbfile : PDBFile PDBFile object. max_topo_len : int Maximum length of the topologies found so far. Returns ------- ori_names : list List of original names. max_topo_len : int Maximum length of the topologies found so far. """ if n != 0: # not the first step, ori_name should be defined ori_names = [pdbfile.ori_name] else: # first step, we get topology files instead of ori_name # topology can either be a list of topologies or a single # topology if isinstance(pdbfile.topology, list): ori_names = [el.file_name for el in pdbfile.topology] if len(pdbfile.topology) > max_topo_len: max_topo_len = len(pdbfile.topology) else: ori_names = [pdbfile.topology.file_name] # type: ignore max_topo_len = 1 return ori_names, max_topo_len
[docs]def traceback_dataframe( data_dict: dict, rank_dict: dict, sel_step: list, max_topo_len: int ) -> None: """ Create traceback dataframe by combining together ranks and data. Parameters ---------- data_dict : dict Dictionary containing the data to be traced back. rank_dict : dict Dictionary containing the ranks of the data to be traced back. sel_step : list List of selected steps. max_topo_len : int Maximum length of the topologies. Returns ------- df_ord : pandas.DataFrame Dataframe containing the traceback data. """ # get last step of the workflow last_step = sel_step[-1] # data dict to dataframe df_data = pd.DataFrame.from_dict(data_dict, orient="index") df_data.reset_index(inplace=True) # assign columns data_cols = [el for el in reversed(sel_step)] data_cols.extend([f"00_topo{i+1}" for i in range(max_topo_len)]) df_data.columns = data_cols # same for the rank_dict df_ranks = pd.DataFrame.from_dict(rank_dict, orient="index") df_ranks.reset_index(inplace=True) ranks_col = [last_step] # the key to merge the dataframes ranks_col.extend([f"{el}_rank" for el in reversed(sel_step)]) df_ranks.columns = ranks_col # merging the data and ranks dataframes df_merged = pd.merge(df_data, df_ranks, on=last_step) ordered_cols = sorted(df_merged.columns) df_ord = df_merged[ordered_cols] # last thing: substituting unk records with - in the last step unk_records = df_ord[f"{last_step}"].str.startswith("unk") df_ord.loc[unk_records, last_step] = "-" return df_ord
[docs]def order_traceback_df(df_output, sel_step): """ Order the traceback dataframe. Each step is ordered by rank. Parameters ---------- df_output : pandas.DataFrame Dataframe containing the traceback data. sel_step : list List of selected steps. Returns ------- df_output : pandas.DataFrame Dataframe containing the ordered traceback data. """ # loop over sel_step in reverse order sorted_list = [] indexes = [] for n in range(len(sel_step) - 1, -1, -1): rank_col = sel_step[n] + "_rank" # take only models with a rank df_last = df_output[df_output[rank_col] != "-"] # remove from df_last the indexes that are already in the dataframe df_last = df_last[~df_last.index.isin(indexes)] # sorting the dataframe by rank sorted_df_last = df_last.sort_values(by=rank_col) sorted_list.append(sorted_df_last) # concat the current indexes with the previous ones indexes = sorted_df_last.index.tolist() + indexes df_output = pd.concat(sorted_list) return df_output
[docs]def subset_traceback(traceback_df: pd.DataFrame, cons_filename: Path) -> pd.DataFrame: """ Generate a subset the traceback dataframe with the top 40 models. Parameters ---------- traceback_df : pandas.DataFrame Dataframe containing the traceback data. cons_filename : pathlib.Path name of the consensus file. Returns ------- rank_data_subset : pandas.DataFrame Dataframe containing the subset of the traceback data. """ red_traceback_df = traceback_df.head(40) rank_data = red_traceback_df.filter(regex='rank$') # get the last column: this will define the name of the models last_column = red_traceback_df.columns[-2] last_column_data = red_traceback_df[last_column] # concat the ranks with the model name rank_data = pd.concat([last_column_data, rank_data], axis=1) # the rank of the last column must be defined last_column_rank = last_column + "_rank" # copy avoids SettingWithCopyWarning rank_data_subset = rank_data[rank_data[last_column_rank] != '-'].copy() rank_columns = rank_data_subset.columns[1:].tolist() rank_data_subset[rank_columns] = rank_data_subset[rank_columns].apply(pd.to_numeric, errors='coerce') rank_data_subset = rank_data_subset.sort_values(by=last_column_rank, ascending=True) # rename the columns rank_data_subset.columns = ["Model"] + rank_columns # sum the ranks rank_data_subset["Sum-of-Ranks"] = rank_data_subset[rank_columns].sum(axis=1) rank_data_subset.to_csv(cons_filename, index=False, sep="\t") return rank_data_subset
# Command line interface parser ap = argparse.ArgumentParser( prog="haddock3-traceback", description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, ) libcli.add_rundir_arg(ap)
[docs]def load_args(ap): """Load argument parser args.""" return ap.parse_args()
[docs]def cli(ap, main): """Command-line interface entry point.""" cmd = vars(load_args(ap)) main(**cmd)
[docs]def maincli(): """Execute main client.""" cli(ap, main)
[docs]def main(run_dir): """ Traceback CLI. Parameters ---------- run_dir : str or Path Path to the original run directory. """ log.level = 20 log.info(f"Running haddock3-traceback on {run_dir}") # Reading steps log.info("Reading input run directory") # get the module folders from the run_dir input all_steps = get_module_steps_folders(Path(run_dir)) log.info(f"All_steps: {', '.join(all_steps)}") ana_modules = get_steps_without_pdbs(run_dir, all_steps) log.info(f"Modules not to be analysed: {', '.join(ana_modules)}") sel_step = [st for st in all_steps if st not in ana_modules] # check if there are steps to traceback if len(sel_step) == 0: log.info("No steps to trace back. Exiting.") return else: log.info(f"Steps to trace back: {', '.join(sel_step)}") # creating traceback folder outdir = Path(run_dir, TRACK_FOLDER) try: outdir.mkdir(exist_ok=False) log.info(f"Created directory: {str(outdir.resolve())}") except FileExistsError: log.warning(f"Directory {str(outdir.resolve())} already exists.") data_dict: dict[Any, Any] = {} rank_dict: dict[Any, Any] = {} unk_idx, max_topo_len = 0, 0 # this cycle goes through the steps in reverse order for n in range(len(sel_step) - 1, -1, -1): log.info(f"Tracing back step {sel_step[n]}") # correcting names in the dictionary. The ori_name must be complemented # with the step folder name for key in data_dict.keys(): if data_dict[key][-1] != "-": data_dict[key][-1] = f"../{sel_step[n]}/{data_dict[key][-1]}" delta = len(sel_step) - n - 1 # how many steps have we gone back? # loading the .json file json_path = Path(run_dir, sel_step[n], "io.json") io = ModuleIO() io.load(json_path) # list all the values in the data_dict ls_values = [x for val in data_dict.values() for x in val] # getting and sorting the ranks for the current step folder ranks = [pdbfile.score for pdbfile in io.output] ranks_argsort = np.argsort(ranks) # iterating through the pdbfiles to fill data_dict and rank_dict for i, pdbfile in enumerate(io.output): rank = np.where(ranks_argsort == i)[0][0] + 1 # getting the original names ori_names, max_topo_len = get_ori_names(n, pdbfile, max_topo_len) if n != len(sel_step) - 1: if str(pdbfile.rel_path) not in ls_values: # this is the first step in which the pdbfile appears. # This means that it was discarded for the subsequent steps # We need to add the pdbfile to the data_dict keys = [f"unk{unk_idx}"] data_dict[keys[0]] = ["-" for el in range(delta - 1)] data_dict[keys[0]].append(str(pdbfile.rel_path)) rank_dict[keys[0]] = ["-" for el in range(delta)] unk_idx += 1 else: # we've already seen this pdb before. idxs = [i for i, el in enumerate(ls_values) if el==str(pdbfile.rel_path)] keys = [list(data_dict.keys())[idx // delta] for idx in idxs] # assignment for el in ori_names: for key in keys: data_dict[key].append(el) for key in keys: rank_dict[key].append(rank) else: # last step of the workflow data_dict[str(pdbfile.rel_path)] = [on for on in ori_names] rank_dict[str(pdbfile.rel_path)] = [rank] # print(f"rank_dict {rank_dict}") # print(f"data_dict {data_dict}, maxtopo {max_topo_len}") # stripping away relative paths final_data_dict = {} for key in data_dict.keys(): new_key = key.split("/")[-1] final_data_dict[new_key] = [el.split("/")[-1] for el in data_dict[key]] final_rank_dict = {} for key in rank_dict.keys(): new_key = key.split("/")[-1] final_rank_dict[new_key] = rank_dict[key] # dumping the data into a dataframe df_output = traceback_dataframe( final_data_dict, final_rank_dict, sel_step, max_topo_len ) # ordering the dataframe df_output = order_traceback_df(df_output, sel_step) # dumping the dataframe track_filename = Path(run_dir, TRACK_FOLDER, "traceback.tsv") log.info( f"Output dataframe {track_filename} " f"created with shape {df_output.shape}" ) df_output.to_csv(track_filename, sep="\t", index=False) # taking (and writing) a subset of the dataframe consensus_filename = Path(run_dir, TRACK_FOLDER, "consensus.tsv") rank_data_subset = subset_traceback(df_output, consensus_filename) # plotting the traceback dataframe plot_filename = Path(run_dir, TRACK_FOLDER, "traceback.html") make_traceback_plot(rank_data_subset, plot_filename) return
if __name__ == "__main__": sys.exit(maincli())