goatcounter-forwarder/main.go

250 lines
5.7 KiB
Go

package main
import (
"bytes"
"cmp"
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"math"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"time"
)
var (
httpAddress = cmp.Or(os.Getenv("GCFWD_HTTP_ADDRESS"), "127.0.0.1:7000")
gcInstance = strings.TrimRight(os.Getenv("GCFWD_INSTANCE_ADDR"), "/")
gcToken = os.Getenv("GCFWD_TOKEN")
debug = false
)
func init() {
if gcInstance == "" {
slog.Error("GCFWD_INSTANCE_ADDR cannot be empty")
os.Exit(1)
}
if gcToken == "" {
slog.Error("GCFWD_TOKEN cannot be empty")
os.Exit(1)
}
debug = os.Getenv("GCFWD_DEBUG") != ""
}
func main() {
if debug {
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug})))
}
if err := run(); err != nil {
slog.Error("unhandled error", "error", err)
os.Exit(1)
}
}
func run() error {
return submit(batch(httpServer()))
}
type Pageview struct {
CreatedAt time.Time `json:"created_at"`
IP string `json:"ip"`
Path string `json:"path"`
Query string `json:"query"`
Referrer string `json:"ref"`
UserAgent string `json:"user_agent"`
}
func httpServer() chan *Pageview {
submit := make(chan *Pageview, 512)
hf := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
xfu := r.Header.Get("X-Forwarded-Uri")
u, err := url.Parse(xfu)
if err != nil {
slog.Warn("received invalid URI from upstream", "uri", xfu)
goto done
}
submit <- &Pageview{
CreatedAt: time.Now().UTC(),
IP: r.Header.Get("X-Forwarded-For"),
Path: r.Header.Get("X-Forwarded-Host") + u.Path,
Query: u.RawQuery,
Referrer: r.Header.Get("Referer"),
UserAgent: r.Header.Get("User-Agent"),
}
done:
w.WriteHeader(200)
})
go func() {
slog.Info("http listener alive", "address", httpAddress)
if err := http.ListenAndServe(httpAddress, hf); err != nil {
slog.Error("http server died", "error", err)
os.Exit(2)
}
}()
return submit
}
const (
maxBatchSize = 500
batchTimeout = 30 * time.Second
)
func batch(receive chan *Pageview) chan []*Pageview {
ch := make(chan []*Pageview, 32)
go func() {
var (
queue = make([]*Pageview, maxBatchSize)
n = 0
timer = time.NewTimer(batchTimeout)
)
for {
select {
case x := <-receive:
slog.Debug("processing pageview", "pageview", x)
queue[n] = x
n += 1
if n == maxBatchSize {
slog.Debug("batch full, submitting")
_ = timer.Reset(batchTimeout)
goto submit
}
case <-timer.C:
_ = timer.Reset(batchTimeout)
if n != 0 {
slog.Debug("batch timeout, submitting")
goto submit
}
}
continue
submit:
ch <- queue[:n]
n = 0
}
}()
return ch
}
func submit(receive chan []*Pageview) error {
type requestBody struct {
Hits []*Pageview `json:"hits"`
}
var (
ratelimitRemaining = 1
ratelimitResetsInSeconds = 0
lastRequestTime = time.Now()
)
for x := range receive {
slog.Debug("processing batch", "len(batch)", len(x), "batch", x)
payloadData, err := json.Marshal(requestBody{
Hits: x,
})
if err != nil {
return fmt.Errorf("marshal request body: %w", err)
}
attempts := 0
retry:
if attempts >= 4 {
return fmt.Errorf("unable to submit request to remote in 4 attempts")
}
if ratelimitRemaining == 0 {
sinceLastRequest := time.Now().Sub(lastRequestTime)
if ds := time.Second * time.Duration(ratelimitResetsInSeconds); sinceLastRequest < ds {
waitDuration := sinceLastRequest - ds
slog.Debug("hit remote ratelimit, waiting", "sinceLastRequest", sinceLastRequest, "resetsIn", ratelimitResetsInSeconds, "duration", waitDuration)
time.Sleep(waitDuration)
} else {
slog.Debug("remote ratelimit already passed", "sinceLastRequest", sinceLastRequest, "resetsIn", ratelimitResetsInSeconds)
}
}
ctx, cancel := context.WithTimeout(context.Background(), batchTimeout)
req, err := http.NewRequestWithContext(ctx, "POST", gcInstance+"/api/v0/count", bytes.NewBuffer(payloadData))
if err != nil {
cancel()
return fmt.Errorf("create HTTP request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+gcToken)
resp, err := http.DefaultClient.Do(req)
if err != nil {
cancel()
return fmt.Errorf("do http request: %w", err)
}
cancel()
lastRequestTime = time.Now()
if v, err := parseIntHeaderValue(resp.Header.Get("X-Rate-Limit-Remaining")); err != nil {
slog.Warn("unable to parse X-Rate-Limit-Remaining from remote", "error", err)
ratelimitRemaining = 1 // ratelimit logic triggers when == 0, prevent it triggering ever
} else {
ratelimitRemaining = v
}
if v, err := parseIntHeaderValue(resp.Header.Get("X-Rate-Limit-Reset")); err != nil {
slog.Warn("unable to parse X-Rate-Limit-Reset from remote", "error", err)
} else {
ratelimitResetsInSeconds = v
}
slog.Debug("request completed", "status", resp.StatusCode)
if resp.StatusCode/100 == 2 {
// ok!
slog.Debug("submission ok!")
} else if resp.StatusCode == 429 {
attempts += 1
goto retry
} else if resp.StatusCode/100 == 5 {
attempts += 1
ratelimitRemaining = 0
ratelimitResetsInSeconds = int(60 * math.Pow(float64(attempts), 1/2)) // durations are 60, ~84, ~108
slog.Warn("5xx error received from remote, retrying after pause", "pauseSeconds", ratelimitResetsInSeconds, "status", resp.StatusCode)
goto retry
} else {
return fmt.Errorf("unknown status code %d from remote", resp.StatusCode)
}
}
return nil
}
func parseIntHeaderValue(val string) (int, error) {
if val != "" {
vi, err := strconv.Atoi(val)
if err != nil {
return 0, fmt.Errorf("invalid value %#v", val)
} else {
return vi, err
}
} else {
return 0, errors.New("no value")
}
}