404 lines
11 KiB
Python
404 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
import fire
|
|
import os
|
|
from pathlib import Path
|
|
import re
|
|
import requests
|
|
import sys
|
|
import json
|
|
import subprocess
|
|
from io import BufferedReader
|
|
from typing import Optional, Callable
|
|
from glob import glob
|
|
import time
|
|
from tqdm import tqdm
|
|
|
|
|
|
CHALLENGES_DIR = "challenges"
|
|
SAMPLE_TEST_JSON = "{}"
|
|
RUNNERS = {
|
|
"py": (None, ["./runners/py.sh"]),
|
|
"go": (["./runners/buildGo.sh"], None),
|
|
"kt": (["./runners/buildKotlin.sh"], ["./runners/jar.sh"]),
|
|
}
|
|
|
|
|
|
def convert_to_camel_case(inp: str) -> str:
|
|
parts = list(
|
|
map(lambda x: x.lower(), filter(lambda x: len(x) != 0, inp.split(" ")))
|
|
)
|
|
for i, st in enumerate(parts):
|
|
if i == 0:
|
|
continue
|
|
parts[i] = st[0].upper() + st[1:]
|
|
return "".join(parts)
|
|
|
|
|
|
def filter_for_filename(inp: str) -> str:
|
|
return "".join(filter(lambda x: x.isalpha() or x.isdigit() or x == "-", inp))
|
|
|
|
|
|
def load_credentials() -> dict[str, str]:
|
|
with open("credentials.json") as f:
|
|
return json.load(f)
|
|
|
|
|
|
def set_terminal_colour(*colours: str):
|
|
colcodes = {
|
|
"bold": "1",
|
|
"italic": "3",
|
|
"red": "31",
|
|
"green": "32",
|
|
"grey": "37",
|
|
"reset": "0",
|
|
}
|
|
for colour in colours:
|
|
if colour not in colcodes:
|
|
continue
|
|
sys.stdout.write(f"\033[{colcodes[colour]}m")
|
|
sys.stdout.flush()
|
|
|
|
|
|
known_runs = {}
|
|
|
|
|
|
def time_command(args: list[str], stdin=None) -> tuple[int, float]:
|
|
kwargs = {}
|
|
|
|
if type(stdin) == str:
|
|
kwargs["input"] = stdin.encode()
|
|
else:
|
|
kwargs["stdin"] = stdin
|
|
|
|
st = time.time()
|
|
proc = subprocess.run(args, stdout=subprocess.PIPE, **kwargs)
|
|
dur = time.time() - st
|
|
|
|
return proc.returncode, dur
|
|
|
|
|
|
def run_command(args: list[str], stdin=None, cache=True) -> tuple[int, str]:
|
|
if cache:
|
|
ah = hash("".join(args))
|
|
sh = hash(stdin)
|
|
|
|
if (ah, sh) in known_runs:
|
|
return known_runs[(ah, sh)]
|
|
|
|
kwargs = {}
|
|
|
|
if type(stdin) == str:
|
|
kwargs["input"] = stdin.encode()
|
|
else:
|
|
kwargs["stdin"] = stdin
|
|
|
|
proc = subprocess.run(args, stdout=subprocess.PIPE, **kwargs)
|
|
|
|
if cache:
|
|
known_runs[((ah, sh))] = (proc.returncode, proc.stdout)
|
|
|
|
return proc.returncode, proc.stdout
|
|
|
|
|
|
def format_result(v: str) -> str:
|
|
if len(v) == len(
|
|
"".join(filter(lambda x: x.isalpha() or x.isdigit() or x == "-", v))
|
|
):
|
|
return v
|
|
return repr(v)
|
|
|
|
|
|
def run_part(command: list[str], label: str, spec: dict[str, str | BufferedReader]):
|
|
set_terminal_colour("grey")
|
|
exit_status, buf_cont = run_command(command, stdin=spec.get("input"))
|
|
set_terminal_colour("reset")
|
|
|
|
print(f"{label}: ", end="")
|
|
|
|
if exit_status != 0:
|
|
set_terminal_colour("red")
|
|
print(f"exited with a non-zero status code ({exit_status})")
|
|
set_terminal_colour("reset")
|
|
return
|
|
|
|
result_str = buf_cont.decode().strip()
|
|
formatted_result_str = format_result(result_str)
|
|
|
|
if result_str == "":
|
|
set_terminal_colour("red")
|
|
print("nothing outputted")
|
|
set_terminal_colour("reset")
|
|
return
|
|
else:
|
|
if expected := spec.get("is"):
|
|
if expected == result_str:
|
|
set_terminal_colour("green")
|
|
print(f"pass", end="")
|
|
set_terminal_colour("grey")
|
|
print(f" ({formatted_result_str})")
|
|
else:
|
|
set_terminal_colour("red")
|
|
print(f"fail", end="")
|
|
set_terminal_colour("grey")
|
|
print(
|
|
f" (got {formatted_result_str}, expected {format_result(expected)})"
|
|
)
|
|
set_terminal_colour("reset")
|
|
else:
|
|
print(formatted_result_str)
|
|
|
|
|
|
def get_runner_command(
|
|
file_name: str,
|
|
) -> tuple[list[str], Optional[Callable[[], None]]]:
|
|
"""
|
|
Builds a solution using `command` then returns a path to the executable.
|
|
"""
|
|
file_extension = file_name.split(".")[-1].lower()
|
|
|
|
if file_extension not in RUNNERS:
|
|
print("No compatible runner found", file=sys.stderr)
|
|
raise SystemExit(1)
|
|
|
|
(runner_build, runner_run) = RUNNERS[file_extension]
|
|
|
|
if runner_build is None:
|
|
if runner_run is not None:
|
|
return runner_run + [file_name], None
|
|
print(f"No build or run command specified for runner {file_extension}")
|
|
raise SystemExit(1)
|
|
|
|
# if runner_run is not None and runner_build is not None:
|
|
# print(
|
|
# f"Build command and run command specified for {file_extension} - cannot determine path forwards."
|
|
# )
|
|
# raise SystemExit(1)
|
|
|
|
command = runner_build + [file_name]
|
|
set_terminal_colour("grey")
|
|
print("Building...", end="\r")
|
|
set_terminal_colour("reset")
|
|
exit_code, fpath = run_command(command, cache=False)
|
|
if exit_code != 0:
|
|
print(f"Failed to build: `{command}` returned exit code {exit_code}")
|
|
raise SystemExit(1)
|
|
fpstr = fpath.decode().strip()
|
|
|
|
if runner_run is None:
|
|
return [fpstr], lambda: os.unlink(fpstr)
|
|
|
|
return runner_run + [fpstr], lambda: os.unlink(fpstr)
|
|
|
|
|
|
class CLI(object):
|
|
@staticmethod
|
|
def init(year: int, day: int):
|
|
"""
|
|
Initialise a day's AoC challenge
|
|
"""
|
|
|
|
# Load day's page to verify that it has been released and to get the
|
|
# challenge title
|
|
|
|
day_url = f"https://adventofcode.com/{year}/day/{day}"
|
|
|
|
page_resp = requests.get(day_url)
|
|
if page_resp.status_code == 404:
|
|
print(
|
|
"Challenge page not found: has that day been released yet?",
|
|
file=sys.stderr,
|
|
)
|
|
raise SystemExit(1)
|
|
|
|
page_resp.raise_for_status()
|
|
|
|
matches = re.findall(
|
|
r"--- Day \d{1,2}: ([\w\- \?!]+) ---", page_resp.content.decode()
|
|
)
|
|
assert len(matches) >= 1, "must be able to discover at least one day title"
|
|
day_title = matches[0].replace("-", " ")
|
|
|
|
# Work out the challenge's directory.
|
|
|
|
p = Path(CHALLENGES_DIR)
|
|
p /= str(year)
|
|
p /= (
|
|
str(day).zfill(2)
|
|
+ "-"
|
|
+ filter_for_filename(convert_to_camel_case(day_title))
|
|
)
|
|
|
|
os.makedirs(p)
|
|
|
|
# Drop a basic README and tests file
|
|
|
|
with open(p / "README.md", "w") as f:
|
|
f.write(f"# [Day {day}: {day_title}]({day_url})\n")
|
|
|
|
with open(p / "tests.json", "w") as f:
|
|
f.write(SAMPLE_TEST_JSON)
|
|
|
|
# Download input and drop it in the challenge's directory
|
|
|
|
creds = load_credentials()
|
|
input_resp = requests.get(
|
|
day_url + "/input",
|
|
cookies={"session": creds["session"]},
|
|
headers={"User-Agent": creds["userAgent"]},
|
|
)
|
|
input_resp.raise_for_status()
|
|
|
|
with open(p / "input.txt", "wb") as f:
|
|
f.write(input_resp.content)
|
|
|
|
# Output the challenge's directory
|
|
|
|
print(p)
|
|
|
|
@staticmethod
|
|
def run(
|
|
fpath: str,
|
|
test_only: bool = False,
|
|
no_test: bool = False,
|
|
select_part: Optional[int] = None,
|
|
):
|
|
"""
|
|
Execute a day's code
|
|
"""
|
|
|
|
if test_only and no_test:
|
|
print(
|
|
f"Conflicting arguments (test-only and no-test both set)",
|
|
file=sys.stderr,
|
|
)
|
|
raise SystemExit(1)
|
|
|
|
if select_part:
|
|
select_part = str(select_part)
|
|
|
|
try:
|
|
os.stat(fpath)
|
|
except FileNotFoundError:
|
|
print(f"Could not stat {fpath}", file=sys.stderr)
|
|
raise SystemExit(1)
|
|
|
|
cmd, cleanup = get_runner_command(fpath)
|
|
|
|
challenge_dir = Path(os.path.dirname(fpath))
|
|
input_file = open(challenge_dir / "input.txt", "rb")
|
|
|
|
if test_only or not no_test:
|
|
test_specs = json.load(open(challenge_dir / "tests.json"))
|
|
|
|
for part in ["1", "2"]:
|
|
if select_part and select_part != part:
|
|
continue
|
|
for i, spec in enumerate(test_specs.get(part, [])):
|
|
run_part(cmd + [part], f"Test {part}.{i+1}", spec)
|
|
|
|
if no_test or not test_only:
|
|
if (select_part and select_part == "1") or not select_part:
|
|
run_part(cmd + ["1"], "Run 1", {"input": input_file})
|
|
input_file.seek(0)
|
|
if (select_part and select_part == "2") or not select_part:
|
|
run_part(cmd + ["2"], "Run 2", {"input": input_file})
|
|
|
|
input_file.close()
|
|
|
|
if cleanup is not None:
|
|
cleanup()
|
|
|
|
@staticmethod
|
|
def bench(fpath: str, n: int = 100):
|
|
try:
|
|
os.stat(fpath)
|
|
except FileNotFoundError:
|
|
print(f"Could not stat {fpath}", file=sys.stderr)
|
|
raise SystemExit(1)
|
|
|
|
file_extension = fpath.split(".")[-1].lower()
|
|
|
|
challenge_dir = Path(os.path.dirname(fpath))
|
|
input_file = open(challenge_dir / "input.txt", "rb")
|
|
|
|
cmd, cleanup = get_runner_command(fpath)
|
|
|
|
benchmark_file = (
|
|
Path(CHALLENGES_DIR) / challenge_dir.parts[1] / "benchmarks.jsonl"
|
|
)
|
|
benchmark_fd = open(benchmark_file, "a")
|
|
|
|
for part in ["1", "2"]:
|
|
durs = []
|
|
r_c = cmd + [part]
|
|
for _ in tqdm(range(n), ncols=0, leave=False, desc=f"Part {part}"):
|
|
exit_status, run_duration = time_command(r_c, stdin=input_file)
|
|
if exit_status != 0:
|
|
set_terminal_colour("red")
|
|
print(f"Exited with a non-zero status code ({exit_status})")
|
|
set_terminal_colour("reset")
|
|
return
|
|
input_file.seek(0)
|
|
durs.append(run_duration)
|
|
|
|
mi, mx, avg = min(durs), max(durs), sum(durs) / len(durs)
|
|
|
|
json.dump(
|
|
{
|
|
"day": int(challenge_dir.parts[-1].split("-")[0]),
|
|
"part": int(part),
|
|
"runner": file_extension,
|
|
"min": mi,
|
|
"max": mx,
|
|
"avg": avg,
|
|
"n": n,
|
|
},
|
|
benchmark_fd,
|
|
)
|
|
benchmark_fd.write("\n")
|
|
|
|
print(
|
|
f"Part {part}: min {round(mi, 4)} seconds, max {round(mx, 4)} seconds, avg {round(avg, 4)}"
|
|
)
|
|
|
|
benchmark_fd.close()
|
|
input_file.close()
|
|
|
|
if cleanup is not None:
|
|
cleanup()
|
|
|
|
@staticmethod
|
|
def addtest(year: int, day: int, part: int, output: str):
|
|
"""
|
|
Add a test to a challenge's test case file
|
|
"""
|
|
|
|
p = Path(CHALLENGES_DIR)
|
|
p /= str(year)
|
|
p /= glob(str(day).zfill(2) + "-*", root_dir=p)[0]
|
|
p /= "tests.json"
|
|
|
|
existing_tests = {}
|
|
|
|
try:
|
|
with open(p) as f:
|
|
existing_tests = json.load(f)
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
if (sp := str(part)) not in existing_tests:
|
|
existing_tests[sp] = []
|
|
|
|
existing_tests[sp].append(
|
|
{
|
|
"is": str(output),
|
|
"input": sys.stdin.read(),
|
|
}
|
|
)
|
|
|
|
with open(p, "w") as f:
|
|
json.dump(existing_tests, f, indent=" ")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
fire.Fire(CLI)
|