Source code for haddock.modules.topology.topocg

"""Create and manage CNS coarse-grained topology.

The ``[topocg]`` module is dedicated to the generation of CNS compatible
parameters and topologies (``.psf``) for each of the input structures.

It will:
- Convert an all atom model to a Martini coarse-grained model
- Detect missing atoms 
- Re-build them when missing
- Build and write out topologies (``.psf``) and coordinates (``.pdb``) files
- Write out a restrain file to convert back the CG model to all atoms

Only standard amino acids and nucleic acids are supported.

For more details about this module, please `refer to the haddock3 user manual
<https://www.bonvinlab.org/haddock3-user-manual/modules/topology.html#topocg-module>`_
"""

import operator
import os
import re
from functools import partial
from pathlib import Path

from haddock.core.defaults import MODULE_DEFAULT_YAML, cns_exec
from haddock.core.typing import FilePath, Optional, ParamDict, ParamMap, Union
from haddock.libs import libpdb
from haddock.libs.libcns import (
    generate_default_header,
    load_workflow_params,
    prepare_output,
    prepare_single_input,
    )
from haddock.libs.libaa2cg import (
    martinize,
    gen_cg_filename,
    gen_cg_tbl_backmapping_fname,
    )
from haddock.libs.libontology import Format, PDBFile, TopologyFile
from haddock.libs.libstructure import make_molecules
from haddock.libs.libsubprocess import CNSJob
from haddock.modules import get_engine
from haddock.modules.base_cns_module import BaseCNSModule


RECIPE_PATH = Path(__file__).resolve().parent
DEFAULT_CONFIG = Path(RECIPE_PATH, MODULE_DEFAULT_YAML)


