This repository has been archived on 2025-07-20. You can view files and clone it, but you cannot make any changes to it's state, such as pushing and creating new issues, pull requests or comments.
analytics/datasette_plugin/analytics.py
AKP ff5ed1348c
Switch back to using proxy-based authentication
Signed-off-by: AKP <tom@tdpain.net>
2023-04-05 17:52:44 +01:00

187 lines
6.3 KiB
Python

import os
import random
import string
from datasette import hookimpl, Response
from dominate.tags import *
import yaml
MODAL_TARGET_ID = "modal-target"
with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), "script.js")) as f:
script_src = f.read()
with open("config.yml") as f:
configs = yaml.load(f, yaml.BaseLoader)
required_user_group = configs.get("datasette", {}).get("edit", {}).get("requiredUserGroups", "")
def is_request_authed(request):
return is_actor_authed(request.actor)
def is_actor_authed(actor):
if actor is None:
return False
return required_user_group in actor.get("groups", {})
@hookimpl
def actor_from_request(request):
# This is designed to convert Authentik proxy provider headers into an actor.
# We're assuming every request is authenticated because that's how that provider works.
rh = request.headers
return {
"id": rh.get("x-authentik-uid"),
"name": rh.get("x-authentik-name"),
"username": rh.get("x-authentik-username"),
"email": rh.get("x-authentik-email"),
"groups": rh.get("x-authentik-groups", "").split("|"),
}
@hookimpl
def permission_allowed(actor, action):
if action == "execute-sql" or action == "permissions-debug" or action == "debug-menu":
return is_actor_authed(actor)
@hookimpl
def extra_body_script(database, table, view_name, request, datasette):
if not is_request_authed(request):
return None
if not (view_name == "table" and table == "config"):
return None
res = script_src.replace("{{#url}}", datasette.urls.path(datasette.urls.table(database, table) + "/-/config-editor/view"))
res = res.replace("{{#id}}", MODAL_TARGET_ID)
return {
"script": res,
}
@hookimpl
def register_routes():
return [
(r"^/(?P<database>.*)/(?P<table>.*)/-/config-editor/view$", Handlers.view_all),
(r"^/(?P<database>.*)/(?P<table>.*)/-/config-editor/close$", Handlers.close_modal),
(r"^/(?P<database>.*)/(?P<table>.*)/-/config-editor/edit/(?P<key>.*)$", Handlers.edit_record),
]
def generate_modal(datasette, request, config_options):
d = div(id="editor-modal", style="width: 100%; height: 100%; position: absolute; top: 50%; left: 50%; transform: translate(-50%, -50%); background-color: rgba(0, 0, 0, 0.25);")
_d = div(style="padding: 15px; height: 280px; width: 570px; position: absolute; top: 50%; left: 50%; transform: translate(-50%, -50%); background-color: white; border-radius: 0.5ch;")
d.add(_d)
h = h3("Edit values", close := a("(close)", href="#"))
close["hx-get"] = datasette.urls.path(datasette.urls.table(request.url_vars["database"], request.url_vars["table"]) + "/-/config-editor/close")
close["hx-target"] = "#" + MODAL_TARGET_ID
_d.add(h)
para = p()
for i, opt in enumerate(config_options):
para.add(generate_setting_span(datasette, request, opt["id"], opt["value"]))
if i != len(config_options) - 1:
para.add(br())
_d.add(para)
return d
def generate_setting_span(datasette, request, setting_id, value):
anchor = a("(edit)", href="#")
anchor["hx-get"] = datasette.urls.path(datasette.urls.table(request.url_vars["database"], request.url_vars["table"]) + "/-/config-editor/edit/" + setting_id)
sp = span(code(setting_id), ": ", code(value), anchor)
sp["hx-target"] = "this"
return sp
def generate_edit_span(datasette, request, setting_id, current_value):
input_id = "config-edit-" + generate_random_string(10)
csrf_token_id = "config-edit-" + generate_random_string(10)
entry_box = input_(type="text", name="new_value", value=current_value, _id=input_id)
csrf_box = input_(_id=csrf_token_id, type="hidden", name="csrftoken", value=request.cookies.get("ds_csrftoken", ""))
anchor = a("(save)", href="#")
anchor["hx-post"] = datasette.urls.path(datasette.urls.table(request.url_vars["database"], request.url_vars["table"]) + "/-/config-editor/edit/" + setting_id)
anchor["hx-include"] = f"#{input_id},#{csrf_token_id}"
sp = span(code(setting_id), ": ", entry_box, anchor, csrf_box)
sp["hx-target"] = "this"
return sp
class Handlers:
@staticmethod
def not_authed_response():
return Response(
"Forbidden",
status=403,
)
@staticmethod
async def view_all(datasette, request):
if not is_request_authed(request):
return Handlers.not_authed_response()
config_options = await datasette. \
get_database(request.url_vars["database"]). \
execute("""SELECT "id", "value" FROM "config";""")
return Response.html(
generate_modal(datasette, request, config_options).render(),
)
@staticmethod
async def close_modal(request):
if not is_request_authed(request):
return Handlers.not_authed_response()
return Response(
"",
status=204,
headers={"hx-refresh": "true"},
)
@staticmethod
async def edit_record(datasette, request):
if not is_request_authed(request):
return Handlers.not_authed_response()
setting_id = request.url_vars["key"]
current_value = await datasette. \
get_database(request.url_vars["database"]). \
execute("""SELECT "value" FROM "config" WHERE "id"=?;""", (setting_id,))
current_value = list(current_value)[0]["value"]
if request.method == "GET":
return Response.html(
generate_edit_span(datasette, request, setting_id, current_value).render()
)
elif request.method == "POST":
vars = await request.post_vars()
new_value = vars["new_value"]
key = request.url_vars["key"]
await datasette.get_database(request.url_vars["database"]).execute_write("""UPDATE "config" SET "value" = ? WHERE "id" = ?""", (new_value, key))
return Response.html(
generate_setting_span(datasette, request, key, new_value).render()
)
return Response(
"method not allowed",
status=405,
content_type="text/html",
)
def generate_random_string(n: int) -> str:
return "".join(random.choices(string.ascii_letters, k=n))