Initial commit
This commit is contained in:
commit
85c8bee34a
2 changed files with 253 additions and 0 deletions
3
go.mod
Normal file
3
go.mod
Normal file
|
@ -0,0 +1,3 @@
|
|||
module git.tdpain.net/codemicro/goatcounter-forwarder
|
||||
|
||||
go 1.23.3
|
250
main.go
Normal file
250
main.go
Normal file
|
@ -0,0 +1,250 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"cmp"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"math"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
httpAddress = cmp.Or(os.Getenv("GCFWD_HTTP_ADDRESS"), "127.0.0.1:7000")
|
||||
gcInstance = strings.TrimRight(os.Getenv("GCFWD_INSTANCE_ADDR"), "/")
|
||||
gcToken = os.Getenv("GCFWD_TOKEN")
|
||||
debug = false
|
||||
)
|
||||
|
||||
func init() {
|
||||
if gcInstance == "" {
|
||||
slog.Error("GCFWD_INSTANCE_ADDR cannot be empty")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if gcToken == "" {
|
||||
slog.Error("GCFWD_TOKEN cannot be empty")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
debug = os.Getenv("GCFWD_DEBUG") != ""
|
||||
}
|
||||
|
||||
func main() {
|
||||
if debug {
|
||||
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug})))
|
||||
}
|
||||
|
||||
if err := run(); err != nil {
|
||||
slog.Error("unhandled error", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
func run() error {
|
||||
return submit(batch(httpServer()))
|
||||
}
|
||||
|
||||
type Pageview struct {
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
IP string `json:"ip"`
|
||||
Path string `json:"path"`
|
||||
Query string `json:"query"`
|
||||
Referrer string `json:"referrer"`
|
||||
UserAgent string `json:"user_agent"`
|
||||
}
|
||||
|
||||
func httpServer() chan *Pageview {
|
||||
submit := make(chan *Pageview, 512)
|
||||
|
||||
hf := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
xfu := r.Header.Get("X-Forwarded-Uri")
|
||||
u, err := url.Parse(xfu)
|
||||
if err != nil {
|
||||
slog.Warn("received invalid URI from upstream", "uri", xfu)
|
||||
goto done
|
||||
}
|
||||
|
||||
submit <- &Pageview{
|
||||
CreatedAt: time.Now().UTC(),
|
||||
IP: r.Header.Get("X-Forwarded-For"),
|
||||
Path: r.Header.Get("X-Forwarded-Host") + u.Path,
|
||||
Query: u.RawQuery,
|
||||
Referrer: r.Header.Get("Referer"),
|
||||
UserAgent: r.Header.Get("User-Agent"),
|
||||
}
|
||||
done:
|
||||
w.WriteHeader(200)
|
||||
})
|
||||
|
||||
go func() {
|
||||
slog.Info("http listener alive", "address", httpAddress)
|
||||
if err := http.ListenAndServe(httpAddress, hf); err != nil {
|
||||
slog.Error("http server died", "error", err)
|
||||
os.Exit(2)
|
||||
}
|
||||
}()
|
||||
|
||||
return submit
|
||||
}
|
||||
|
||||
const (
|
||||
maxBatchSize = 500
|
||||
batchTimeout = 30 * time.Second
|
||||
)
|
||||
|
||||
func batch(receive chan *Pageview) chan []*Pageview {
|
||||
ch := make(chan []*Pageview, 32)
|
||||
|
||||
go func() {
|
||||
var (
|
||||
queue = make([]*Pageview, maxBatchSize)
|
||||
n = 0
|
||||
timer = time.NewTimer(batchTimeout)
|
||||
)
|
||||
|
||||
for {
|
||||
select {
|
||||
case x := <-receive:
|
||||
slog.Debug("processing pageview", "pageview", x)
|
||||
queue[n] = x
|
||||
n += 1
|
||||
if n == maxBatchSize {
|
||||
slog.Debug("batch full, submitting")
|
||||
_ = timer.Reset(batchTimeout)
|
||||
goto submit
|
||||
}
|
||||
case <-timer.C:
|
||||
_ = timer.Reset(batchTimeout)
|
||||
if n != 0 {
|
||||
slog.Debug("batch timeout, submitting")
|
||||
goto submit
|
||||
}
|
||||
}
|
||||
|
||||
continue
|
||||
submit:
|
||||
ch <- queue[:n]
|
||||
n = 0
|
||||
|
||||
}
|
||||
}()
|
||||
|
||||
return ch
|
||||
}
|
||||
|
||||
func submit(receive chan []*Pageview) error {
|
||||
type requestBody struct {
|
||||
Hits []*Pageview `json:"hits"`
|
||||
}
|
||||
|
||||
var (
|
||||
ratelimitRemaining = 1
|
||||
ratelimitResetsInSeconds = 0
|
||||
lastRequestTime = time.Now()
|
||||
)
|
||||
|
||||
for x := range receive {
|
||||
slog.Debug("processing batch", "len(batch)", len(x), "batch", x)
|
||||
|
||||
payloadData, err := json.Marshal(requestBody{
|
||||
Hits: x,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal request body: %w", err)
|
||||
}
|
||||
|
||||
attempts := 0
|
||||
retry:
|
||||
if attempts >= 4 {
|
||||
return fmt.Errorf("unable to submit request to remote in 4 attempts")
|
||||
}
|
||||
|
||||
if ratelimitRemaining == 0 {
|
||||
sinceLastRequest := time.Now().Sub(lastRequestTime)
|
||||
|
||||
if ds := time.Second * time.Duration(ratelimitResetsInSeconds); sinceLastRequest < ds {
|
||||
waitDuration := sinceLastRequest - ds
|
||||
slog.Debug("hit remote ratelimit, waiting", "sinceLastRequest", sinceLastRequest, "resetsIn", ratelimitResetsInSeconds, "duration", waitDuration)
|
||||
time.Sleep(waitDuration)
|
||||
} else {
|
||||
slog.Debug("remote ratelimit already passed", "sinceLastRequest", sinceLastRequest, "resetsIn", ratelimitResetsInSeconds)
|
||||
}
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), batchTimeout)
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", gcInstance+"/api/v0/count", bytes.NewBuffer(payloadData))
|
||||
if err != nil {
|
||||
cancel()
|
||||
return fmt.Errorf("create HTTP request: %w", err)
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("Authorization", "Bearer "+gcToken)
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return fmt.Errorf("do http request: %w", err)
|
||||
}
|
||||
cancel()
|
||||
|
||||
lastRequestTime = time.Now()
|
||||
|
||||
if v, err := parseIntHeaderValue(resp.Header.Get("X-Rate-Limit-Remaining")); err != nil {
|
||||
slog.Warn("unable to parse X-Rate-Limit-Remaining from remote", "error", err)
|
||||
ratelimitRemaining = 1 // ratelimit logic triggers when == 0, prevent it triggering ever
|
||||
} else {
|
||||
ratelimitRemaining = v
|
||||
}
|
||||
|
||||
if v, err := parseIntHeaderValue(resp.Header.Get("X-Rate-Limit-Reset")); err != nil {
|
||||
slog.Warn("unable to parse X-Rate-Limit-Reset from remote", "error", err)
|
||||
} else {
|
||||
ratelimitResetsInSeconds = v
|
||||
}
|
||||
|
||||
slog.Debug("request completed", "status", resp.StatusCode)
|
||||
|
||||
if resp.StatusCode/100 == 2 {
|
||||
// ok!
|
||||
slog.Debug("submission ok!")
|
||||
} else if resp.StatusCode == 429 {
|
||||
attempts += 1
|
||||
goto retry
|
||||
} else if resp.StatusCode/100 == 5 {
|
||||
attempts += 1
|
||||
|
||||
ratelimitRemaining = 0
|
||||
ratelimitResetsInSeconds = int(60 * math.Pow(float64(attempts), 1/2)) // durations are 60, ~84, ~108
|
||||
|
||||
slog.Warn("5xx error received from remote, retrying after pause", "pauseSeconds", ratelimitResetsInSeconds, "status", resp.StatusCode)
|
||||
|
||||
goto retry
|
||||
} else {
|
||||
return fmt.Errorf("unknown status code %d from remote", resp.StatusCode)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func parseIntHeaderValue(val string) (int, error) {
|
||||
if val != "" {
|
||||
vi, err := strconv.Atoi(val)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("invalid value %#v", val)
|
||||
} else {
|
||||
return vi, err
|
||||
}
|
||||
} else {
|
||||
return 0, errors.New("no value")
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue