Merge pull request #360 from cyoung/sqlite_logging

Sqlite logging.
pull/361/head
cyoung 2016-04-01 18:44:11 -04:00
commit 2d10744cfe
8 zmienionych plików z 247 dodań i 227 usunięć

Wyświetl plik

@ -12,6 +12,7 @@ package main
import (
"database/sql"
"errors"
"fmt"
_ "github.com/mattn/go-sqlite3"
"log"
@ -30,27 +31,48 @@ type StratuxTimestamp struct {
id int64
Time_type_preference int // 0 = stratuxClock, 1 = gpsClock, 2 = gpsClock extrapolated via stratuxClock.
StratuxClock_value time.Time
GPSClock_value time.Time
GPSClock_value time.Time // The value of this is either from the GPS or extrapolated from the GPS via stratuxClock if pref is 1 or 2. It is time.Time{} if 0.
PreferredTime_value time.Time
}
var dataLogTimestamp StratuxTimestamp // Current timestamp bucket.
var dataLogTimestamps map[int64]StratuxTimestamp
var dataLogCurTimestamp int64 // Current timestamp bucket. This is an index on dataLogTimestamps which is not necessarily the db id.
/*
checkTimestamp().
Verify that our current timestamp is within the LOG_TIMESTAMP_RESOLUTION bucket.
Returns false if the timestamp was changed, true if it is still valid.
This is where GPS timestamps are extrapolated, if the GPS data is currently valid.
*/
//FIXME: time -> stratuxClock
func checkTimestamp() bool {
if stratuxClock.Since(dataLogTimestamp.StratuxClock_value) >= LOG_TIMESTAMP_RESOLUTION {
if stratuxClock.Since(dataLogTimestamps[dataLogCurTimestamp].StratuxClock_value) >= LOG_TIMESTAMP_RESOLUTION {
//FIXME: mutex.
dataLogTimestamp.id = 0
dataLogTimestamp.Time_type_preference = 0 // stratuxClock.
dataLogTimestamp.StratuxClock_value = stratuxClock.Time
dataLogTimestamp.GPSClock_value = time.Time{}
dataLogTimestamp.PreferredTime_value = stratuxClock.Time
var ts StratuxTimestamp
ts.id = 0
ts.Time_type_preference = 0 // stratuxClock.
ts.StratuxClock_value = stratuxClock.Time
ts.GPSClock_value = time.Time{}
ts.PreferredTime_value = stratuxClock.Time
// Extrapolate from GPS timestamp, if possible.
if isGPSClockValid() && dataLogCurTimestamp > 0 {
// Was the last timestamp either extrapolated or GPS time?
last_ts := dataLogTimestamps[dataLogCurTimestamp]
if last_ts.Time_type_preference == 1 || last_ts.Time_type_preference == 2 {
// Extrapolate via stratuxClock.
timeSinceLastTS := ts.StratuxClock_value.Sub(last_ts.StratuxClock_value) // stratuxClock ticks since last timestamp.
extrapolatedGPSTimestamp := last_ts.PreferredTime_value.Add(timeSinceLastTS)
// Re-set the preferred timestamp type to '2' (extrapolated time).
ts.Time_type_preference = 2
ts.PreferredTime_value = extrapolatedGPSTimestamp
ts.GPSClock_value = extrapolatedGPSTimestamp
}
}
dataLogCurTimestamp++
dataLogTimestamps[dataLogCurTimestamp] = ts
return false
}
@ -182,7 +204,56 @@ func makeTable(i interface{}, tbl string, db *sql.DB) {
}
}
func insertData(i interface{}, tbl string, db *sql.DB) int64 {
/*
bulkInsert().
Reads insertBatch and insertBatchIfs. This is called after a group of insertData() calls.
*/
func bulkInsert(tbl string, db *sql.DB) (res sql.Result, err error) {
if _, ok := insertString[tbl]; !ok {
return nil, errors.New("no insert statement")
}
batchVals := insertBatchIfs[tbl]
for len(batchVals) > 0 {
i := int(0) // Maximum of 10 rows per INSERT statement.
stmt := ""
vals := make([]interface{}, 0)
for len(batchVals) > 0 && i < 10 {
if len(stmt) == 0 { // The first set will be covered by insertString.
stmt = insertString[tbl]
} else {
stmt += ", (" + strings.Join(strings.Split(strings.Repeat("?", len(batchVals[0])), ""), ",") + ")"
}
vals = append(vals, batchVals[0]...)
batchVals = batchVals[1:]
i++
}
res, err = db.Exec(stmt, vals...)
if err != nil {
return
}
}
// Clear the buffers.
delete(insertString, tbl)
delete(insertBatchIfs, tbl)
return
}
/*
insertData().
Inserts an arbitrary struct into an SQLite table.
Inserts the timestamp first, if its 'id' is 0.
*/
// Cached 'VALUES' statements. Indexed by table name.
var insertString map[string]string // INSERT INTO tbl (col1, col2, ...) VALUES(?, ?, ...). Only for one value.
var insertBatchIfs map[string][][]interface{}
func insertData(i interface{}, tbl string, db *sql.DB, ts_num int64) int64 {
val := reflect.ValueOf(i)
keys := make([]string, 0)
@ -205,40 +276,97 @@ func insertData(i interface{}, tbl string, db *sql.DB) int64 {
// Add the timestamp_id field to link up with the timestamp table.
if tbl != "timestamp" {
keys = append(keys, "timestamp_id")
values = append(values, strconv.FormatInt(dataLogTimestamp.id, 10))
if dataLogTimestamps[ts_num].id == 0 {
//FIXME: This is somewhat convoluted. When insertData() is called for a ts_num that corresponds to a timestamp with no database id,
// then it inserts that timestamp via the same interface and the id is updated in the structure via the below lines
// (dataLogTimestamps[ts_num].id = id).
insertData(dataLogTimestamps[ts_num], "timestamp", db, ts_num) // Updates dataLogTimestamps[ts_num].id.
}
values = append(values, strconv.FormatInt(dataLogTimestamps[ts_num].id, 10))
}
tblInsert := fmt.Sprintf("INSERT INTO %s (%s) VALUES(%s)", tbl, strings.Join(keys, ","),
strings.Join(strings.Split(strings.Repeat("?", len(keys)), ""), ","))
if _, ok := insertString[tbl]; !ok {
// Prepare the statement.
tblInsert := fmt.Sprintf("INSERT INTO %s (%s) VALUES(%s)", tbl, strings.Join(keys, ","),
strings.Join(strings.Split(strings.Repeat("?", len(keys)), ""), ","))
insertString[tbl] = tblInsert
}
// Make the values slice into a slice of interface{}.
ifs := make([]interface{}, len(values))
for i := 0; i < len(values); i++ {
ifs[i] = values[i]
}
res, err := db.Exec(tblInsert, ifs...)
if err != nil {
log.Printf("ERROR: %s\n", err.Error())
}
id, err := res.LastInsertId()
if err == nil {
if tbl == "timestamp" {
dataLogTimestamp.id = id
insertBatchIfs[tbl] = append(insertBatchIfs[tbl], ifs)
if tbl == "timestamp" { // Immediate insert always for "timestamp" table.
res, err := bulkInsert("timestamp", db) // Bulk insert of 1, always.
if err == nil {
id, err := res.LastInsertId()
if err == nil {
ts := dataLogTimestamps[ts_num]
ts.id = id
dataLogTimestamps[ts_num] = ts
}
return id
}
return id
}
return 0
}
type DataLogRow struct {
tbl string
data interface{}
tbl string
data interface{}
ts_num int64
}
var dataLogChan chan DataLogRow
var shutdownDataLog chan bool
func dataLogWriter() {
var dataLogWriteChan chan DataLogRow
func dataLogWriter(db *sql.DB) {
dataLogWriteChan = make(chan DataLogRow, 10240)
// The write queue. As data comes in via dataLogChan, it is timestamped and stored.
// When writeTicker comes up, the queue is emptied.
writeTicker := time.NewTicker(10 * time.Second)
rowsQueuedForWrite := make([]DataLogRow, 0)
for {
select {
case r := <-dataLogWriteChan:
// Accept timestamped row.
rowsQueuedForWrite = append(rowsQueuedForWrite, r)
case <-writeTicker.C:
// Write the buffered rows. This will block while it is writing.
// Save the names of the tables affected so that we can run bulkInsert() on after the insertData() calls.
tblsAffected := make(map[string]bool)
// Start transaction.
tx, err := db.Begin()
if err != nil {
log.Printf("db.Begin() error: %s\n", err.Error())
break // from select {}
}
for _, r := range rowsQueuedForWrite {
tblsAffected[r.tbl] = true
insertData(r.data, r.tbl, db, r.ts_num)
}
// Do the bulk inserts.
for tbl, _ := range tblsAffected {
bulkInsert(tbl, db)
}
// Close the transaction.
tx.Commit()
rowsQueuedForWrite = make([]DataLogRow, 0) // Zero the queue.
}
}
}
func dataLog() {
dataLogChan = make(chan DataLogRow, 10240)
shutdownDataLog = make(chan bool)
dataLogTimestamps = make(map[int64]StratuxTimestamp, 0)
// Check if we need to create a new database.
createDatabase := false
@ -254,40 +382,59 @@ func dataLogWriter() {
}
defer db.Close()
go dataLogWriter(db)
// Do we need to create the database?
if createDatabase {
makeTable(dataLogTimestamp, "timestamp", db)
makeTable(StratuxTimestamp{}, "timestamp", db)
makeTable(mySituation, "mySituation", db)
makeTable(globalStatus, "status", db)
makeTable(globalSettings, "settings", db)
makeTable(TrafficInfo{}, "traffic", db)
makeTable(msg{}, "messages", db)
makeTable(Dump1090TermMessage{}, "dump1090_terminal", db)
}
for {
//FIXME: measure latency from here to end of block. Messages may need to be timestamped *before* executing everything here.
r := <-dataLogChan
// Check if our time bucket has expired or has never been entered.
if !checkTimestamp() || dataLogTimestamp.id == 0 {
insertData(dataLogTimestamp, "timestamp", db) // Updates dataLogTimestamp.id.
select {
case r := <-dataLogChan:
// When data is input, the first step is to timestamp it.
// Check if our time bucket has expired or has never been entered.
checkTimestamp()
// Mark the row with the current timestamp ID, in case it gets entered later.
r.ts_num = dataLogCurTimestamp
// Queue it for the scheduled write.
dataLogWriteChan <- r
case <-shutdownDataLog: // Received a message on the channel (anything). Graceful shutdown (defer statement).
return
}
insertData(r.data, r.tbl, db)
}
}
/*
setDataLogTimeWithGPS().
Create a timestamp entry using GPS time.
*/
func setDataLogTimeWithGPS(sit SituationData) {
if isGPSClockValid() {
//FIXME: mutex.
var ts StratuxTimestamp
// Piggyback a GPS time update from this update.
dataLogTimestamp.id = 0
dataLogTimestamp.Time_type_preference = 1 // gpsClock.
dataLogTimestamp.StratuxClock_value = stratuxClock.Time
dataLogTimestamp.GPSClock_value = sit.GPSTime
dataLogTimestamp.PreferredTime_value = sit.GPSTime
ts.id = 0
ts.Time_type_preference = 1 // gpsClock.
ts.StratuxClock_value = stratuxClock.Time
ts.GPSClock_value = sit.GPSTime
ts.PreferredTime_value = sit.GPSTime
dataLogCurTimestamp++
dataLogTimestamps[dataLogCurTimestamp] = ts
}
}
func logSituation() {
dataLogChan <- DataLogRow{tbl: "mySituation", data: mySituation}
if globalSettings.ReplayLog {
dataLogChan <- DataLogRow{tbl: "mySituation", data: mySituation}
}
}
func logStatus() {
@ -299,9 +446,23 @@ func logSettings() {
}
func logTraffic(ti TrafficInfo) {
dataLogChan <- DataLogRow{tbl: "traffic", data: ti}
if globalSettings.ReplayLog {
dataLogChan <- DataLogRow{tbl: "traffic", data: ti}
}
}
func logMsg(m msg) {
if globalSettings.ReplayLog {
dataLogChan <- DataLogRow{tbl: "messages", data: m}
}
}
func logDump1090TermMessage(m Dump1090TermMessage) {
dataLogChan <- DataLogRow{tbl: "dump1090_terminal", data: m}
}
func initDataLog() {
go dataLogWriter()
insertString = make(map[string]string)
insertBatchIfs = make(map[string][][]interface{})
go dataLog()
}

Wyświetl plik

@ -65,11 +65,8 @@ const (
MSGTYPE_BASIC_REPORT = 0x1E
MSGTYPE_LONG_REPORT = 0x1F
MSGCLASS_UAT = 0
MSGCLASS_ES = 1
MSGCLASS_GPS = 3
MSGCLASS_AHRS = 4
MSGCLASS_DUMP1090 = 5
MSGCLASS_UAT = 0
MSGCLASS_ES = 1
LON_LAT_RESOLUTION = float32(180.0 / 8388608.0)
TRACK_RESOLUTION = float32(360.0 / 256.0)
@ -77,12 +74,6 @@ const (
var maxSignalStrength int
var uatReplayLog string
var esReplayLog string
var gpsReplayLog string
var ahrsReplayLog string
var dump1090ReplayLog string
var stratuxBuild string
var stratuxVersion string
@ -102,13 +93,6 @@ type ReadCloser interface {
io.Closer
}
// File handles for replay logging.
var uatReplayWriter WriteCloser
var esReplayWriter WriteCloser
var gpsReplayWriter WriteCloser
var ahrsReplayWriter WriteCloser
var dump1090ReplayWriter WriteCloser
var developerMode bool
type msg struct {
@ -168,19 +152,6 @@ func constructFilenames() {
}
fo.Sync()
fo.Close()
if developerMode == true {
uatReplayLog = fmt.Sprintf("%s/%04d-uat.log", logDirectory, fileIndexNumber)
esReplayLog = fmt.Sprintf("%s/%04d-es.log", logDirectory, fileIndexNumber)
gpsReplayLog = fmt.Sprintf("%s/%04d-gps.log", logDirectory, fileIndexNumber)
ahrsReplayLog = fmt.Sprintf("%s/%04d-ahrs.log", logDirectory, fileIndexNumber)
dump1090ReplayLog = fmt.Sprintf("%s/%04d-dump1090.log", logDirectory, fileIndexNumber)
} else {
uatReplayLog = fmt.Sprintf("%s/%04d-uat.log.gz", logDirectory, fileIndexNumber)
esReplayLog = fmt.Sprintf("%s/%04d-es.log.gz", logDirectory, fileIndexNumber)
gpsReplayLog = fmt.Sprintf("%s/%04d-gps.log.gz", logDirectory, fileIndexNumber)
ahrsReplayLog = fmt.Sprintf("%s/%04d-ahrs.log.gz", logDirectory, fileIndexNumber)
dump1090ReplayLog = fmt.Sprintf("%s/%04d-dump1090.log.gz", logDirectory, fileIndexNumber)
}
}
// Construct the CRC table. Adapted from FAA ref above.
@ -765,51 +736,6 @@ func updateStatus() {
globalStatus.Clock = time.Now()
}
type ReplayWriter struct {
fp *os.File
}
func (r ReplayWriter) Write(p []byte) (n int, err error) {
return r.fp.Write(p)
}
func (r ReplayWriter) Close() error {
return r.fp.Close()
}
func makeReplayLogEntry(msg string) string {
return fmt.Sprintf("%d,%s\n", time.Since(timeStarted).Nanoseconds(), msg)
}
func replayLog(msg string, msgclass int) {
if !globalSettings.ReplayLog { // Logging disabled.
return
}
msg = strings.Trim(msg, " \r\n")
if len(msg) == 0 { // Blank message.
return
}
var fp WriteCloser
switch msgclass {
case MSGCLASS_UAT:
fp = uatReplayWriter
case MSGCLASS_ES:
fp = esReplayWriter
case MSGCLASS_GPS:
fp = gpsReplayWriter
case MSGCLASS_AHRS:
fp = ahrsReplayWriter
case MSGCLASS_DUMP1090:
fp = dump1090ReplayWriter
}
if fp != nil {
s := makeReplayLogEntry(msg)
fp.Write([]byte(s))
}
}
type WeatherMessage struct {
Type string
Location string
@ -840,8 +766,7 @@ func registerADSBTextMessageReceived(msg string) {
}
func parseInput(buf string) ([]byte, uint16) {
replayLog(buf, MSGCLASS_UAT) // Log the raw message.
//FIXME: We're ignoring all invalid format UAT messages (not sending to datalog).
x := strings.Split(buf, ";") // Discard everything after the first ';'.
s := x[0]
if len(s) == 0 {
@ -941,6 +866,7 @@ func parseInput(buf string) ([]byte, uint16) {
}
MsgLog = append(MsgLog, thisMsg)
logMsg(thisMsg)
return frame, msgtype
}
@ -1124,36 +1050,6 @@ func saveSettings() {
log.Printf("wrote settings.\n")
}
func replayMark(active bool) {
var t string
if !active {
t = fmt.Sprintf("PAUSE,%d\n", time.Since(timeStarted).Nanoseconds())
} else {
t = fmt.Sprintf("UNPAUSE,%d\n", time.Since(timeStarted).Nanoseconds())
}
if uatReplayWriter != nil {
uatReplayWriter.Write([]byte(t))
}
if esReplayWriter != nil {
esReplayWriter.Write([]byte(t))
}
if gpsReplayWriter != nil {
gpsReplayWriter.Write([]byte(t))
}
if ahrsReplayWriter != nil {
ahrsReplayWriter.Write([]byte(t))
}
if dump1090ReplayWriter != nil {
dump1090ReplayWriter.Write([]byte(t))
}
}
func openReplay(fn string, compressed bool) (WriteCloser, error) {
fp, err := os.OpenFile(fn, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
@ -1263,32 +1159,13 @@ func openReplayFile(fn string) ReadCloser {
var stratuxClock *monotonic
var sigs = make(chan os.Signal, 1) // Signal catch channel (shutdown).
// Close replay log file handles.
func closeReplayLogs() {
if uatReplayWriter != nil {
uatReplayWriter.Close()
}
if esReplayWriter != nil {
esReplayWriter.Close()
}
if gpsReplayWriter != nil {
gpsReplayWriter.Close()
}
if ahrsReplayWriter != nil {
ahrsReplayWriter.Close()
}
if dump1090ReplayWriter != nil {
dump1090ReplayWriter.Close()
}
}
// Graceful shutdown.
func gracefulShutdown() {
// Shut down SDRs.
sdrKill()
//TODO: Any other graceful shutdown functions.
closeReplayLogs()
// Shut down data logging.
shutdownDataLog <- true
os.Exit(1)
}
@ -1365,45 +1242,6 @@ func main() {
//FIXME: Only do this if data logging is enabled.
initDataLog()
// Set up the replay logs. Keep these files open in any case, even if replay logging is disabled.
if uatwt, err := openReplay(uatReplayLog, !developerMode); err != nil {
globalSettings.ReplayLog = false
} else {
uatReplayWriter = uatwt
defer uatReplayWriter.Close()
}
// 1090ES replay log.
if eswt, err := openReplay(esReplayLog, !developerMode); err != nil {
globalSettings.ReplayLog = false
} else {
esReplayWriter = eswt
defer esReplayWriter.Close()
}
// GPS replay log.
if gpswt, err := openReplay(gpsReplayLog, !developerMode); err != nil {
globalSettings.ReplayLog = false
} else {
gpsReplayWriter = gpswt
defer gpsReplayWriter.Close()
}
// AHRS replay log.
if ahrswt, err := openReplay(ahrsReplayLog, !developerMode); err != nil {
globalSettings.ReplayLog = false
} else {
ahrsReplayWriter = ahrswt
defer ahrsReplayWriter.Close()
}
// Dump1090 replay log.
if dump1090wt, err := openReplay(dump1090ReplayLog, !developerMode); err != nil {
globalSettings.ReplayLog = false
} else {
dump1090ReplayWriter = dump1090wt
defer dump1090ReplayWriter.Close()
}
// Mark the files (whether we're logging or not).
replayMark(globalSettings.ReplayLog)
initRY835AI()

Wyświetl plik

@ -209,7 +209,6 @@ func handleSettingsSetRequest(w http.ResponseWriter, r *http.Request) {
v := val.(bool)
if v != globalSettings.ReplayLog { // Don't mark the files unless there is a change.
globalSettings.ReplayLog = v
replayMark(v)
}
case "PPM":
globalSettings.PPM = int(val.(float64))

13
main/plugin.go 100644
Wyświetl plik

@ -0,0 +1,13 @@
package main
import (
"time"
)
type StratuxPlugin struct {
InitFunc func() bool
ShutdownFunc func() bool
Name string
Clock time.Time
Input chan string
}

Wyświetl plik

@ -430,14 +430,16 @@ return is true if parse occurs correctly and position is valid.
*/
func processNMEALine(l string) bool {
func processNMEALine(l string) (sentenceUsed bool) {
mySituation.mu_GPS.Lock()
defer func() {
logSituation()
if sentenceUsed || globalSettings.DEBUG {
logSituation()
}
mySituation.mu_GPS.Unlock()
}()
replayLog(l, MSGCLASS_GPS)
l_valid, validNMEAcs := validateNMEAChecksum(l)
if !validNMEAcs {
log.Printf("GPS error. Invalid NMEA string: %s\n", l_valid) // remove log message once validation complete
@ -623,7 +625,6 @@ func processNMEALine(l string) bool {
x[8+6*i] // lock time, sec, 0-64
*/
return true
} else if x[1] == "04" { // clock message
// field 5 is UTC week (epoch = 1980-JAN-06). If this is invalid, do not parse date / time
utcWeek, err0 := strconv.Atoi(x[5])

Wyświetl plik

@ -46,6 +46,11 @@ var UATDev *UAT
// ESDev holds a 1090 MHz dongle object
var ESDev *ES
type Dump1090TermMessage struct {
Text string
Source string
}
func (e *ES) read() {
defer e.wg.Done()
log.Println("Entered ES read() ...")
@ -100,7 +105,8 @@ func (e *ES) read() {
default:
n, err := stdout.Read(stdoutBuf)
if err == nil && n > 0 {
replayLog(string(stdoutBuf[:n]), MSGCLASS_DUMP1090)
m := Dump1090TermMessage{Text: string(stdoutBuf[:n]), Source: "stdout"}
logDump1090TermMessage(m)
}
}
}
@ -114,7 +120,8 @@ func (e *ES) read() {
default:
n, err := stderr.Read(stderrBuf)
if err == nil && n > 0 {
replayLog(string(stderrBuf[:n]), MSGCLASS_DUMP1090)
m := Dump1090TermMessage{Text: string(stdoutBuf[:n]), Source: "stderr"}
logDump1090TermMessage(m)
}
}
}

Wyświetl plik

@ -188,6 +188,7 @@ func sendTrafficUpdates() {
// Send update to attached JSON client.
func registerTrafficUpdate(ti TrafficInfo) {
logTraffic(ti)
if !ti.Position_valid { // Don't send unless a valid position exists.
return
}
@ -555,8 +556,14 @@ func esListen() {
break
}
buf = strings.Trim(buf, "\r\n")
//log.Printf("%s\n", buf)
replayLog(buf, MSGCLASS_ES) // Log the raw message to nnnn-ES.log
// Log the message to the message counter in any case.
var thisMsg msg
thisMsg.MessageClass = MSGCLASS_ES
thisMsg.TimeReceived = stratuxClock.Time
thisMsg.Data = []byte(buf)
MsgLog = append(MsgLog, thisMsg)
logMsg(thisMsg)
var newTi *dump1090Data
err = json.Unmarshal([]byte(buf), &newTi)
@ -565,18 +572,11 @@ func esListen() {
continue
}
if (newTi.Icao_addr & 0xFF000000) != 0 { //24-bit overflow is used to signal heartbeat
if globalSettings.DEBUG && (newTi.Icao_addr&0xFF000000) != 0 { //24-bit overflow is used to signal heartbeat
log.Printf("No traffic last 60 seconds. Heartbeat message from dump1090: %s\n", buf)
continue
}
// Log the message to the message counter as a valid ES if it unmarshalles.
var thisMsg msg
thisMsg.MessageClass = MSGCLASS_ES
thisMsg.TimeReceived = stratuxClock.Time
thisMsg.Data = []byte(buf)
MsgLog = append(MsgLog, thisMsg)
icao := uint32(newTi.Icao_addr)
var ti TrafficInfo

Wyświetl plik

@ -3,4 +3,5 @@
rm -rf /root/stratux-update
mkdir -p /root/stratux-update
cd /root/stratux-update
service stratux stop
mv -f /var/log/stratux.sqlite /var/log/stratux.sqlite.`date +%s`