stratux/main/datalog.go

526 wiersze
16 KiB
Go
Czysty Zwykły widok Historia

2016-03-23 16:31:11 +00:00
/*
Copyright (c) 2015-2016 Christopher Young
Distributable under the terms of The "BSD New"" License
that can be found in the LICENSE file, herein included
as part of this header.
datalog.go: Log stratux data as it is received. Bucket data into timestamp time slots.
*/
package main
import (
"database/sql"
"errors"
2016-03-23 16:31:11 +00:00
"fmt"
_ "github.com/mattn/go-sqlite3"
"log"
"os"
2016-03-23 16:31:11 +00:00
"reflect"
"strconv"
2016-03-23 16:31:11 +00:00
"strings"
"time"
)
const (
LOG_TIMESTAMP_RESOLUTION = 250 * time.Millisecond
2016-03-23 16:31:11 +00:00
)
type StratuxTimestamp struct {
2016-04-01 12:36:10 +00:00
id int64
2016-03-24 05:14:48 +00:00
Time_type_preference int // 0 = stratuxClock, 1 = gpsClock, 2 = gpsClock extrapolated via stratuxClock.
StratuxClock_value time.Time
GPSClock_value time.Time // The value of this is either from the GPS or extrapolated from the GPS via stratuxClock if pref is 1 or 2. It is time.Time{} if 0.
2016-03-24 05:14:48 +00:00
PreferredTime_value time.Time
2016-03-23 16:31:11 +00:00
}
var dataLogTimestamps []StratuxTimestamp
var dataLogCurTimestamp int64 // Current timestamp bucket. This is an index on dataLogTimestamps which is not necessarily the db id.
2016-03-23 16:31:11 +00:00
/*
checkTimestamp().
Verify that our current timestamp is within the LOG_TIMESTAMP_RESOLUTION bucket.
Returns false if the timestamp was changed, true if it is still valid.
This is where GPS timestamps are extrapolated, if the GPS data is currently valid.
2016-03-23 16:31:11 +00:00
*/
2016-03-23 16:31:11 +00:00
func checkTimestamp() bool {
thisCurTimestamp := dataLogCurTimestamp
if stratuxClock.Since(dataLogTimestamps[thisCurTimestamp].StratuxClock_value) >= LOG_TIMESTAMP_RESOLUTION {
var ts StratuxTimestamp
ts.id = 0
ts.Time_type_preference = 0 // stratuxClock.
ts.StratuxClock_value = stratuxClock.Time
ts.GPSClock_value = time.Time{}
ts.PreferredTime_value = stratuxClock.Time
// Extrapolate from GPS timestamp, if possible.
if isGPSClockValid() && thisCurTimestamp > 0 {
// Was the last timestamp either extrapolated or GPS time?
last_ts := dataLogTimestamps[thisCurTimestamp]
if last_ts.Time_type_preference == 1 || last_ts.Time_type_preference == 2 {
// Extrapolate via stratuxClock.
timeSinceLastTS := ts.StratuxClock_value.Sub(last_ts.StratuxClock_value) // stratuxClock ticks since last timestamp.
extrapolatedGPSTimestamp := last_ts.PreferredTime_value.Add(timeSinceLastTS)
// Re-set the preferred timestamp type to '2' (extrapolated time).
ts.Time_type_preference = 2
ts.PreferredTime_value = extrapolatedGPSTimestamp
ts.GPSClock_value = extrapolatedGPSTimestamp
}
}
dataLogTimestamps = append(dataLogTimestamps, ts)
dataLogCurTimestamp = int64(len(dataLogTimestamps) - 1)
2016-03-23 16:31:11 +00:00
return false
}
return true
}
2016-03-23 16:31:11 +00:00
type SQLiteMarshal struct {
FieldType string
Marshal func(v reflect.Value) string
2016-03-23 16:31:11 +00:00
}
func boolMarshal(v reflect.Value) string {
b := v.Bool()
if b {
return "1"
}
return "0"
2016-03-23 16:31:11 +00:00
}
func intMarshal(v reflect.Value) string {
return strconv.FormatInt(v.Int(), 10)
2016-03-23 16:31:11 +00:00
}
func uintMarshal(v reflect.Value) string {
return strconv.FormatUint(v.Uint(), 10)
}
func floatMarshal(v reflect.Value) string {
return strconv.FormatFloat(v.Float(), 'f', 10, 64)
}
func stringMarshal(v reflect.Value) string {
return v.String()
2016-03-23 16:31:11 +00:00
}
func notsupportedMarshal(v reflect.Value) string {
2016-03-23 16:31:11 +00:00
return ""
}
func structCanBeMarshalled(v reflect.Value) bool {
m := v.MethodByName("String")
if m.IsValid() && !m.IsNil() {
return true
}
return false
}
func structMarshal(v reflect.Value) string {
if structCanBeMarshalled(v) {
m := v.MethodByName("String")
in := make([]reflect.Value, 0)
ret := m.Call(in)
if len(ret) > 0 {
return ret[0].String()
}
}
2016-03-23 16:31:11 +00:00
return ""
}
var sqliteMarshalFunctions = map[string]SQLiteMarshal{
"bool": {FieldType: "INTEGER", Marshal: boolMarshal},
"int": {FieldType: "INTEGER", Marshal: intMarshal},
"uint": {FieldType: "INTEGER", Marshal: uintMarshal},
"float": {FieldType: "REAL", Marshal: floatMarshal},
"string": {FieldType: "TEXT", Marshal: stringMarshal},
"struct": {FieldType: "STRING", Marshal: structMarshal},
2016-03-23 16:31:11 +00:00
"notsupported": {FieldType: "notsupported", Marshal: notsupportedMarshal},
}
var sqlTypeMap = map[reflect.Kind]string{
reflect.Bool: "bool",
reflect.Int: "int",
reflect.Int8: "int",
reflect.Int16: "int",
reflect.Int32: "int",
reflect.Int64: "int",
reflect.Uint: "uint",
reflect.Uint8: "uint",
reflect.Uint16: "uint",
reflect.Uint32: "uint",
reflect.Uint64: "uint",
reflect.Uintptr: "notsupported",
reflect.Float32: "float",
reflect.Float64: "float",
reflect.Complex64: "notsupported",
reflect.Complex128: "notsupported",
reflect.Array: "notsupported",
reflect.Chan: "notsupported",
reflect.Func: "notsupported",
reflect.Interface: "notsupported",
reflect.Map: "notsupported",
reflect.Ptr: "notsupported",
reflect.Slice: "notsupported",
reflect.String: "string",
reflect.Struct: "struct",
2016-03-23 16:31:11 +00:00
reflect.UnsafePointer: "notsupported",
}
func makeTable(i interface{}, tbl string, db *sql.DB) {
2016-03-23 16:31:11 +00:00
val := reflect.ValueOf(i)
fields := make([]string, 0)
for i := 0; i < val.NumField(); i++ {
kind := val.Field(i).Kind()
fieldName := val.Type().Field(i).Name
sqlTypeAlias := sqlTypeMap[kind]
// Check that if the field is a struct that it can be marshalled.
if sqlTypeAlias == "struct" && !structCanBeMarshalled(val.Field(i)) {
continue
}
2016-03-23 16:31:11 +00:00
if sqlTypeAlias == "notsupported" || fieldName == "id" {
continue
}
sqlType := sqliteMarshalFunctions[sqlTypeAlias].FieldType
s := fieldName + " " + sqlType
fields = append(fields, s)
}
2016-03-24 05:14:48 +00:00
// Add the timestamp_id field to link up with the timestamp table.
if tbl != "timestamp" {
fields = append(fields, "timestamp_id INTEGER")
}
tblCreate := fmt.Sprintf("CREATE TABLE %s (id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, %s)", tbl, strings.Join(fields, ", "))
_, err := db.Exec(tblCreate)
if err != nil {
fmt.Printf("ERROR: %s\n", err.Error())
2016-03-23 16:31:11 +00:00
}
}
/*
bulkInsert().
Reads insertBatch and insertBatchIfs. This is called after a group of insertData() calls.
*/
func bulkInsert(tbl string, db *sql.DB) (res sql.Result, err error) {
if _, ok := insertString[tbl]; !ok {
return nil, errors.New("no insert statement")
}
batchVals := insertBatchIfs[tbl]
numColsPerRow := len(batchVals[0])
maxRowBatch := int(999 / numColsPerRow) // SQLITE_MAX_VARIABLE_NUMBER = 999.
// log.Printf("table %s. %d cols per row. max batch %d\n", tbl, numColsPerRow, maxRowBatch)
for len(batchVals) > 0 {
// timeInit := time.Now()
2016-04-06 03:01:10 +00:00
i := int(0) // Variable number of rows per INSERT statement.
stmt := ""
vals := make([]interface{}, 0)
querySize := uint64(0) // Size of the query in bytes.
for len(batchVals) > 0 && i < maxRowBatch && querySize < 750000 { // Maximum of 1,000,000 bytes per query.
if len(stmt) == 0 { // The first set will be covered by insertString.
stmt = insertString[tbl]
querySize += uint64(len(insertString[tbl]))
} else {
addStr := ", (" + strings.Join(strings.Split(strings.Repeat("?", len(batchVals[0])), ""), ",") + ")"
stmt += addStr
querySize += uint64(len(addStr))
}
2016-04-02 19:33:19 +00:00
for _, val := range batchVals[0] {
2016-04-02 19:40:25 +00:00
querySize += uint64(len(val.(string)))
2016-04-02 19:33:19 +00:00
}
vals = append(vals, batchVals[0]...)
batchVals = batchVals[1:]
i++
}
// log.Printf("inserting %d rows to %s. querySize=%d\n", i, tbl, querySize)
res, err = db.Exec(stmt, vals...)
// timeBatch := time.Since(timeInit) // debug
// log.Printf("SQLite: bulkInserted %d rows to %s. Took %f msec to build and insert query. querySize=%d\n", i, tbl, 1000*timeBatch.Seconds(), querySize) // debug
if err != nil {
2016-04-06 03:01:10 +00:00
log.Printf("sqlite INSERT error: '%s'\n", err.Error())
return
}
}
// Clear the buffers.
delete(insertString, tbl)
delete(insertBatchIfs, tbl)
return
}
/*
insertData().
Inserts an arbitrary struct into an SQLite table.
Inserts the timestamp first, if its 'id' is 0.
*/
// Cached 'VALUES' statements. Indexed by table name.
var insertString map[string]string // INSERT INTO tbl (col1, col2, ...) VALUES(?, ?, ...). Only for one value.
var insertBatchIfs map[string][][]interface{}
func insertData(i interface{}, tbl string, db *sql.DB, ts_num int64) int64 {
val := reflect.ValueOf(i)
keys := make([]string, 0)
values := make([]string, 0)
for i := 0; i < val.NumField(); i++ {
kind := val.Field(i).Kind()
fieldName := val.Type().Field(i).Name
sqlTypeAlias := sqlTypeMap[kind]
if sqlTypeAlias == "notsupported" || fieldName == "id" {
continue
}
v := sqliteMarshalFunctions[sqlTypeAlias].Marshal(val.Field(i))
keys = append(keys, fieldName)
values = append(values, v)
}
2016-03-24 05:14:48 +00:00
// Add the timestamp_id field to link up with the timestamp table.
if tbl != "timestamp" {
keys = append(keys, "timestamp_id")
if dataLogTimestamps[ts_num].id == 0 {
//FIXME: This is somewhat convoluted. When insertData() is called for a ts_num that corresponds to a timestamp with no database id,
// then it inserts that timestamp via the same interface and the id is updated in the structure via the below lines
// (dataLogTimestamps[ts_num].id = id).
insertData(dataLogTimestamps[ts_num], "timestamp", db, ts_num) // Updates dataLogTimestamps[ts_num].id.
}
values = append(values, strconv.FormatInt(dataLogTimestamps[ts_num].id, 10))
2016-03-24 05:14:48 +00:00
}
if _, ok := insertString[tbl]; !ok {
// Prepare the statement.
tblInsert := fmt.Sprintf("INSERT INTO %s (%s) VALUES(%s)", tbl, strings.Join(keys, ","),
strings.Join(strings.Split(strings.Repeat("?", len(keys)), ""), ","))
insertString[tbl] = tblInsert
}
2016-03-24 05:14:48 +00:00
// Make the values slice into a slice of interface{}.
2016-03-24 05:14:48 +00:00
ifs := make([]interface{}, len(values))
for i := 0; i < len(values); i++ {
ifs[i] = values[i]
}
insertBatchIfs[tbl] = append(insertBatchIfs[tbl], ifs)
if tbl == "timestamp" { // Immediate insert always for "timestamp" table.
res, err := bulkInsert("timestamp", db) // Bulk insert of 1, always.
if err == nil {
id, err := res.LastInsertId()
if err == nil {
ts := dataLogTimestamps[ts_num]
ts.id = id
dataLogTimestamps[ts_num] = ts
}
return id
2016-03-24 21:26:59 +00:00
}
2016-03-24 05:14:48 +00:00
}
return 0
}
type DataLogRow struct {
tbl string
data interface{}
ts_num int64
2016-03-24 05:14:48 +00:00
}
var dataLogChan chan DataLogRow
2016-03-26 21:15:59 +00:00
var shutdownDataLog chan bool
2016-03-24 05:14:48 +00:00
var dataLogWriteChan chan DataLogRow
func dataLogWriter(db *sql.DB) {
dataLogWriteChan = make(chan DataLogRow, 10240)
// The write queue. As data comes in via dataLogChan, it is timestamped and stored.
// When writeTicker comes up, the queue is emptied.
2016-04-01 22:42:28 +00:00
writeTicker := time.NewTicker(10 * time.Second)
rowsQueuedForWrite := make([]DataLogRow, 0)
for {
select {
case r := <-dataLogWriteChan:
// Accept timestamped row.
rowsQueuedForWrite = append(rowsQueuedForWrite, r)
case <-writeTicker.C:
// for i := 0; i < 1000; i++ {
// logSituation()
// }
timeStart := stratuxClock.Time
nRows := len(rowsQueuedForWrite)
if globalSettings.DEBUG {
log.Printf("Writing %d rows\n", nRows)
}
// Write the buffered rows. This will block while it is writing.
// Save the names of the tables affected so that we can run bulkInsert() on after the insertData() calls.
tblsAffected := make(map[string]bool)
// Start transaction.
tx, err := db.Begin()
if err != nil {
log.Printf("db.Begin() error: %s\n", err.Error())
break // from select {}
}
for _, r := range rowsQueuedForWrite {
tblsAffected[r.tbl] = true
insertData(r.data, r.tbl, db, r.ts_num)
}
// Do the bulk inserts.
for tbl, _ := range tblsAffected {
bulkInsert(tbl, db)
}
// Close the transaction.
tx.Commit()
rowsQueuedForWrite = make([]DataLogRow, 0) // Zero the queue.
timeElapsed := stratuxClock.Since(timeStart)
if globalSettings.DEBUG {
rowsPerSecond := float64(nRows) / float64(timeElapsed.Seconds())
log.Printf("Writing finished. %d rows in %.2f seconds (%.1f rows per second).\n", nRows, float64(timeElapsed.Seconds()), rowsPerSecond)
}
if timeElapsed.Seconds() > 10.0 {
2016-04-07 22:34:01 +00:00
log.Printf("WARNING! SQLite logging is behind. Last write took %.1f seconds.\n", float64(timeElapsed.Seconds()))
}
}
}
}
func dataLog() {
2016-03-24 16:13:08 +00:00
dataLogChan = make(chan DataLogRow, 10240)
2016-03-26 21:15:59 +00:00
shutdownDataLog = make(chan bool)
dataLogTimestamps = make([]StratuxTimestamp, 0)
var ts StratuxTimestamp
ts.id = 0
ts.Time_type_preference = 0 // stratuxClock.
ts.StratuxClock_value = stratuxClock.Time
ts.GPSClock_value = time.Time{}
ts.PreferredTime_value = stratuxClock.Time
dataLogTimestamps = append(dataLogTimestamps, ts)
dataLogCurTimestamp = 0
// Check if we need to create a new database.
createDatabase := false
if _, err := os.Stat(dataLogFile); os.IsNotExist(err) {
createDatabase = true
log.Printf("creating new database '%s'.\n", dataLogFile)
}
db, err := sql.Open("sqlite3", dataLogFile)
2016-03-24 05:14:48 +00:00
if err != nil {
log.Printf("sql.Open(): %s\n", err.Error())
2016-03-24 05:14:48 +00:00
}
defer db.Close()
2016-04-02 19:33:19 +00:00
_, err = db.Exec("PRAGMA journal_mode=WAL")
if err != nil {
log.Printf("db.Exec('PRAGMA journal_mode=WAL') err: %s\n", err.Error())
}
_, err = db.Exec("PRAGMA synchronous=OFF")
if err != nil {
log.Printf("db.Exec('PRAGMA journal_mode=WAL') err: %s\n", err.Error())
}
go dataLogWriter(db)
// Do we need to create the database?
if createDatabase {
makeTable(StratuxTimestamp{}, "timestamp", db)
makeTable(mySituation, "mySituation", db)
makeTable(globalStatus, "status", db)
makeTable(globalSettings, "settings", db)
makeTable(TrafficInfo{}, "traffic", db)
makeTable(msg{}, "messages", db)
makeTable(esmsg{}, "es_messages", db)
makeTable(Dump1090TermMessage{}, "dump1090_terminal", db)
}
2016-03-24 05:14:48 +00:00
for {
2016-03-26 21:15:59 +00:00
select {
case r := <-dataLogChan:
// When data is input, the first step is to timestamp it.
2016-03-26 21:15:59 +00:00
// Check if our time bucket has expired or has never been entered.
checkTimestamp()
// Mark the row with the current timestamp ID, in case it gets entered later.
r.ts_num = dataLogCurTimestamp
// Queue it for the scheduled write.
dataLogWriteChan <- r
2016-03-26 21:15:59 +00:00
case <-shutdownDataLog: // Received a message on the channel (anything). Graceful shutdown (defer statement).
return
}
2016-03-23 16:31:11 +00:00
}
}
/*
setDataLogTimeWithGPS().
Create a timestamp entry using GPS time.
*/
2016-03-24 21:26:59 +00:00
func setDataLogTimeWithGPS(sit SituationData) {
if isGPSClockValid() {
var ts StratuxTimestamp
2016-03-24 21:26:59 +00:00
// Piggyback a GPS time update from this update.
ts.id = 0
ts.Time_type_preference = 1 // gpsClock.
ts.StratuxClock_value = stratuxClock.Time
ts.GPSClock_value = sit.GPSTime
ts.PreferredTime_value = sit.GPSTime
dataLogTimestamps = append(dataLogTimestamps, ts)
dataLogCurTimestamp = int64(len(dataLogTimestamps) - 1)
2016-03-24 21:26:59 +00:00
}
}
func logSituation() {
if globalSettings.ReplayLog {
dataLogChan <- DataLogRow{tbl: "mySituation", data: mySituation}
}
}
func logStatus() {
dataLogChan <- DataLogRow{tbl: "status", data: globalStatus}
}
func logSettings() {
dataLogChan <- DataLogRow{tbl: "settings", data: globalSettings}
}
func logTraffic(ti TrafficInfo) {
if globalSettings.ReplayLog {
dataLogChan <- DataLogRow{tbl: "traffic", data: ti}
}
}
func logMsg(m msg) {
if globalSettings.ReplayLog {
dataLogChan <- DataLogRow{tbl: "messages", data: m}
}
}
func logESMsg(m esmsg) {
if globalSettings.ReplayLog {
dataLogChan <- DataLogRow{tbl: "es_messages", data: m}
}
}
func logDump1090TermMessage(m Dump1090TermMessage) {
if globalSettings.DEBUG && globalSettings.ReplayLog {
dataLogChan <- DataLogRow{tbl: "dump1090_terminal", data: m}
}
}
func initDataLog() {
insertString = make(map[string]string)
insertBatchIfs = make(map[string][][]interface{})
go dataLog()
}