This repository has been archived on 2025-07-20. You can view files and clone it, but you cannot make any changes to it's state, such as pushing and creating new issues, pull requests or comments.
hacknotts23/feedprocessor/feed/process.go
AKP 70d0fdc809
Alter 8 files
Update `config.go`
Add `feed.go`
Add `messageTypes.go`
Add `process.go`
Update `main.go`
Update `structures.go`
Update `go.mod`
Update `go.sum`
2023-02-12 05:57:10 +00:00

118 lines
3.2 KiB
Go

package feed
import (
"context"
"encoding/json"
"fmt"
"github.com/redis/go-redis/v9"
)
func ProcessMessage(redisClient *redis.Client, rawMessage []byte) error {
hab := new(headerAndBody)
if err := json.Unmarshal(rawMessage, hab); err != nil {
return err
}
switch hab.Header.MsgType {
case MessageTypeActivation:
return processActivation(redisClient, hab.Body)
case MessageTypeMovement:
return processMovement(redisClient, hab.Body)
case MessageTypeCancellation:
return processCancellation(redisClient, hab.Body)
}
return nil
}
func processActivation(redisClient *redis.Client, body json.RawMessage) error {
am := new(activationMessage)
if err := json.Unmarshal(body, am); err != nil {
return err
}
if am.ScheduleSource == "V" {
// This is a VSTP schedule - cba to deal with that.
return nil
}
ts := new(dbTrainStatus)
ts.OriginTimestamp = am.TpOriginTimestamp
ts.ScheduleID = am.TrainUid
res := redisClient.HMSet(context.Background(), fmt.Sprintf("position:%s", am.TrainId), ts.ToMap())
return res.Err()
}
func processMovement(redisClient *redis.Client, body json.RawMessage) error {
mm := new(movementMessage)
if err := json.Unmarshal(body, mm); err != nil {
return err
}
if mm.CorrectionInd == "true" {
return nil
}
return nil
}
func processCancellation(redisClient *redis.Client, body json.RawMessage) error {
return nil
}
type headerAndBody struct {
Header struct {
MsgType string `json:"msg_type"`
OriginalDataSource string `json:"original_data_source"`
MsgQueueTimestamp string `json:"msg_queue_timestamp"`
SourceSystemId string `json:"source_system_id"`
} `json:"header"`
Body json.RawMessage `json:"body"`
}
type activationMessage struct {
ScheduleSource string `json:"schedule_source"`
ScheduleEndDate string `json:"schedule_end_date"`
TrainId string `json:"train_id"`
TpOriginTimestamp string `json:"tp_origin_timestamp"`
OriginDepTimestamp string `json:"origin_dep_timestamp"`
TrainServiceCode string `json:"train_service_code"`
TocId string `json:"toc_id"`
TrainUid string `json:"train_uid"`
TrainCallMode string `json:"train_call_mode"`
ScheduleType string `json:"schedule_type"`
ScheduleWttId string `json:"schedule_wtt_id"`
ScheduleStartDate string `json:"schedule_start_date"`
}
type movementMessage struct {
EventType string `json:"event_type"`
CorrectionInd string `json:"correction_ind"`
TrainTerminated string `json:"train_terminated"`
TrainId string `json:"train_id"`
VariationStatus string `json:"variation_status"`
LocStanox string `json:"loc_stanox"`
PlannedEventType string `json:"planned_event_type"`
NextReportStanox string `json:"next_report_stanox"`
}
type dbTrainStatus struct {
OriginTimestamp string
ScheduleID string // aka train uid
CurrentTIPLOC string
NextTIPLOC string
PreviousTIPLOC string
Running bool
}
func (dts *dbTrainStatus) ToMap() map[string]any {
return map[string]any{
"originTimestamp": dts.OriginTimestamp,
"scheduleID": dts.ScheduleID,
"currentTIPLOC": dts.CurrentTIPLOC,
"nextTIPLOC": dts.NextTIPLOC,
"previousTIPLOC": dts.PreviousTIPLOC,
"runningg": dts.Running,
}
}