250 lines
5.7 KiB
Go
250 lines
5.7 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"cmp"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"math"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
var (
|
|
httpAddress = cmp.Or(os.Getenv("GCFWD_HTTP_ADDRESS"), "127.0.0.1:7000")
|
|
gcInstance = strings.TrimRight(os.Getenv("GCFWD_INSTANCE_ADDR"), "/")
|
|
gcToken = os.Getenv("GCFWD_TOKEN")
|
|
debug = false
|
|
)
|
|
|
|
func init() {
|
|
if gcInstance == "" {
|
|
slog.Error("GCFWD_INSTANCE_ADDR cannot be empty")
|
|
os.Exit(1)
|
|
}
|
|
|
|
if gcToken == "" {
|
|
slog.Error("GCFWD_TOKEN cannot be empty")
|
|
os.Exit(1)
|
|
}
|
|
|
|
debug = os.Getenv("GCFWD_DEBUG") != ""
|
|
}
|
|
|
|
func main() {
|
|
if debug {
|
|
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug})))
|
|
}
|
|
|
|
if err := run(); err != nil {
|
|
slog.Error("unhandled error", "error", err)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
func run() error {
|
|
return submit(batch(httpServer()))
|
|
}
|
|
|
|
type Pageview struct {
|
|
CreatedAt time.Time `json:"created_at"`
|
|
IP string `json:"ip"`
|
|
Path string `json:"path"`
|
|
Query string `json:"query"`
|
|
Referrer string `json:"ref"`
|
|
UserAgent string `json:"user_agent"`
|
|
}
|
|
|
|
func httpServer() chan *Pageview {
|
|
submit := make(chan *Pageview, 512)
|
|
|
|
hf := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
xfu := r.Header.Get("X-Forwarded-Uri")
|
|
u, err := url.Parse(xfu)
|
|
if err != nil {
|
|
slog.Warn("received invalid URI from upstream", "uri", xfu)
|
|
goto done
|
|
}
|
|
|
|
submit <- &Pageview{
|
|
CreatedAt: time.Now().UTC(),
|
|
IP: r.Header.Get("X-Forwarded-For"),
|
|
Path: r.Header.Get("X-Forwarded-Host") + u.Path,
|
|
Query: u.RawQuery,
|
|
Referrer: r.Header.Get("Referer"),
|
|
UserAgent: r.Header.Get("User-Agent"),
|
|
}
|
|
done:
|
|
w.WriteHeader(200)
|
|
})
|
|
|
|
go func() {
|
|
slog.Info("http listener alive", "address", httpAddress)
|
|
if err := http.ListenAndServe(httpAddress, hf); err != nil {
|
|
slog.Error("http server died", "error", err)
|
|
os.Exit(2)
|
|
}
|
|
}()
|
|
|
|
return submit
|
|
}
|
|
|
|
const (
|
|
maxBatchSize = 500
|
|
batchTimeout = 30 * time.Second
|
|
)
|
|
|
|
func batch(receive chan *Pageview) chan []*Pageview {
|
|
ch := make(chan []*Pageview, 32)
|
|
|
|
go func() {
|
|
var (
|
|
queue = make([]*Pageview, maxBatchSize)
|
|
n = 0
|
|
timer = time.NewTimer(batchTimeout)
|
|
)
|
|
|
|
for {
|
|
select {
|
|
case x := <-receive:
|
|
slog.Debug("processing pageview", "pageview", x)
|
|
queue[n] = x
|
|
n += 1
|
|
if n == maxBatchSize {
|
|
slog.Debug("batch full, submitting")
|
|
_ = timer.Reset(batchTimeout)
|
|
goto submit
|
|
}
|
|
case <-timer.C:
|
|
_ = timer.Reset(batchTimeout)
|
|
if n != 0 {
|
|
slog.Debug("batch timeout, submitting")
|
|
goto submit
|
|
}
|
|
}
|
|
|
|
continue
|
|
submit:
|
|
ch <- queue[:n]
|
|
n = 0
|
|
|
|
}
|
|
}()
|
|
|
|
return ch
|
|
}
|
|
|
|
func submit(receive chan []*Pageview) error {
|
|
type requestBody struct {
|
|
Hits []*Pageview `json:"hits"`
|
|
}
|
|
|
|
var (
|
|
ratelimitRemaining = 1
|
|
ratelimitResetsInSeconds = 0
|
|
lastRequestTime = time.Now()
|
|
)
|
|
|
|
for x := range receive {
|
|
slog.Debug("processing batch", "len(batch)", len(x), "batch", x)
|
|
|
|
payloadData, err := json.Marshal(requestBody{
|
|
Hits: x,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("marshal request body: %w", err)
|
|
}
|
|
|
|
attempts := 0
|
|
retry:
|
|
if attempts >= 4 {
|
|
return fmt.Errorf("unable to submit request to remote in 4 attempts")
|
|
}
|
|
|
|
if ratelimitRemaining == 0 {
|
|
sinceLastRequest := time.Now().Sub(lastRequestTime)
|
|
|
|
if ds := time.Second * time.Duration(ratelimitResetsInSeconds); sinceLastRequest < ds {
|
|
waitDuration := sinceLastRequest - ds
|
|
slog.Debug("hit remote ratelimit, waiting", "sinceLastRequest", sinceLastRequest, "resetsIn", ratelimitResetsInSeconds, "duration", waitDuration)
|
|
time.Sleep(waitDuration)
|
|
} else {
|
|
slog.Debug("remote ratelimit already passed", "sinceLastRequest", sinceLastRequest, "resetsIn", ratelimitResetsInSeconds)
|
|
}
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), batchTimeout)
|
|
req, err := http.NewRequestWithContext(ctx, "POST", gcInstance+"/api/v0/count", bytes.NewBuffer(payloadData))
|
|
if err != nil {
|
|
cancel()
|
|
return fmt.Errorf("create HTTP request: %w", err)
|
|
}
|
|
|
|
req.Header.Set("Content-Type", "application/json")
|
|
req.Header.Set("Authorization", "Bearer "+gcToken)
|
|
|
|
resp, err := http.DefaultClient.Do(req)
|
|
if err != nil {
|
|
cancel()
|
|
return fmt.Errorf("do http request: %w", err)
|
|
}
|
|
cancel()
|
|
|
|
lastRequestTime = time.Now()
|
|
|
|
if v, err := parseIntHeaderValue(resp.Header.Get("X-Rate-Limit-Remaining")); err != nil {
|
|
slog.Warn("unable to parse X-Rate-Limit-Remaining from remote", "error", err)
|
|
ratelimitRemaining = 1 // ratelimit logic triggers when == 0, prevent it triggering ever
|
|
} else {
|
|
ratelimitRemaining = v
|
|
}
|
|
|
|
if v, err := parseIntHeaderValue(resp.Header.Get("X-Rate-Limit-Reset")); err != nil {
|
|
slog.Warn("unable to parse X-Rate-Limit-Reset from remote", "error", err)
|
|
} else {
|
|
ratelimitResetsInSeconds = v
|
|
}
|
|
|
|
slog.Debug("request completed", "status", resp.StatusCode)
|
|
|
|
if resp.StatusCode/100 == 2 {
|
|
// ok!
|
|
slog.Debug("submission ok!")
|
|
} else if resp.StatusCode == 429 {
|
|
attempts += 1
|
|
goto retry
|
|
} else if resp.StatusCode/100 == 5 {
|
|
attempts += 1
|
|
|
|
ratelimitRemaining = 0
|
|
ratelimitResetsInSeconds = int(60 * math.Pow(float64(attempts), 1/2)) // durations are 60, ~84, ~108
|
|
|
|
slog.Warn("5xx error received from remote, retrying after pause", "pauseSeconds", ratelimitResetsInSeconds, "status", resp.StatusCode)
|
|
|
|
goto retry
|
|
} else {
|
|
return fmt.Errorf("unknown status code %d from remote", resp.StatusCode)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func parseIntHeaderValue(val string) (int, error) {
|
|
if val != "" {
|
|
vi, err := strconv.Atoi(val)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("invalid value %#v", val)
|
|
} else {
|
|
return vi, err
|
|
}
|
|
} else {
|
|
return 0, errors.New("no value")
|
|
}
|
|
}
|