"""Describe the Haddock3 ontology used for communicating between modules."""
import datetime
import itertools
from enum import Enum
from os import linesep
from pathlib import Path
from typing import List, Any
import jsonpickle
from haddock.core.defaults import MODULE_IO_FILE
from haddock.core.typing import FilePath, Optional, TypeVar, Union
NaN = float("nan")
jsonpickle.set_preferred_backend("json")
jsonpickle.set_encoder_options("json", sort_keys=True, indent=4)
[docs]
class Persistent:
"""Any persistent file generated by this framework."""
def __init__(
self,
file_name: FilePath,
file_type: Format,
path: FilePath = ".",
md5: Optional[str] = None,
restr_fname: Optional[FilePath] = None,
) -> None:
self.created = datetime.datetime.now().isoformat(" ", "seconds")
self.file_name = Path(file_name).name
self.file_type = file_type
self.path = str(Path(path).resolve())
self.full_name = str(Path(path, self.file_name))
self.rel_path = Path("..", Path(self.path).name, file_name)
self.md5 = md5
self.restr_fname = restr_fname
def __repr__(self) -> str:
rep = (
f"[{self.file_type}|{self.created}]"
f"{Path(self.path) / self.file_name}"
)
return rep
[docs]
def is_present(self) -> bool:
"""Check if the persisent file exists on disk."""
return self.rel_path.resolve().exists()
[docs]
class TopologyFile(Persistent):
"""Represent a CNS-generated topology file."""
def __init__(self, file_name: FilePath, path: FilePath = ".") -> None:
super().__init__(file_name, Format.TOPOLOGY, path)
[docs]
class PDBFile(Persistent):
"""Represent a PDB file."""
def __init__(
self,
file_name: Union[Path, str],
topology: Optional[Union[List[TopologyFile], TopologyFile]] = None,
path: Union[Path, str] = ".",
score: float = NaN,
md5: Optional[str] = None,
restr_fname: Optional[Union[Path, str]] = None,
unw_energies: Optional[dict[str, float]] = None,
ligand_top_fname: Optional[Union[Path, str]] = None,
ligand_param_fname: Optional[Union[Path, str]] = None,
aa_topology: Optional[Union[List[TopologyFile], TopologyFile]] = None,
cgtoaa_tbl: Optional[Union[List[str], str]] = None,
shape: Optional[Union[List[bool], bool]] = None,
) -> None:
super().__init__(
file_name,
Format.PDB,
path=path,
md5=md5,
restr_fname=restr_fname,
)
self.topology = topology
self.score: Optional[float] = score
self.ori_name: Optional[str] = None
self.clt_id: Union[str, int, None] = None
self.clt_rank: Optional[int] = None
self.clt_model_rank: Optional[int] = None
self.unw_energies: Optional[dict[str, float]] = unw_energies
self.seed: Optional[int] = None
self.ligand_top_fname = ligand_top_fname
self.ligand_param_fname = ligand_param_fname
self.aa_topology: Optional[Union[List[TopologyFile], TopologyFile]] = aa_topology
self.cgtoaa_tbl: Optional[Union[List[str], str]] = cgtoaa_tbl
self.shape: Optional[Union[List[bool], bool]] = shape
self.len = score
def __lt__(self, other: "PDBFile") -> bool:
return self.score < other.score
def __gt__(self, other: "PDBFile") -> bool:
return self.score > other.score
def __eq__(self, other: "PDBFile") -> bool:
return self.score == other.score
def __hash__(self) -> int:
return id(self)
[docs]
class RMSDFile(Persistent):
"""Represents a RMSD matrix file."""
def __init__(self, file_name: FilePath, npairs: int, path: FilePath = ".") -> None:
super().__init__(file_name, Format.MATRIX, path)
self.npairs = npairs
def __hash__(self) -> int:
return id(self)
[docs]
class ModuleIO:
"""Intercommunicating modules and exchange input/output information."""
def __init__(self) -> None:
self.input: List[Any] = []
self.output: List[Any] = []
[docs]
def add(self, persistent, mode: str = "i") -> None:
"""Add a given filename as input or output."""
if mode == "i":
self._add(persistent, self.input)
else:
self._add(persistent, self.output)
def _add(self, persistent, holder: list) -> None:
"""Increment list holder with additional entries."""
if isinstance(persistent, list):
holder.extend(persistent)
else:
holder.append(persistent)
[docs]
def save(self, path: FilePath = ".", filename: FilePath = MODULE_IO_FILE) -> Path:
"""Save Input/Output needed files by this module to disk."""
fpath = Path(path, filename)
with open(fpath, "w") as output_handler:
to_save = {"input": self.input, "output": self.output}
output_handler.write(jsonpickle.encode(to_save))
return fpath
[docs]
def load(self, filename: FilePath) -> None:
"""Load the content of a given IO filename."""
with open(filename, "r") as json_file:
content = jsonpickle.decode(json_file.read())
self.input = content["input"] # type: ignore
self.output = content["output"] # type: ignore
[docs]
def retrieve_models(
self, crossdock: bool = False, individualize: bool = False,
) -> list[Union[PDBFile, list[PDBFile]]]:
"""Retrieve the PDBobjects to be used in the module."""
# Get the models generated in previous step
model_list: list[PDBFile] = []
input_dic: dict[int, list[PDBFile]] = {}
for i, element in enumerate(self.output):
if isinstance(element, dict):
position_list: list[PDBFile] = input_dic.setdefault(i, [])
for key in element:
position_list.append(element[key]) # type: ignore
elif element.file_type == Format.PDB: # type: ignore
model_list.append(element) # type: ignore
if input_dic:
if not crossdock and not individualize:
# check if all ensembles contain the same number of models
sub_lists = iter(input_dic.values())
_len = len(next(sub_lists))
if not all(len(sub) == _len for sub in sub_lists):
_msg = (
"Different number of models in molecules,"
" cannot prepare pairwise complexes."
)
raise Exception(_msg)
# prepare pairwise combinations
model_list = [values for values in zip(*input_dic.values())]
elif crossdock and not individualize:
model_list = [
values for values in itertools.product(*input_dic.values())
]
elif individualize:
model_list = list(itertools.chain(*input_dic.values()))
return model_list
[docs]
def check_faulty(self) -> float:
"""Check how many of the output exists."""
total = 0.0
present = 0.0
for element in self.output:
if isinstance(element, dict):
total += len(element)
present += sum(j.is_present() for j in element.values())
else:
total += 1
if element.is_present():
present += 1
if total == 0:
_msg = "No expected output was passed to ModuleIO"
raise Exception(_msg)
faulty_per = (1 - (present / total)) * 100
# added this method here to avoid modifying all calls in the
# modules' run method. We can think about restructure this part
# in the future.
self.remove_missing()
return faulty_per
[docs]
def remove_missing(self) -> None:
"""Remove missing structure from `output`."""
# can't modify a list/dictionary within a loop
idxs: list[int] = []
for idx, element in enumerate(self.output):
if isinstance(element, dict):
to_pop = []
for key2 in element:
if not element[key2].is_present():
to_pop.append(key2)
for pop_me in to_pop:
element.pop(pop_me)
else:
if not element.is_present():
idxs.append(idx)
self.output = [value for i, value in enumerate(self.output) if i not in idxs]
def __repr__(self) -> str:
return f"Input: {self.input}{linesep}Output: {self.output}"
PDBPath = Union[PDBFile, Path]
PDBPathT = TypeVar("PDBPathT", bound=Union[PDBFile, Path])
"""
Generic type variable for PDBFile or Path.
If the first annotated variable is PDBFile,
the second annotated variable will be PDBFile instead of Path,vice versa.
"""