stratux/main/datalog.go

610 wiersze
19 KiB
Go

/*
Copyright (c) 2015-2016 Christopher Young
Distributable under the terms of The "BSD New"" License
that can be found in the LICENSE file, herein included
as part of this header.
datalog.go: Log stratux data as it is received. Bucket data into timestamp time slots.
*/
package main
import (
"database/sql"
"errors"
"fmt"
_ "github.com/mattn/go-sqlite3"
"log"
"os"
"reflect"
"strconv"
"strings"
"time"
)
const (
LOG_TIMESTAMP_RESOLUTION = 250 * time.Millisecond
)
type StratuxTimestamp struct {
id int64
Time_type_preference int // 0 = stratuxClock, 1 = gpsClock, 2 = gpsClock extrapolated via stratuxClock.
StratuxClock_value time.Time
GPSClock_value time.Time // The value of this is either from the GPS or extrapolated from the GPS via stratuxClock if pref is 1 or 2. It is time.Time{} if 0.
PreferredTime_value time.Time
StartupID int64
}
// 'startup' table creates a new entry each time the daemon is started. This keeps track of sequential starts, even if the
// timestamp is ambiguous (units with no GPS). This struct is just a placeholder for an empty table (other than primary key).
type StratuxStartup struct {
id int64
Fill string
}
var dataLogStarted bool
var dataLogReadyToWrite bool
//var dataLogInShutdown bool
var stratuxStartupID int64
var dataLogTimestamps []StratuxTimestamp
var dataLogCurTimestamp int64 // Current timestamp bucket. This is an index on dataLogTimestamps which is not necessarily the db id.
/*
checkTimestamp().
Verify that our current timestamp is within the LOG_TIMESTAMP_RESOLUTION bucket.
Returns false if the timestamp was changed, true if it is still valid.
This is where GPS timestamps are extrapolated, if the GPS data is currently valid.
*/
func checkTimestamp() bool {
thisCurTimestamp := dataLogCurTimestamp
if stratuxClock.Since(dataLogTimestamps[thisCurTimestamp].StratuxClock_value) >= LOG_TIMESTAMP_RESOLUTION {
var ts StratuxTimestamp
ts.id = 0
ts.Time_type_preference = 0 // stratuxClock.
ts.StratuxClock_value = stratuxClock.Time
ts.GPSClock_value = time.Time{}
ts.PreferredTime_value = stratuxClock.Time
// Extrapolate from GPS timestamp, if possible.
if isGPSClockValid() && thisCurTimestamp > 0 {
// Was the last timestamp either extrapolated or GPS time?
last_ts := dataLogTimestamps[thisCurTimestamp]
if last_ts.Time_type_preference == 1 || last_ts.Time_type_preference == 2 {
// Extrapolate via stratuxClock.
timeSinceLastTS := ts.StratuxClock_value.Sub(last_ts.StratuxClock_value) // stratuxClock ticks since last timestamp.
extrapolatedGPSTimestamp := last_ts.PreferredTime_value.Add(timeSinceLastTS)
// Re-set the preferred timestamp type to '2' (extrapolated time).
ts.Time_type_preference = 2
ts.PreferredTime_value = extrapolatedGPSTimestamp
ts.GPSClock_value = extrapolatedGPSTimestamp
}
}
dataLogTimestamps = append(dataLogTimestamps, ts)
dataLogCurTimestamp = int64(len(dataLogTimestamps) - 1)
return false
}
return true
}
type SQLiteMarshal struct {
FieldType string
Marshal func(v reflect.Value) string
}
func boolMarshal(v reflect.Value) string {
b := v.Bool()
if b {
return "1"
}
return "0"
}
func intMarshal(v reflect.Value) string {
return strconv.FormatInt(v.Int(), 10)
}
func uintMarshal(v reflect.Value) string {
return strconv.FormatUint(v.Uint(), 10)
}
func floatMarshal(v reflect.Value) string {
return strconv.FormatFloat(v.Float(), 'f', 10, 64)
}
func stringMarshal(v reflect.Value) string {
return v.String()
}
func notsupportedMarshal(v reflect.Value) string {
return ""
}
func structCanBeMarshalled(v reflect.Value) bool {
m := v.MethodByName("String")
if m.IsValid() && !m.IsNil() {
return true
}
return false
}
func structMarshal(v reflect.Value) string {
if structCanBeMarshalled(v) {
m := v.MethodByName("String")
in := make([]reflect.Value, 0)
ret := m.Call(in)
if len(ret) > 0 {
return ret[0].String()
}
}
return ""
}
var sqliteMarshalFunctions = map[string]SQLiteMarshal{
"bool": {FieldType: "INTEGER", Marshal: boolMarshal},
"int": {FieldType: "INTEGER", Marshal: intMarshal},
"uint": {FieldType: "INTEGER", Marshal: uintMarshal},
"float": {FieldType: "REAL", Marshal: floatMarshal},
"string": {FieldType: "TEXT", Marshal: stringMarshal},
"struct": {FieldType: "STRING", Marshal: structMarshal},
"notsupported": {FieldType: "notsupported", Marshal: notsupportedMarshal},
}
var sqlTypeMap = map[reflect.Kind]string{
reflect.Bool: "bool",
reflect.Int: "int",
reflect.Int8: "int",
reflect.Int16: "int",
reflect.Int32: "int",
reflect.Int64: "int",
reflect.Uint: "uint",
reflect.Uint8: "uint",
reflect.Uint16: "uint",
reflect.Uint32: "uint",
reflect.Uint64: "uint",
reflect.Uintptr: "notsupported",
reflect.Float32: "float",
reflect.Float64: "float",
reflect.Complex64: "notsupported",
reflect.Complex128: "notsupported",
reflect.Array: "notsupported",
reflect.Chan: "notsupported",
reflect.Func: "notsupported",
reflect.Interface: "notsupported",
reflect.Map: "notsupported",
reflect.Ptr: "notsupported",
reflect.Slice: "notsupported",
reflect.String: "string",
reflect.Struct: "struct",
reflect.UnsafePointer: "notsupported",
}
func makeTable(i interface{}, tbl string, db *sql.DB) {
val := reflect.ValueOf(i)
fields := make([]string, 0)
for i := 0; i < val.NumField(); i++ {
kind := val.Field(i).Kind()
fieldName := val.Type().Field(i).Name
sqlTypeAlias := sqlTypeMap[kind]
// Check that if the field is a struct that it can be marshalled.
if sqlTypeAlias == "struct" && !structCanBeMarshalled(val.Field(i)) {
continue
}
if sqlTypeAlias == "notsupported" || fieldName == "id" {
continue
}
sqlType := sqliteMarshalFunctions[sqlTypeAlias].FieldType
s := fieldName + " " + sqlType
fields = append(fields, s)
}
// Add the timestamp_id field to link up with the timestamp table.
if tbl != "timestamp" && tbl != "startup" {
fields = append(fields, "timestamp_id INTEGER")
}
tblCreate := fmt.Sprintf("CREATE TABLE %s (id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, %s)", tbl, strings.Join(fields, ", "))
_, err := db.Exec(tblCreate)
if err != nil {
fmt.Printf("ERROR: %s\n", err.Error())
}
}
/*
bulkInsert().
Reads insertBatch and insertBatchIfs. This is called after a group of insertData() calls.
*/
func bulkInsert(tbl string, db *sql.DB) (res sql.Result, err error) {
if _, ok := insertString[tbl]; !ok {
return nil, errors.New("no insert statement")
}
batchVals := insertBatchIfs[tbl]
numColsPerRow := len(batchVals[0])
maxRowBatch := int(999 / numColsPerRow) // SQLITE_MAX_VARIABLE_NUMBER = 999.
// log.Printf("table %s. %d cols per row. max batch %d\n", tbl, numColsPerRow, maxRowBatch)
for len(batchVals) > 0 {
// timeInit := time.Now()
i := int(0) // Variable number of rows per INSERT statement.
stmt := ""
vals := make([]interface{}, 0)
querySize := uint64(0) // Size of the query in bytes.
for len(batchVals) > 0 && i < maxRowBatch && querySize < 750000 { // Maximum of 1,000,000 bytes per query.
if len(stmt) == 0 { // The first set will be covered by insertString.
stmt = insertString[tbl]
querySize += uint64(len(insertString[tbl]))
} else {
addStr := ", (" + strings.Join(strings.Split(strings.Repeat("?", len(batchVals[0])), ""), ",") + ")"
stmt += addStr
querySize += uint64(len(addStr))
}
for _, val := range batchVals[0] {
querySize += uint64(len(val.(string)))
}
vals = append(vals, batchVals[0]...)
batchVals = batchVals[1:]
i++
}
// log.Printf("inserting %d rows to %s. querySize=%d\n", i, tbl, querySize)
res, err = db.Exec(stmt, vals...)
// timeBatch := time.Since(timeInit) // debug
// log.Printf("SQLite: bulkInserted %d rows to %s. Took %f msec to build and insert query. querySize=%d\n", i, tbl, 1000*timeBatch.Seconds(), querySize) // debug
if err != nil {
log.Printf("sqlite INSERT error: '%s'\n", err.Error())
return
}
}
// Clear the buffers.
delete(insertString, tbl)
delete(insertBatchIfs, tbl)
return
}
/*
insertData().
Inserts an arbitrary struct into an SQLite table.
Inserts the timestamp first, if its 'id' is 0.
*/
// Cached 'VALUES' statements. Indexed by table name.
var insertString map[string]string // INSERT INTO tbl (col1, col2, ...) VALUES(?, ?, ...). Only for one value.
var insertBatchIfs map[string][][]interface{}
func insertData(i interface{}, tbl string, db *sql.DB, ts_num int64) int64 {
val := reflect.ValueOf(i)
keys := make([]string, 0)
values := make([]string, 0)
for i := 0; i < val.NumField(); i++ {
kind := val.Field(i).Kind()
fieldName := val.Type().Field(i).Name
sqlTypeAlias := sqlTypeMap[kind]
if sqlTypeAlias == "notsupported" || fieldName == "id" {
continue
}
v := sqliteMarshalFunctions[sqlTypeAlias].Marshal(val.Field(i))
keys = append(keys, fieldName)
values = append(values, v)
}
// Add the timestamp_id field to link up with the timestamp table.
if tbl != "timestamp" && tbl != "startup" {
keys = append(keys, "timestamp_id")
if dataLogTimestamps[ts_num].id == 0 {
//FIXME: This is somewhat convoluted. When insertData() is called for a ts_num that corresponds to a timestamp with no database id,
// then it inserts that timestamp via the same interface and the id is updated in the structure via the below lines
// (dataLogTimestamps[ts_num].id = id).
dataLogTimestamps[ts_num].StartupID = stratuxStartupID
insertData(dataLogTimestamps[ts_num], "timestamp", db, ts_num) // Updates dataLogTimestamps[ts_num].id.
}
values = append(values, strconv.FormatInt(dataLogTimestamps[ts_num].id, 10))
}
if _, ok := insertString[tbl]; !ok {
// Prepare the statement.
tblInsert := fmt.Sprintf("INSERT INTO %s (%s) VALUES(%s)", tbl, strings.Join(keys, ","),
strings.Join(strings.Split(strings.Repeat("?", len(keys)), ""), ","))
insertString[tbl] = tblInsert
}
// Make the values slice into a slice of interface{}.
ifs := make([]interface{}, len(values))
for i := 0; i < len(values); i++ {
ifs[i] = values[i]
}
insertBatchIfs[tbl] = append(insertBatchIfs[tbl], ifs)
if tbl == "timestamp" || tbl == "startup" { // Immediate insert always for "timestamp" and "startup" table.
res, err := bulkInsert(tbl, db) // Bulk insert of 1, always.
if err == nil {
id, err := res.LastInsertId()
if err == nil && tbl == "timestamp" { // Special handling for timestamps. Update the timestamp ID.
ts := dataLogTimestamps[ts_num]
ts.id = id
dataLogTimestamps[ts_num] = ts
}
return id
}
}
return 0
}
type DataLogRow struct {
tbl string
data interface{}
ts_num int64
}
var dataLogChan chan DataLogRow
var shutdownDataLog chan bool
var shutdownDataLogWriter chan bool
var dataLogWriteChan chan DataLogRow
func dataLogWriter(db *sql.DB) {
dataLogWriteChan = make(chan DataLogRow, 10240)
shutdownDataLogWriter = make(chan bool)
// The write queue. As data comes in via dataLogChan, it is timestamped and stored.
// When writeTicker comes up, the queue is emptied.
writeTicker := time.NewTicker(10 * time.Second)
rowsQueuedForWrite := make([]DataLogRow, 0)
for {
select {
case r := <-dataLogWriteChan:
// Accept timestamped row.
//log.Printf("Accepting timestamped row from dataLogWriteChan\n")
rowsQueuedForWrite = append(rowsQueuedForWrite, r)
case <-writeTicker.C:
// for i := 0; i < 1000; i++ {
// logSituation()
// }
timeStart := stratuxClock.Time
nRows := len(rowsQueuedForWrite)
//if globalSettings.DEBUG {
log.Printf("Writing %d rows\n", nRows)
//}
// Write the buffered rows. This will block while it is writing.
// Save the names of the tables affected so that we can run bulkInsert() on after the insertData() calls.
tblsAffected := make(map[string]bool)
// Start transaction.
tx, err := db.Begin()
if err != nil {
log.Printf("db.Begin() error: %s\n", err.Error())
break // from select {}
}
for _, r := range rowsQueuedForWrite {
tblsAffected[r.tbl] = true
insertData(r.data, r.tbl, db, r.ts_num)
}
// Do the bulk inserts.
for tbl, _ := range tblsAffected {
bulkInsert(tbl, db)
}
// Close the transaction.
tx.Commit()
rowsQueuedForWrite = make([]DataLogRow, 0) // Zero the queue.
timeElapsed := stratuxClock.Since(timeStart)
//if globalSettings.DEBUG {
rowsPerSecond := float64(nRows) / float64(timeElapsed.Seconds())
log.Printf("Writing finished. %d rows in %.2f seconds (%.1f rows per second).\n", nRows, float64(timeElapsed.Seconds()), rowsPerSecond)
//}
if timeElapsed.Seconds() > 10.0 {
log.Printf("WARNING! SQLite logging is behind. Last write took %.1f seconds.\n", float64(timeElapsed.Seconds()))
}
case <-shutdownDataLogWriter: // Received a message on the channel (anything). Graceful shutdown (defer statement).
log.Printf("dataLogWriter() received shutdown message with len(dataLogWriteChan) = %d and rowsQueuedForWrite = %d\n", len(dataLogWriteChan), len(rowsQueuedForWrite))
shutdownDataLog <- true
return
}
}
log.Printf("dataLogWriter() shutting down\n")
}
func dataLog() {
dataLogStarted = true
//dataLogInShutdown = false
log.Printf("dataLog started\n")
dataLogChan = make(chan DataLogRow, 10240)
shutdownDataLog = make(chan bool)
dataLogTimestamps = make([]StratuxTimestamp, 0)
var ts StratuxTimestamp
ts.id = 0
ts.Time_type_preference = 0 // stratuxClock.
ts.StratuxClock_value = stratuxClock.Time
ts.GPSClock_value = time.Time{}
ts.PreferredTime_value = stratuxClock.Time
dataLogTimestamps = append(dataLogTimestamps, ts)
dataLogCurTimestamp = 0
// Check if we need to create a new database.
createDatabase := false
if _, err := os.Stat(dataLogFile); os.IsNotExist(err) {
createDatabase = true
log.Printf("creating new database '%s'.\n", dataLogFile)
}
db, err := sql.Open("sqlite3", dataLogFile)
if err != nil {
log.Printf("sql.Open(): %s\n", err.Error())
}
defer func() {
db.Close()
dataLogStarted = false
log.Printf("dataLog() dB is now closed\n")
}()
_, err = db.Exec("PRAGMA journal_mode=WAL")
if err != nil {
log.Printf("db.Exec('PRAGMA journal_mode=WAL') err: %s\n", err.Error())
}
_, err = db.Exec("PRAGMA synchronous=OFF")
if err != nil {
log.Printf("db.Exec('PRAGMA journal_mode=WAL') err: %s\n", err.Error())
}
log.Printf("Starting dataLogWriter\n")
go dataLogWriter(db)
// Do we need to create the database?
if createDatabase {
makeTable(StratuxTimestamp{}, "timestamp", db)
makeTable(mySituation, "mySituation", db)
makeTable(globalStatus, "status", db)
makeTable(globalSettings, "settings", db)
makeTable(TrafficInfo{}, "traffic", db)
makeTable(msg{}, "messages", db)
makeTable(esmsg{}, "es_messages", db)
makeTable(Dump1090TermMessage{}, "dump1090_terminal", db)
makeTable(StratuxStartup{}, "startup", db)
}
// The first entry to be created is the "startup" entry.
stratuxStartupID = insertData(StratuxStartup{}, "startup", db, 0)
dataLogReadyToWrite = true
log.Printf("Entering dataLog read loop\n")
for {
select {
case r := <-dataLogChan:
// When data is input, the first step is to timestamp it.
// Check if our time bucket has expired or has never been entered.
checkTimestamp()
// Mark the row with the current timestamp ID, in case it gets entered later.
r.ts_num = dataLogCurTimestamp
// Queue it for the scheduled write.
dataLogWriteChan <- r
case <-shutdownDataLog: // Received a message on the channel (anything). Graceful shutdown (defer statement).
//dataLogStarted = false // moved to defer statement
log.Printf("dataLog() received shutdown message\n")
return
}
}
log.Printf("dataLog() shutting down\n")
}
/*
setDataLogTimeWithGPS().
Create a timestamp entry using GPS time.
*/
func setDataLogTimeWithGPS(sit SituationData) {
if isGPSClockValid() {
var ts StratuxTimestamp
// Piggyback a GPS time update from this update.
ts.id = 0
ts.Time_type_preference = 1 // gpsClock.
ts.StratuxClock_value = stratuxClock.Time
ts.GPSClock_value = sit.GPSTime
ts.PreferredTime_value = sit.GPSTime
dataLogTimestamps = append(dataLogTimestamps, ts)
dataLogCurTimestamp = int64(len(dataLogTimestamps) - 1)
}
}
func logSituation() {
if globalSettings.ReplayLog && dataLogReadyToWrite {
dataLogChan <- DataLogRow{tbl: "mySituation", data: mySituation}
}
}
func logStatus() {
if globalSettings.ReplayLog && dataLogReadyToWrite {
dataLogChan <- DataLogRow{tbl: "status", data: globalStatus}
}
}
func logSettings() {
if globalSettings.ReplayLog && dataLogReadyToWrite {
dataLogChan <- DataLogRow{tbl: "settings", data: globalSettings}
}
}
func logTraffic(ti TrafficInfo) {
if globalSettings.ReplayLog && dataLogReadyToWrite {
dataLogChan <- DataLogRow{tbl: "traffic", data: ti}
}
}
func logMsg(m msg) {
if globalSettings.ReplayLog && dataLogReadyToWrite {
dataLogChan <- DataLogRow{tbl: "messages", data: m}
}
}
func logESMsg(m esmsg) {
if globalSettings.ReplayLog && dataLogReadyToWrite {
dataLogChan <- DataLogRow{tbl: "es_messages", data: m}
}
}
func logDump1090TermMessage(m Dump1090TermMessage) {
if globalSettings.DEBUG && globalSettings.ReplayLog && dataLogReadyToWrite {
dataLogChan <- DataLogRow{tbl: "dump1090_terminal", data: m}
}
}
func initDataLog() {
log.Printf("dataLogStarted = %t. dataLogReadyToWrite = %t\n", dataLogStarted, dataLogReadyToWrite) //REMOVE -- DEBUG
insertString = make(map[string]string)
insertBatchIfs = make(map[string][][]interface{})
if globalSettings.ReplayLog {
go dataLog()
}
go dataLogWatchdog()
log.Printf("initDataLog complete.\n") //REMOVE -- DEBUG
}
// Watchdog function to control startup / shutdown of data logging subsystem
func dataLogWatchdog() {
for {
log.Printf("Watchdog loop begins. dataLogStarted = %t\n", dataLogStarted)
if !dataLogStarted && globalSettings.ReplayLog { // case 1: sqlite logging isn't running, and we want to start it
log.Printf("Watchdog wants to START logging.\n")
go dataLog()
} else if dataLogStarted && !globalSettings.ReplayLog { // case 2: sqlite logging is running, and we want to shut it down
log.Printf("Watchdog wants to STOP logging.\n")
closeDataLog()
}
log.Printf("Watchdog iterated.\n") //REMOVE -- DEBUG
time.Sleep(1 * time.Second)
log.Printf("Watchdog sleep over.\n") //REMOVE -- DEBUG
}
}
// Handler for graceful shutdown of data logging goroutines. Intended to be called by dataLogWatchdog() and gracefulShutdown()
func closeDataLog() {
log.Printf("closeDataLog(): dataLogStarted = %t\n", dataLogStarted) //REMOVE -- DEBUG
dataLogReadyToWrite = false // prevent any new messages from being sent down the channels
log.Printf("Shutting down SQLite data log\n")
shutdownDataLogWriter <- true
log.Printf("Waiting for signal from dataLog()") //REMOVE -- DEBUG
for dataLogStarted {
log.Printf("closeDataLog(): dataLogStarted = %t\n", dataLogStarted) //REMOVE -- DEBUG
time.Sleep(50 * time.Millisecond)
}
log.Printf("closeDataLog(): Finished wait. dataLogStarted = %t\n", dataLogStarted) //REMOVE -- DEBUG
log.Printf("closeDataLog() complete")
}