2016-03-23 16:31:11 +00:00
/ *
Copyright ( c ) 2015 - 2016 Christopher Young
2017-04-19 19:57:25 +00:00
Distributable under the terms of The "BSD New" License
2016-03-23 16:31:11 +00:00
that can be found in the LICENSE file , herein included
as part of this header .
datalog . go : Log stratux data as it is received . Bucket data into timestamp time slots .
* /
package main
import (
2016-03-24 04:23:12 +00:00
"database/sql"
2016-04-01 22:25:53 +00:00
"errors"
2016-03-23 16:31:11 +00:00
"fmt"
2016-03-24 04:23:12 +00:00
_ "github.com/mattn/go-sqlite3"
2016-03-24 13:33:11 +00:00
"log"
"os"
2016-03-23 16:31:11 +00:00
"reflect"
2016-03-24 03:08:00 +00:00
"strconv"
2016-03-23 16:31:11 +00:00
"strings"
"time"
)
const (
2016-03-24 13:33:11 +00:00
LOG_TIMESTAMP_RESOLUTION = 250 * time . Millisecond
2016-03-23 16:31:11 +00:00
)
type StratuxTimestamp struct {
2016-04-01 12:36:10 +00:00
id int64
2016-03-24 05:14:48 +00:00
Time_type_preference int // 0 = stratuxClock, 1 = gpsClock, 2 = gpsClock extrapolated via stratuxClock.
StratuxClock_value time . Time
2016-04-01 18:05:23 +00:00
GPSClock_value time . Time // The value of this is either from the GPS or extrapolated from the GPS via stratuxClock if pref is 1 or 2. It is time.Time{} if 0.
2016-03-24 05:14:48 +00:00
PreferredTime_value time . Time
2016-04-22 20:10:56 +00:00
StartupID int64
2016-03-23 16:31:11 +00:00
}
2016-04-22 20:10:56 +00:00
// 'startup' table creates a new entry each time the daemon is started. This keeps track of sequential starts, even if the
// timestamp is ambiguous (units with no GPS). This struct is just a placeholder for an empty table (other than primary key).
type StratuxStartup struct {
id int64
Fill string
}
2016-05-02 05:03:21 +00:00
var dataLogStarted bool
var dataLogReadyToWrite bool
2016-04-22 20:10:56 +00:00
var stratuxStartupID int64
2016-04-10 05:21:13 +00:00
var dataLogTimestamps [ ] StratuxTimestamp
2016-04-01 12:26:37 +00:00
var dataLogCurTimestamp int64 // Current timestamp bucket. This is an index on dataLogTimestamps which is not necessarily the db id.
2016-03-23 16:31:11 +00:00
/ *
checkTimestamp ( ) .
Verify that our current timestamp is within the LOG_TIMESTAMP_RESOLUTION bucket .
Returns false if the timestamp was changed , true if it is still valid .
2016-04-01 12:34:37 +00:00
This is where GPS timestamps are extrapolated , if the GPS data is currently valid .
2016-03-23 16:31:11 +00:00
* /
2016-03-24 04:23:12 +00:00
2016-03-23 16:31:11 +00:00
func checkTimestamp ( ) bool {
2016-04-10 05:21:13 +00:00
thisCurTimestamp := dataLogCurTimestamp
if stratuxClock . Since ( dataLogTimestamps [ thisCurTimestamp ] . StratuxClock_value ) >= LOG_TIMESTAMP_RESOLUTION {
2016-04-01 12:26:37 +00:00
var ts StratuxTimestamp
ts . id = 0
ts . Time_type_preference = 0 // stratuxClock.
ts . StratuxClock_value = stratuxClock . Time
ts . GPSClock_value = time . Time { }
ts . PreferredTime_value = stratuxClock . Time
2016-04-01 12:34:37 +00:00
// Extrapolate from GPS timestamp, if possible.
2016-04-10 05:21:13 +00:00
if isGPSClockValid ( ) && thisCurTimestamp > 0 {
2016-04-01 12:34:37 +00:00
// Was the last timestamp either extrapolated or GPS time?
2016-04-10 05:21:13 +00:00
last_ts := dataLogTimestamps [ thisCurTimestamp ]
2016-04-01 12:34:37 +00:00
if last_ts . Time_type_preference == 1 || last_ts . Time_type_preference == 2 {
// Extrapolate via stratuxClock.
timeSinceLastTS := ts . StratuxClock_value . Sub ( last_ts . StratuxClock_value ) // stratuxClock ticks since last timestamp.
extrapolatedGPSTimestamp := last_ts . PreferredTime_value . Add ( timeSinceLastTS )
// Re-set the preferred timestamp type to '2' (extrapolated time).
ts . Time_type_preference = 2
ts . PreferredTime_value = extrapolatedGPSTimestamp
2016-04-01 18:05:23 +00:00
ts . GPSClock_value = extrapolatedGPSTimestamp
2016-04-01 12:34:37 +00:00
}
}
2016-04-10 05:21:13 +00:00
dataLogTimestamps = append ( dataLogTimestamps , ts )
dataLogCurTimestamp = int64 ( len ( dataLogTimestamps ) - 1 )
2016-03-23 16:31:11 +00:00
return false
}
return true
}
2016-03-24 04:23:12 +00:00
2016-03-23 16:31:11 +00:00
type SQLiteMarshal struct {
FieldType string
2016-03-24 03:08:00 +00:00
Marshal func ( v reflect . Value ) string
2016-03-23 16:31:11 +00:00
}
2016-03-24 03:08:00 +00:00
func boolMarshal ( v reflect . Value ) string {
b := v . Bool ( )
if b {
return "1"
}
return "0"
2016-03-23 16:31:11 +00:00
}
2016-03-24 03:08:00 +00:00
func intMarshal ( v reflect . Value ) string {
return strconv . FormatInt ( v . Int ( ) , 10 )
2016-03-23 16:31:11 +00:00
}
2016-03-24 03:08:00 +00:00
func uintMarshal ( v reflect . Value ) string {
return strconv . FormatUint ( v . Uint ( ) , 10 )
}
func floatMarshal ( v reflect . Value ) string {
return strconv . FormatFloat ( v . Float ( ) , 'f' , 10 , 64 )
}
func stringMarshal ( v reflect . Value ) string {
return v . String ( )
2016-03-23 16:31:11 +00:00
}
2016-03-24 03:08:00 +00:00
func notsupportedMarshal ( v reflect . Value ) string {
2016-03-23 16:31:11 +00:00
return ""
}
2016-03-24 13:33:11 +00:00
func structCanBeMarshalled ( v reflect . Value ) bool {
m := v . MethodByName ( "String" )
if m . IsValid ( ) && ! m . IsNil ( ) {
return true
}
return false
}
2016-03-24 03:08:00 +00:00
func structMarshal ( v reflect . Value ) string {
if structCanBeMarshalled ( v ) {
m := v . MethodByName ( "String" )
in := make ( [ ] reflect . Value , 0 )
ret := m . Call ( in )
if len ( ret ) > 0 {
return ret [ 0 ] . String ( )
}
}
2016-03-23 16:31:11 +00:00
return ""
}
var sqliteMarshalFunctions = map [ string ] SQLiteMarshal {
"bool" : { FieldType : "INTEGER" , Marshal : boolMarshal } ,
"int" : { FieldType : "INTEGER" , Marshal : intMarshal } ,
"uint" : { FieldType : "INTEGER" , Marshal : uintMarshal } ,
"float" : { FieldType : "REAL" , Marshal : floatMarshal } ,
"string" : { FieldType : "TEXT" , Marshal : stringMarshal } ,
2016-03-24 03:08:00 +00:00
"struct" : { FieldType : "STRING" , Marshal : structMarshal } ,
2016-03-23 16:31:11 +00:00
"notsupported" : { FieldType : "notsupported" , Marshal : notsupportedMarshal } ,
}
var sqlTypeMap = map [ reflect . Kind ] string {
reflect . Bool : "bool" ,
reflect . Int : "int" ,
reflect . Int8 : "int" ,
reflect . Int16 : "int" ,
reflect . Int32 : "int" ,
reflect . Int64 : "int" ,
reflect . Uint : "uint" ,
reflect . Uint8 : "uint" ,
reflect . Uint16 : "uint" ,
reflect . Uint32 : "uint" ,
reflect . Uint64 : "uint" ,
reflect . Uintptr : "notsupported" ,
reflect . Float32 : "float" ,
reflect . Float64 : "float" ,
reflect . Complex64 : "notsupported" ,
reflect . Complex128 : "notsupported" ,
reflect . Array : "notsupported" ,
reflect . Chan : "notsupported" ,
reflect . Func : "notsupported" ,
reflect . Interface : "notsupported" ,
reflect . Map : "notsupported" ,
reflect . Ptr : "notsupported" ,
reflect . Slice : "notsupported" ,
reflect . String : "string" ,
2016-03-24 03:08:00 +00:00
reflect . Struct : "struct" ,
2016-03-23 16:31:11 +00:00
reflect . UnsafePointer : "notsupported" ,
}
2016-03-24 04:23:12 +00:00
func makeTable ( i interface { } , tbl string , db * sql . DB ) {
2016-03-23 16:31:11 +00:00
val := reflect . ValueOf ( i )
fields := make ( [ ] string , 0 )
for i := 0 ; i < val . NumField ( ) ; i ++ {
kind := val . Field ( i ) . Kind ( )
fieldName := val . Type ( ) . Field ( i ) . Name
sqlTypeAlias := sqlTypeMap [ kind ]
2016-03-24 03:08:00 +00:00
// Check that if the field is a struct that it can be marshalled.
if sqlTypeAlias == "struct" && ! structCanBeMarshalled ( val . Field ( i ) ) {
continue
}
2016-03-23 16:31:11 +00:00
if sqlTypeAlias == "notsupported" || fieldName == "id" {
continue
}
sqlType := sqliteMarshalFunctions [ sqlTypeAlias ] . FieldType
s := fieldName + " " + sqlType
fields = append ( fields , s )
}
2016-03-24 05:14:48 +00:00
// Add the timestamp_id field to link up with the timestamp table.
2016-04-22 20:10:56 +00:00
if tbl != "timestamp" && tbl != "startup" {
2016-03-24 05:14:48 +00:00
fields = append ( fields , "timestamp_id INTEGER" )
}
tblCreate := fmt . Sprintf ( "CREATE TABLE %s (id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, %s)" , tbl , strings . Join ( fields , ", " ) )
2016-04-22 20:10:56 +00:00
2016-03-24 05:14:48 +00:00
_ , err := db . Exec ( tblCreate )
if err != nil {
fmt . Printf ( "ERROR: %s\n" , err . Error ( ) )
2016-03-23 16:31:11 +00:00
}
}
2016-04-01 22:25:53 +00:00
/ *
bulkInsert ( ) .
Reads insertBatch and insertBatchIfs . This is called after a group of insertData ( ) calls .
* /
func bulkInsert ( tbl string , db * sql . DB ) ( res sql . Result , err error ) {
if _ , ok := insertString [ tbl ] ; ! ok {
return nil , errors . New ( "no insert statement" )
}
batchVals := insertBatchIfs [ tbl ]
2016-04-03 18:10:42 +00:00
numColsPerRow := len ( batchVals [ 0 ] )
maxRowBatch := int ( 999 / numColsPerRow ) // SQLITE_MAX_VARIABLE_NUMBER = 999.
// log.Printf("table %s. %d cols per row. max batch %d\n", tbl, numColsPerRow, maxRowBatch)
2016-04-01 22:25:53 +00:00
for len ( batchVals ) > 0 {
2016-04-10 05:21:13 +00:00
// timeInit := time.Now()
2016-04-06 03:01:10 +00:00
i := int ( 0 ) // Variable number of rows per INSERT statement.
2016-04-01 22:25:53 +00:00
stmt := ""
vals := make ( [ ] interface { } , 0 )
2016-04-03 18:10:42 +00:00
querySize := uint64 ( 0 ) // Size of the query in bytes.
for len ( batchVals ) > 0 && i < maxRowBatch && querySize < 750000 { // Maximum of 1,000,000 bytes per query.
2016-04-01 22:25:53 +00:00
if len ( stmt ) == 0 { // The first set will be covered by insertString.
stmt = insertString [ tbl ]
2016-04-02 19:57:05 +00:00
querySize += uint64 ( len ( insertString [ tbl ] ) )
2016-04-01 22:25:53 +00:00
} else {
2016-04-02 19:57:05 +00:00
addStr := ", (" + strings . Join ( strings . Split ( strings . Repeat ( "?" , len ( batchVals [ 0 ] ) ) , "" ) , "," ) + ")"
2016-04-03 18:10:42 +00:00
stmt += addStr
2016-04-02 19:57:05 +00:00
querySize += uint64 ( len ( addStr ) )
2016-04-01 22:25:53 +00:00
}
2016-04-02 19:33:19 +00:00
for _ , val := range batchVals [ 0 ] {
2016-04-02 19:40:25 +00:00
querySize += uint64 ( len ( val . ( string ) ) )
2016-04-02 19:33:19 +00:00
}
2016-04-01 22:25:53 +00:00
vals = append ( vals , batchVals [ 0 ] ... )
batchVals = batchVals [ 1 : ]
i ++
}
2016-04-02 19:57:05 +00:00
// log.Printf("inserting %d rows to %s. querySize=%d\n", i, tbl, querySize)
2016-04-01 22:25:53 +00:00
res , err = db . Exec ( stmt , vals ... )
2016-04-10 05:21:13 +00:00
// timeBatch := time.Since(timeInit) // debug
// log.Printf("SQLite: bulkInserted %d rows to %s. Took %f msec to build and insert query. querySize=%d\n", i, tbl, 1000*timeBatch.Seconds(), querySize) // debug
2016-04-01 22:25:53 +00:00
if err != nil {
2016-04-06 03:01:10 +00:00
log . Printf ( "sqlite INSERT error: '%s'\n" , err . Error ( ) )
2016-04-01 22:25:53 +00:00
return
}
}
// Clear the buffers.
delete ( insertString , tbl )
delete ( insertBatchIfs , tbl )
return
}
2016-04-01 19:42:46 +00:00
/ *
insertData ( ) .
Inserts an arbitrary struct into an SQLite table .
Inserts the timestamp first , if its ' id ' is 0.
* /
2016-04-01 22:25:53 +00:00
// Cached 'VALUES' statements. Indexed by table name.
var insertString map [ string ] string // INSERT INTO tbl (col1, col2, ...) VALUES(?, ?, ...). Only for one value.
var insertBatchIfs map [ string ] [ ] [ ] interface { }
2016-04-01 19:42:46 +00:00
2016-04-01 12:26:37 +00:00
func insertData ( i interface { } , tbl string , db * sql . DB , ts_num int64 ) int64 {
2016-03-24 03:08:00 +00:00
val := reflect . ValueOf ( i )
2016-03-24 04:23:12 +00:00
keys := make ( [ ] string , 0 )
values := make ( [ ] string , 0 )
2016-03-24 03:08:00 +00:00
for i := 0 ; i < val . NumField ( ) ; i ++ {
kind := val . Field ( i ) . Kind ( )
fieldName := val . Type ( ) . Field ( i ) . Name
sqlTypeAlias := sqlTypeMap [ kind ]
if sqlTypeAlias == "notsupported" || fieldName == "id" {
continue
}
v := sqliteMarshalFunctions [ sqlTypeAlias ] . Marshal ( val . Field ( i ) )
2016-03-24 04:23:12 +00:00
keys = append ( keys , fieldName )
values = append ( values , v )
2016-03-24 03:08:00 +00:00
}
2016-03-24 05:14:48 +00:00
// Add the timestamp_id field to link up with the timestamp table.
2016-04-22 20:10:56 +00:00
if tbl != "timestamp" && tbl != "startup" {
2016-03-24 05:14:48 +00:00
keys = append ( keys , "timestamp_id" )
2016-04-01 12:26:37 +00:00
if dataLogTimestamps [ ts_num ] . id == 0 {
//FIXME: This is somewhat convoluted. When insertData() is called for a ts_num that corresponds to a timestamp with no database id,
// then it inserts that timestamp via the same interface and the id is updated in the structure via the below lines
// (dataLogTimestamps[ts_num].id = id).
2016-04-22 20:10:56 +00:00
dataLogTimestamps [ ts_num ] . StartupID = stratuxStartupID
2016-04-01 12:26:37 +00:00
insertData ( dataLogTimestamps [ ts_num ] , "timestamp" , db , ts_num ) // Updates dataLogTimestamps[ts_num].id.
}
values = append ( values , strconv . FormatInt ( dataLogTimestamps [ ts_num ] . id , 10 ) )
2016-03-24 05:14:48 +00:00
}
2016-04-01 22:25:53 +00:00
if _ , ok := insertString [ tbl ] ; ! ok {
2016-04-01 19:42:46 +00:00
// Prepare the statement.
tblInsert := fmt . Sprintf ( "INSERT INTO %s (%s) VALUES(%s)" , tbl , strings . Join ( keys , "," ) ,
strings . Join ( strings . Split ( strings . Repeat ( "?" , len ( keys ) ) , "" ) , "," ) )
2016-04-01 22:25:53 +00:00
insertString [ tbl ] = tblInsert
2016-04-01 19:42:46 +00:00
}
2016-03-24 05:14:48 +00:00
2016-04-01 22:25:53 +00:00
// Make the values slice into a slice of interface{}.
2016-03-24 05:14:48 +00:00
ifs := make ( [ ] interface { } , len ( values ) )
for i := 0 ; i < len ( values ) ; i ++ {
ifs [ i ] = values [ i ]
}
2016-04-01 22:25:53 +00:00
insertBatchIfs [ tbl ] = append ( insertBatchIfs [ tbl ] , ifs )
2016-04-22 20:10:56 +00:00
if tbl == "timestamp" || tbl == "startup" { // Immediate insert always for "timestamp" and "startup" table.
res , err := bulkInsert ( tbl , db ) // Bulk insert of 1, always.
2016-04-01 22:25:53 +00:00
if err == nil {
id , err := res . LastInsertId ( )
2016-04-22 20:10:56 +00:00
if err == nil && tbl == "timestamp" { // Special handling for timestamps. Update the timestamp ID.
2016-04-01 22:25:53 +00:00
ts := dataLogTimestamps [ ts_num ]
ts . id = id
dataLogTimestamps [ ts_num ] = ts
}
return id
2016-03-24 21:26:59 +00:00
}
2016-03-24 05:14:48 +00:00
}
return 0
}
type DataLogRow struct {
2016-04-01 12:26:37 +00:00
tbl string
data interface { }
ts_num int64
2016-03-24 05:14:48 +00:00
}
var dataLogChan chan DataLogRow
2016-03-26 21:15:59 +00:00
var shutdownDataLog chan bool
2016-05-02 05:03:21 +00:00
var shutdownDataLogWriter chan bool
2016-03-24 05:14:48 +00:00
2016-04-01 18:05:23 +00:00
var dataLogWriteChan chan DataLogRow
func dataLogWriter ( db * sql . DB ) {
dataLogWriteChan = make ( chan DataLogRow , 10240 )
2016-05-02 05:03:21 +00:00
shutdownDataLogWriter = make ( chan bool )
2016-04-01 18:05:23 +00:00
// The write queue. As data comes in via dataLogChan, it is timestamped and stored.
// When writeTicker comes up, the queue is emptied.
2016-04-01 22:42:28 +00:00
writeTicker := time . NewTicker ( 10 * time . Second )
2016-04-01 18:05:23 +00:00
rowsQueuedForWrite := make ( [ ] DataLogRow , 0 )
for {
select {
case r := <- dataLogWriteChan :
// Accept timestamped row.
rowsQueuedForWrite = append ( rowsQueuedForWrite , r )
case <- writeTicker . C :
2016-04-03 18:10:42 +00:00
// for i := 0; i < 1000; i++ {
// logSituation()
// }
timeStart := stratuxClock . Time
2016-04-10 05:21:13 +00:00
nRows := len ( rowsQueuedForWrite )
2016-05-03 03:50:42 +00:00
if globalSettings . DEBUG {
log . Printf ( "Writing %d rows\n" , nRows )
}
2016-04-01 19:42:46 +00:00
// Write the buffered rows. This will block while it is writing.
2016-04-01 22:25:53 +00:00
// Save the names of the tables affected so that we can run bulkInsert() on after the insertData() calls.
tblsAffected := make ( map [ string ] bool )
2016-04-01 19:42:46 +00:00
// Start transaction.
tx , err := db . Begin ( )
if err != nil {
log . Printf ( "db.Begin() error: %s\n" , err . Error ( ) )
break // from select {}
}
2016-04-01 18:05:23 +00:00
for _ , r := range rowsQueuedForWrite {
2016-04-01 22:25:53 +00:00
tblsAffected [ r . tbl ] = true
2016-04-01 18:05:23 +00:00
insertData ( r . data , r . tbl , db , r . ts_num )
}
2016-04-01 22:25:53 +00:00
// Do the bulk inserts.
for tbl , _ := range tblsAffected {
bulkInsert ( tbl , db )
}
2016-04-01 19:42:46 +00:00
// Close the transaction.
tx . Commit ( )
2016-04-01 18:05:23 +00:00
rowsQueuedForWrite = make ( [ ] DataLogRow , 0 ) // Zero the queue.
2016-04-03 18:10:42 +00:00
timeElapsed := stratuxClock . Since ( timeStart )
2016-05-03 03:50:42 +00:00
if globalSettings . DEBUG {
rowsPerSecond := float64 ( nRows ) / float64 ( timeElapsed . Seconds ( ) )
log . Printf ( "Writing finished. %d rows in %.2f seconds (%.1f rows per second).\n" , nRows , float64 ( timeElapsed . Seconds ( ) ) , rowsPerSecond )
}
2016-04-03 18:10:42 +00:00
if timeElapsed . Seconds ( ) > 10.0 {
2016-04-07 22:34:01 +00:00
log . Printf ( "WARNING! SQLite logging is behind. Last write took %.1f seconds.\n" , float64 ( timeElapsed . Seconds ( ) ) )
2016-05-03 13:12:49 +00:00
dataLogCriticalErr := fmt . Errorf ( "WARNING! SQLite logging is behind. Last write took %.1f seconds.\n" , float64 ( timeElapsed . Seconds ( ) ) )
2016-05-03 13:05:54 +00:00
addSystemError ( dataLogCriticalErr )
2016-04-03 18:10:42 +00:00
}
2016-05-03 03:50:42 +00:00
case <- shutdownDataLogWriter : // Received a message on the channel to initiate a graceful shutdown, and to command dataLog() to shut down
log . Printf ( "datalog.go: dataLogWriter() received shutdown message with rowsQueuedForWrite = %d\n" , len ( rowsQueuedForWrite ) )
2016-05-02 05:03:21 +00:00
shutdownDataLog <- true
return
2016-04-01 18:05:23 +00:00
}
}
2016-05-03 03:50:42 +00:00
log . Printf ( "datalog.go: dataLogWriter() shutting down\n" )
2016-04-01 18:05:23 +00:00
}
func dataLog ( ) {
2016-05-02 05:03:21 +00:00
dataLogStarted = true
2016-05-03 03:50:42 +00:00
log . Printf ( "datalog.go: dataLog() started\n" )
2016-03-24 16:13:08 +00:00
dataLogChan = make ( chan DataLogRow , 10240 )
2016-03-26 21:15:59 +00:00
shutdownDataLog = make ( chan bool )
2016-04-10 05:21:13 +00:00
dataLogTimestamps = make ( [ ] StratuxTimestamp , 0 )
var ts StratuxTimestamp
ts . id = 0
ts . Time_type_preference = 0 // stratuxClock.
ts . StratuxClock_value = stratuxClock . Time
ts . GPSClock_value = time . Time { }
ts . PreferredTime_value = stratuxClock . Time
dataLogTimestamps = append ( dataLogTimestamps , ts )
dataLogCurTimestamp = 0
2016-03-24 13:33:11 +00:00
// Check if we need to create a new database.
createDatabase := false
2016-05-27 20:27:22 +00:00
if _ , err := os . Stat ( dataLogFilef ) ; os . IsNotExist ( err ) {
2016-03-24 13:33:11 +00:00
createDatabase = true
2016-05-27 20:27:22 +00:00
log . Printf ( "creating new database '%s'.\n" , dataLogFilef )
2016-03-24 13:33:11 +00:00
}
2016-05-27 20:27:22 +00:00
db , err := sql . Open ( "sqlite3" , dataLogFilef )
2016-03-24 05:14:48 +00:00
if err != nil {
2016-03-24 13:33:11 +00:00
log . Printf ( "sql.Open(): %s\n" , err . Error ( ) )
2016-03-24 05:14:48 +00:00
}
2016-05-02 05:03:21 +00:00
defer func ( ) {
db . Close ( )
dataLogStarted = false
2016-05-03 04:59:13 +00:00
//close(dataLogChan)
2016-05-27 20:27:22 +00:00
log . Printf ( "datalog.go: dataLog() has closed DB in %s\n" , dataLogFilef )
2016-05-02 05:03:21 +00:00
} ( )
2016-03-24 04:23:12 +00:00
2016-04-02 19:33:19 +00:00
_ , err = db . Exec ( "PRAGMA journal_mode=WAL" )
if err != nil {
log . Printf ( "db.Exec('PRAGMA journal_mode=WAL') err: %s\n" , err . Error ( ) )
}
_ , err = db . Exec ( "PRAGMA synchronous=OFF" )
if err != nil {
log . Printf ( "db.Exec('PRAGMA journal_mode=WAL') err: %s\n" , err . Error ( ) )
}
2016-05-03 03:50:42 +00:00
//log.Printf("Starting dataLogWriter\n") // REMOVE -- DEBUG
2016-04-01 18:05:23 +00:00
go dataLogWriter ( db )
2016-03-24 13:33:11 +00:00
// Do we need to create the database?
if createDatabase {
2016-04-01 12:26:37 +00:00
makeTable ( StratuxTimestamp { } , "timestamp" , db )
2016-03-24 13:33:11 +00:00
makeTable ( mySituation , "mySituation" , db )
makeTable ( globalStatus , "status" , db )
makeTable ( globalSettings , "settings" , db )
makeTable ( TrafficInfo { } , "traffic" , db )
2016-03-26 20:49:57 +00:00
makeTable ( msg { } , "messages" , db )
2016-04-13 05:21:05 +00:00
makeTable ( esmsg { } , "es_messages" , db )
2016-03-26 21:12:26 +00:00
makeTable ( Dump1090TermMessage { } , "dump1090_terminal" , db )
2016-06-08 00:10:17 +00:00
makeTable ( gpsPerfStats { } , "gps_attitude" , db )
2016-04-22 20:10:56 +00:00
makeTable ( StratuxStartup { } , "startup" , db )
2016-03-24 13:33:11 +00:00
}
2016-04-22 20:10:56 +00:00
// The first entry to be created is the "startup" entry.
stratuxStartupID = insertData ( StratuxStartup { } , "startup" , db , 0 )
2016-05-02 05:03:21 +00:00
dataLogReadyToWrite = true
2016-05-03 03:50:42 +00:00
//log.Printf("Entering dataLog read loop\n") //REMOVE -- DEBUG
2016-03-24 05:14:48 +00:00
for {
2016-03-26 21:15:59 +00:00
select {
case r := <- dataLogChan :
2016-04-01 18:05:23 +00:00
// When data is input, the first step is to timestamp it.
2016-03-26 21:15:59 +00:00
// Check if our time bucket has expired or has never been entered.
2016-04-01 12:26:37 +00:00
checkTimestamp ( )
// Mark the row with the current timestamp ID, in case it gets entered later.
r . ts_num = dataLogCurTimestamp
// Queue it for the scheduled write.
2016-04-01 18:05:23 +00:00
dataLogWriteChan <- r
2016-05-03 03:50:42 +00:00
case <- shutdownDataLog : // Received a message on the channel to complete a graceful shutdown (see the 'defer func()...' statement above).
log . Printf ( "datalog.go: dataLog() received shutdown message\n" )
2016-03-26 21:15:59 +00:00
return
2016-03-24 04:23:12 +00:00
}
2016-03-23 16:31:11 +00:00
}
2016-05-03 03:50:42 +00:00
log . Printf ( "datalog.go: dataLog() shutting down\n" )
2016-05-03 04:40:51 +00:00
close ( shutdownDataLog )
2016-03-23 16:31:11 +00:00
}
2016-03-24 03:08:00 +00:00
2016-04-01 12:26:37 +00:00
/ *
setDataLogTimeWithGPS ( ) .
Create a timestamp entry using GPS time .
* /
2016-03-24 21:26:59 +00:00
func setDataLogTimeWithGPS ( sit SituationData ) {
if isGPSClockValid ( ) {
2016-04-01 12:26:37 +00:00
var ts StratuxTimestamp
2016-03-24 21:26:59 +00:00
// Piggyback a GPS time update from this update.
2016-04-01 12:26:37 +00:00
ts . id = 0
ts . Time_type_preference = 1 // gpsClock.
ts . StratuxClock_value = stratuxClock . Time
ts . GPSClock_value = sit . GPSTime
ts . PreferredTime_value = sit . GPSTime
2016-04-10 05:21:13 +00:00
dataLogTimestamps = append ( dataLogTimestamps , ts )
dataLogCurTimestamp = int64 ( len ( dataLogTimestamps ) - 1 )
2016-03-24 21:26:59 +00:00
}
}
2016-05-03 03:50:42 +00:00
/ *
logSituation ( ) , logStatus ( ) , ... pass messages from other functions to the logging
engine . These are only read into ` dataLogChan ` if the Replay Log is toggled on ,
and if the log system is ready to accept writes .
* /
2016-05-03 13:05:54 +00:00
func isDataLogReady ( ) bool {
return dataLogReadyToWrite
}
2016-03-24 13:33:11 +00:00
func logSituation ( ) {
2016-05-03 13:05:54 +00:00
if globalSettings . ReplayLog && isDataLogReady ( ) {
2016-03-25 15:58:44 +00:00
dataLogChan <- DataLogRow { tbl : "mySituation" , data : mySituation }
}
2016-03-24 04:23:12 +00:00
}
2016-03-24 13:33:11 +00:00
func logStatus ( ) {
2016-05-03 13:05:54 +00:00
if globalSettings . ReplayLog && isDataLogReady ( ) {
2016-05-01 20:02:56 +00:00
dataLogChan <- DataLogRow { tbl : "status" , data : globalStatus }
}
2016-03-24 13:33:11 +00:00
}
func logSettings ( ) {
2016-05-03 13:05:54 +00:00
if globalSettings . ReplayLog && isDataLogReady ( ) {
2016-05-01 20:02:56 +00:00
dataLogChan <- DataLogRow { tbl : "settings" , data : globalSettings }
}
2016-03-24 13:33:11 +00:00
}
func logTraffic ( ti TrafficInfo ) {
2016-05-03 13:05:54 +00:00
if globalSettings . ReplayLog && isDataLogReady ( ) {
2016-03-25 15:58:44 +00:00
dataLogChan <- DataLogRow { tbl : "traffic" , data : ti }
}
2016-03-24 13:33:11 +00:00
}
2016-03-24 04:23:12 +00:00
2016-03-26 20:49:57 +00:00
func logMsg ( m msg ) {
2016-05-03 13:05:54 +00:00
if globalSettings . ReplayLog && isDataLogReady ( ) {
2016-03-26 20:49:57 +00:00
dataLogChan <- DataLogRow { tbl : "messages" , data : m }
}
}
2016-04-13 05:21:05 +00:00
func logESMsg ( m esmsg ) {
2016-05-03 13:05:54 +00:00
if globalSettings . ReplayLog && isDataLogReady ( ) {
2016-04-13 05:21:05 +00:00
dataLogChan <- DataLogRow { tbl : "es_messages" , data : m }
}
2016-06-08 00:10:17 +00:00
}
func logGPSAttitude ( gpsPerf gpsPerfStats ) {
if globalSettings . ReplayLog && isDataLogReady ( ) {
dataLogChan <- DataLogRow { tbl : "gps_attitude" , data : gpsPerf }
}
2016-04-13 05:21:05 +00:00
}
2016-03-26 21:12:26 +00:00
func logDump1090TermMessage ( m Dump1090TermMessage ) {
2016-05-03 13:05:54 +00:00
if globalSettings . DEBUG && globalSettings . ReplayLog && isDataLogReady ( ) {
2016-04-13 05:21:05 +00:00
dataLogChan <- DataLogRow { tbl : "dump1090_terminal" , data : m }
}
2016-03-26 21:12:26 +00:00
}
2016-03-24 13:33:11 +00:00
func initDataLog ( ) {
2016-05-03 03:50:42 +00:00
//log.Printf("dataLogStarted = %t. dataLogReadyToWrite = %t\n", dataLogStarted, dataLogReadyToWrite) //REMOVE -- DEBUG
2016-04-01 22:25:53 +00:00
insertString = make ( map [ string ] string )
insertBatchIfs = make ( map [ string ] [ ] [ ] interface { } )
2016-05-02 05:03:21 +00:00
go dataLogWatchdog ( )
2016-05-03 04:59:13 +00:00
2016-05-03 03:50:42 +00:00
//log.Printf("datalog.go: initDataLog() complete.\n") //REMOVE -- DEBUG
2016-05-02 05:03:21 +00:00
}
2016-05-03 03:50:42 +00:00
/ *
dataLogWatchdog ( ) : Watchdog function to control startup / shutdown of data logging subsystem .
Called by initDataLog as a goroutine . It iterates once per second to determine if
globalSettings . ReplayLog has toggled . If logging was switched from off to on , it starts
datalog ( ) as a goroutine . If the log is running and we want it to stop , it calls
closeDataLog ( ) to turn off the input channels , close the log , and tear down the dataLog
and dataLogWriter goroutines .
* /
2016-05-02 05:03:21 +00:00
func dataLogWatchdog ( ) {
for {
if ! dataLogStarted && globalSettings . ReplayLog { // case 1: sqlite logging isn't running, and we want to start it
2016-05-03 03:50:42 +00:00
log . Printf ( "datalog.go: Watchdog wants to START logging.\n" )
2016-05-02 05:03:21 +00:00
go dataLog ( )
} else if dataLogStarted && ! globalSettings . ReplayLog { // case 2: sqlite logging is running, and we want to shut it down
2016-05-03 03:50:42 +00:00
log . Printf ( "datalog.go: Watchdog wants to STOP logging.\n" )
2016-05-02 05:03:21 +00:00
closeDataLog ( )
}
2016-05-03 03:50:42 +00:00
//log.Printf("Watchdog iterated.\n") //REMOVE -- DEBUG
2016-05-02 05:03:21 +00:00
time . Sleep ( 1 * time . Second )
2016-05-03 03:50:42 +00:00
//log.Printf("Watchdog sleep over.\n") //REMOVE -- DEBUG
2016-05-02 05:03:21 +00:00
}
}
2016-05-03 03:50:42 +00:00
/ *
closeDataLog ( ) : Handler for graceful shutdown of data logging goroutines . It is called by
by dataLogWatchdog ( ) , gracefulShutdown ( ) , and by any other function ( disk space monitor ? )
that needs to be able to shut down sqlite logging without corrupting data or blocking
execution .
This function turns off log message reads into the dataLogChan receiver , and sends a
message to a quit channel ( ' shutdownDataLogWriter ` ) in dataLogWriter ( ) . dataLogWriter ( )
then sends a message to a quit channel to ' shutdownDataLog ` in dataLog ( ) to close * that *
goroutine . That function sets dataLogStarted = false once the logfile is closed . By waiting
for that signal , closeDataLog ( ) won ' t exit until the log is safely written . This prevents
data loss on shutdown .
* /
2016-05-02 05:03:21 +00:00
func closeDataLog ( ) {
2016-05-03 03:50:42 +00:00
//log.Printf("closeDataLog(): dataLogStarted = %t\n", dataLogStarted) //REMOVE -- DEBUG
dataLogReadyToWrite = false // prevent any new messages from being sent down the channels
log . Printf ( "datalog.go: Starting data log shutdown\n" )
shutdownDataLogWriter <- true //
defer close ( shutdownDataLogWriter ) // ... and close the channel so subsequent accidental writes don't stall execution
log . Printf ( "datalog.go: Waiting for shutdown signal from dataLog()" )
2016-05-02 05:03:21 +00:00
for dataLogStarted {
2016-05-03 03:50:42 +00:00
//log.Printf("closeDataLog(): dataLogStarted = %t\n", dataLogStarted) //REMOVE -- DEBUG
2016-05-02 05:03:21 +00:00
time . Sleep ( 50 * time . Millisecond )
}
2016-05-03 03:50:42 +00:00
log . Printf ( "datalog.go: Data log shutdown successful.\n" )
2016-03-24 03:08:00 +00:00
}