"""Detect known/common CNS errors.
Inspired from:
https://github.com/haddocking/haddock25/blob/main/tools/check-error-messages.sh
"""
import gzip
from io import BufferedReader
from pathlib import Path
from haddock.core.exceptions import KnownCNSError
from haddock.core.typing import FilePath, Optional, Union
# Dictionary of known errors
# as key:    How to catch it in the cns.cnserr
# as value:  Message to user
KNOWN_ERRORS = {
    "CHAIN LENGTH FOR SYMMETRY RESTRAINTS DOES NOT MATCH": (
        "Mismatch between chain length for symmetry restraints. "
        "Check your input molecules and symmetry restraints."
        ),
    "NCS-restraints error encountered: Improperly defined non-crystallographic symmetry": (  # noqa : E501
        "Improperly defined non-crystallographic symmetry (NCS). "
        "Check your symmetry restraints definition."
        ),
    "error in SYMMETRY potential, check NOE table": (
        "Check your symmetry restraints definition."
        ),
    "exceeded allocation for NOE-restraints": (
        "Too many distance restraints defined. "
        "Try to reduce this number by checking your definition of active "
        "and passive residues. "
        "Make sure to filter those for solvent accessibility. "
        "Or alternatively increase the nres parameter in the noe statements"
        " in the relevant CNS scripts."
        ),
    "SELRPN error encountered: parsing error": (
        "Check your restraint files."
        ),
    "PARSER error encountered: Encountered too many parsing errors": (
        "Encountered too many parsing errors. "
        "Check your input molecules and symmetry restraints."
        ),
    "XMREAD error encountered:  sectioning of map incompatible with resolution": (  # noqa : E501
        "Check your EM map resolution and sectioning."
        ),
    "ALLHP error encountered: not enough memory available": (
        "Too many distance restraints defined. "
        "Try to reduce this number by checking your definition of active and "
        "passive residues. "
        "Make sure to filter those for solvent accessibility. "
        "Try to decrease the size of your system where possible."
        ),
    "error encountered: missing SCATter definition for SELEcted atoms": (
        "Unsupported atoms/molecules for cryo-EM restraints."
        ),
    "ROTMAT error encountered: rotation vector has zero length": (
        "Check your input parameters and restraints. "
        "Possibly try turning off the sampling of 180 degrees rotation."
        )
    }
[docs]
def find_cns_errors(cns_out_fpath: FilePath) -> Optional[KnownCNSError]:
    """Detect if a known CNS error is in a cns.cnserr file.
    Parameters
    ----------
    cns_out_fpath : FilePath -> Union[str, Path]
        Path to the cns.cnserr file to check.
    Returns
    -------
    Optional[KnownCNSError]
        An exception for known CNS errors, with its hint on how to solve it!
    """
    # Check for file extension to open it the appropriate way
    if Path(cns_out_fpath).suffix == ".gz":
        file_handle = gzip.open(cns_out_fpath, "rb")
    else:
        file_handle = open(cns_out_fpath, "rb")
    # Read the file
    try:
        _find_cns_errors(file_handle, KNOWN_ERRORS, filepath=cns_out_fpath)
    except KnownCNSError as err:
        return err
    else:
        # return the cause
        return KnownCNSError(
            "An unfortunate CNS error occured at exection time...",
            f"Manually check the file `{cns_out_fpath}` to understand why!",
            cns_out_fpath,
            ) 
def _find_cns_errors(
        file_handle: Union[gzip.GzipFile, BufferedReader],
        known_errors: dict[str, str],
        chunk_size: int = 4096,
        filepath: FilePath = "",
        ) -> None:
    """Backward reading and detect first known CNS error in file.
    Parameters
    ----------
    file_handle: Union[gzip.GzipFile, BufferedReader]
        An opened file in read bytes mode.
    known_errors : dict[str, str]
        Dict of known errors and their hints
    chunk_size : int, optional
        Check size (in bytes) to read the file backwards, by default 4096
    filepath : FilePath -> Union[str, Path]
        Path to the cns.cnserr file currently checked.
    Raises
    ------
    KnownCNSError
        An exception for known CNS errors, with its hint on how to solve it!
    """
    # Find file size
    file_handle.seek(0, 2)
    size = file_handle.tell()
    buffer = b''
    # Set the number of lines to parse
    # initiated with high value to read all lines the first time
    # updated to the number of lines that were already parsed
    # after each chunk iteration, so we do not parse the same lines
    parsed_lines = 99999
    for i in range(size - 1, -1, -chunk_size):
        # Go to location in file
        file_handle.seek(max(i - chunk_size, 0))
        # Read next chunk
        chunk = file_handle.read(min(chunk_size, i + 1))
        # Increment buffer
        buffer = chunk + buffer
        lines = buffer.split(b'\n')
        # Read lines
        for line in reversed(lines[-len(lines):parsed_lines]):
            decoded_line = line.decode('utf-8', errors='replace')
            # Loop over known errors
            for error_string, hint in known_errors.items():
                # Check if this error is known
                if error_string in decoded_line:
                    # return the cause
                    raise KnownCNSError(
                        error_string,
                        hint,
                        filepath,
                        )
        # Update number of parsed lines so we do not check them again
        parsed_lines = -len(lines)
[docs]
def find_all_cns_errors(
        directory_path: FilePath,
        ) -> dict[str, dict[str, Union[int, KnownCNSError]]]:
    """Find all errors in a directory.
    Parameters
    ----------
    directory_path : FilePath
        Path to the directory to be checked.
    Returns
    -------
    all_errors : dict[str, dict[str, Union[list[FilePath], KnownCNSError]]]
        Dictionary containing all errors found in this directory.
    """
    all_errors: dict[str, dict[str, Union[int, KnownCNSError]]] = {}
    # Gather list of all `.cnserr` and `.cnserr.gz` files present in directory
    all_cns_out_files = list(Path(directory_path).glob("*.cnserr.gz"))
    all_cns_out_files += list(Path(directory_path).glob("*.cnserr"))
    # Loop over all .cnserr files
    for fpath in all_cns_out_files:
        # Try to dectect an error
        if (detected_error := find_cns_errors(fpath)):
            # Hold data if an error is present in that file
            error_type = all_errors.setdefault(
                detected_error.cns_error,
                {"files": [], "error": detected_error}
                )
            error_type["files"].append(detected_error.filepath)
    return all_errors