adventOfCode/aoc
2024-12-07 02:05:09 +00:00

404 lines
11 KiB
Python

#!/usr/bin/env python3
import fire
import os
from pathlib import Path
import re
import requests
import sys
import json
import subprocess
from io import BufferedReader
from typing import Optional, Callable
from glob import glob
import time
from tqdm import tqdm
CHALLENGES_DIR = "challenges"
SAMPLE_TEST_JSON = "{}"
RUNNERS = {
"py": (None, ["./runners/py.sh"]),
"go": (["./runners/buildGo.sh"], None),
"kt": (["./runners/buildKotlin.sh"], ["./runners/jar.sh"]),
}
def convert_to_camel_case(inp: str) -> str:
parts = list(
map(lambda x: x.lower(), filter(lambda x: len(x) != 0, inp.split(" ")))
)
for i, st in enumerate(parts):
if i == 0:
continue
parts[i] = st[0].upper() + st[1:]
return "".join(parts)
def filter_for_filename(inp: str) -> str:
return "".join(filter(lambda x: x.isalpha() or x.isdigit() or x == "-", inp))
def load_credentials() -> dict[str, str]:
with open("credentials.json") as f:
return json.load(f)
def set_terminal_colour(*colours: str):
colcodes = {
"bold": "1",
"italic": "3",
"red": "31",
"green": "32",
"grey": "37",
"reset": "0",
}
for colour in colours:
if colour not in colcodes:
continue
sys.stdout.write(f"\033[{colcodes[colour]}m")
sys.stdout.flush()
known_runs = {}
def time_command(args: list[str], stdin=None) -> tuple[int, float]:
kwargs = {}
if type(stdin) == str:
kwargs["input"] = stdin.encode()
else:
kwargs["stdin"] = stdin
st = time.time()
proc = subprocess.run(args, stdout=subprocess.PIPE, **kwargs)
dur = time.time() - st
return proc.returncode, dur
def run_command(args: list[str], stdin=None, cache=True) -> tuple[int, str]:
if cache:
ah = hash("".join(args))
sh = hash(stdin)
if (ah, sh) in known_runs:
return known_runs[(ah, sh)]
kwargs = {}
if type(stdin) == str:
kwargs["input"] = stdin.encode()
else:
kwargs["stdin"] = stdin
proc = subprocess.run(args, stdout=subprocess.PIPE, **kwargs)
if cache:
known_runs[((ah, sh))] = (proc.returncode, proc.stdout)
return proc.returncode, proc.stdout
def format_result(v: str) -> str:
if len(v) == len(
"".join(filter(lambda x: x.isalpha() or x.isdigit() or x == "-", v))
):
return v
return repr(v)
def run_part(command: list[str], label: str, spec: dict[str, str | BufferedReader]):
set_terminal_colour("grey")
exit_status, buf_cont = run_command(command, stdin=spec.get("input"))
set_terminal_colour("reset")
print(f"{label}: ", end="")
if exit_status != 0:
set_terminal_colour("red")
print(f"exited with a non-zero status code ({exit_status})")
set_terminal_colour("reset")
return
result_str = buf_cont.decode().strip()
formatted_result_str = format_result(result_str)
if result_str == "":
set_terminal_colour("red")
print("nothing outputted")
set_terminal_colour("reset")
return
else:
if expected := spec.get("is"):
if expected == result_str:
set_terminal_colour("green")
print(f"pass", end="")
set_terminal_colour("grey")
print(f" ({formatted_result_str})")
else:
set_terminal_colour("red")
print(f"fail", end="")
set_terminal_colour("grey")
print(
f" (got {formatted_result_str}, expected {format_result(expected)})"
)
set_terminal_colour("reset")
else:
print(formatted_result_str)
def get_runner_command(
file_name: str,
) -> tuple[list[str], Optional[Callable[[], None]]]:
"""
Builds a solution using `command` then returns a path to the executable.
"""
file_extension = file_name.split(".")[-1].lower()
if file_extension not in RUNNERS:
print("No compatible runner found", file=sys.stderr)
raise SystemExit(1)
(runner_build, runner_run) = RUNNERS[file_extension]
if runner_build is None:
if runner_run is not None:
return runner_run + [file_name], None
print(f"No build or run command specified for runner {file_extension}")
raise SystemExit(1)
# if runner_run is not None and runner_build is not None:
# print(
# f"Build command and run command specified for {file_extension} - cannot determine path forwards."
# )
# raise SystemExit(1)
command = runner_build + [file_name]
set_terminal_colour("grey")
print("Building...", end="\r")
set_terminal_colour("reset")
exit_code, fpath = run_command(command, cache=False)
if exit_code != 0:
print(f"Failed to build: `{command}` returned exit code {exit_code}")
raise SystemExit(1)
fpstr = fpath.decode().strip()
if runner_run is None:
return [fpstr], lambda: os.unlink(fpstr)
return runner_run + [fpstr], lambda: os.unlink(fpstr)
class CLI(object):
@staticmethod
def init(year: int, day: int):
"""
Initialise a day's AoC challenge
"""
# Load day's page to verify that it has been released and to get the
# challenge title
day_url = f"https://adventofcode.com/{year}/day/{day}"
page_resp = requests.get(day_url)
if page_resp.status_code == 404:
print(
"Challenge page not found: has that day been released yet?",
file=sys.stderr,
)
raise SystemExit(1)
page_resp.raise_for_status()
matches = re.findall(
r"--- Day \d{1,2}: ([\w\- \?!]+) ---", page_resp.content.decode()
)
assert len(matches) >= 1, "must be able to discover at least one day title"
day_title = matches[0].replace("-", " ")
# Work out the challenge's directory.
p = Path(CHALLENGES_DIR)
p /= str(year)
p /= (
str(day).zfill(2)
+ "-"
+ filter_for_filename(convert_to_camel_case(day_title))
)
os.makedirs(p)
# Drop a basic README and tests file
with open(p / "README.md", "w") as f:
f.write(f"# [Day {day}: {day_title}]({day_url})\n")
with open(p / "tests.json", "w") as f:
f.write(SAMPLE_TEST_JSON)
# Download input and drop it in the challenge's directory
creds = load_credentials()
input_resp = requests.get(
day_url + "/input",
cookies={"session": creds["session"]},
headers={"User-Agent": creds["userAgent"]},
)
input_resp.raise_for_status()
with open(p / "input.txt", "wb") as f:
f.write(input_resp.content)
# Output the challenge's directory
print(p)
@staticmethod
def run(
fpath: str,
test_only: bool = False,
no_test: bool = False,
select_part: Optional[int] = None,
):
"""
Execute a day's code
"""
if test_only and no_test:
print(
f"Conflicting arguments (test-only and no-test both set)",
file=sys.stderr,
)
raise SystemExit(1)
if select_part:
select_part = str(select_part)
try:
os.stat(fpath)
except FileNotFoundError:
print(f"Could not stat {fpath}", file=sys.stderr)
raise SystemExit(1)
cmd, cleanup = get_runner_command(fpath)
challenge_dir = Path(os.path.dirname(fpath))
input_file = open(challenge_dir / "input.txt", "rb")
if test_only or not no_test:
test_specs = json.load(open(challenge_dir / "tests.json"))
for part in ["1", "2"]:
if select_part and select_part != part:
continue
for i, spec in enumerate(test_specs.get(part, [])):
run_part(cmd + [part], f"Test {part}.{i+1}", spec)
if no_test or not test_only:
if (select_part and select_part == "1") or not select_part:
run_part(cmd + ["1"], "Run 1", {"input": input_file})
input_file.seek(0)
if (select_part and select_part == "2") or not select_part:
run_part(cmd + ["2"], "Run 2", {"input": input_file})
input_file.close()
if cleanup is not None:
cleanup()
@staticmethod
def bench(fpath: str, n: int = 100):
try:
os.stat(fpath)
except FileNotFoundError:
print(f"Could not stat {fpath}", file=sys.stderr)
raise SystemExit(1)
file_extension = fpath.split(".")[-1].lower()
challenge_dir = Path(os.path.dirname(fpath))
input_file = open(challenge_dir / "input.txt", "rb")
cmd, cleanup = get_runner_command(fpath)
benchmark_file = (
Path(CHALLENGES_DIR) / challenge_dir.parts[1] / "benchmarks.jsonl"
)
benchmark_fd = open(benchmark_file, "a")
for part in ["1", "2"]:
durs = []
r_c = cmd + [part]
for _ in tqdm(range(n), ncols=0, leave=False, desc=f"Part {part}"):
exit_status, run_duration = time_command(r_c, stdin=input_file)
if exit_status != 0:
set_terminal_colour("red")
print(f"Exited with a non-zero status code ({exit_status})")
set_terminal_colour("reset")
return
input_file.seek(0)
durs.append(run_duration)
mi, mx, avg = min(durs), max(durs), sum(durs) / len(durs)
json.dump(
{
"day": int(challenge_dir.parts[-1].split("-")[0]),
"part": int(part),
"runner": file_extension,
"min": mi,
"max": mx,
"avg": avg,
"n": n,
},
benchmark_fd,
)
benchmark_fd.write("\n")
print(
f"Part {part}: min {round(mi, 4)} seconds, max {round(mx, 4)} seconds, avg {round(avg, 4)}"
)
benchmark_fd.close()
input_file.close()
if cleanup is not None:
cleanup()
@staticmethod
def addtest(year: int, day: int, part: int, output: str):
"""
Add a test to a challenge's test case file
"""
p = Path(CHALLENGES_DIR)
p /= str(year)
p /= glob(str(day).zfill(2) + "-*", root_dir=p)[0]
p /= "tests.json"
existing_tests = {}
try:
with open(p) as f:
existing_tests = json.load(f)
except FileNotFoundError:
pass
if (sp := str(part)) not in existing_tests:
existing_tests[sp] = []
existing_tests[sp].append(
{
"is": str(output),
"input": sys.stdin.read(),
}
)
with open(p, "w") as f:
json.dump(existing_tests, f, indent=" ")
if __name__ == "__main__":
fire.Fire(CLI)