Source code for haddock.libs.libparallel

"""Module in charge of parallelizing the execution of tasks."""
import math
from multiprocessing import Process

from haddock import log
from haddock.core.typing import (
    AnyT,
    FilePath,
    Generator,
    Optional,
    Sequence,
    SupportsRunT,
    Union,
    )
from haddock.libs.libutil import parse_ncores


[docs]def split_tasks(lst: Sequence[AnyT], n: int) -> Generator[Sequence[AnyT], None, None]: """Split tasks into N-sized chunks.""" n = math.ceil(len(lst) / n) for j in range(0, len(lst), n): chunk = lst[j:n + j] yield chunk
[docs]def get_index_list(nmodels, ncores): """ Optimal distribution of models among cores Parameters ---------- nmodels : int Number of models to be distributed. ncores : int Number of cores to be used. Returns ------- index_list : list List of model indexes to be used for the parallel scanning. """ if nmodels < 1: raise ValueError(f"nmodels ({nmodels})) must be greater than 0") if ncores < 1: raise ValueError(f"ncores ({ncores}) must be greater than 0") spc = nmodels // ncores # now the remainder rem = nmodels % ncores # now the list of indexes to be used for the SCAN calculation index_list = [0] for core in range(ncores): if core < rem: index_list.append(index_list[-1] + spc + 1) else: index_list.append(index_list[-1] + spc) return index_list
[docs]class Worker(Process): """Work on tasks.""" def __init__(self, tasks: Sequence[SupportsRunT]) -> None: super(Worker, self).__init__() self.tasks = tasks log.debug(f"Worker ready with {len(self.tasks)} tasks")
[docs] def run(self) -> None: """Execute tasks.""" for task in self.tasks: task.run() log.debug(f"{self.name} executed")
[docs]class Scheduler: """Schedules tasks to run in multiprocessing.""" def __init__(self, tasks: list[SupportsRunT], ncores: Optional[int] = None, max_cpus: bool = False) -> None: """ Schedule tasks to a defined number of processes. Parameters ---------- tasks : list The list of tasks to execute. Tasks must have method `run()`. ncores : None or int The number of cores to use. If `None` is given uses the maximum number of CPUs allowed by `libs.libututil.parse_ncores` function. """ self.max_cpus = max_cpus self.num_tasks = len(tasks) self.num_processes = ncores # first parses num_cores # Sort the tasks by input_file name and its length, # so we know that 2 comes before 10 task_name_dic: dict[int, tuple[FilePath, int]] = {} for i, t in enumerate(tasks): try: task_name_dic[i] = (t.input_file, len(str(t.input_file))) except AttributeError: # If this is not a CNS job it will not have # input_file, use the output instead task_name_dic[i] = (t.output, len(str(t.output))) sorted_task_list: list[SupportsRunT] = [] for e in sorted(task_name_dic.items(), key=lambda x: (x[0], x[1])): idx = e[0] sorted_task_list.append(tasks[idx]) job_list = split_tasks(sorted_task_list, self.num_processes) self.worker_list = [Worker(jobs) for jobs in job_list] log.info(f"Using {self.num_processes} cores") log.debug(f"{self.num_tasks} tasks ready.") @property def num_processes(self) -> int: """Number of processors to use.""" # noqa: D401 return self._ncores @num_processes.setter def num_processes(self, n: Union[str, int, None]) -> None: self._ncores = parse_ncores( n, njobs=self.num_tasks, max_cpus=self.max_cpus, ) log.debug(f"Scheduler configured for {self._ncores} cpu cores.")
[docs] def run(self) -> None: """Run tasks in parallel.""" try: for worker in self.worker_list: # Start the worker worker.start() c = 1 l = 1 nlog = max(1,int(self.num_tasks)/10) for worker in self.worker_list: # Wait for the worker to finish worker.join() for t in worker.tasks: per = (c / float(self.num_tasks)) * 100 try: task_ident = ( f'{t.input_file.parents[0].name}/' f'{t.input_file.name}' ) except AttributeError: task_ident = ( f'{t.output.parents[0].name}/' f'{t.output.name}' ) c += 1 if l == nlog: log.info(f'>> completed {per:.0f}% ') l = 1 else: l += 1 log.info(f"{self.num_tasks} tasks finished") except KeyboardInterrupt as err: # Q: why have a keyboard interrupt here? # A: To have a controlled break if the user Ctrl+c during CNS run self.terminate() # this raises sends the error to libs.libworkflow.Step # if Scheduler is used independently the error will propagate to # whichever has to catch it raise err
[docs] def terminate(self) -> None: """Terminate tasks in a controlled way.""" for worker in self.worker_list: worker.terminate() log.info("The workers terminated in a controlled way")