import re
import requests
import subprocess
from pathlib import Path
from typing import List
from acheck.config import config
from acheck.checking.check_interface import Check, ToolObjectsMeta
from acheck.checking.error import Error, ErrorType, Sequence, Fix, FixCode, ErrorLevel
from acheck.utils import filehelper as fh
from acheck.utils.annotationhelper import get_plan, time_to_line_number, read_annotation
import logging
logger = logging.getLogger(__name__)
def _parse_stdout(stdout):
messages = []
msg_buffer = []
is_msg = False
for line in stdout.decode("utf-8").splitlines():
if "Checking next happening" in line:
is_msg = True
elif "Bad plan description!" in line:
is_msg = True
if line == "" or line == "\n":
is_msg = False
if is_msg:
msg_buffer.append(line)
else:
if len(msg_buffer) > 0:
messages.append(msg_buffer)
msg_buffer = []
if len(msg_buffer) > 0:
messages.append(msg_buffer)
return messages
def _parse_stderr(stderr):
messages = []
error_buffer = ""
for line in stderr.decode("utf-8").splitlines():
if "Problem in domain definition" in line:
messages.append(line)
elif "Parser failed to read file" in line:
messages.append(line)
else:
error_buffer += line
messages.append(error_buffer)
return messages
def _use_validator(domain, problem, plan, validator_path, timeout, logs):
process_stdout = ""
process_stderr = f"Validator timeout after {timeout} seconds"
try:
completed_process = subprocess.run([validator_path, "-v", domain, problem, plan], stdout=subprocess.PIPE,
stderr=subprocess.PIPE, timeout=timeout)
process_stdout = completed_process.stdout
process_stderr = completed_process.stderr
except subprocess.TimeoutExpired:
logs.append(f"Full output from plan validator:\nValidator timeout after {timeout} seconds")
raise
except Exception:
logs.append(
f"An error occurred during plan validation.\nMake sure that the path to the validator executable is set correctly.\nPath: '{validator_path}'")
raise
return process_stdout, process_stderr
def _plan_validation(annotation: Path, domain: Path, problem: Path, tool_meta: ToolObjectsMeta,
add_finished_as_last_line: bool,
check_id,
line_limit, logs: List[str],
timeout: int = 5):
plan = [x["plan"] for x in tool_meta.annotations if x["file"] == annotation][0]
plan_lines = get_plan(annotation, line_limit, add_finished_as_last_line)
fh.write_lines(plan, plan_lines)
validator = tool_meta.validator
annotation_lines = read_annotation(annotation, line_limit)
stdout, stderr = _use_validator(domain, problem, plan, validator, timeout, logs)
error_list = []
messages = _parse_stdout(stdout)
logs.append(stdout.decode('utf-8'))
logs.append(stderr.decode('utf-8'))
critical_messages = _parse_stderr(stderr)
for critical_msg in critical_messages:
if "domain" in critical_msg:
error_list.append(
Error(
file_name=domain,
fixes=[Fix(fix_code=FixCode.Alert, correct_string="Domain description is invalid.")],
error_type=ErrorType.IllegalDomainDescription,
check_id=check_id,
)
)
if "failed to read file" in critical_msg:
error_list.append(
Error(
file_name=domain,
fixes=[Fix(fix_code=FixCode.Alert, correct_string="Problem description is invalid.")],
error_type=ErrorType.IllegalProblemDescription,
check_id=check_id
)
)
if "Error:" in critical_msg:
error_list.append(
Error(
file_name=domain,
fixes=[Fix(fix_code=FixCode.Alert, correct_string="There was an error during plan validation. "
"Make sure that 'ActionCheck' and 'WorldObjectCheck' are enabled. "
"For more information, check the log.")],
error_type=ErrorType.PlanValidationError,
check_id=check_id
)
)
warn_messages = []
error_messages = []
for msg_block in messages:
warnings = [msg_block for line in msg_block if "WARNING" in line]
errors = [msg_block for line in msg_block if ("failed" in line) or ("Bad plan description!" in line)]
if errors:
error_messages.append(errors[0])
if warnings:
warn_messages.append(warnings[0])
for error_block in error_messages:
if "Bad plan description!" in error_block:
error_list.append(
Error(
file_name=domain,
error_type=ErrorType.PlanValidationError,
check_id=check_id,
fixes=[Fix(fix_code=FixCode.Alert, correct_string=" ".join(error_block))]
)
)
else:
error_list.append(
Error(
file_name=annotation,
error_type=ErrorType.PlanValidationError,
line_number=time_to_line_number(annotation, _get_time_from_block(error_block), line_limit),
incorrect_sequence=Sequence(0, annotation_lines[
time_to_line_number(annotation, _get_time_from_block(error_block), line_limit) - 1]),
check_id=check_id,
fixes=[Fix(fix_code=FixCode.Alert, correct_string=" ".join(error_block))]
)
)
return error_list
def _get_time_from_block(msg_block) -> float:
m = re.search(r"(time [0-9]*)(.)?[0-9]*", msg_block[0])
if m:
time = m.group().split(" ")[1].strip(")")
return float(time)
[docs]class PlanValidationCheck(Check):
"""Do a plan validation based on the given annotation
:Options:
- timeout:
Time after which the validator terminates if it does not receive a response
"""
timeout = config.load("Validator","timeout")
[docs] def run(self, annotation_file, domain_file, problem_file, line_limit=-1) -> List[Error]:
self.logs.clear()
return _plan_validation(annotation=annotation_file,
domain=domain_file,
problem=problem_file,
tool_meta=self.tool_meta,
add_finished_as_last_line=False,
check_id=self.id,
line_limit=line_limit,
logs=self.logs,
timeout=self.options.get("timeout", self.timeout))
[docs]class PDDLSyntaxCheck(Check):
"""Checks the model files for pddl syntax errors
:Options:
- timeout:
Time after which the validator terminates if it does not receive a response
"""
timeout = config.load("DomainProblemCheck","timeout")
[docs] def run(self, annotation: Path, domain: Path, problem: Path, line_limit: int = -1) -> List[Error]:
self.logs.clear()
domain_data = fh.read_file(domain)
problem_data = fh.read_file(problem)
data = {'domain': domain_data,
'problem': problem_data}
response = requests.post('http://solver.planning.domains/solve', verify=False, json=data,
timeout=self.options.get("timeout", self.timeout)).json()
if response.get("status") == "ok" and response.get("result").get("parse_status") == "ok":
output = response["result"]["output"]
self.logs.append(f"Full output from 'http://solver.planning.domains/solve':\n{output}")
if isinstance(response.get("result"), str):
raise Exception(response.get("result"))
if response.get("status") == "error" and response.get("result").get("parse_status") == "err":
err_msg_verbose = response["result"]["error"]
err_msg = response["result"]["output"]
if "problem.pddl" in err_msg.split(" ")[0]:
self.logs.append(f"Full output from 'http://solver.planning.domains/solve':\n{err_msg_verbose}")
pattern = r"syntax error in line (\d+)"
hit = re.search(pattern, err_msg)
if hit:
line_number = int(hit.group(1))
else:
line_number = -1
return [Error(
file_name=problem,
error_type=ErrorType.IllegalProblemDescription,
check_id=self.id,
line_number=line_number,
fixes=[Fix(correct_string=err_msg, fix_code=FixCode.Alert)],
error_level=ErrorLevel.Error
)]
if "domain.pddl" in err_msg.split(" ")[0]:
self.logs.append(f"Full output from 'http://solver.planning.domains/solve':\n{err_msg_verbose}")
pattern = r"syntax error in line (\d+)"
hit = re.search(pattern, err_msg)
if hit:
line_number = int(hit.group(1))
else:
line_number = -1
return [Error(
file_name=domain,
error_type=ErrorType.IllegalDomainDescription,
check_id=self.id,
line_number=line_number,
fixes=[Fix(correct_string=err_msg, fix_code=FixCode.Alert)],
error_level=ErrorLevel.Error
)]
return []