#!/usr/bin/env python3 import fire import os from pathlib import Path import re import requests import sys import json import subprocess from io import BufferedReader from typing import Optional, Callable from glob import glob import time from tqdm import tqdm CHALLENGES_DIR = "challenges" SAMPLE_TEST_JSON = "{}" RUNNERS = { "py": (None, ["./runners/py.sh"]), "go": (["./runners/buildGo.sh"], None), } def convert_to_camel_case(inp: str) -> str: parts = list( map(lambda x: x.lower(), filter(lambda x: len(x) != 0, inp.split(" "))) ) for i, st in enumerate(parts): if i == 0: continue parts[i] = st[0].upper() + st[1:] return "".join(parts) def filter_for_filename(inp: str) -> str: return "".join(filter(lambda x: x.isalpha() or x.isdigit() or x == "-", inp)) def load_credentials() -> dict[str, str]: with open("credentials.json") as f: return json.load(f) def set_terminal_colour(*colours: str): colcodes = { "bold": "1", "italic": "3", "red": "31", "green": "32", "grey": "37", "reset": "0", } for colour in colours: if colour not in colcodes: continue sys.stdout.write(f"\033[{colcodes[colour]}m") sys.stdout.flush() known_runs = {} def time_command(args: list[str], stdin=None) -> tuple[int, float]: kwargs = {} if type(stdin) == str: kwargs["input"] = stdin.encode() else: kwargs["stdin"] = stdin st = time.time() proc = subprocess.run(args, stdout=subprocess.PIPE, **kwargs) dur = time.time() - st return proc.returncode, dur def run_command(args: list[str], stdin=None, cache=True) -> tuple[int, str]: if cache: ah = hash("".join(args)) sh = hash(stdin) if (ah, sh) in known_runs: return known_runs[(ah, sh)] kwargs = {} if type(stdin) == str: kwargs["input"] = stdin.encode() else: kwargs["stdin"] = stdin proc = subprocess.run(args, stdout=subprocess.PIPE, **kwargs) if cache: known_runs[((ah, sh))] = (proc.returncode, proc.stdout) return proc.returncode, proc.stdout def format_result(v: str) -> str: if len(v) == len( "".join(filter(lambda x: x.isalpha() or x.isdigit() or x == "-", v)) ): return v return repr(v) def run_part(command: list[str], label: str, spec: dict[str, str | BufferedReader]): set_terminal_colour("grey") exit_status, buf_cont = run_command(command, stdin=spec.get("input")) set_terminal_colour("reset") print(f"{label}: ", end="") if exit_status != 0: set_terminal_colour("red") print(f"exited with a non-zero status code ({exit_status})") set_terminal_colour("reset") return result_str = buf_cont.decode().strip() formatted_result_str = format_result(result_str) if result_str == "": set_terminal_colour("red") print("nothing outputted") set_terminal_colour("reset") return else: if expected := spec.get("is"): if expected == result_str: set_terminal_colour("green") print(f"pass", end="") set_terminal_colour("grey") print(f" ({formatted_result_str})") else: set_terminal_colour("red") print(f"fail", end="") set_terminal_colour("grey") print( f" (got {formatted_result_str}, expected {format_result(expected)})" ) set_terminal_colour("reset") else: print(formatted_result_str) def get_runner_command( file_name: str, ) -> tuple[list[str], Optional[Callable[[], None]]]: """ Builds a solution using `command` then returns a path to the executable. """ file_extension = file_name.split(".")[-1].lower() if file_extension not in RUNNERS: print("No compatible runner found", file=sys.stderr) raise SystemExit(1) (runner_build, runner_run) = RUNNERS[file_extension] if runner_build is None: if runner_run is not None: return runner_run + [file_name], None print(f"No build or run command specified for runner {file_extension}") raise SystemExit(1) if runner_run is not None and runner_build is not None: print( f"Build command and run command specified for {file_extension} - cannot determine path forwards." ) raise SystemExit(1) command = runner_build + [file_name] set_terminal_colour("grey") print("Building...", end="\r") set_terminal_colour("reset") exit_code, fpath = run_command(command, cache=False) if exit_code != 0: print(f"Failed to build: `{command}` returned exit code {exit_code}") raise SystemExit(1) fpstr = fpath.decode().strip() return [fpstr], lambda: os.unlink(fpstr) class CLI(object): @staticmethod def init(year: int, day: int): """ Initialise a day's AoC challenge """ # Load day's page to verify that it has been released and to get the # challenge title day_url = f"https://adventofcode.com/{year}/day/{day}" page_resp = requests.get(day_url) if page_resp.status_code == 404: print( "Challenge page not found: has that day been released yet?", file=sys.stderr, ) raise SystemExit(1) page_resp.raise_for_status() matches = re.findall( r"--- Day \d{1,2}: ([\w\- \?!]+) ---", page_resp.content.decode() ) assert len(matches) >= 1, "must be able to discover at least one day title" day_title = matches[0].replace("-", " ") # Work out the challenge's directory. p = Path(CHALLENGES_DIR) p /= str(year) p /= ( str(day).zfill(2) + "-" + filter_for_filename(convert_to_camel_case(day_title)) ) os.makedirs(p) # Drop a basic README and tests file with open(p / "README.md", "w") as f: f.write(f"# [Day {day}: {day_title}]({day_url})\n") with open(p / "tests.json", "w") as f: f.write(SAMPLE_TEST_JSON) # Download input and drop it in the challenge's directory creds = load_credentials() input_resp = requests.get( day_url + "/input", cookies={"session": creds["session"]}, headers={"User-Agent": creds["userAgent"]}, ) input_resp.raise_for_status() with open(p / "input.txt", "wb") as f: f.write(input_resp.content) # Output the challenge's directory print(p) @staticmethod def run( fpath: str, test_only: bool = False, no_test: bool = False, select_part: Optional[int] = None, ): """ Execute a day's code """ if test_only and no_test: print( f"Conflicting arguments (test-only and no-test both set)", file=sys.stderr, ) raise SystemExit(1) if select_part: select_part = str(select_part) try: os.stat(fpath) except FileNotFoundError: print(f"Could not stat {fpath}", file=sys.stderr) raise SystemExit(1) cmd, cleanup = get_runner_command(fpath) challenge_dir = Path(os.path.dirname(fpath)) input_file = open(challenge_dir / "input.txt", "rb") if test_only or not no_test: test_specs = json.load(open(challenge_dir / "tests.json")) for part in ["1", "2"]: if select_part and select_part != part: continue for i, spec in enumerate(test_specs.get(part, [])): run_part(cmd + [part], f"Test {part}.{i+1}", spec) if no_test or not test_only: if (select_part and select_part == "1") or not select_part: run_part(cmd + ["1"], "Run 1", {"input": input_file}) input_file.seek(0) if (select_part and select_part == "2") or not select_part: run_part(cmd + ["2"], "Run 2", {"input": input_file}) input_file.close() if cleanup is not None: cleanup() @staticmethod def bench(fpath: str, n: int = 100): try: os.stat(fpath) except FileNotFoundError: print(f"Could not stat {fpath}", file=sys.stderr) raise SystemExit(1) file_extension = fpath.split(".")[-1].lower() challenge_dir = Path(os.path.dirname(fpath)) input_file = open(challenge_dir / "input.txt", "rb") cmd, cleanup = get_runner_command(fpath) benchmark_file = ( Path(CHALLENGES_DIR) / challenge_dir.parts[1] / "benchmarks.jsonl" ) benchmark_fd = open(benchmark_file, "a") for part in ["1", "2"]: durs = [] r_c = cmd + [part] for _ in tqdm(range(n), ncols=0, leave=False, desc=f"Part {part}"): exit_status, run_duration = time_command(r_c, stdin=input_file) if exit_status != 0: set_terminal_colour("red") print(f"Exited with a non-zero status code ({exit_status})") set_terminal_colour("reset") return input_file.seek(0) durs.append(run_duration) mi, mx, avg = min(durs), max(durs), sum(durs) / len(durs) json.dump( { "day": int(challenge_dir.parts[-1].split("-")[0]), "part": int(part), "runner": file_extension, "min": mi, "max": mx, "avg": avg, "n": n, }, benchmark_fd, ) benchmark_fd.write("\n") print( f"Part {part}: min {round(mi, 4)} seconds, max {round(mx, 4)} seconds, avg {round(avg, 4)}" ) benchmark_fd.close() input_file.close() if cleanup is not None: cleanup() @staticmethod def addtest(year: int, day: int, part: int, output: str): """ Add a test to a challenge's test case file """ p = Path(CHALLENGES_DIR) p /= str(year) p /= glob(str(day).zfill(2) + "-*", root_dir=p)[0] p /= "tests.json" existing_tests = {} try: with open(p) as f: existing_tests = json.load(f) except FileNotFoundError: pass if (sp := str(part)) not in existing_tests: existing_tests[sp] = [] existing_tests[sp].append( { "is": str(output), "input": sys.stdin.read(), } ) with open(p, "w") as f: json.dump(existing_tests, f, indent=" ") if __name__ == "__main__": fire.Fire(CLI)