Source code for haddock.libs.libutil

"""General utilities."""
import collections.abc
import contextlib
import re
import shutil
import subprocess
import sys
from copy import deepcopy
from functools import partial
from os import cpu_count
from pathlib import Path

from haddock import EmptyPath, log
from haddock.core.exceptions import SetupError
from haddock.core.typing import (
    PT,
    Any,
    AnyT,
    Callable,
    Container,
    FilePath,
    FilePathT,
    Generator,
    Iterable,
    Optional,
    ParamDict,
    ParamMap,
    ParamMapT,
    Union,
)
from haddock.gear.greetings import get_goodbye_help


check_subprocess = partial(
    subprocess.run,
    shell=True,
    check=True,
    stdout=subprocess.DEVNULL,
)


[docs]def get_result_or_same_in_list( function: Callable[[PT], AnyT], value: PT ) -> Union[AnyT, list[PT]]: """ Return the result if True or the value within a list. Applies `function` to `value` and returns its result if it evaluates to True. Otherwise, return the value within a list. `function` should receive a single argument, the `value`. """ result = function(value) return result if result else [value]
[docs]def make_list_if_string(item: Union[str, list[str]]) -> list[str]: """Put `item` into a list.""" if isinstance(item, str): return [item] return item
[docs]def transform_to_list( item: Union[Iterable[AnyT], AnyT] ) -> Union[list[AnyT], tuple[AnyT, ...]]: """ Put `item` into a list if not a list already. If it is set, transforms the set into a list. If it is a dict, returns a list of the keys. If it is tuple, returns the tuple. If a list, returns the same. Everything else returns `item` inside a one element list. """ if isinstance(item, (set, dict)): return list(item) if isinstance(item, (list, tuple)): return item return [item]
[docs]def copy_files_to_dir(paths: Iterable[FilePath], directory: FilePath) -> None: """ Copy files to directory. Parameters ---------- paths : iterable of paths Source files. directory : path Where to copy files to. """ for path in paths: shutil.copy(path, directory)
[docs]def remove_folder(folder: FilePath) -> None: """ Remove a folder if it exists. Parameters ---------- folder : str or Path Path to folder to remove. """ if Path(folder).exists(): log.warning(f"{folder} exists and it will be REMOVED!") shutil.rmtree(folder)
[docs]def remove_dict_keys(d: ParamMap, keys: Container[str]) -> ParamDict: """ Remove `keys` from dictionary (`d`). Return ------ dict A copy of `d` dictionary without the `keys`. """ return {k: deepcopy(v) for k, v in d.items() if k not in keys}
[docs]def parse_ncores( n: Optional[Union[int, str]] = None, njobs: Optional[int] = None, max_cpus: Optional[bool] = None, ) -> int: """ Check the number of cores according to HADDOCK3 architecture. Parameters ---------- n : int or str The desired number of cores. If `None` is given, returns the maximum number of cores allowed, see `max_cpus`. njobs : int The number of jobs to execute. Optional. The number of cores will be compared to `njobs`. max_cpus : int The maximum number of CPUs allowed. If not specified, defaults to the available CPUs minus one. Raises ------ SetupError If `n` is not positive or not convertable to `int`. Returns ------- int A correct number of cores according to specifications. """ if max_cpus is None or max_cpus is False: max_cpus = max(cpu_count() - 1, 1) # type: ignore if max_cpus is True: max_cpus = cpu_count() # type: ignore elif not isinstance(max_cpus, int): raise TypeError(f"`max_cpus` not of valid type: {type(max_cpus)}") if n is None: return max_cpus try: n = int(n) except (TypeError, ValueError) as err: _msg = f"`n` must be `int` or `int`-convertable `str`: {n!r} given." raise SetupError(_msg) from err if n < 1: _msg = f"`n` is not positive, this is not possible: {n!r}" raise SetupError(_msg) if njobs is not None: ncores = min(n, njobs, max_cpus) log.info( f"Selected {ncores} cores to process {njobs} jobs, with {max_cpus} " "maximum available cores." ) return ncores log.debug(f"`njobs` not specified, evaluating initial value {n}...") ncores = min(n, max_cpus) log.debug(f"Selected {ncores} for a maximum of {max_cpus} CPUs") return ncores
[docs]def non_negative_int( n: Any, exception: type[Exception] = ValueError, emsg: str = "`n` do not satisfies", ) -> int: """ Transform `n` in int and returns if `compare` evaluates to True. Parameters ---------- n : int-convertable Something that can be converted to int. exception : Exception The Exception to raise in case `n` is not a positive integer. emsg : str The error message to give to `exception`. May accept formatting to pass `n`. Raises ------ ValueError, TypeError If `n` cannot be converted to `int` """ try: n1 = int(n) if n1 >= 0: return n1 except Exception as e: raise e else: # don't change to f-strings, .format has a purpose raise exception(emsg.format(n))
[docs]def recursive_dict_update(d: ParamMapT, u: ParamMap) -> ParamMapT: """ Update dictionary `d` according to `u` recursively. https://stackoverflow.com/questions/3232943 Returns ------- dict A new dict object with updated key: values. The original dictionaries are not modified. """ def _recurse(d_: ParamMapT, u_: ParamMap) -> ParamMapT: for k, v in u_.items(): if isinstance(v, collections.abc.Mapping): d_[k] = _recurse(d_.get(k, {}), v) else: d_[k] = deepcopy(v) # in case these are also lists return d_ new = deepcopy(d) _recurse(new, u) return new
[docs]def get_number_from_path_stem(path: FilePath) -> int: """ Extract tail number from path. Examples -------- >>> get_number_from_path_stem('src/file_1.pdb') >>> 1 >>> get_number_from_path_stem('src/file_3.pdb') >>> 3 >>> get_number_from_path_stem('file_1231.pdb') >>> 1231 >>> get_number_from_path_stem('src/file11') >>> 11 >>> get_number_from_path_stem('src/file_1234_1.pdb') >>> 1 Parameters ---------- path : str or Path obj The path to evaluate. Returns ------- int The tail integer of the path. """ stem = Path(path).stem number = re.findall(r"\d+", stem)[-1] return int(number)
[docs]def sort_numbered_paths(*paths: FilePathT) -> list[FilePathT]: """ Sort input paths to tail number. If possible, sort criteria is provided by :py:func:`get_number_from_path_stem`. If paths do not have a numbered tag, sort paths alphabetically. Parameters ---------- *inputs : str or pathlib.Path Paths to files. Returns ------- list The sorted pathlist. The original types are not modified. If strings are given, strings are returns, if Paths are given paths are returned. """ try: return sorted(paths, key=get_number_from_path_stem) except TypeError as err: log.exception(err) emsg = ( "Mind the packing *argument, input should be strings or Paths, " "not a list." ) raise TypeError(emsg) except IndexError: return sorted(paths, key=lambda x: Path(x).stem)
[docs]@contextlib.contextmanager def log_error_and_exit() -> Generator[None, None, None]: """Exit with exception.""" try: yield except Exception as err: log.exception(err) log.error(err) log.error( "An error has occurred, see log file. " "And contact the developers if needed." ) log.info(get_goodbye_help()) sys.exit(1)
[docs]def extract_keys_recursive(config: ParamMap) -> Generator[str, None, None]: """Extract keys recursively for the needed modules.""" for param_name, value in config.items(): if isinstance(value, collections.abc.Mapping): yield from extract_keys_recursive(value) else: yield param_name
[docs]def recursive_convert_paths_to_strings(params: ParamMapT) -> ParamMapT: """ Convert paths to strings recursively over a dictionary. Parameters ---------- params : dictionary Returns ------- dictionary A copy of the original dictionary with paths converted to strings. """ params = deepcopy(params) for param, value in params.items(): if isinstance(value, (Path, EmptyPath)): params[param] = str(value) elif isinstance(value, collections.abc.Mapping): params[param] = recursive_convert_paths_to_strings(value) elif isinstance(value, (tuple, list)): for i, v in enumerate(value): if isinstance(v, (Path, EmptyPath)): value[i] = str(v) params[param] = value return params