Merge pull request #144 from jamez70/replay_integration_uncompressed_logs

Switched logs back to uncompressed. Replay now supported with command…
pull/182/head
cyoung 2015-12-17 14:53:42 -05:00
commit 6b9a750ddc
1 zmienionych plików z 251 dodań i 74 usunięć

Wyświetl plik

@ -2,6 +2,7 @@ package main
import (
"bufio"
"flag"
"compress/gzip"
"encoding/hex"
"encoding/json"
@ -76,11 +77,18 @@ var Crc16Table [256]uint16
var mySituation SituationData
// File handles for replay logging.
var uatReplayWriter *gzip.Writer
var esReplayWriter *gzip.Writer
var gpsReplayWriter *gzip.Writer
var ahrsReplayWriter *gzip.Writer
var dump1090ReplayWriter *gzip.Writer
var uatReplayWriter *os.File
var uatReplayWriterGzip *gzip.Writer
var esReplayWriter *os.File
var esReplayWriterGzip *gzip.Writer
var gpsReplayWriter *os.File
var gpsReplayWriterGzip *gzip.Writer
var ahrsReplayWriter *os.File
var ahrsReplayWriterGzip *gzip.Writer
var dump1090ReplayWriter *os.File
var dump1090ReplayWriterGzip *gzip.Writer
var developerMode bool
type msg struct {
MessageClass uint
@ -111,11 +119,6 @@ type ADSBTower struct {
var ADSBTowers map[string]ADSBTower // Running list of all towers seen. (lat,lng) -> ADSBTower
func constructFilenames() {
// uatReplayLog = "/var/log/stratux-uat.log"
// esReplayLog = "/var/log/stratux-es.log"
// gpsReplayLog = "/var/log/stratux-gps.log"
// ahrsReplayLog = "/var/log/stratux-ahrs.log"
// dump1090ReplayLog = "/var/log/stratux-dump1090.log"
var fileIndexNumber uint
// First, create the log file directory if it does not exist
@ -143,11 +146,19 @@ func constructFilenames() {
}
fo.Sync()
fo.Close()
uatReplayLog = fmt.Sprintf("%s/%04d-uat.log.gz",logDirectory,fileIndexNumber)
esReplayLog = fmt.Sprintf("%s/%04d-es.log.gz",logDirectory,fileIndexNumber)
gpsReplayLog = fmt.Sprintf("%s/%04d-gps.log.gz",logDirectory,fileIndexNumber)
ahrsReplayLog = fmt.Sprintf("%s/%04d-ahrs.log.gz",logDirectory,fileIndexNumber)
dump1090ReplayLog = fmt.Sprintf("%s/%04d-dump1090.log.gz",logDirectory,fileIndexNumber)
if developerMode == true {
uatReplayLog = fmt.Sprintf("%s/%04d-uat.log",logDirectory,fileIndexNumber)
esReplayLog = fmt.Sprintf("%s/%04d-es.log",logDirectory,fileIndexNumber)
gpsReplayLog = fmt.Sprintf("%s/%04d-gps.log",logDirectory,fileIndexNumber)
ahrsReplayLog = fmt.Sprintf("%s/%04d-ahrs.log",logDirectory,fileIndexNumber)
dump1090ReplayLog = fmt.Sprintf("%s/%04d-dump1090.log",logDirectory,fileIndexNumber)
} else {
uatReplayLog = fmt.Sprintf("%s/%04d-uat.log.gz",logDirectory,fileIndexNumber)
esReplayLog = fmt.Sprintf("%s/%04d-es.log.gz",logDirectory,fileIndexNumber)
gpsReplayLog = fmt.Sprintf("%s/%04d-gps.log.gz",logDirectory,fileIndexNumber)
ahrsReplayLog = fmt.Sprintf("%s/%04d-ahrs.log.gz",logDirectory,fileIndexNumber)
dump1090ReplayLog = fmt.Sprintf("%s/%04d-dump1090.log.gz",logDirectory,fileIndexNumber)
}
}
// Construct the CRC table. Adapted from FAA ref above.
@ -551,22 +562,42 @@ func replayLog(msg string, msgclass int) {
if len(msg) == 0 { // Blank message.
return
}
var fp *os.File
var wt *gzip.Writer
switch msgclass {
case MSGCLASS_UAT:
wt = uatReplayWriter
case MSGCLASS_ES:
wt = esReplayWriter
case MSGCLASS_GPS:
wt = gpsReplayWriter
case MSGCLASS_AHRS:
wt = ahrsReplayWriter
case MSGCLASS_DUMP1090:
wt = dump1090ReplayWriter
}
if wt != nil {
s := makeReplayLogEntry(msg)
wt.Write([]byte(s))
if developerMode == true {
switch msgclass {
case MSGCLASS_UAT:
fp = uatReplayWriter
case MSGCLASS_ES:
fp = esReplayWriter
case MSGCLASS_GPS:
fp = gpsReplayWriter
case MSGCLASS_AHRS:
fp = ahrsReplayWriter
case MSGCLASS_DUMP1090:
fp = dump1090ReplayWriter
}
if fp != nil {
s := makeReplayLogEntry(msg)
fp.Write([]byte(s))
}
} else {
switch msgclass {
case MSGCLASS_UAT:
wt = uatReplayWriterGzip
case MSGCLASS_ES:
wt = esReplayWriterGzip
case MSGCLASS_GPS:
wt = gpsReplayWriterGzip
case MSGCLASS_AHRS:
wt = ahrsReplayWriterGzip
case MSGCLASS_DUMP1090:
wt = dump1090ReplayWriterGzip
}
if wt != nil {
s := makeReplayLogEntry(msg)
wt.Write([]byte(s))
}
}
}
@ -883,14 +914,26 @@ func replayMark(active bool) {
}
func openReplay(fn string) (*gzip.Writer, error) {
func openReplay(fn string) (*os.File, error) {
fp, err := os.OpenFile(fn, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
log.Printf("Failed to open log file '%s': %s\n", fn, err.Error())
return nil, err
} else {
timeFmt := "Mon Jan 2 15:04:05 -0700 MST 2006"
fmt.Fprintf(fp, "START,%s,%s\n", timeStarted.Format(timeFmt), time.Now().Format(timeFmt)) // Start time marker.
}
return fp, err
}
func openReplayGzip(fn string) (*gzip.Writer, error) {
fp, err := os.OpenFile(fn, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
log.Printf("Failed to open log file '%s': %s\n", fn, err.Error())
return nil, err
}
gzw := gzip.NewWriter(fp) //FIXME: Close() on the gzip.Writer will not close the underlying file.
timeFmt := "Mon Jan 2 15:04:05 -0700 MST 2006"
s := fmt.Sprintf("START,%s,%s\n", timeStarted.Format(timeFmt), time.Now().Format(timeFmt)) // Start time marker.
@ -913,10 +956,80 @@ func printStats() {
}
}
var uatReplayDone bool
func uatReplay(f *os.File, replaySpeed uint64) {
rdr := bufio.NewReader(f)
curTick := int64(0)
for {
buf, err := rdr.ReadString('\n')
if err != nil {
break
}
linesplit := strings.Split(buf, ",")
if len(linesplit) < 2 { // Blank line or invalid.
continue
}
if linesplit[0] == "START" { // Reset ticker, new start.
curTick = 0
} else { // If it's not "START", then it's a tick count.
i, err := strconv.ParseInt(linesplit[0], 10, 64)
if err != nil {
fmt.Fprintf(os.Stderr, "invalid tick: '%s'\n", linesplit[0])
continue
}
thisWait := (i - curTick) / int64(replaySpeed)
if thisWait >= 120000000000 { // More than 2 minutes wait, skip ahead.
fmt.Fprintf(os.Stderr, "UAT skipahead - %d seconds.\n", thisWait/1000000000)
} else {
time.Sleep(time.Duration(thisWait) * time.Nanosecond) // Just in case the units change.
}
p := strings.Trim(linesplit[1], " ;\r\n")
fmt.Printf("%s;\n", p)
buf := fmt.Sprintf("%s;\n", p)
o, msgtype := parseInput(buf)
if o != nil && msgtype != 0 {
relayMessage(msgtype, o)
}
curTick = i
}
}
uatReplayDone = true
}
func openReplayFile(fn string) *os.File {
f, err := os.Open(fn)
if err != nil {
fmt.Fprintf(os.Stderr, "error opening '%s': %s\n", fn, err.Error())
os.Exit(1)
return nil
}
return f
}
func main() {
// replayESFilename := flag.String("eslog", "none", "ES Log filename")
replayUATFilename := flag.String("uatlog", "none", "UAT Log filename")
develFlag := flag.Bool("developer", false, "Developer mode")
replayFlag := flag.Bool("replay", false, "Replay file flag")
replaySpeed := flag.Int("speed", 1, "Replay speed multiplier")
flag.Parse()
timeStarted = time.Now()
runtime.GOMAXPROCS(runtime.NumCPU()) // redundant with Go v1.5+ compiler
if *develFlag == true {
log.Printf("Developer mode flag true!\n")
developerMode=true
}
// Duplicate log.* output to debugLog.
fp, err := os.OpenFile(debugLog, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
@ -934,6 +1047,13 @@ func main() {
MsgLog = make([]msg, 0)
crcInit() // Initialize CRC16 table.
if *replayFlag == true {
// if (*replayESFilename == "none") || (*replayUATFilename == "none") {
// log.Fatal("Must specify both UAT and ES log files\n")
// }
log.Printf("Replay file %s\n", *replayUATFilename)
}
sdrInit()
initTraffic()
@ -944,41 +1064,77 @@ func main() {
// Set up the replay logs. Keep these files open in any case, even if replay logging is disabled.
// UAT replay log.
if uatwt, err := openReplay(uatReplayLog); err != nil {
globalSettings.ReplayLog = false
if developerMode == true {
if uatwt, err := openReplay(uatReplayLog); err != nil {
globalSettings.ReplayLog = false
} else {
uatReplayWriter = uatwt
defer uatReplayWriter.Close()
}
// 1090ES replay log.
if eswt, err := openReplay(esReplayLog); err != nil {
globalSettings.ReplayLog = false
} else {
esReplayWriter = eswt
defer esReplayWriter.Close()
}
// GPS replay log.
if gpswt, err := openReplay(gpsReplayLog); err != nil {
globalSettings.ReplayLog = false
} else {
gpsReplayWriter = gpswt
defer gpsReplayWriter.Close()
}
// AHRS replay log.
if ahrswt, err := openReplay(ahrsReplayLog); err != nil {
globalSettings.ReplayLog = false
} else {
ahrsReplayWriter = ahrswt
defer ahrsReplayWriter.Close()
}
// Dump1090 replay log.
if dump1090wt, err := openReplay(dump1090ReplayLog); err != nil {
globalSettings.ReplayLog = false
} else {
dump1090ReplayWriter = dump1090wt
defer dump1090ReplayWriter.Close()
}
} else {
uatReplayWriter = uatwt
defer uatReplayWriter.Close()
if uatwt, err := openReplayGzip(uatReplayLog); err != nil {
globalSettings.ReplayLog = false
} else {
uatReplayWriterGzip = uatwt
defer uatReplayWriterGzip.Close()
}
// 1090ES replay log.
if eswt, err := openReplayGzip(esReplayLog); err != nil {
globalSettings.ReplayLog = false
} else {
esReplayWriterGzip = eswt
defer esReplayWriterGzip.Close()
}
// GPS replay log.
if gpswt, err := openReplayGzip(gpsReplayLog); err != nil {
globalSettings.ReplayLog = false
} else {
gpsReplayWriterGzip = gpswt
defer gpsReplayWriterGzip.Close()
}
// AHRS replay log.
if ahrswt, err := openReplayGzip(ahrsReplayLog); err != nil {
globalSettings.ReplayLog = false
} else {
ahrsReplayWriterGzip = ahrswt
defer ahrsReplayWriterGzip.Close()
}
// Dump1090 replay log.
if dump1090wt, err := openReplayGzip(dump1090ReplayLog); err != nil {
globalSettings.ReplayLog = false
} else {
dump1090ReplayWriterGzip = dump1090wt
defer dump1090ReplayWriterGzip.Close()
}
}
// 1090ES replay log.
if eswt, err := openReplay(esReplayLog); err != nil {
globalSettings.ReplayLog = false
} else {
esReplayWriter = eswt
defer esReplayWriter.Close()
}
// GPS replay log.
if gpswt, err := openReplay(gpsReplayLog); err != nil {
globalSettings.ReplayLog = false
} else {
gpsReplayWriter = gpswt
defer gpsReplayWriter.Close()
}
// AHRS replay log.
if ahrswt, err := openReplay(ahrsReplayLog); err != nil {
globalSettings.ReplayLog = false
} else {
ahrsReplayWriter = ahrswt
defer ahrsReplayWriter.Close()
}
// Dump1090 replay log.
if dump1090wt, err := openReplay(dump1090ReplayLog); err != nil {
globalSettings.ReplayLog = false
} else {
dump1090ReplayWriter = dump1090wt
defer dump1090ReplayWriter.Close()
}
// Mark the files (whether we're logging or not).
replayMark(globalSettings.ReplayLog)
@ -1000,16 +1156,37 @@ func main() {
reader := bufio.NewReader(os.Stdin)
for {
buf, err := reader.ReadString('\n')
if err != nil {
log.Printf("lost stdin.\n")
break
if *replayFlag == true {
f := openReplayFile(*replayUATFilename)
// if len(os.Args) >= 4 {
// i, err := strconv.ParseUint(os.Args[3], 10, 64)
// if err == nil {
// replaySpeed = i
// }
// }
playSpeed := uint64(*replaySpeed)
fmt.Fprintf(os.Stderr, "Replay speed: %dx\n", playSpeed)
go uatReplay(f, playSpeed)
// go esReplay(f2, playSpeed)
for {
time.Sleep(1 * time.Second)
if uatReplayDone {
//&& esDone {
return
}
}
o, msgtype := parseInput(buf)
if o != nil && msgtype != 0 {
relayMessage(msgtype, o)
} else {
for {
buf, err := reader.ReadString('\n')
if err != nil {
log.Printf("lost stdin.\n")
break
}
o, msgtype := parseInput(buf)
if o != nil && msgtype != 0 {
relayMessage(msgtype, o)
}
}
}
}