"""Run subprocess jobs."""
import os
import shlex
import subprocess
from contextlib import suppress
from pathlib import Path
from haddock.core.defaults import cns_exec as global_cns_exec
from haddock.core.exceptions import CNSRunningError, JobRunningError
from haddock.core.typing import Any, FilePath, Optional, ParamDict
from haddock.libs.libio import gzip_files
[docs]class BaseJob:
"""Base class for a subprocess job."""
def __init__(self, input_: Path, output: Path, executable: Path,
*args: Any) -> None:
self.input = input_
self.output = output
self.executable = executable
self.args = args
self.cmd: str
[docs] def run(self) -> bytes:
"""Execute job in subprocess."""
self.make_cmd()
with open(self.output, 'w') as outf:
p = subprocess.Popen(
shlex.split(self.cmd),
stdout=outf,
close_fds=True,
)
out, error = p.communicate()
p.kill()
if error:
raise JobRunningError(error)
return out
[docs] def make_cmd(self) -> None:
"""Execute subprocess job."""
raise NotImplementedError()
[docs]class Job(BaseJob):
"""
Instantiate a standard job.
Runs with the following scheme:
$ cmd ARGS INPUT
"""
[docs] def make_cmd(self) -> None:
"""Execute subprocess job."""
self.cmd = " ".join([
os.fspath(self.executable),
''.join(map(str, self.args)), # empty string if no args
os.fspath(self.input),
])
return
[docs]class CNSJob:
"""A CNS job script."""
def __init__(
self,
input_file: FilePath,
output_file: FilePath,
envvars: Optional[ParamDict] = None,
cns_exec: Optional[FilePath] = None,
) -> None:
"""
CNS subprocess.
To execute the job, call the `.run()` method.
Parameters
----------
input_file : str or pathlib.Path
The path to the .inp CNS file.
output_file : str or pathlib.Path
The path to the .out CNS file, where the standard output
will be saved.
envvars : dict
A dictionary containing the environment variables needed for
the CNSJob. These will be passed to subprocess.Popen.env
argument.
"""
self.input_file = input_file
self.output_file = output_file
self.envvars = envvars
self.cns_exec = cns_exec
def __repr__(self) -> str:
return (
f"CNSJob({self.input_file}, {self.output_file}, "
f"envvars={self.envvars}, cns_exec={self.cns_exec})"
)
def __str__(self) -> str:
return repr(self)
@property
def envvars(self) -> ParamDict:
"""CNS environment vars."""
return self._envvars
@envvars.setter
def envvars(self, envvars: Optional[ParamDict]) -> None:
"""CNS environment vars."""
self._envvars = envvars or {}
if not isinstance(self._envvars, dict):
raise ValueError('`envvars` must be a dictionary.')
for k, v in self._envvars.items():
if isinstance(v, Path):
self._envvars[k] = str(v)
@property
def cns_exec(self) -> FilePath:
"""CNS executable path."""
return self._cns_exec
@cns_exec.setter
def cns_exec(self, cns_exec_path: Optional[FilePath]) -> None:
if not cns_exec_path:
cns_exec_path = global_cns_exec # global cns_exec
if not os.access(cns_exec_path, mode=os.X_OK):
raise ValueError(
f'{str(cns_exec_path)!r} binary file not found, '
'or is not executable.'
)
self._cns_exec = cns_exec_path
[docs] def run(
self,
compress_inp: bool = False,
compress_out: bool = True,
compress_seed: bool = False,
) -> bytes:
"""
Run this CNS job script.
Parameters
----------
compress_inp : bool
Compress the *.inp file to '.gz' after the run. Defaults to
``False``.
compress_out : bool
Compress the *.out file to '.gz' after the run. Defaults to
``True``.
compress_seed : bool
Compress the *.seed file to '.gz' after the run. Defaults to
``False``.
"""
with open(self.input_file) as inp, \
open(self.output_file, 'w+') as outf:
p = subprocess.Popen(
self.cns_exec,
stdin=inp,
stdout=outf,
stderr=subprocess.PIPE,
close_fds=True,
env=self.envvars,
)
out, error = p.communicate()
p.kill()
if compress_inp:
gzip_files(self.input_file, remove_original=True)
if compress_out:
gzip_files(self.output_file, remove_original=True)
if compress_seed:
with suppress(FileNotFoundError):
gzip_files(
Path(Path(self.output_file).stem).with_suffix('.seed'),
remove_original=True)
if error:
raise CNSRunningError(error)
return out