gzip stream replay logs.

Addresses (partially) replay log file size mentioned in #131.
CPU impact is minimal, can still process 45,000+ UAT messages per min.
pull/134/head
Christopher Young 2015-12-02 12:18:52 -05:00
rodzic f315ca5799
commit 0db686a14b
1 zmienionych plików z 49 dodań i 44 usunięć

Wyświetl plik

@ -2,6 +2,7 @@ package main
import ( import (
"bufio" "bufio"
"compress/gzip"
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"fmt" "fmt"
@ -72,11 +73,11 @@ var Crc16Table [256]uint16
var mySituation SituationData var mySituation SituationData
// File handles for replay logging. // File handles for replay logging.
var uatReplayfp *os.File var uatReplayWriter *gzip.Writer
var esReplayfp *os.File var esReplayWriter *gzip.Writer
var gpsReplayfp *os.File var gpsReplayWriter *gzip.Writer
var ahrsReplayfp *os.File var ahrsReplayWriter *gzip.Writer
var dump1090Replayfp *os.File var dump1090ReplayWriter *gzip.Writer
type msg struct { type msg struct {
MessageClass uint MessageClass uint
@ -506,22 +507,22 @@ func replayLog(msg string, msgclass int) {
if len(msg) == 0 { // Blank message. if len(msg) == 0 { // Blank message.
return return
} }
var fp *os.File var wt *gzip.Writer
switch msgclass { switch msgclass {
case MSGCLASS_UAT: case MSGCLASS_UAT:
fp = uatReplayfp wt = uatReplayWriter
case MSGCLASS_ES: case MSGCLASS_ES:
fp = esReplayfp wt = esReplayWriter
case MSGCLASS_GPS: case MSGCLASS_GPS:
fp = gpsReplayfp wt = gpsReplayWriter
case MSGCLASS_AHRS: case MSGCLASS_AHRS:
fp = ahrsReplayfp wt = ahrsReplayWriter
case MSGCLASS_DUMP1090: case MSGCLASS_DUMP1090:
fp = dump1090Replayfp wt = dump1090ReplayWriter
} }
if fp != nil { if wt != nil {
s := makeReplayLogEntry(msg) s := makeReplayLogEntry(msg)
fp.Write([]byte(s)) wt.Write([]byte(s))
} }
} }
@ -816,37 +817,41 @@ func replayMark(active bool) {
t = fmt.Sprintf("UNPAUSE,%d\n", time.Since(timeStarted).Nanoseconds()) t = fmt.Sprintf("UNPAUSE,%d\n", time.Since(timeStarted).Nanoseconds())
} }
if uatReplayfp != nil { if uatReplayWriter != nil {
uatReplayfp.Write([]byte(t)) uatReplayWriter.Write([]byte(t))
} }
if esReplayfp != nil { if esReplayWriter != nil {
esReplayfp.Write([]byte(t)) esReplayWriter.Write([]byte(t))
} }
if gpsReplayfp != nil { if gpsReplayWriter != nil {
gpsReplayfp.Write([]byte(t)) gpsReplayWriter.Write([]byte(t))
} }
if ahrsReplayfp != nil { if ahrsReplayWriter != nil {
ahrsReplayfp.Write([]byte(t)) ahrsReplayWriter.Write([]byte(t))
} }
if dump1090Replayfp != nil { if dump1090ReplayWriter != nil {
dump1090Replayfp.Write([]byte(t)) dump1090ReplayWriter.Write([]byte(t))
} }
} }
func openReplay(fn string) (*os.File, error) { func openReplay(fn string) (*gzip.Writer, error) {
ret, err := os.OpenFile(fn, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) fp, err := os.OpenFile(fn, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil { if err != nil {
log.Printf("Failed to open log file '%s': %s\n", fn, err.Error()) log.Printf("Failed to open log file '%s': %s\n", fn, err.Error())
} else { return nil, err
timeFmt := "Mon Jan 2 15:04:05 -0700 MST 2006"
fmt.Fprintf(ret, "START,%s,%s\n", timeStarted.Format(timeFmt), time.Now().Format(timeFmt)) // Start time marker.
} }
return ret, err
gzw := gzip.NewWriter(fp) //FIXME: Close() on the gzip.Writer will not close the underlying file.
timeFmt := "Mon Jan 2 15:04:05 -0700 MST 2006"
s := fmt.Sprintf("START,%s,%s\n", timeStarted.Format(timeFmt), time.Now().Format(timeFmt)) // Start time marker.
gzw.Write([]byte(s))
return gzw, err
} }
func printStats() { func printStats() {
@ -894,39 +899,39 @@ func main() {
// Set up the replay logs. Keep these files open in any case, even if replay logging is disabled. // Set up the replay logs. Keep these files open in any case, even if replay logging is disabled.
// UAT replay log. // UAT replay log.
if uatfp, err := openReplay(uatReplayLog); err != nil { if uatwt, err := openReplay(uatReplayLog); err != nil {
globalSettings.ReplayLog = false globalSettings.ReplayLog = false
} else { } else {
uatReplayfp = uatfp uatReplayWriter = uatwt
defer uatReplayfp.Close() defer uatReplayWriter.Close()
} }
// 1090ES replay log. // 1090ES replay log.
if esfp, err := openReplay(esReplayLog); err != nil { if eswt, err := openReplay(esReplayLog); err != nil {
globalSettings.ReplayLog = false globalSettings.ReplayLog = false
} else { } else {
esReplayfp = esfp esReplayWriter = eswt
defer esReplayfp.Close() defer esReplayWriter.Close()
} }
// GPS replay log. // GPS replay log.
if gpsfp, err := openReplay(gpsReplayLog); err != nil { if gpswt, err := openReplay(gpsReplayLog); err != nil {
globalSettings.ReplayLog = false globalSettings.ReplayLog = false
} else { } else {
gpsReplayfp = gpsfp gpsReplayWriter = gpswt
defer gpsReplayfp.Close() defer gpsReplayWriter.Close()
} }
// AHRS replay log. // AHRS replay log.
if ahrsfp, err := openReplay(ahrsReplayLog); err != nil { if ahrswt, err := openReplay(ahrsReplayLog); err != nil {
globalSettings.ReplayLog = false globalSettings.ReplayLog = false
} else { } else {
ahrsReplayfp = ahrsfp ahrsReplayWriter = ahrswt
defer ahrsReplayfp.Close() defer ahrsReplayWriter.Close()
} }
// Dump1090 replay log. // Dump1090 replay log.
if dump1090fp, err := openReplay(dump1090ReplayLog); err != nil { if dump1090wt, err := openReplay(dump1090ReplayLog); err != nil {
globalSettings.ReplayLog = false globalSettings.ReplayLog = false
} else { } else {
dump1090Replayfp = dump1090fp dump1090ReplayWriter = dump1090wt
defer dump1090Replayfp.Close() defer dump1090ReplayWriter.Close()
} }
// Mark the files (whether we're logging or not). // Mark the files (whether we're logging or not).