"""
Traces back PDB files from a HADDOCK run directory.
Given an input run directory, haddock3-traceback traces back each model to the
initial input molecules used, providing the rank of each intermediate model.
USAGE::
haddock3-traceback -r <run_dir>
"""
import argparse
import sys
from pathlib import Path
import numpy as np
import pandas as pd
from haddock import log
from haddock.core.defaults import TRACEBACK_FOLDER
from haddock.core.typing import Any, FilePath
from haddock.libs import libcli
from haddock.libs.libontology import ModuleIO, PDBFile
from haddock.libs.libplots import make_traceback_plot
from haddock.modules import get_module_steps_folders
[docs]
def get_steps_without_pdbs(run_dir, all_steps):
"""
Get the modules that do not produce PDB files.
Parameters
----------
run_dir : str or pathlib.Path
Path to the run directory.
all_steps : list
List of all the steps in the run directory.
Returns
-------
steps_without_pdbs : list
List of steps that did not produce PDB files.
"""
steps_without_pdbs = []
for step in all_steps:
if step.endswith("topoaa"):
steps_without_pdbs.append(step)
else:
step_dir = Path(run_dir, step)
if step_dir.is_dir():
pdbs = list(step_dir.glob("*.pdb*"))
if len(pdbs) == 0:
steps_without_pdbs.append(step)
return steps_without_pdbs
[docs]
def get_ori_names(n: int, pdbfile: PDBFile, max_topo_len: int) -> tuple[list, int]:
"""
Get the original name(s) of the PDB file.
Parameters
----------
n : int
Step number.
pdbfile : PDBFile
PDBFile object.
max_topo_len : int
Maximum length of the topologies found so far.
Returns
-------
ori_names : list
List of original names.
max_topo_len : int
Maximum length of the topologies found so far.
"""
if n != 0: # not the first step, ori_name should be defined
ori_names = [pdbfile.ori_name]
else: # first step, we get topology files instead of ori_name
# topology can either be a list of topologies or a single
# topology
if isinstance(pdbfile.topology, list):
ori_names = [el.file_name for el in pdbfile.topology]
if len(pdbfile.topology) > max_topo_len:
max_topo_len = len(pdbfile.topology)
else:
ori_names = [pdbfile.topology.file_name] # type: ignore
max_topo_len = 1
return ori_names, max_topo_len
[docs]
def traceback_dataframe(
data_dict: dict, rank_dict: dict, sel_step: list, max_topo_len: int
) -> pd.DataFrame:
"""
Create traceback dataframe by combining together ranks and data.
Parameters
----------
data_dict : dict
Dictionary containing the data to be traced back.
rank_dict : dict
Dictionary containing the ranks of the data to be traced back.
sel_step : list
List of selected steps.
max_topo_len : int
Maximum length of the topologies.
Returns
-------
df_ord : pandas.DataFrame
Dataframe containing the traceback data.
"""
# get last step of the workflow
last_step = sel_step[-1]
# data dict to dataframe
df_data = pd.DataFrame.from_dict(data_dict, orient="index")
df_data.reset_index(inplace=True)
# assign columns
data_cols = [el for el in reversed(sel_step)]
data_cols.extend([f"00_topo{i+1}" for i in range(max_topo_len)])
df_data.columns = data_cols
# same for the rank_dict
df_ranks = pd.DataFrame.from_dict(rank_dict, orient="index")
df_ranks.reset_index(inplace=True)
ranks_col = [last_step] # the key to merge the dataframes
ranks_col.extend([f"{el}_rank" for el in reversed(sel_step)])
df_ranks.columns = ranks_col
# merging the data and ranks dataframes
df_merged = pd.merge(df_data, df_ranks, on=last_step)
ordered_cols = sorted(df_merged.columns)
df_ord = df_merged[ordered_cols]
# last thing: substituting unk records with - in the last step
unk_records = df_ord[f"{last_step}"].str.startswith("unk")
df_ord.loc[unk_records, last_step] = "-"
return df_ord
[docs]
def order_traceback_df(df_output, sel_step):
"""
Order the traceback dataframe. Each step is ordered by rank.
Parameters
----------
df_output : pandas.DataFrame
Dataframe containing the traceback data.
sel_step : list
List of selected steps.
Returns
-------
df_output : pandas.DataFrame
Dataframe containing the ordered traceback data.
"""
# loop over sel_step in reverse order
sorted_list = []
indexes = []
for n in range(len(sel_step) - 1, -1, -1):
rank_col = sel_step[n] + "_rank"
# take only models with a rank
df_last = df_output[df_output[rank_col] != "-"]
# remove from df_last the indexes that are already in the dataframe
df_last = df_last[~df_last.index.isin(indexes)]
# sorting the dataframe by rank
sorted_df_last = df_last.sort_values(by=rank_col)
sorted_list.append(sorted_df_last)
# concat the current indexes with the previous ones
indexes = sorted_df_last.index.tolist() + indexes
df_output = pd.concat(sorted_list)
return df_output
[docs]
def subset_traceback(traceback_df: pd.DataFrame, cons_filename: Path) -> pd.DataFrame:
"""
Generate a subset the traceback dataframe with the top 40 models.
Parameters
----------
traceback_df : pandas.DataFrame
Dataframe containing the traceback data.
cons_filename : pathlib.Path
name of the consensus file.
Returns
-------
rank_data_subset : pandas.DataFrame
Dataframe containing the subset of the traceback data.
"""
red_traceback_df = traceback_df.head(40)
rank_data = red_traceback_df.filter(regex='rank$')
# get the last column: this will define the name of the models
last_column = red_traceback_df.columns[-2]
last_column_data = red_traceback_df[last_column]
# concat the ranks with the model name
rank_data = pd.concat([last_column_data, rank_data], axis=1)
# the rank of the last column must be defined
last_column_rank = last_column + "_rank"
# copy avoids SettingWithCopyWarning
rank_data_subset = rank_data[rank_data[last_column_rank] != '-'].copy()
rank_columns = rank_data_subset.columns[1:].tolist()
rank_data_subset[rank_columns] = rank_data_subset[rank_columns].apply(pd.to_numeric, errors='coerce')
rank_data_subset = rank_data_subset.sort_values(by=last_column_rank, ascending=True)
# rename the columns
rank_data_subset.columns = ["Model"] + rank_columns
# sum the ranks
rank_data_subset["Sum-of-Ranks"] = rank_data_subset[rank_columns].sum(axis=1)
rank_data_subset.to_csv(cons_filename, index=False, sep="\t")
return rank_data_subset
# Command line interface parser
ap = argparse.ArgumentParser(
prog="haddock3-traceback",
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
libcli.add_rundir_arg(ap)
[docs]
def load_args(ap):
"""Load argument parser args."""
return ap.parse_args()
[docs]
def cli(ap, main):
"""Command-line interface entry point."""
cmd = vars(load_args(ap))
main(**cmd)
[docs]
def maincli():
"""Execute main client."""
cli(ap, main)
[docs]
def main(run_dir: FilePath, offline: bool = False) -> None:
"""
Traceback CLI.
Parameters
----------
run_dir : str or Path
Path to the original run directory.
"""
log.level = 20
log.info(f"Running haddock3-traceback on {run_dir}")
# Reading steps
log.info("Reading input run directory")
# get the module folders from the run_dir input
all_steps = get_module_steps_folders(Path(run_dir))
log.info(f"All_steps: {', '.join(all_steps)}")
ana_modules = get_steps_without_pdbs(run_dir, all_steps)
log.info(f"Modules not to be analysed: {', '.join(ana_modules)}")
sel_step = [st for st in all_steps if st not in ana_modules]
# check if there are steps to traceback
if len(sel_step) == 0:
log.info("No steps to trace back. Exiting.")
return
else:
log.info(f"Steps to trace back: {', '.join(sel_step)}")
# creating traceback folder
outdir = Path(run_dir, TRACEBACK_FOLDER)
try:
outdir.mkdir(exist_ok=False)
log.info(f"Created directory: {str(outdir.resolve())}")
except FileExistsError:
log.warning(f"Directory {str(outdir.resolve())} already exists.")
data_dict: dict[Any, Any] = {}
rank_dict: dict[Any, Any] = {}
unk_idx, max_topo_len = 0, 0
# this cycle goes through the steps in reverse order
for n in range(len(sel_step) - 1, -1, -1):
log.info(f"Tracing back step {sel_step[n]}")
# correcting names in the dictionary. The ori_name must be complemented
# with the step folder name
for key in data_dict.keys():
if data_dict[key][-1] != "-":
data_dict[key][-1] = f"../{sel_step[n]}/{data_dict[key][-1]}"
delta = len(sel_step) - n - 1 # how many steps have we gone back?
# loading the .json file
json_path = Path(run_dir, sel_step[n], "io.json")
io = ModuleIO()
io.load(json_path)
# list all the values in the data_dict
ls_values = [x for val in data_dict.values() for x in val]
# getting and sorting the ranks for the current step folder
ranks = [pdbfile.score for pdbfile in io.output]
ranks_argsort = np.argsort(ranks)
# iterating through the pdbfiles to fill data_dict and rank_dict
for i, pdbfile in enumerate(io.output):
rank = np.where(ranks_argsort == i)[0][0] + 1
# getting the original names
ori_names, max_topo_len = get_ori_names(n, pdbfile, max_topo_len)
if n != len(sel_step) - 1:
if str(pdbfile.rel_path) not in ls_values:
# this is the first step in which the pdbfile appears.
# This means that it was discarded for the subsequent steps
# We need to add the pdbfile to the data_dict
keys = [f"unk{unk_idx}"]
data_dict[keys[0]] = ["-" for el in range(delta - 1)]
data_dict[keys[0]].append(str(pdbfile.rel_path))
rank_dict[keys[0]] = ["-" for el in range(delta)]
unk_idx += 1
else:
# we've already seen this pdb before.
idxs = [i for i, el in enumerate(ls_values) if el==str(pdbfile.rel_path)]
keys = [list(data_dict.keys())[idx // delta] for idx in idxs]
# assignment
for el in ori_names:
for key in keys:
data_dict[key].append(el)
for key in keys:
rank_dict[key].append(rank)
else: # last step of the workflow
data_dict[str(pdbfile.rel_path)] = [on for on in ori_names]
rank_dict[str(pdbfile.rel_path)] = [rank]
# print(f"rank_dict {rank_dict}")
# print(f"data_dict {data_dict}, maxtopo {max_topo_len}")
# stripping away relative paths
final_data_dict = {}
for key in data_dict.keys():
new_key = key.split("/")[-1]
final_data_dict[new_key] = [el.split("/")[-1] for el in data_dict[key]]
final_rank_dict = {}
for key in rank_dict.keys():
new_key = key.split("/")[-1]
final_rank_dict[new_key] = rank_dict[key]
# dumping the data into a dataframe
df_output = traceback_dataframe(
final_data_dict, final_rank_dict, sel_step, max_topo_len
)
# ordering the dataframe
df_output = order_traceback_df(df_output, sel_step)
# dumping the dataframe
track_filename = Path(run_dir, TRACEBACK_FOLDER, "traceback.tsv")
log.info(
f"Output dataframe {track_filename} " f"created with shape {df_output.shape}"
)
df_output.to_csv(track_filename, sep="\t", index=False)
# taking (and writing) a subset of the dataframe
consensus_filename = Path(run_dir, TRACEBACK_FOLDER, "consensus.tsv")
rank_data_subset = subset_traceback(df_output, consensus_filename)
# plotting the traceback dataframe
plot_filename = Path(run_dir, TRACEBACK_FOLDER, "traceback.html")
make_traceback_plot(rank_data_subset, plot_filename, offline=offline)
return
if __name__ == "__main__":
sys.exit(maincli())