[docs] def generate_topology( input_pdb: Path, output_path: str, recipe_str: str, defaults: ParamMap, mol_params: ParamMap, default_params_path: Optional[FilePath] = None, write_to_disk: Optional[bool] = True, force_field: str = "martini2", shape: bool = False, ) -> Union[Path, str]: """Generate a HADDOCK topology file from input_pdb.""" # generate params headers general_param = load_workflow_params(**defaults) input_mols_params = load_workflow_params(param_header="", **mol_params) general_param = general_param + input_mols_params # generate default headers link, trans_vec, tensor, scatter, axis, water_box = generate_default_header( path=default_params_path ) if not shape: # AA to CG cg_pdb_name = martinize(input_pdb, output_path, False) output = prepare_output( output_pdb_filename=f"{Path(cg_pdb_name).stem}_{force_field}{input_pdb.suffix}", output_psf_filename=f"{Path(cg_pdb_name).stem}_{force_field}.{Format.TOPOLOGY}", ) input_str = prepare_single_input(str(cg_pdb_name)) else: output = prepare_output( output_pdb_filename=f"{input_pdb.stem}{input_pdb.suffix}", output_psf_filename=f"{input_pdb.stem}.{Format.TOPOLOGY}", ) input_str = prepare_single_input(str(input_pdb)) inp_parts = ( general_param, input_str, output, link, trans_vec, tensor, scatter, axis, water_box, recipe_str, ) inp = "".join(inp_parts) if write_to_disk: output_inp_filename = Path(f"{input_pdb.stem}.{Format.CNS_INPUT}") output_inp_filename.write_text(inp) return output_inp_filename else: return inp
[docs] class HaddockModule(BaseCNSModule): """HADDOCK3 module to create CNS all-atom topologies.""" name = RECIPE_PATH.name def __init__( self, order: int, path: Path, initial_params: FilePath = DEFAULT_CONFIG ) -> None: cns_script = RECIPE_PATH / "cns" / "generate-topology.cns" super().__init__(order, path, initial_params, cns_script=cns_script)
[docs] @classmethod def confirm_installation(cls) -> None: """Confirm if module is installed.""" return
[docs] @staticmethod def get_md5(ensemble_f: FilePath) -> dict[int, str]: """Get MD5 hash of a multi-model PDB file.""" md5_dic: dict[int, str] = {} text = Path(ensemble_f).read_text() lines = text.split(os.linesep) REMARK_lines = (line for line in lines if line.startswith("REMARK")) remd5 = re.compile(r"^[a-f0-9]{32}$") for line in REMARK_lines: parts = line.strip().split() try: idx = parts.index("MODEL") except ValueError: # MODEL not in parts, this line can be ignored continue # check if there's a md5 hash in line for part in parts: group = remd5.fullmatch(part) if group: # the model num comes after the MODEL model_num = int(parts[idx + 1]) md5_dic[model_num] = group.string # md5 hash break return md5_dic
[docs] @staticmethod def get_ensemble_origin(ensemble_f: FilePath) -> dict[int, str]: """Try to find origin for each model in ensemble. Parameters ---------- ensemble_f : FilePath Path to a pdb file containing an ensemble. Returns ------- origin_dic : dict[int, str] Dictionary holding as keys the modelID and values its origin. """ origin_dic: dict[int, str] = {} text = Path(ensemble_f).read_text() lines = text.split(os.linesep) REMARK_lines = (line for line in lines if line.startswith("REMARK")) re_origin = re.compile( "REMARK\s+MODEL\s+(\d+)\s+(FROM|from|From)\s+(([\w_-]+\.?)+)" ) # noqa : E501 for line in REMARK_lines: if match := re_origin.search(line): model_num = int(match.group(1).strip()) original_path = match.group(3).strip() original_name = Path(original_path).stem origin_dic[model_num] = original_name return origin_dic
def _run(self) -> None: """Execute module.""" try: _molecules = self.previous_io.output molecules = [] for mol in _molecules: molecules.append(make_molecules([mol[key].rel_path for key in mol])) except Exception as e: self.finish_with_error(e) # extracts `input` key from params. The `input` keyword needs to # be treated separately mol_params: ParamDict = {} for k in list(self.params.keys()): if k.startswith("mol") and k[3:].isdigit(): mol_params[k] = self.params.pop(k) # to facilitate the for loop down the line, we create a list with the # keys of `mol_params` with inverted order (we will use .pop) mol_params_keys = list(mol_params.keys())[::-1] # limit is only useful when order == 0 if self.order == 0 and self.params["limit"]: mol_params_get = mol_params_keys.pop # `else` is used in any case where limit is False. else: mol_params_get = partial(operator.getitem, mol_params_keys, -1) # Pool of jobs to be executed by the CNS engine jobs: list[CNSJob] = [] models_dic: dict[int, list[Path]] = {} ens_dic: dict[int, dict[int, str]] = {} origi_ens_dic: dict[int, dict[int, str]] = {} # get the all-atom psf files in a list psf_files: dict[int, dict[int, str]] = {} shape_dic: dict[int, dict[bool]] = {} force_field = self.params["cgffversion"] for i, molecule in enumerate(molecules, start=1): #self.log(f"Molecule {i}: {molecule.with_parent}") shape_dic[i] = False # Copy the molecule to the step folder models_dic[i] = [] # Split models # these come already sorted splited_models = [ libpdb.split_ensemble(Path(mol.file_name), dest=Path.cwd(),)[0] for mol in molecule ] # check for shape if libpdb.check_mol_shape(splited_models[0]): shape_dic[i] = True # Get psf files for aa topology psf_files[i] = [ Path(mol.parent, f"{mol.stem}.{Format.TOPOLOGY}") for mol in splited_models ] # get the MD5 hash of each model ens_dic[i] = [ self.get_md5(mol.file_name) for mol in molecule ] origi_ens_dic[i] = [ self.get_ensemble_origin(mol.file_name) for mol in molecule ] # nice variable name, isn't it? :-) # molecule parameters are shared among models of the same molecule parameters_for_this_molecule = mol_params[mol_params_get()] for task_id, model in enumerate(splited_models): self.log(f"Sanitizing molecule {model.name}") models_dic[i].append(model) if self.params["ligand_top_fname"]: custom_top = self.params["ligand_top_fname"] self.log(f"Using custom topology {custom_top}") libpdb.sanitize( model, overwrite=True, custom_topology=custom_top, ) else: libpdb.sanitize(model, overwrite=True) # Prepare generation of topologies jobs topocg_input = generate_topology( model, self.path.resolve().parent, self.recipe_str, self.params, parameters_for_this_molecule, default_params_path=self.toppar_path, write_to_disk=self.params["debug"], force_field=force_field, shape=shape_dic[i] ) self.log("Topology CNS input created") # Add new job to the pool output_filename = Path(f"{model.stem}.{Format.CNS_OUTPUT}") err_fname = f"{model.stem}.cnserr" job = CNSJob( topocg_input, output_filename, err_fname, envvars=self.envvars, cns_exec=cns_exec, ) jobs.append(job) # Run CNS Jobs self.log(f"Running CNS Jobs n={len(jobs)}") Engine = get_engine(self.params["mode"], self.params) engine = Engine(jobs) engine.run() self.log("CNS jobs have finished") # Check for generated output, fail it not all expected files # are found expected: dict[int, dict[int, PDBFile]] = {} for i in models_dic: expected[i] = {} md5_dic = ens_dic[i] for j, model in enumerate(models_dic[i]): if len(md5_dic[j]) == 0: md5_hash = None else: md5_hash = md5_dic[j] origin_name_model = model.stem # FIXME: _model_id not used ? try: _model_id = int(model.stem.split("_")[-2]) except ValueError: _model_id = 0 shape_mod = shape_dic[i] if not shape_mod: processed_pdb = gen_cg_filename( self.path.resolve().parent, origin_name_model, force_field=force_field, ) processed_topology = gen_cg_filename( self.path.resolve().parent, origin_name_model, force_field=force_field, ext=Format.TOPOLOGY, ) processed_cgtoaa_tbl = gen_cg_tbl_backmapping_fname( self.path.resolve().parent, origin_name_model, ).resolve() else: processed_pdb = Path(f"{origin_name_model}.{Format.PDB}") processed_topology = Path(f"{origin_name_model}.{Format.TOPOLOGY}") processed_cgtoaa_tbl = None topology = TopologyFile(processed_topology, path=".") psf_file_uniq = psf_files[i][j].as_posix().split("/") aa_topo_path = f"{psf_file_uniq[0]}/{psf_file_uniq[1]}" aa_topology = TopologyFile(psf_file_uniq[2], path=aa_topo_path) pdb = PDBFile( file_name=processed_pdb, topology=topology, aa_topology=aa_topology, cgtoaa_tbl=processed_cgtoaa_tbl, shape=shape_mod, path=".", md5=md5_hash, ) pdb.ori_name = model.stem expected[i][j] = pdb # Remove temporary `*_cg.pdb` files if not in debug mode if not self.params["debug"]: for tmp_cg in Path(self.path.resolve().parent).glob(f"{gen_cg_filename('', '*')}"): os.remove(tmp_cg) # Save module information self.output_models = list(expected.values()) # type: ignore self.export_io_models(faulty_tolerance=self.params["tolerance"])