Add log prune worker
Signed-off-by: AKP <tom@tdpain.net>
This commit is contained in:
parent
f8b9c80ebd
commit
0db599df32
2 changed files with 62 additions and 0 deletions
|
@ -4,6 +4,7 @@ import (
|
|||
"github.com/codemicro/analytics/ingest/config"
|
||||
"github.com/codemicro/analytics/ingest/db"
|
||||
"github.com/codemicro/analytics/ingest/ingest"
|
||||
"github.com/codemicro/analytics/ingest/worker"
|
||||
"github.com/rs/zerolog/log"
|
||||
"os"
|
||||
"os/signal"
|
||||
|
@ -27,6 +28,8 @@ func run() error {
|
|||
return err
|
||||
}
|
||||
|
||||
worker.Start(database)
|
||||
|
||||
ig, err := ingest.Start(conf, database)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
59
ingest/worker/worker.go
Normal file
59
ingest/worker/worker.go
Normal file
|
@ -0,0 +1,59 @@
|
|||
package worker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/codemicro/analytics/ingest/db"
|
||||
"github.com/codemicro/analytics/ingest/db/models"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/rs/zerolog/log"
|
||||
"time"
|
||||
)
|
||||
|
||||
const interval = time.Hour
|
||||
|
||||
func Start(db *db.DB) {
|
||||
ticker := time.NewTicker(interval)
|
||||
logger := log.Logger.With().Str("location", "worker").Logger()
|
||||
run(db, logger)
|
||||
go func() {
|
||||
for {
|
||||
<-ticker.C
|
||||
run(db, logger)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func run(db *db.DB, logger zerolog.Logger) {
|
||||
logger.Info().Msg("running worker")
|
||||
|
||||
tx, err := db.DB.Begin()
|
||||
if err != nil {
|
||||
logger.Err(err).Msg("unable to open session")
|
||||
_ = tx.Rollback()
|
||||
return
|
||||
}
|
||||
|
||||
_, err = tx.NewDelete().Model(&models.Request{}).
|
||||
Where(`datetime() > datetime("time", (select value from config where id='prune_requests_after'))`).
|
||||
Exec(context.Background())
|
||||
if err != nil {
|
||||
logger.Err(err).Msg("failed to run request delete query")
|
||||
_ = tx.Rollback()
|
||||
return
|
||||
}
|
||||
|
||||
_, err = tx.NewDelete().Model(&models.Session{}).
|
||||
Where(`(SELECT COUNT(*) FROM requests WHERE "session_id" = "session"."id") = 0`).
|
||||
Exec(context.Background())
|
||||
if err != nil {
|
||||
logger.Err(err).Msg("failed to run session delete query")
|
||||
_ = tx.Rollback()
|
||||
return
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
logger.Err(err).Msg("failed to commit transaction")
|
||||
_ = tx.Rollback()
|
||||
return
|
||||
}
|
||||
}
|
Reference in a new issue