Add watcher stuff
Signed-off-by: AKP <tom@tdpain.net>
This commit is contained in:
parent
c81382e603
commit
f6fc2d378b
2 changed files with 133 additions and 4 deletions
32
circuit-laundry-notifier/watcher.py
Normal file
32
circuit-laundry-notifier/watcher.py
Normal file
|
@ -0,0 +1,32 @@
|
|||
from dataclasses import dataclass
|
||||
import multiprocessing
|
||||
import time
|
||||
from typing import *
|
||||
|
||||
|
||||
class Watcher:
|
||||
_queue: multiprocessing.Array
|
||||
_lock: multiprocessing.Lock
|
||||
|
||||
def __init__(self):
|
||||
self._lock = multiprocessing.Lock()
|
||||
self._queue = multiprocessing.Array(lock=False)
|
||||
|
||||
def start(self):
|
||||
multiprocessing.Process(
|
||||
target=self._worker, args=(self._queue, self._lock)
|
||||
).start()
|
||||
|
||||
@staticmethod
|
||||
def _worker(jobs: multiprocessing.Array, lock: multiprocessing.Lock):
|
||||
while True:
|
||||
time.sleep(60)
|
||||
|
||||
lock.acquire()
|
||||
|
||||
jobs.pop()
|
||||
|
||||
lock.release()
|
||||
|
||||
def register(self, ws: WatchState):
|
||||
self._queue.append(ws)
|
|
@ -1,6 +1,11 @@
|
|||
import uuid
|
||||
from dataclasses import dataclass
|
||||
from threading import Lock
|
||||
from typing import *
|
||||
|
||||
import flask
|
||||
|
||||
from circuit_scraper import CircuitScraper, ScraperError
|
||||
from circuit_scraper import CircuitScraper, Machine, ScraperError, MachineState
|
||||
|
||||
|
||||
def new() -> flask.Flask:
|
||||
|
@ -9,11 +14,22 @@ def new() -> flask.Flask:
|
|||
app.add_url_rule(
|
||||
"/api/v1/machines/<site_id>", view_func=_api_get_machines, methods=["GET"]
|
||||
)
|
||||
app.add_url_rule(
|
||||
"/api/v1/machines/<site_id>/watch",
|
||||
view_func=_api_register_watcher,
|
||||
methods=["POST"],
|
||||
)
|
||||
app.add_url_rule(
|
||||
"/api/v1/watcher", view_func=_api_list_watched, methods=["GET"]
|
||||
)
|
||||
app.add_url_rule(
|
||||
"/api/v1/watcher/run", view_func=_api_run_watcher, methods=["POST"]
|
||||
)
|
||||
|
||||
return app
|
||||
|
||||
|
||||
def _api_get_machines(site_id: str):
|
||||
def _api_get_machines(site_id: str) -> Union[any, Tuple[any, int]]:
|
||||
try:
|
||||
machines = CircuitScraper.get_site_machine_states(site_id)
|
||||
except ScraperError:
|
||||
|
@ -32,9 +48,90 @@ def _api_get_machines(site_id: str):
|
|||
machine.to_dict()
|
||||
for machine in sorted(
|
||||
machines,
|
||||
# It was annoying me that the machines weren't sorted by number. This sorts the machines by number, taking into
|
||||
# account the fact that some machine numbers include letters, which are quietly ignored.
|
||||
# It was annoying me that the machines weren't sorted by number. This sorts the machines by number,
|
||||
# taking into account the fact that some machine numbers include letters, which are quietly ignored.
|
||||
key=lambda x: int("".join(filter(lambda y: y.isdigit(), x.number))),
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class WatchState:
|
||||
site_id: str
|
||||
ntfy_topic: str
|
||||
machine_number: str
|
||||
|
||||
def _to_dict(self) -> Dict[str, str]:
|
||||
return {
|
||||
"site_id": self.site_id,
|
||||
"ntfy_topic": self.ntfy_topic,
|
||||
"machine_number": self.machine_number,
|
||||
}
|
||||
|
||||
|
||||
jobs: List[WatchState] = []
|
||||
job_lock = Lock()
|
||||
|
||||
|
||||
def _api_register_watcher(site_id: str) -> Union[any, Tuple[any, int]]:
|
||||
ntfy_topic = flask.request.form.get("ntfy_topic", str(uuid.uuid4()))
|
||||
machine_number = flask.request.form.get("machine_number")
|
||||
|
||||
if machine_number is None:
|
||||
return flask.jsonify({"ok": False, "message": "missing machine_number"}), 400
|
||||
|
||||
job_lock.acquire()
|
||||
jobs.append(WatchState(site_id, ntfy_topic, machine_number))
|
||||
job_lock.release()
|
||||
|
||||
return "", 204
|
||||
|
||||
|
||||
def _api_list_watched() -> Union[any, Tuple[any, int]]:
|
||||
job_lock.acquire()
|
||||
res = flask.jsonify(jobs)
|
||||
job_lock.release()
|
||||
return res
|
||||
|
||||
|
||||
def _api_run_watcher() -> Union[any, Tuple[any, int]]:
|
||||
global jobs, job_lock
|
||||
|
||||
job_lock.acquire()
|
||||
|
||||
seen_sites: Dict[str, List[Machine]] = {}
|
||||
completed_items: List[int] = []
|
||||
|
||||
for i, ws in enumerate(jobs):
|
||||
site_machines = seen_sites.get(ws.site_id)
|
||||
if site_machines is None:
|
||||
try:
|
||||
site_machines = CircuitScraper.get_site_machine_states(ws.site_id)
|
||||
seen_sites[ws.site_id] = site_machines
|
||||
except ScraperError:
|
||||
completed_items.append(i)
|
||||
continue
|
||||
|
||||
target_machine: Optional[Machine] = None
|
||||
for machine in site_machines:
|
||||
if machine.number.lower() == ws.machine_number.lower():
|
||||
target_machine = machine
|
||||
break
|
||||
|
||||
if target_machine is None:
|
||||
completed_items.append(i)
|
||||
continue
|
||||
|
||||
if target_machine.state == MachineState.Completed:
|
||||
# TODO: Notify ntfy
|
||||
print("DONE " + str(ws))
|
||||
completed_items.append(i)
|
||||
continue
|
||||
|
||||
for x in list(reversed(sorted(completed_items))):
|
||||
jobs.pop(x)
|
||||
|
||||
job_lock.release()
|
||||
|
||||
return "", 204
|
||||
|
|
Reference in a new issue