From 0db599df32bf1a5b059d28142a97d3969d30ede7 Mon Sep 17 00:00:00 2001 From: AKP Date: Wed, 5 Apr 2023 15:35:11 +0100 Subject: [PATCH] Add log prune worker Signed-off-by: AKP --- ingest/main.go | 3 +++ ingest/worker/worker.go | 59 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+) create mode 100644 ingest/worker/worker.go diff --git a/ingest/main.go b/ingest/main.go index d9f957d..8f52bee 100644 --- a/ingest/main.go +++ b/ingest/main.go @@ -4,6 +4,7 @@ import ( "github.com/codemicro/analytics/ingest/config" "github.com/codemicro/analytics/ingest/db" "github.com/codemicro/analytics/ingest/ingest" + "github.com/codemicro/analytics/ingest/worker" "github.com/rs/zerolog/log" "os" "os/signal" @@ -27,6 +28,8 @@ func run() error { return err } + worker.Start(database) + ig, err := ingest.Start(conf, database) if err != nil { return err diff --git a/ingest/worker/worker.go b/ingest/worker/worker.go new file mode 100644 index 0000000..48e7a49 --- /dev/null +++ b/ingest/worker/worker.go @@ -0,0 +1,59 @@ +package worker + +import ( + "context" + "github.com/codemicro/analytics/ingest/db" + "github.com/codemicro/analytics/ingest/db/models" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + "time" +) + +const interval = time.Hour + +func Start(db *db.DB) { + ticker := time.NewTicker(interval) + logger := log.Logger.With().Str("location", "worker").Logger() + run(db, logger) + go func() { + for { + <-ticker.C + run(db, logger) + } + }() +} + +func run(db *db.DB, logger zerolog.Logger) { + logger.Info().Msg("running worker") + + tx, err := db.DB.Begin() + if err != nil { + logger.Err(err).Msg("unable to open session") + _ = tx.Rollback() + return + } + + _, err = tx.NewDelete().Model(&models.Request{}). + Where(`datetime() > datetime("time", (select value from config where id='prune_requests_after'))`). + Exec(context.Background()) + if err != nil { + logger.Err(err).Msg("failed to run request delete query") + _ = tx.Rollback() + return + } + + _, err = tx.NewDelete().Model(&models.Session{}). + Where(`(SELECT COUNT(*) FROM requests WHERE "session_id" = "session"."id") = 0`). + Exec(context.Background()) + if err != nil { + logger.Err(err).Msg("failed to run session delete query") + _ = tx.Rollback() + return + } + + if err := tx.Commit(); err != nil { + logger.Err(err).Msg("failed to commit transaction") + _ = tx.Rollback() + return + } +}