Add test runner
This commit is contained in:
parent
4123959928
commit
b838be2440
1 changed files with 100 additions and 57 deletions
157
aoc
157
aoc
|
@ -7,7 +7,8 @@ import requests
|
|||
import sys
|
||||
import json
|
||||
import subprocess
|
||||
from io import StringIO
|
||||
from io import BufferedReader
|
||||
from typing import Optional
|
||||
|
||||
|
||||
CHALLENGES_DIR = "challenges"
|
||||
|
@ -26,7 +27,9 @@ RUNNERS = {
|
|||
|
||||
|
||||
def convert_to_camel_case(inp: str) -> str:
|
||||
parts = list(map(lambda x: x.lower(), filter(lambda x: len(x) != 0, inp.split(" "))))
|
||||
parts = list(
|
||||
map(lambda x: x.lower(), filter(lambda x: len(x) != 0, inp.split(" ")))
|
||||
)
|
||||
for i, st in enumerate(parts):
|
||||
if i == 0:
|
||||
continue
|
||||
|
@ -43,31 +46,8 @@ def load_credentials() -> dict[str, str]:
|
|||
return json.load(f)
|
||||
|
||||
|
||||
class Recorder(StringIO):
|
||||
def __init__(self, buf):
|
||||
self.out = buf
|
||||
super().__init__()
|
||||
|
||||
def write(self, data):
|
||||
self.out.write(data)
|
||||
super().write(data)
|
||||
|
||||
def flush(self):
|
||||
self.out.flush()
|
||||
super().flush()
|
||||
|
||||
def fileno(self):
|
||||
return self.out.fileno()
|
||||
|
||||
|
||||
def set_terminal_colour(*colours: str):
|
||||
colcodes = {
|
||||
"bold": "1",
|
||||
"italic": "3",
|
||||
"red": "31",
|
||||
"grey": "37",
|
||||
"reset": "0"
|
||||
}
|
||||
colcodes = {"bold": "1", "italic": "3", "red": "31", "green": "32", "grey": "37", "reset": "0"}
|
||||
for colour in colours:
|
||||
if colour not in colcodes:
|
||||
continue
|
||||
|
@ -75,11 +55,66 @@ def set_terminal_colour(*colours: str):
|
|||
sys.stdout.flush()
|
||||
|
||||
|
||||
known_runs = {}
|
||||
|
||||
|
||||
def run_command(args: list[str], stdin=None) -> tuple[int, str]:
|
||||
proc = subprocess.run(args, stdin=stdin, stdout=subprocess.PIPE)
|
||||
ah = hash("".join(args))
|
||||
sh = hash(stdin)
|
||||
|
||||
if (ah, sh) in known_runs:
|
||||
return known_runs[(ah, sh)]
|
||||
|
||||
kwargs = {}
|
||||
|
||||
if type(stdin) == str:
|
||||
kwargs["input"] = stdin.encode()
|
||||
else:
|
||||
kwargs["stdin"] = stdin
|
||||
|
||||
proc = subprocess.run(args, stdout=subprocess.PIPE, **kwargs)
|
||||
|
||||
known_runs[((ah, sh))] = (proc.returncode, proc.stdout)
|
||||
|
||||
return proc.returncode, proc.stdout
|
||||
|
||||
|
||||
def run_part(command: list[str], label: str, spec: dict[str, str|BufferedReader]):
|
||||
set_terminal_colour("grey")
|
||||
exit_status, buf_cont = run_command(command, stdin=spec.get("input"))
|
||||
set_terminal_colour("reset")
|
||||
|
||||
print(f"{label}: ", end="")
|
||||
|
||||
if exit_status != 0:
|
||||
set_terminal_colour("red")
|
||||
print(f"exited with a non-zero status code ({exit_status})")
|
||||
set_terminal_colour("reset")
|
||||
return
|
||||
|
||||
result_str = buf_cont.decode().strip()
|
||||
formatted_result_str = result_str if len(result_str) == len("".join(filter(lambda x: x.isalpha() or x.isdigit(), result_str))) else repr(result_str)
|
||||
|
||||
if result_str == "":
|
||||
set_terminal_colour("red")
|
||||
print("nothing outputted")
|
||||
set_terminal_colour("reset")
|
||||
return
|
||||
else:
|
||||
if (expected := spec.get("is")):
|
||||
if expected == result_str:
|
||||
set_terminal_colour("green")
|
||||
print(formatted_result_str)
|
||||
else:
|
||||
set_terminal_colour("red")
|
||||
print(formatted_result_str, end="")
|
||||
set_terminal_colour("grey")
|
||||
print(f" expected {repr(expected)}")
|
||||
set_terminal_colour("reset")
|
||||
else:
|
||||
print(formatted_result_str)
|
||||
|
||||
|
||||
class CLI(object):
|
||||
@staticmethod
|
||||
def init(year: int, day: int):
|
||||
|
@ -94,12 +129,17 @@ class CLI(object):
|
|||
|
||||
page_resp = requests.get(day_url)
|
||||
if page_resp.status_code == 404:
|
||||
print("Challenge page not found: has that day been released yet?", file=sys.stderr)
|
||||
print(
|
||||
"Challenge page not found: has that day been released yet?",
|
||||
file=sys.stderr,
|
||||
)
|
||||
raise SystemExit(1)
|
||||
|
||||
page_resp.raise_for_status()
|
||||
|
||||
matches = re.findall(r"--- Day \d{1,2}: ([\w \?!]+) ---", page_resp.content.decode())
|
||||
matches = re.findall(
|
||||
r"--- Day \d{1,2}: ([\w \?!]+) ---", page_resp.content.decode()
|
||||
)
|
||||
assert len(matches) >= 1, "must be able to discover at least one day title"
|
||||
day_title = matches[0]
|
||||
|
||||
|
@ -107,7 +147,11 @@ class CLI(object):
|
|||
|
||||
p = Path(CHALLENGES_DIR)
|
||||
p /= str(year)
|
||||
p /= str(day).zfill(2) + "-" + filter_for_filename(convert_to_camel_case(day_title))
|
||||
p /= (
|
||||
str(day).zfill(2)
|
||||
+ "-"
|
||||
+ filter_for_filename(convert_to_camel_case(day_title))
|
||||
)
|
||||
|
||||
os.makedirs(p)
|
||||
|
||||
|
@ -122,7 +166,8 @@ class CLI(object):
|
|||
# Download input and drop it in the challenge's directory
|
||||
|
||||
creds = load_credentials()
|
||||
input_resp = requests.get(day_url + "/input",
|
||||
input_resp = requests.get(
|
||||
day_url + "/input",
|
||||
cookies={"session": creds["session"]},
|
||||
headers={"User-Agent": creds["userAgent"]},
|
||||
)
|
||||
|
@ -134,13 +179,20 @@ class CLI(object):
|
|||
# Output the challenge's directory
|
||||
|
||||
print(p)
|
||||
|
||||
|
||||
@staticmethod
|
||||
def exec(fpath: str):
|
||||
def run(fpath: str, test_only: bool = False, no_test: bool = False, select_part: Optional[int] = None):
|
||||
"""
|
||||
Execute a day's code
|
||||
"""
|
||||
|
||||
if test_only and no_test:
|
||||
print(f"Conflicting arguments (test-only and no-test both set)", file=sys.stderr)
|
||||
raise SystemExit(1)
|
||||
|
||||
if select_part:
|
||||
select_part = str(select_part)
|
||||
|
||||
# Stat file
|
||||
# Get file extension
|
||||
# Based on file extension, run
|
||||
|
@ -156,38 +208,29 @@ class CLI(object):
|
|||
if file_extension not in RUNNERS:
|
||||
print("No compatible runner found", file=sys.stderr)
|
||||
raise SystemExit(1)
|
||||
else:
|
||||
print("Detected compatible runner")
|
||||
|
||||
challenge_dir = Path(os.path.dirname(fpath))
|
||||
input_file = open(challenge_dir / "input.txt", "rb")
|
||||
|
||||
cmd = RUNNERS[file_extension].copy()
|
||||
cmd.append(fpath)
|
||||
cmd.append("1")
|
||||
|
||||
# buf_stdout = Recorder(sys.stdout)
|
||||
# buf_stdout.write("hi\n")
|
||||
# completed_command = subprocess.run(cmd, stdin=input_file, stdout=buf_stdout, stderr=sys.stderr)
|
||||
# print(f"{completed_command.returncode=}")
|
||||
# buf_stdout.flush()
|
||||
# print(f"{buf_stdout.getvalue()=}")
|
||||
if test_only or not no_test:
|
||||
test_specs = json.load(open(challenge_dir / "tests.json"))
|
||||
|
||||
for part in ["1", "2"]:
|
||||
if select_part and select_part != part:
|
||||
continue
|
||||
for i, spec in enumerate(test_specs[part]):
|
||||
run_part(cmd + [part], f"Test {part}.{i+1}", spec)
|
||||
|
||||
if no_test or not test_only:
|
||||
if (select_part and select_part == "1") or not select_part:
|
||||
run_part(cmd + ["1"], "Run 1", {"input": input_file})
|
||||
input_file.seek(0)
|
||||
if (select_part and select_part == "2") or not select_part:
|
||||
run_part(cmd + ["2"], "Run 2", {"input": input_file})
|
||||
|
||||
set_terminal_colour("grey")
|
||||
exit_status, buf_cont = run_command(cmd, stdin=input_file)
|
||||
set_terminal_colour("reset")
|
||||
|
||||
if exit_status != 0:
|
||||
set_terminal_colour("red")
|
||||
print(f"Exited with a non-zero status code ({exit_status})")
|
||||
set_terminal_colour("reset")
|
||||
|
||||
if (r := buf_cont.decode().strip()) == "":
|
||||
set_terminal_colour("red")
|
||||
print("Nothing outputted")
|
||||
set_terminal_colour("reset")
|
||||
else:
|
||||
print("Output: " + r)
|
||||
|
||||
input_file.close()
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue